自己写一个Telegram Bot是非常简单的,无论是使用HTTP API还是MTProto协议。
最简单的echo bot大概是下面的这种样子
import logging import time from telebot import apihelper import telebot logging.basicConfig(level=logging.INFO) bot = telebot.TeleBot("3XI") apihelper.proxy = {'https': 'socks5h://127.0.0.1:1086'} @bot.message_handler(commands=['start', 'help']) def send_welcome(message): logging.info("Received message") bot.reply_to(message, "Howdy, how are you doing?") if __name__ == '__main__': bot.polling()
很棒是不是!
当你的bot开始有更多的业务,比如说查询数据库,去访问其他网站然后解析,你就会发现这个bot似乎有点力不从心,所有的响应都慢了一拍。
比如这样,这个bot每次都要去访问另外一个网站,然后返回,可惜的是这个网站响应太慢了,要5秒才能返回
@bot.message_handler(commands=['start']) def send_welcome(message): logging.info("Received start message") text = requests.get("http://127.0.0.1:5000/").text bot.reply_to(message, text) @bot.message_handler(commands=['help']) def send_help(message): logging.info("Received help message") bot.reply_to(message, "help")
此时你就会惊奇地发现,所有其他的操作都被阻塞了,直到start结束。
这要怎么办呢?一定是有办法的对吧
增大thread数量
无论是pyrogram还是pytelegrambotapi,都有一个额外的参数用来设置线程数量。
对于pytelegrambotapi来说,参数是num_threads=2
,构造bot的时候就可以指定;
对于pyrogram来说,参数是workers: int = Scaffold.WORKERS
,更准确的说是min(32, os.cpu_count() + 4)
。
那么只要我们在一定范围内增加这个数字,那么就应该在一定程度内不会反应迟钝太多。比如说:
bot = telebot.TeleBot("token", num_threads=100)
是这个道理的!
使用async
使用async也可以恰到好处的解决我们以上阻塞的问题,此时我们需要用AsyncTeleBot
from telebot.async_telebot import AsyncTeleBot bot = AsyncTeleBot("TOKEN")
但是这个时候我们就不能直接用requests了,因为requests是sync的,简单起见我们这用aiohttp作为演示例子
@bot.message_handler(commands=['start']) async def send_welcome(message): logging.info("Received start message") async with aiohttp.ClientSession() as session: async with session.get('http://127.0.0.1:5000/') as response: html = await response.text() await bot.reply_to(message, html) @bot.message_handler(commands=['help']) async def send_help(message): logging.info("Received help message") await bot.reply_to(message, "help") if __name__ == '__main__': asyncio.run(bot.polling())
嗯,不会阻塞的。
这种做法也是一种很流行的办法,但是我个人不是很喜欢。一个是因为菜,还有一个是因为很多库都是非async的,切换过去难度可能比较大。尽管也有些monkey patch可以使用,但是总感觉这种做法太过于魔法。
更复杂的场景
通常来说,简单加大线程数量或者用async即可解决我们的问题。
但是比如你的场景比较复杂,比如说是从YouTube上下载视频,需要占用大量的带宽,即使线程数量足够,那么并发量高的时候,下载速度也会被限制于本机的带宽。
既然MTProto协议最高允许10个客户端同时在线,那么有没有办法同时在多台机器上登录,然后分发任务,每个机器接收到任务之后只负责处理自己的任务,这样整体的吞吐量就可以非常容易的水平扩展。
Celery - 分布式任务队列
Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必需工具。
没错的,我们可以在多台机器之间通过WireGuard组一个大内网,然后通过celery分发任务。
使用Celery非常简单,我们需要先准备好broker,比如说使用redis吧!
from celery import Celery app = Celery('hello', broker='redis://localhost:6379/0', timezone="Asia/Shanghai" @app.task def hello(): return 'hello world'
然后运行worker
celery -A tasks worker --loglevel=info
这里我们可以看到,celery连接到了redis,concurrency
是8,使用prefork
之后我们只要使用delay方法提交任务就可以了
> python3 master [bd0f485] deleted untracked Python 3.9.9 (main, Nov 21 2021, 03:22:47) [Clang 12.0.0 (clang-1200.0.32.29)] on darwin Type "help", "copyright", "credits" or "license" for more information. >>> from tasks import add >>> add.delay(8,1) <AsyncResult: cf06f416-bdf7-4ea9-99bc-7ba136329b7f> >>>
celery worker会拿到任务,然后进行处理。如果有多个worker并且全部空闲,那么大家拿到任务的几率是均等的,也就是round robin。
celery+pyrogram
好极了,那就这么搞,用pyrogram只要确保session名字不同,那么就是不同的客户端。
于是你兴致勃勃的写好了代码
# bot.py from pyrogram import filters from client_init import create_app from tasks import send_hello app = create_app() @app.on_message(filters.command(["/start"])) def hello(client, message): send_hello.delay(message.chat.id) if __name__ == '__main__': app.run() # tasks.py from celery import Celery from client_init import create_app app = Celery('tasks', broker='redis://localhost:6379/0', timezone="Asia/Shanghai") tg_client = create_app("development") tg_client.start() @app.task def send_hello(user_id): tg_client.send_message(user_id, "Hello")
然后发送了一个/start
,奇迹并没有发生。事实就像社会主义铁拳一样,总会把你的脸打得啪啪作响。
行吧,这是macOS/Python.org/Celery的锅,猜测是和subprocess在不同平台上的实现有关。因为我们用的是prefork模式。
那既然这样,我们换到Linux试一下看看
虽然没有报错了,但是为何收到了消息却没有执行?或者说执行卡住了?多打点log试试看?
@app.task def send_hello(user_id): logging.info("receive tasks") print(tg_client.get_me()) tg_client.send_message(user_id, "Hello") logging.info("finish tasks")
为什么?
为什么client卡住了?
celery默认是prefork模式,意味celery会fork出来一堆子进程来干活,然后自己成为包工头。这也是我们在日志中看到 ForkPoolWorker-x的执行日志的原因。
但是我们是在file-level定义的tg_client
,而执行任务的worker在收到下载请求时要去访问tg_client
,这是一个大问题。再加上pyrogram是异步库。所以问题就更严重了
解决方案之一是,我们在tasks中使用with
,这样相当于每次tasks执行都要经历完成的连接步骤,就好像开机-干活-关机一样。
@app.task def send_hello(user_id): logging.info("receive tasks") with create_app("development") as tg_client: print(tg_client.get_me()) tg_client.send_message(user_id, "Hello") logging.info("finish tasks")
第一次会比较慢,后续会使用到session文件,能稍微快一点。
如果突然来了10个并发的任务……💣那这就完蛋啦
solo模式
celery worker除了prefork模式,还有gevent、eventlet、threads和solo这四种。它们的区别大致如下:
1. prefork(默认):worker会开启多个进程来执行具体的任务实例(task instance),适合于CPU密集型应用;这会开启一个worker主进程,和一组工作进程(如果并行度设置为2,当使用ps -ef | grep celery的时候,会看到3个进程,多出来的一个就是主进程)。
celery -A djangogo worker -P prefork -c 2
2. eventlet:适用于I/O密集型应用;底层使用epoll或者libevent来驱动多路复用。要注意不要在这样的worker中运行CPU密集型的任务实例。
celery -A djangogo worker -P eventlet -c 20000
3. gevent:类似于eventlet,基于libev或者libuv事件循环。
celery -A djangogo worker -P gevent -c 20000
4.solo:接收控制指令同运行任务实例在同一个进程里执行,如果任务实例执行时间较长会阻塞控制指令请求的响应,客户端需要适度增加超时时间设置。(一般不使用)
5. threads:任务实例在线程中执行。这里的线程就是我们通常认知上的线程,线程维护通常要明显大于协程,所以并行度的设置也需要考虑到维护的代价。
那么用solo模式似乎可行
tg_client = create_app("development") tg_client.start() @app.task def send_hello(user_id): logging.info("receive tasks") # with create_app("development") as tg_client: text = requests.get("http://127.0.0.1:5000").text tg_client.send_message(user_id, text) logging.info("finish tasks") # run celery celery -A tasks worker --loglevel=info --pool=solo
这样当然也行,但是一个很严肃的问题就是,solo情况下如果函数执行时间过长,会使得后续tasks被阻塞,也就是说solo下concurrency是没有用的。这……
threads模式
那么用threads模式如何?反正大部分情况下也只是网络相关的操作,不太涉及到计算。
celery -A tasks worker --loglevel=info --pool=threads --concurrency=20
此时你会发现,虽然可以并发,但是似乎又像prefork模式一样,卡住了……
根本原因是celery tasks是阻塞主线程的,而pyrogram作为异步库是要依赖主线程的。
但是改改还能抢救一下
如果你使用的是pyrogram 1.x版本的话
from pyrogram import idle logging.basicConfig(level=logging.INFO) app = Celery('tasks', broker='redis://host.docker.internal:6379/0', timezone="Asia/Shanghai") tg_client = create_app("development") @app.task def send_hello(user_id): logging.info("receive tasks") # with create_app("development") as tg_client: text = requests.get("http://127.0.0.1:5000").text tg_client.send_message(user_id, text) logging.info("finish tasks") def run_celery(): argv = ["-A", "tasks", 'worker', '--loglevel=info', "--pool=threads", "--concurrency=200"] app.worker_main(argv) if __name__ == '__main__': tg_client.start() threading.Thread(target=run_celery, daemon=True).start() idle() tg_client.stop() # run celery python3 tasks.py
这样就可以顺利收发消息啦😄
其他模式
至于gevent、eventlet,反正我是没搞出来……
其他配置
监控worker
在有了worker后,总想做一些监控什么的,celery flower正好是干这件事情的
celery -A tasks worker --loglevel=info
由于我们的tg_client是file-level,并且由于flower和最新版本的celery似乎有些兼容性问题,没办法不用-A喧嚣。因此为了避免浪费一个client并加快启动速度,我们可以弄一个dummy tasks
# flower_tasks.py app = Celery('tasks', broker=BROKER, timezone="Asia/Shanghai")
然后
celery -A flower_tasks flower
甚至可以根据个人情况使用--persistent
、basic auth
之类的。看看这效果(
还可以看到哪些任务是失败的,修复了好几个bug
持续部署
worker这么多,每次更新都要ssh上去很麻烦,即使是用了docker-compose
也一样。方法之一是用ansible,方法之二是用GitHub Actions。
我的用法是在Github Actions上ssh到机器,然后执行我写好的Makefile
。Makefile
的内容大概就是docker pull docker-compose up -d
之类的啦。
可以参考这里 https://github.com/tgbot-collection/ytdlbot/blob/master/.github/workflows/upgrade.yml
总结
总结一下,大概有三种可行的方法,以tg_client.get_me()
和我的慢速梯子为例:
- prefork+with,每次都要开机-干活-关机。第一次会比较慢大概15-20秒,后续会快一点。参考时间2-3s
- solo模式,只能处理一个,每次处理参考时间0.5s或更低
- threads模式,只要不涉及到大量计算那么问题就不大,那么每次处理参考时间为0.5s或更低
最后……
欢迎流量太多的盆友来跟我一起做worker!名额有限速来!
也希望有人能够帮我解决prefork下的那个问题……🌚 🌚
参考
https://github.com/pyrogram/pyrogram/issues/480
https://blog.csdn.net/cnweike/article/details/105840936
https://github.com/tgbot-collection/ytdlbot