具有Python WebSocket的高通量游戏消息服务器

2020-12-06 07:11:36

效率我的比赛中出现错误。 80名选手中的一位陷入困境。好像真的卡住了:一个严重的缺陷!该错误应该不会发生,不可能发生,但是确实发生了。唯一的可能性是我使用的Web套接字堆栈。原来,该图层未按预期工作,并且无法对其进行修复。 las,我寻求替代解决方案。

在这里,我描述了我的构想:使用Python websockets的,更具针对性的新堆栈。许多协程,异步和队列。我在本文结尾处有完整的工作示例。这可能是它可以独立运行的最后一个状态。我将对其进行更改,以使其更适合我的游戏代码。

我的游戏是由消息协调的多人益智游戏。我将游戏的每个实例与其他实例分开。对于消息,我使用经典的“房间”概念来实现。这也是Flask-SocketIO所使用的名称,这也是我第一个实现所使用的名称。

通常,游戏中的消息不需要定义的总订单。他们可以以不同的顺序到达不同的客户。在某些情况下,情况并非如此,在某些情况下,我需要一些消息来确定总订单。它们不多,这可能就是为什么我较早没有注意到缺陷的原因。

当我第一次启动该项目时,我问如果在单个客户端系统上使用该库,按顺序回显,该库是否可以维持对客户端的订单。答案是肯定的,但事实并非如此。给定负载主要是受网络限制的,它通常会保持顺序。仅在几次压力下,或者由于运气不好,它就会无序发送消息。

很好,我可能可以在它周围排队。糟糕,吞吐量下降到了惊人的70msgs / s。没有队列,它已经慢了1200msg / s,但这对我的游戏来说已经足够了。经过反复的反复,我和库作者在可接受的吞吐量上存在分歧。

因此,我改为使用websockets库,一起进行概念验证,结果得到了12,000msgs / s。是的,这更像我期望的那样。

实际上,我期望更多。从长远来看,如果我有足够的流量,我会用C ++重写。吞吐量应完全受网络限制,但仍受服务器上CPU的限制。在知道可以将其推向更高的水平之前,我已经做过很多底层的联网工作,但是就我现在的需求而言,12K / s远远超过了。在担心优化其中一台服务器之前,我可能会扩展服务器的数量。

“ websockets”模块是WebSockets的最小实现。听起来像我想要的。我不想从底层处理协议。这让我写了所有高级逻辑,尤其是客户室。

该库易于使用。我得到了一个不费吹灰之力的基本示例。当然,还有很多细节需要介绍。这里是我需要支持的快速清单,首先是功能:

允许客户端加入游戏室(每个客户端只能加入我系统中的一个室,以简化代码)

在我的最终代码中,我将消息持久保存到Redis存储,然后最终保存到MongoDB。那不是我的示例代码的一部分。

而且,我不得不处理几种情况或错误。

@dataclass类Client:socket:任何#什么类型? id:int断开连接:bool = False @dataclass类Room:密钥:str客户:Dict [int,Client] =字段(default_factory = dict)new_clients:列表[Client] =字段(default_factory = list)msg_id:int = 0 event_queue :异步队列=字段(default_factory = asyncio。Queue)侦听:bool =虚假的未来:任何=无#什么类型?

我将Python和MyPy一起使用类型注释,以检查类型。 las,我不确定几个库类的类型。由于其中许多是自动创建的或从其他函数返回的,因此很难确定类型。我最终将找出所有类型。

在这些数据类型中,套接字是直接连接到“ websockets”模块的唯一部分。它跟踪传入的连接,并用于发送和接收数据。

简而言之,listen_room函数处理传入的客户端连接。我将所有消息推送到会议室的event_queue上。 listen_room函数侦听此队列,并将消息发送到会议室中的所有客户端。

最初,我只有一个监听队列,可以处理所有房间。当我最终使用C ++编写较低层的服务器时,我会保留这种结构。当您获得足够低的级别时,您可以控制更多细节,完全不需要协程。

但是在Python中,有几个原因使我在每个房间使用一个侦听器:

我写的所有Python代码和库代码都在开销上,围绕着写给客户的内容。数量不多,但可以增加很多活动。我怀疑JSON解析和格式化是其中最大的一部分。但这不是每个房间只有一个听众的原因。由于Python代码是作为单个真实线程运行的,因此该代码是在一个侦听器中还是在多个侦听器中发生都是无关紧要的。这些都是不可避免的计算量。

真正的第一个真正的原因是Redis,这是举止得体的动机。对于每个外发消息,我必须创建一个唯一的消息ID。在示例代码中,我在Room类中的Python中对此进行了跟踪。在我的最终服务器上,我将在Redis整数键中对此进行跟踪。此外,我会将所有邮件存储在Redis列表中。一个单独的过程将定期清除此问题,并将消息持久保存到MongoDB。到Redis的调用需要花费时间,服务器需要花费一些时间来处理其他房间的消息。因此,我想分隔房间。当一个房间等待Redis时,其他房间可以继续处理。

第二个原因,不好的客户,是不幸的需求。客户端可能会断开连接,或者无法足够迅速地处理邮件。在大多数情况下,这是由缓冲区处理的。对socket.send的调用实际上是异步的,至少直到队列填满为止。发生这种情况时,发送将等待直到队列中有空间。在等待时,所有其他房间都将停止,无法发送任何消息。通过每个房间只有一个队列,我将客户的损害仅限于该房间。

这不太可能发生。首先,websockets库具有超时功能。无响应的客户端将在传出套接字缓冲区被填满之前很长时间断开连接。我的游戏根本无法产生足够的消息来填充缓冲区。根据压力测试推断出的平均消息大小,在标准缓冲区中可以容纳25K游戏消息。和我的团队一起进行的典型游戏过程,仅生成3至4000条消息。

拥有单个真实线程的优点之一是无需担心实际的数据争用。它们根本不会像在多线程应用程序中那样发生。好极了!没有内存损坏的可能。

这并不意味着种族情况不会发生。并发的逻辑问题仍然存在,尽管程度较小。感谢合作线程!我的代码中最重要的问题是使用clients对象。队列侦听器遍历客户端。如果列表在迭代过程中被修改,Python将抛出并发修改异常。这是严格的禁止,因为迭代器不知道应该做什么。

最初,我在listen_socket函数中处理了断开连接,但是通过测试发现它可以是先检测断开连接的socket.send()调用。因此,断开连接发生在多个地方。在这两种情况下,我仅在Client结构中将客户端标记为已断开连接。 listen_room在发送消息时跳过断开连接的客户端。它会跟踪它们,并在迭代循环后安全地将它们从房间中移出。

当新客户端加入会议室时,listen_socket会将其添加到new_clients列表中。然后,listen_room将在每个消息循环之前添加新客户端。它在检索消息后立即执行此操作,以确保所有新客户端都收到该消息。这意味着会议室消息可以在加入会议室的“加入”响应之前到达客户端。在我的游戏中,获得此顺序以及发送旧消息对于客户端获得一致的游戏状态非常重要。我可能需要稍微调整一下此代码。

由于listen_socket不能确定listen_room是在循环内还是循环外,因此它永远不会知道与客户合作是否安全。锁定并不是一个坏主意,但是它会在传入的监听端引入可避免的延迟,并在房间监听器中引入延迟。为什么不用时锁定?

回想起来,许多协程并行性是隐式的可能是一个缺点,尤其是在使用诸如eventlet之类的方法时。作为程序员,逻辑线程切换发生在哪里不太明显。您只需要知道每个等待操作,每个asyncio调用和每个websocket调用都是线程切换的潜在位置。很高兴地说,您应该假设随时都可以进行切换,但是那样我就不能依靠它在某些地方不进行切换,并且需要一堆锁。

如果不确定,请使用锁。如果您的游戏中断,则性能无关紧要。叽

我添加了一个简单的Stats类来跟踪服务器上的吞吐量。它为每个房间的所有传入和传出消息发出计时。如果我有多个客户端连接到唯一的房间,则将发生12K / s的速度。我的机器达到了12K的限制,服务器进程占用了100%的CPU使用率。

不幸的是,我必须将我的数字调低到10K。将房间移至各个听众之后,我的开销大大增加。我不确定为什么-我无法想象这是协程的额外数量。可能在异步方面做了一些调整以改进它,但是它仍然足够快,我不关心。

出于好奇,我测量了连接到服务器的单个客户端。速度已经超过5Kmsgs / s。由于这是客户端和服务器,因此我有两个过程。它们的CPU使用率均为58%。理想情况下,它们应该使用50%的CPU,因为它们来回发送消息。那额外的8%是用于处理除处理消息之外的工作。也许如果我用C ++编写系统,那将接近50%,但永远无法完全达到目标。但是吞吐量应该上升。

我说C ++会更快,这是因为我的经验。我对所发生的事情有更好的控制,并且知道如何使用该控件。比起干净的Python版本,更容易出错并产生大量错误。服务器代码很难!

统计信息无法直接衡量响应时间。但是知道了乒乓球测试的性质后,我可以大致计算出这将是什么。与部署时的实际网络开销相比,每条消息的毫秒数为零。

Warning: Can only detect less than 5000 characters

您会注意到,我的客户端测试代码比服务器代码更粗糙,并且缺少许多类型定义。该代码将不会长期使用,但服务器代码将被长期使用。

从输入import *导入asyncio,json,websockets,time,sys,如果len(sys。argv) 2:打印(f&Sytnax {sys.argv [0]}房间(延迟)")sys。出口(-1)房间= sys。 argv [1]#非零慢速会创建无法跟上的客户端。如果房间#中还有其他客户端,它将最终中断,从而导致服务器断开连接。如果len(sys。argv)>慢= 0.0。 2:slow =浮点(sys。argv [2])def encode_msg(msg:Dict)-> str:返回json。转储(msg,sure_ascii = False)def encode_msg(text:str)->字典:返回json。 loads(text)#如果慢速>则统计跟踪器比服务器的trigger_count = 5000.0更简单。 0:trigger_count / =(1 +慢)* 100异步def阅读器(websocket):count = 0 seq = 0 last_time = time。单调()client_id =无last_msg_id = websocket中message_raw的异步:count + = 1 msg =解码_msg(message_raw),如果msg [' type' ] ==' joined' :client_id = msg [' client_id' ] else:#确保消息的总顺序为msg_id = msg [' msg_id' ]如果last_msg_id为None:last_msg_id == msg_id else:如果msg_id!=(last_msg_id + 1):print(last_msg_id,msg_id)引发Exception(" bad msg sequence")如果msg [' type& #39; ] ==' ping'和client_id == msg [' client_id' ]:#如果味精[' seq' ]!= seq:打印(seq,message_raw)引发异常("错误消息seq")#如果count> = trigger_count:next_time = time,则跟踪粗略吞吐量。单调()p

......