本周,我在调试一个行为不端的Python程序,该程序大量使用了Python的异步。该程序最终会花费很长时间来响应网络请求。我的第一个怀疑是CPU负载很大的协程程序占用了线程,阻止了套接字协程程序的运行,但PDB的检查显示情况并非如此。相反,该程序的作者在使用asyncio时犯了几个根本性的错误。让我们用一些小例子来讨论它们。
布景:有一个每秒“跳动”一次的心跳协程。一个真正的程序会发送一个数据包作为心跳,但在那里它只打印计划的时间有多晚。
异步延迟心跳():当True:Start=Time时。时间()在等待异步。睡眠(1)延迟=时间。Time()-开始打印(f';心跳延迟={Delay:.3f}s';)。
它总是迟到1毫秒,但已经足够好了,特别是考虑到即将发生的事情。只发送心跳的程序非常没用,所以真正的程序会同时忙于处理其他事情。在本例中,我们有很少的10ms有效负载的工作待办事项,它们由下面的process()函数表示:
这是同步睡眠,因为它代表实际的CPU工作,可能是在循环中解析JSON或在NumPy中处理数字。发挥你的想象力。在这10毫秒内,不能调度其他协程,因为毕竟这仍然只是一个单线程程序。
JOB_COUNT=200 Async def main():Asyncio。CREATE_TASK(HEARTER_BEAT())等待异步。休眠(2.5)打印(';开始处理';)count=job_count for_in range(Job_Count):Asyncio。create_task(process())等待异步。睡眠(5)。
此程序在任务中启动心跳协程。除非有人在等待,否则协作组不会取得进展,而这可能是一项任务。因此,它将继续独立地前进,而不会受到刺激。
任意的2.5秒睡眠模拟等待,比方说,等待网络请求。在输出中,我们将看到心跳滴答几次,然后它将并发创建和处理200个作业。在真正的程序中,我们会有一些方法来收集结果,但我们现在可以忽略这一部分。它们只有10毫秒,所以对心跳的影响应该很小,对吗?
心跳延迟=0.001剪切延迟=0.001开始处理心跳延迟=1.534剪切延迟=0.001剪切延迟=0.001s。
心跳延迟了1.5秒,因为只有200个任务,每个任务都只做了10毫秒的工作。发生了什么?
Python调用调度任务的对象为循环,这不是巧合。所有要调度的东西都被放入一个循环中,一个接一个地调度循环。这200个任务在心跳之前被调度,因此直到这些任务中的每一个要么放弃(等待)要么完成,它都不会再次被调度。
它真的不需要太多时间来显著阻碍心跳,而且,使用愚蠢的字节码编译器,10ms可能根本不是什么工作。这里的教训是,如果延迟是一个重要的考虑因素,那么要避免产生许多任务。
我在解决方案中的第一个想法是:如果我们使用信号量来限制一次“活动”任务的数量,会怎么样?那么也许心跳就不需要和这么多其他任务争夺时间了。
Worker_count=4#max";一次活动";个作业异步定义main_with_semaphore():asyncio。CREATE_TASK(HEARTER_BEAT())等待异步。睡眠(2.5)sem=异步。信号量(Worker_Count)异步定义进程():等待sem。获取()时间。睡眠(JOB_DATION)sem。RELEASE()PRINT(';开始处理';)for_in range(Job_Count):Asyncio。create_task(process())等待异步。睡眠(5)。
当心跳休眠完成时,大约一半的作业将完成,另一半在信号量上被阻塞。那么,也许心跳会跳过所有阻塞的任务,因为它们还没有准备好运行?
心跳延迟=0.001剪切延迟=0.001开始处理心跳延迟=1.537剪切延迟=0.001剪切延迟=0.001s
这没有任何区别,因为每个任务在循环中都“保持了自己的位置”!即使将Worker_Count减少到1也不会有任何效果。一旦任务完成,它就会释放队列中等待下一个的任务。信号量在这里几乎什么也不做。
这就是真正有效的方法:作业队列。创建一个用协程(而不是任务)填充的队列,并让少量任务从队列中运行作业。既然这是一个真正的解决方案,我就把这个例子做得更完整。
异步def main_with_queue():异步。CREATE_TASK(HEARTER_BEAT())等待异步。睡眠(2.5)队列=异步。Queue(maxSize=1)Async def Worker():While True:CORO=等待队列。get()等待coro队列。TASK_DONE()工作进程=[异步。create_task(worker())for_in range(Worker_Count)]print(';开始处理';)for_in range(Job_Count):等待队列。将(process())放入等待队列。Join()为工人w打印(';结束处理';):w。Cancel()等待异步。睡眠(2)。
TASK_DONE()和JOIN()方法使完全作业完成时同步变得微不足道。我还会花时间销毁工作任务,让它们在队列中被阻塞是无害的。他们会被收集垃圾,所以不会造成资源泄漏。然而,CPython抱怨垃圾收集正在运行的任务,因为它看起来像是一个错误-而且通常是错误的。
如果您仔细阅读,您可能已经注意到队列的最大大小被设置为1:这不算什么“队列”!Go开发人员将认识到这是一个无缓冲的通道,这是默认和最常见的通道,因此它更像是生产者(put())和消费者(get())之间的同步交汇点。制片人拿着一份工作排队等待,直到ATASK有空来接。任务在队列中等待,直到生产者带着它的作业到达。
心跳延迟=0.001剪切信号延迟=0.001开始处理心跳延迟=0.014剪切信号延迟=0.020发送处理心跳延迟=0.002剪切信号延迟=0.001s。
输出显示,对心跳的影响不大--大约是我们从异步/等待中所能期望的最好结果--并且在作业运行期间心跳仍在继续。并发越多-队列上运行的工作任务越多-延迟就越大。
注意:在这个玩具示例中增加Worker_Count不会对延迟产生影响,因为作业实际上并不是并发的。它们在另一个工作任务可以从队列中提取之前开始、运行和完成。推送一对正在进行中的等待()允许并发:
心跳延迟=0.001剪切信号延迟=0.001开始处理心跳延迟=1.655发送处理心跳延迟=0.001剪切信号延迟=0.001s。
这是同一个程序的另一个缺陷。创建一个无界队列、一个生产者和一个消费者。消费者打印队列大小,这样我们就可以看到发生了什么:
Async def Producer_Consumer():queue=asyncio。Queue()Done=异步。Condition()Async def Producer():对于范围(100_000)内的i:正在等待队列。把(I)放入等待队列。Join()Async with Done:Done。Notify()Async def Consumer():While True:等待队列。get()print(f';qsize={queue e.qsize()}';)队列。TASK_Done()异步。create_task(Producer())asyncio。CREATE_TASK(Consumer())异步完成:等待完成。等待()。
因此,在消费者执行任何操作之前就填充了整个队列:无论正在消费的是什么,都会有大量的延迟。由于队列是无界的,生产者永远不需要让步。您可能会忍不住在生产者中使用asyncio.sleep(0)来显式地放弃:
Async def Producer():对于范围(100_000)内的i:正在等待队列。把(I)放在等待异步。SLEEP(0)#产生等待队列。Join()Async with Done:Done。Notify()。
然而,这是脆弱的,不是一个真正的解决方案。如果消费者在自己的循环中只做出了两次让步,那么它几乎又回到了我们开始的地方:
Async def Consumer():While True:等待队列。get()print(f';qsize={queue e.qsize()}';)队列。TASK_Done()等待异步。睡眠(0)等待异步。睡眠(0)。
产量表明,生产者逐渐领先于消费者。在每个使用者迭代中,生产者迭代两次:
有一个非常简单的解决方案:永远不要使用无边界队列。实际上,每一个无界异步.Queue()都是一个错误,异步允许创建无界队列,这是API的一个严重缺陷。默认的maxSize应该是1,而不是无限大,并且maxSize=0应该会引发错误。
对这篇文章有什么评论吗?通过发送电子邮件至~Skeeto/[email protected][邮件列表礼仪]开始我的公共收件箱中的讨论,或查看现有讨论。