登录
  • 人们都希望被别人需要 却往往事与愿违
  • 判断一个人的人品, 不是看他好起来做什么好事, 而是看他坏起来不做什么坏事。

分布式Telegram Bot

编程 Benny小土豆 4211次浏览 7261字 1个评论
文章目录[显示]
这篇文章在 2023年01月04日22:23:59 更新了哦~

自己写一个Telegram Bot是非常简单的,无论是使用HTTP API还是MTProto协议。

最简单的echo bot大概是下面的这种样子

import logging
import time

from telebot import apihelper
import telebot

logging.basicConfig(level=logging.INFO)

bot = telebot.TeleBot("3XI")
apihelper.proxy = {'https': 'socks5h://127.0.0.1:1086'}

@bot.message_handler(commands=['start', 'help'])
def send_welcome(message):
    logging.info("Received message")
    bot.reply_to(message, "Howdy, how are you doing?")

if __name__ == '__main__':
    bot.polling()

分布式Telegram Bot

很棒是不是!

当你的bot开始有更多的业务,比如说查询数据库,去访问其他网站然后解析,你就会发现这个bot似乎有点力不从心,所有的响应都慢了一拍。

比如这样,这个bot每次都要去访问另外一个网站,然后返回,可惜的是这个网站响应太慢了,要5秒才能返回

@bot.message_handler(commands=['start'])
def send_welcome(message):
    logging.info("Received start message")
    text = requests.get("http://127.0.0.1:5000/").text
    bot.reply_to(message, text)

@bot.message_handler(commands=['help'])
def send_help(message):
    logging.info("Received help message")
    bot.reply_to(message, "help")

此时你就会惊奇地发现,所有其他的操作都被阻塞了,直到start结束。

分布式Telegram Bot

这要怎么办呢?一定是有办法的对吧

增大thread数量

无论是pyrogram还是pytelegrambotapi,都有一个额外的参数用来设置线程数量。

对于pytelegrambotapi来说,参数是num_threads=2,构造bot的时候就可以指定;

对于pyrogram来说,参数是workers: int = Scaffold.WORKERS,更准确的说是min(32, os.cpu_count() + 4)

那么只要我们在一定范围内增加这个数字,那么就应该在一定程度内不会反应迟钝太多。比如说:

bot = telebot.TeleBot("token", num_threads=100)

是这个道理的!

使用async

使用async也可以恰到好处的解决我们以上阻塞的问题,此时我们需要用AsyncTeleBot

from telebot.async_telebot import AsyncTeleBot
bot = AsyncTeleBot("TOKEN")

但是这个时候我们就不能直接用requests了,因为requests是sync的,简单起见我们这用aiohttp作为演示例子

@bot.message_handler(commands=['start'])
async def send_welcome(message):
    logging.info("Received start message")
    async with aiohttp.ClientSession() as session:
        async with session.get('http://127.0.0.1:5000/') as response:
            html = await response.text()
    await bot.reply_to(message, html)

@bot.message_handler(commands=['help'])
async def send_help(message):
    logging.info("Received help message")
    await bot.reply_to(message, "help")

if __name__ == '__main__':
    asyncio.run(bot.polling())

嗯,不会阻塞的。

分布式Telegram Bot

这种做法也是一种很流行的办法,但是我个人不是很喜欢。一个是因为菜,还有一个是因为很多库都是非async的,切换过去难度可能比较大。尽管也有些monkey patch可以使用,但是总感觉这种做法太过于魔法。

更复杂的场景

通常来说,简单加大线程数量或者用async即可解决我们的问题。

但是比如你的场景比较复杂,比如说是从YouTube上下载视频,需要占用大量的带宽,即使线程数量足够,那么并发量高的时候,下载速度也会被限制于本机的带宽。

既然MTProto协议最高允许10个客户端同时在线,那么有没有办法同时在多台机器上登录,然后分发任务,每个机器接收到任务之后只负责处理自己的任务,这样整体的吞吐量就可以非常容易的水平扩展。

Celery - 分布式任务队列

Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必需工具。

没错的,我们可以在多台机器之间通过WireGuard组一个大内网,然后通过celery分发任务。

使用Celery非常简单,我们需要先准备好broker,比如说使用redis吧!

from celery import Celery
app = Celery('hello', broker='redis://localhost:6379/0', timezone="Asia/Shanghai"

@app.task
def hello():
    return 'hello world'

然后运行worker

celery -A tasks worker --loglevel=info

这里我们可以看到,celery连接到了redis,concurrency是8,使用prefork

分布式Telegram Bot

之后我们只要使用delay方法提交任务就可以了

> python3 master [bd0f485] deleted untracked
Python 3.9.9 (main, Nov 21 2021, 03:22:47)
[Clang 12.0.0 (clang-1200.0.32.29)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> from tasks import add
>>> add.delay(8,1)
<AsyncResult: cf06f416-bdf7-4ea9-99bc-7ba136329b7f>
>>>

celery worker会拿到任务,然后进行处理。如果有多个worker并且全部空闲,那么大家拿到任务的几率是均等的,也就是round robin。

celery+pyrogram

好极了,那就这么搞,用pyrogram只要确保session名字不同,那么就是不同的客户端。

于是你兴致勃勃的写好了代码

# bot.py
from pyrogram import filters
from client_init import create_app
from tasks import send_hello

app = create_app()

@app.on_message(filters.command(["/start"]))
def hello(client, message):
    send_hello.delay(message.chat.id)

if __name__ == '__main__':
    app.run()


# tasks.py
from celery import Celery
from client_init import create_app

app = Celery('tasks', broker='redis://localhost:6379/0', timezone="Asia/Shanghai")
tg_client = create_app("development")
tg_client.start()

@app.task
def send_hello(user_id):
    tg_client.send_message(user_id, "Hello")

然后发送了一个/start,奇迹并没有发生。事实就像社会主义铁拳一样,总会把你的脸打得啪啪作响。

分布式Telegram Bot

行吧,这是macOS/Python.org/Celery的锅,猜测是和subprocess在不同平台上的实现有关。因为我们用的是prefork模式。

那既然这样,我们换到Linux试一下看看

虽然没有报错了,但是为何收到了消息却没有执行?或者说执行卡住了?多打点log试试看?

@app.task
def send_hello(user_id):
    logging.info("receive tasks")
    print(tg_client.get_me())
    tg_client.send_message(user_id, "Hello")
    logging.info("finish tasks")

分布式Telegram Bot

为什么?

为什么client卡住了?

celery默认是prefork模式,意味celery会fork出来一堆子进程来干活,然后自己成为包工头。这也是我们在日志中看到 ForkPoolWorker-x的执行日志的原因。

但是我们是在file-level定义的tg_client,而执行任务的worker在收到下载请求时要去访问tg_client,这是一个大问题。再加上pyrogram是异步库。所以问题就更严重了

解决方案之一是,我们在tasks中使用with,这样相当于每次tasks执行都要经历完成的连接步骤,就好像开机-干活-关机一样。

@app.task
def send_hello(user_id):
    logging.info("receive tasks")
    with create_app("development") as tg_client:
        print(tg_client.get_me())
        tg_client.send_message(user_id, "Hello")
    logging.info("finish tasks")

第一次会比较慢,后续会使用到session文件,能稍微快一点。

如果突然来了10个并发的任务……💣那这就完蛋啦

solo模式

celery worker除了prefork模式,还有gevent、eventlet、threads和solo这四种。它们的区别大致如下:

1. prefork(默认):worker会开启多个进程来执行具体的任务实例(task instance),适合于CPU密集型应用;这会开启一个worker主进程,和一组工作进程(如果并行度设置为2,当使用ps -ef | grep celery的时候,会看到3个进程,多出来的一个就是主进程)。

celery -A djangogo worker -P prefork -c 2

2. eventlet:适用于I/O密集型应用;底层使用epoll或者libevent来驱动多路复用。要注意不要在这样的worker中运行CPU密集型的任务实例。

celery -A djangogo worker -P eventlet -c 20000

3. gevent:类似于eventlet,基于libev或者libuv事件循环。

celery -A djangogo worker -P gevent -c 20000

4.solo:接收控制指令同运行任务实例在同一个进程里执行,如果任务实例执行时间较长会阻塞控制指令请求的响应,客户端需要适度增加超时时间设置。(一般不使用)

5. threads:任务实例在线程中执行。这里的线程就是我们通常认知上的线程,线程维护通常要明显大于协程,所以并行度的设置也需要考虑到维护的代价。

那么用solo模式似乎可行

tg_client = create_app("development")
tg_client.start()

@app.task
def send_hello(user_id):
    logging.info("receive tasks")
    # with create_app("development") as tg_client:
    text = requests.get("http://127.0.0.1:5000").text
    tg_client.send_message(user_id, text)
    logging.info("finish tasks")

# run celery
celery -A tasks worker --loglevel=info --pool=solo

分布式Telegram Bot

这样当然也行,但是一个很严肃的问题就是,solo情况下如果函数执行时间过长,会使得后续tasks被阻塞,也就是说solo下concurrency是没有用的。这……

threads模式

那么用threads模式如何?反正大部分情况下也只是网络相关的操作,不太涉及到计算。

celery -A tasks worker --loglevel=info --pool=threads --concurrency=20

此时你会发现,虽然可以并发,但是似乎又像prefork模式一样,卡住了……

根本原因是celery tasks是阻塞主线程的,而pyrogram作为异步库是要依赖主线程的

但是改改还能抢救一下

如果你使用的是pyrogram 1.x版本的话

from pyrogram import idle

logging.basicConfig(level=logging.INFO)
app = Celery('tasks', broker='redis://host.docker.internal:6379/0', timezone="Asia/Shanghai")
tg_client = create_app("development")

@app.task
def send_hello(user_id):
    logging.info("receive tasks")
    # with create_app("development") as tg_client:
    text = requests.get("http://127.0.0.1:5000").text
    tg_client.send_message(user_id, text)
    logging.info("finish tasks")

def run_celery():
    argv = ["-A", "tasks", 'worker', '--loglevel=info', "--pool=threads", "--concurrency=200"]
    app.worker_main(argv)

if __name__ == '__main__':
    tg_client.start()
    threading.Thread(target=run_celery, daemon=True).start()
    idle()
    tg_client.stop()

# run celery
python3 tasks.py

这样就可以顺利收发消息啦😄

其他模式

至于gevent、eventlet,反正我是没搞出来……

其他配置

监控worker

在有了worker后,总想做一些监控什么的,celery flower正好是干这件事情的

celery -A tasks worker --loglevel=info

由于我们的tg_client是file-level,并且由于flower和最新版本的celery似乎有些兼容性问题,没办法不用-A喧嚣。因此为了避免浪费一个client并加快启动速度,我们可以弄一个dummy tasks

# flower_tasks.py
app = Celery('tasks', broker=BROKER, timezone="Asia/Shanghai")

然后

celery -A flower_tasks flower

甚至可以根据个人情况使用--persistentbasic auth之类的。看看这效果(

分布式Telegram Bot

还可以看到哪些任务是失败的,修复了好几个bug

分布式Telegram Bot

持续部署

worker这么多,每次更新都要ssh上去很麻烦,即使是用了docker-compose也一样。方法之一是用ansible,方法之二是用GitHub Actions。

我的用法是在Github Actions上ssh到机器,然后执行我写好的MakefileMakefile的内容大概就是docker pull docker-compose up -d之类的啦。

可以参考这里 https://github.com/tgbot-collection/ytdlbot/blob/master/.github/workflows/upgrade.yml

总结

总结一下,大概有三种可行的方法,以tg_client.get_me()和我的慢速梯子为例:

  1. prefork+with,每次都要开机-干活-关机。第一次会比较慢大概15-20秒,后续会快一点。参考时间2-3s
  2. solo模式,只能处理一个,每次处理参考时间0.5s或更低
  3. threads模式,只要不涉及到大量计算那么问题就不大,那么每次处理参考时间为0.5s或更低

最后……

欢迎流量太多的盆友来跟我一起做worker!名额有限速来!

也希望有人能够帮我解决prefork下的那个问题……🌚 🌚

参考

https://github.com/pyrogram/pyrogram/issues/480

https://blog.csdn.net/cnweike/article/details/105840936

https://github.com/tgbot-collection/ytdlbot


文章版权归原作者所有丨本站默认采用CC-BY-NC-SA 4.0协议进行授权|
转载必须包含本声明,并以超链接形式注明原作者和本文原始地址:
https://dmesg.app/celery-bot.html
喜欢 (33)
分享:-)
关于作者:
If you have any further questions, feel free to contact me in English or Chinese.
发表我的评论
取消评论

                     

去你妹的实名制!

  • 昵称 (必填)
  • 邮箱 (必填,不要邮件提醒可以随便写)
  • 网址 (选填)
(1)个小伙伴在吐槽
  1. 你好小土豆,我发现我用telethon接收消息的时候总有1-60秒的延迟,需要开多个client搞分布式来试图将低延迟吗,还是更改一些参数和代码逻辑就可以的了
    迈乐斯2024-12-04 17:34 回复