我第一次使用ASGI(异步服务器网关接口)是通过通道1.0,当时ASGI规范还在起草阶段。这是我的第一个面试项目,帮助我找到了现在的工作。当时感觉很神奇,可以很容易地将WebSocket功能添加到我的Django应用程序中,并为我无缝地处理身份验证和其他与Django相关的事情。
撰写本文时,ASGI规范已经是版本3,并且ASGI和通道都已成为Django Software Foundation的一部分。与草案版相比,它已经成熟了很多,增加了生命周期调用和更好的应用程序格式等。最令人兴奋的是,一个健康而快速增长的社区正在形成,我们看到越来越多的ASGI服务器运行在生产环境中。在我的公司,我们每天通过运行在Daphne上的ASGI服务几百万个请求,Netflix的Dispatch基于FastAPI,一个流行的ASGI web应用程序框架,显然,微软也在使用它。
我谦虚地建议任何用Python构建Web服务的人都要学习ASGI。而学习东西的最好方法就是用它来构建东西,所以在这篇博客文章中,我将逐步介绍构建一个支持ASGI的微型Web应用程序框架的步骤。我希望它能帮助解释ASGI是如何工作的。
在编写第一行代码之前,我们需要对ASGI是什么以及我们要构建的目标有一个基本的了解。
图形TD A[客户端]-->;|HTTP、WebSocket、.|B(ASGI服务器)B-->;|作用域、发送、接收|C(ASGI应用程序)。
简单地说,浏览器(客户端)使用某种类型的请求(HTTP或WebSocket)建立到ASGI服务器的连接,然后,ASGI服务器使用封装在名为Scope的python字典中的有关连接的信息调用ASGI应用程序,并使用两个回调函数(名为Send和Receive),应用程序可以使用它们在服务器和客户端之间发送和接收消息。
{";type";:";http";,";http_version";:";1.1";,";服务器";:(";127.0.0.1";,8,000),";客户端";:(";127.0.0.1";,60457),";方案";:";http";,";方法";:";获取";,";根路径";:";";,";路径";:";/hello/a";,";RAW_path";:B";/hello/a";,";QUERY_STRING";:B";";,";Headers";:[(B";host";,b";localhost:8000";),(b";Connection";,b";Keep-Alive";),(b";User-agent";,b";Mozilla/5.0(Macintosh;英特尔Mac OS X 10_14_6)AppleWebKit/537.36(khtml,如壁虎)Chrome/83.0.4103.106 Safari/537.36";,),(b";Accept";,b";text/html,application/xhtml+xml,application/xml;q=0.9,image/apng,*/*;q=0.8,application/签名交换;v=b3;q=0.9";,)。,b";gzip,deflate,br";),(b";Accept-Language";,b";EN-US,EN;q=0.9";),(b";cookie";,b';csrftoken=dDA2IAPrvgPc7hkyBSyctxDk78KmhHAzUqR0LUpjXI3Xgki0QrGEWazE3RGZuLGl';,),],}。
您可能会注意到,作用域与WSGI环境没有太大不同。事实上,ASGI接口与WSGI接口非常相似,但是ASGI不是使用environ和start_response来发送报头并使用WSGI应用程序的返回值作为响应主体,而是与连接进行接口,并允许我们在连接的生命周期中多次异步地接收和发送消息,直到连接关闭。这为WebSocket和HTTP都提供了一个很好的接口。
也完全可以将WSGI应用程序包装在ASGI应用程序中,只需根据范围准备一个WSGI环境和start_response,接收和发送,然后调用WSGI应用程序,它就可以工作了。如果将调用委托到线程池或类似内容中,则只是使WSGI应用程序异步。这就是频道围绕姜戈的大致方式。
当我说ASGI框架时,我指的是使构建ASGI应用程序更容易的框架,这不包括ASGI服务器部分。我之所以提到这一点,是因为一些较早的Python异步Web框架有自己的服务器实现,也会接管诸如解析HTTP请求、处理网络连接等任务。我们不会在ASGI Web框架中做这些工作。作为WSGI的精神继承者,在WSGI中,Web服务器(如Gunicorn和uwsgi)和Web框架(如Flask和Django)是分开的,ASGI也有这种分离。
异步定义应用程序(作用域、接收、发送):名称=作用域[";路径";]。拆分(";/";,1)[-1]或";WORLD";等待发送({";type";:";http.response se.start";,";status";:200,";Header";:[[B";Content-type";,b";Text/Plain";],],})正在等待发送({";type";:";http.response.body";,";body";:F";Hello,{name}!";)。encode(),";MORE_BODY";:FALSE,})。
start启动HTTP响应,发送状态代码和响应头。在本例中,它以200OK状态代码响应,并在标题中将Content-type设置为text/PLAN。body发送响应正文,more_body键告诉服务器响应是否完成。ASGI服务器可以使用它来知道连接是否应该关闭,或者自动决定是内容长度报头还是分块编码。
你应该可以访问http://localhost:8000/并获得“你好,世界”。访问http://localhost:8000/tom会让你获得“你好,汤姆”。
顺便说一句,uvicorn相当快,这是一个简单的基准测试,WRK-D10S http://localhost:8000/hi的2018年最低规格MacBook Air的请求数/秒为27857.87。
虽然这种方法适用于一个简单的hello world示例,但是以这种方式编写更复杂的应用程序并不太方便。首先,它不做路由,如果你想对不同的路径做出不同的响应,你可能会得到一个巨大的如果…。否则如果..。Else子句。其次,每次都要以Python字典的形式编写ASGI消息是相当困难的。第三,在复杂的应用程序中,跟踪连接的状态变得更加困难,例如响应是否已启动、响应是否已结束、是否应在此处启动响应等。
使用新框架,我希望能够编写如下ASGI应用程序:
从aAF导入asyncio从aaf导入aAF#另一个ASGI框架从aaf.route导入来自aaf.response导入HttpResponse路由器=Router()@Router。route(';/';)@Router。route(';/<;name>;';)async def hello(connection,name=';world';):return HttpResponse(f";hello,{name}";)@Router。route(';/count';)@Router。ROUTE(';/Count/<;int:Number&>;';)异步定义计数(Connection,Number=10):对于范围内的i(Number):等待连接。发送(f';count{i}\n&39;,Finish=false)等待异步。休眠(%1)等待连接。发送(';';,Finish=True)@Router。ROUTE(';/ECHO';)Async def ECHO(连接):Body=等待连接。Body()正在等待连接。发送(Body,Finish=True)APP=AAF([路由器])。
我希望这个我想要的框架外观的片段是不言而喻的。但以下是我想要实现的一些关键目标:
Connection类将表示ASGI HTTP或WebSocket连接。它是一个类,封装了ASGI中的三个基本元素,即Scope、Send和Receive,并公开了一些方便的方法和属性,这样用户就不需要冗长地写出所有的ASGI消息,并从Scope中解析所有内容,如cookie和Header。但它应该允许用户访问原来的作用域,在他们想要的时候发送和接收,这样就保持了ASGI应用程序的可组合性。例如,它应该允许用户通过调用Another_Asgi_app(connection.scope,connectionn.asgi_send,connection.asgi_Receive)将某些连接委托给另一个ASGI应用程序。
从枚举导入枚举FROM函数工具从http.Cookie导入Cached_Property通过键入import any,Awaable,Callable,Optional,Union from urllib导入SimpleCookie。parse import parse_qsl,un引号_plus from werkzeug.datastructs导入标头,MultiDict CoroutineFunction=Callable[[any],Awaable]class ConnectionType(Enum):http=";HTTP";WebSocket=。类连接:def__init__(self,作用域:dict,*,send:CoroutineFunction,Receive:CoroutineFunction):self。作用域=作用域本身。ASGI_SEND=发送自身。ASGI_RECEIVE=接收自身。开始=错误的自我。完成=错误的自我。RESP_HEADERS=HEADERS()SELF。resp_cookies:SimpleCookie=SimpleCookie()self。RESP_STATUS_CODE:可选[INT]=无自身。http_body=b";";自身。HTTP_HAS_MORE_BODY=真我。HTTP_RECEIVED_BODY_LENGTH=0@cached_property def Req_Headers(Self)->;Headers:Self中(k,v)的Headers=Headers()。范围[";Headers";]:Header。添加(k.。解码(";ascii";),v.。decode(";ascii";))return headers@cached_property def req_cookies(Self)->;SimpleCookie:Cookie=SimpleCookie()cookie。加载(自身。请求标题(_H)。get(";cookie";,{})return cookie@cached_property def type(Self)->;ConnectionType:return(ConnectionType.。如果为SELF,则为WebSocket。范围。get(";type";)==";websocket";Else ConnectionType。http)@cached_property def方法(Self)->;str:返回self。作用域[";Method";]@CACHED_PROPERTY定义路径(SELF)->;字符串:返回SELF。作用域[";path";]@cached_property def query(Self)->;MultiDict:return MultiDict(parse_qsl(UNQUOTE_PLUS(sel.。作用域[";QUERY_STRING";]。decode()Async def send(self,data:Union[bytes,str]=b";";,Finish:可选[bool]=false):if self。已完成:如果是SELF,则引发ValueError(";当连接关闭时不能发送消息";)。type==ConnectionType。HTTP:if isinstance(data,str):data=data。encode()等待自己。_http_send(data,Finish=Finish)否则:引发NotImplementedError()async def_http_send(self,data:bytes=b";";,*,Finish:bool=false):如果不是self。已开始:如果完成:自我。PUT_RESP_HEADER(";Content-Length";,str(len(Data)等待自己。start_resp()等待Self。如果完成:等待自我,则asgi_send({";type";:";http.response.body";,";body";:data or b";";,";more_body";:true})。Finish()异步定义Finish(SELF,CLOSE_CODE:OPTIONAL[INT]=1000):如果是SELF。type==ConnectionType。HTTP:如果是Self。已完成:如果不是自身,则引发ValueError(";连接已完成";)。开始:赛尔夫。RESP_STATUS_CODE=204等待自我。start_resp()等待Self。asgi_send({";type";:";http.response.body";,";body";:B";";,";more_body";:false})否则:引发NotImp
在HTTP请求-响应周期中,通常不需要对何时发送内容进行精细控制,返回知道如何设置头部和发送正文的响应更为方便和熟悉。为此,我们可以编写一个简单的HttpResponse助手类。
#response.py导入json来自键入import Union,Optional,Mapping,Any from.connection导入连接类HttpResponse:def__init__(self,body:Optional[Union[Bytes,str]]=b";";,connection:Optional[Connection]=None,*,Status_code:int=200,Headers:Optional[Mapping[str,str]]=None):self。身体=身体自我。连接=连接本身。STATUS_CODE=STATUS_CODE自身。Headers=Header def__await__(Self):如果不是self,则为。连接:提高值错误(";无连接";)自身。连接。RESP_STATUS_CODE=SELF。如果是SELF,则为STATUS_CODE。标题:表示k,表示self中的v。标题。项目():自我。连接。put_resp_header(k,v)返回self。连接。发送(自己。Body,Finish=True)。__await__()class JsonResponse(HttpResponse):def__init__(self,data:any,connection:可选[连接]=无,*args,**kwargs):body=json。转储(数据)标头=kwargs。如果Headers为None,则获取(";Headers";):Headers={}Headers[";Content-type";]=";application/json";Super()。__init__(body,connection,*args,**kwargs)。
HttpResponse类中没有太多内容。它所做的就是提供一个熟悉的接口,允许我们传入响应主体、可选头、可选状态代码,并在Connection类中调用底层方法。在JsonResponse类的示例中,它还设置了内容类型标头。
此应用程序在访问时应以JSON对象的形式返回所有查询参数。
您可能已经注意到,这与Goal部分中的用法不完全相同,在Goal部分中,我们可以只返回响应对象,而不是等待它。这是因为此示例是一个普通的ASGI应用程序,而原始目标部分中的应用程序位于路由器的上下文中,路由器为我们调用等待。出于同样的原因,构造函数中允许连接参数为NONE。
路由器根据请求的URL和HTTP方法将请求分派到不同的处理程序。大多数路由器实施还会解析URL中的参数。例如,如果我们定义一个路由器。
然后告诉路由器匹配URL/a/foo/bar,它应该给我们提供处理程序函数以及参数param_a和params_b。
这确实不容易,但幸运的是,wekzeug附带了一个路由模块,它可以做更多的事情,比如在缺少尾部斜杠的情况下自动重定向。在它的帮助下,我们可以用大约60行代码实现我们的路由模块。
#routing.py import functools from键入import Callable,Iterable,Optional from werkzeug.routing import Map,MethodNotAllowed,NotFound,RequestRedirect,Rule from.connection import Connection from.response import HttpResponse类路由器:def__init__(Self):Super()。__init__()自我。url_map=Map()self。ENDPOINT_TO_HANDLER={}def route(self,Rule,Methods=None,Name=None):Methods=Set(Methods)If Methods不是None If Methods和Not";Options";in Methods:Methods。添加(";options";)def修饰符(名称:可选[str],处理程序:可调用):self。add_route(RULE_STRING=RULE,HANDLER=HANDLER,Methods=Methods,Name=name)返回处理程序返回函数工具。Partial(修饰符,名称)def add_route(self,*,Rule_string:str,Handler:Callable,Name:Optional[str]=None,Methods:Optional[Iterable[str]]=None,):如果不是Name:Name=Handler。__NAME__EXISTING_HANDLER=SELF。Endpoint_to_Handler。如果EXISTING_HANDLER和EXISTING_HANDLER不是处理程序,则GET(NAME):引发ValueError(";重复的路由名称:%s";%(Name))自身。url_map。添加(Rule(RULE_STRING,ENDPOINT=name,Methods=Methods))self。ENDPOINT_TO_HANDLER[名称]=处理程序定义get_url_binding_for_connection(self,connection:connection):Scope=Connection。作用域返回Self。url_map。绑定(连接。请求标题(_H)。GET(";HOST";),PATH_INFO=作用域。GET(";路径";),SCRIPT_NAME=作用域。GET(";root_path";)或NONE,url_schema=Scope。GET(";SCHEMA";),QUERY_ARGS=SCOPE。Get(";query_string";,b";";),)Async def__call__(自身,连接:连接。
..