几个月前,我开始学习Elixir,主要是通过黑客攻击纸张。我愤怒地说,我的大部分Elixir教育一直是通过审判和错误的,在我走的时候弄清楚。所以过去的一周我决定从纸上掉一段时间休息,更深入地进入语言。
特别是,我正在拍摄更多关于在Elixir中处理并发性的更多信息。当然,这让我感到了惩罚者。
' SOMEDERY熟悉ELIXIR的人可能至少听说了Genserver。 (如果你没有,那也没有,那个' SOOD!)严格来说,它'虽然你可以逃避一段时间,但在使用这样的东西时仍然具有合理的效率凤凰框架。
但它'是您可以添加到工具箱的语言的令人难以置信的有用功能!你可以用它做很多好的东西。例如,使用Genservers,您可以:
设置每隔几个小时的重复任务每隔几个小时(例如,检查何塞'答案到"如何安排在Elixir中每隔几个小时运行代码?"堆栈溢出)
更轻松地处理异步流程,具有追踪,错误报告和重试逻辑的内置功能
Genserver是一个过程,就像任何其他Elixir进程一样。它可用于异步管理状态并执行代码,并包括围绕跟踪和错误报告的一些方便的功能。
另一种思考方式 - 一个咒语就像一个孤立的小盒子,其中有一些东西:
其内部状态(可以是任何Elixir数据结构,从简单的数字到复杂的地图/结构),
和一个回调模块,它定义了根据邮箱中收到的消息运行的代码
在学习Genservers时,我发现许多例子有点是一个......很多"钥匙值"商店,购物列表和类似的东西。我想了解Genservers是如何使用的,并且在野外"所以我转向了我最喜欢的elixir开源repos:https://github.com/plausible/analytics
有点背景:合理的是一个开源分析平台,具有> 1000支付客户。因为它们是服务许多用户的真实生产应用程序,所以我想我可以学习很多诊断他们的模块。 🤓
为了这篇文章,我将通过他们的repo中的一个更简单的genservers之一:plausible.event.writeBuffer,它可以在/lib/plausible/fent/write_buffer.ex中找到
在高级别中,此Genserver允许合理的信息以批量将大量事件插入数据库中,而不是为每个单独事件进行插入。 (这对像合理的产品似乎尤为重要,在那里他们可能会摄取每秒成千上万的事件!)
将它们添加到过程中的缓冲区' s内部状态,由列表表示(即[])
"冲洗"每5秒的缓冲区,或者它达到其容量为10,000个事件 - 无论哪个是第一次在其中"冲洗"意味着将缓冲区中的所有事件保存到数据库(在本例中,单击小时)
(对于完整的上下文,让' s看看代码库:我们可以看到plausible.event.writeBuffer Genserver用于PlausibleWeb.api.externalController.Event / 2方法,它处理逻辑post / api /事件API端点,又从他们的JavaScript跟踪片段调用。)
在我们跳入它之前,这里的完整模块,带有一些小型风格调整和注释I' ve添加了一些其他背景:
DefModule合理的.Event .writeBuffer确实使用Genserver要求记录器5_000 10_000#客户端API Def Start_Link(_Opts)Do Genserver .start_link(__module__,[],__module__)结束def插入(事件)do genserver .cast(__module__,{:插入,事件}){:确定,事件}结束def flush()do genserver .call(__module__,:flush,:Infinity):确定#server(回调)true def init(buffer)do process .flag(:trap_exit,true )timer = process .send_after(self(),:tick,){:ok,%{buffer,timer}}结束true def handle_cast({:插入,事件},%{buffer,timer} = _state)do new_buffer = [事件|缓冲区]如果长度(new_buffer)> = do logger .info(" buffer full,刷新到磁盘")进程.cancel_timer(timer)do_flush(new_buffer)new_timer = process .send_after(self(),:勾选,){:noreply,%{[],new_timer}} else {:noreply,%{state | new_buffer}}结束结束true def handle_info(:tick,%{buffer} = _state)do do_flush(buffer)timer = process .send_after(self(),:tick,){::noreply,%{[],timer}}结束真实def handle_call(:flush,_from,%{buffer,timer} = _state)do进程.cancel_timer(timer)do_flush(beaffer)new_timer = process .send_after(self(),:tick,){:回复,nil, %{[],new_timer}}结束try def tendine(_reason,%{buffer} = _state)do logger .info("关闭前刷新事件缓冲区...")do_flush(缓冲区)结束#私人/实用方法DEFP DO_FLUSH(缓冲区)DO CASE BUFFER DO [] - >零事件 - > logger .info("刷新#{长度(事件)}事件")events = enum .map(事件,&(map.from_struct(& 1)|>地图.delete(:__元__) ))合理的.clickhouseepo .insert_all(合理的.clickhouseEvent,活动)结束结束
注意:如果您'重新能够阅读上面的代码,并确切地知道每一步,你可能还要进一步阅读'
让' s开始逻辑的核心实际生活的核心:在Genserver回调中。
句柄_call / 3 - 调用genserver.call/3时调用,同步处理消息genserver.call/3将阻止,直到收到回复(除非呼叫超时或节点断开连接)
handle_info / 2 - 调用以处理所有其他消息(即由Genserver.Call/3和Genserver.Cast / 2触发的那些邮件(即,和#34;系统"消息)
启动Genserver时调用init / 1回调,并处理设置服务器进程的初始内部状态。
true def init(缓冲区)do进程.flag(:trap_exit,true)timer = process .send_after(self(),:tick,){:确定,%{buffer,timer}}结束
此方法的第一行(process.flag(:trap_exit,true)设置我们最多"陷阱退出"这使我们能够处理任何"清理"在过程之前任务终止,在终止/ 2回调中。 (我们' ll在下面讨论这个!)
接下来,我们在5秒内设置一个计时器以在5秒内运行:勾选事件(@flush_interval_ms == 5_000)。当我们' ll见下文,这个:勾选事件还要递归调用自己,以便在服务器活跃时有效地运行每5秒一次。 (这解释了为什么它' s被叫"勾选&#34 ;!)
最后,我们存储初始缓冲区(在这种情况下,空列表,即内部状态中的计时器。
True Def Handle_cast({:插入,事件},%{buffer,timer} = _state)do new_buffer = [事件|缓冲区]如果长度(new_buffer)> = do logger .info(" buffer full,刷新到磁盘")进程.cancel_timer(timer)do_flush(new_buffer)new_timer = process .send_after(self(),:勾选,){:noreply,%{[],new_timer}} else {:noreply,%{state | new_buffer}}结束结束
将新事件插入内部状态和#39; s缓冲区,通过预先配置到列表
检查缓冲区是否已达到容量,如果是,请设置一个新的计时器,它将在5秒内触发下一个自动刷新
将内部状态重置为空列表,并更新计时器
此原因这在Handle_cast / 2回调中处理而不是hange_call / 3回调是为了使客户端可以异步执行,而不会在完成潜在DO_FLUSH / 1方法调用的情况下阻止。 (作为我们' ll见下文,"冲洗" /挽救10,000个事件是一个可能需要几秒钟的潜在昂贵的操作。)
让'快速看看Do_Flush / 1方法,以查看那里的内容和#39;
defp do_flush(缓冲区)do case buffer do [] - >零事件 - > logger .info("刷新#{长度(事件)}事件")events = enum .map(事件,&(map.from_struct(& 1)|>地图.delete(:__元__) ))合理的.clickhouterepo .insert_all(合理的.clickhouseEvent,活动)结束结束
这种方法只是标准的elixir。所有'正在做的是在缓冲区中取出事件,将它们格式化为纯地图(而不是结构),然后持续到数据库(点击屋)。 (如果我们想要这种方法稍微明确的名称,我们可以称之为save_to_clickhouse / 1。)
请注意,如果我们想保存一些空格,这也可以写类似:
defp do_flush([]),nil defp do_flush(事件)do logger .info("刷新#{length_ {lence}事件")events = enum .map(事件,&(map.from_struct() & 1)|>地图。地图.delete(:__ meta__)))合理的.clickhoutersepo .insert_all(合理的.clickhouseEvent,活动)结束
调用genserver.call/3时调用handle_call / 3回调,以处理同步消息。请注意,Genserver.Call/3将阻止,直到收到回复(除非呼叫超时或节点断开连接)。
true def handle_call(:flush,_from,%{buffer,timer} = _state)do进程.cancel_timer(timer)do_flush(buffer)new_timer = process .send_after(self(),:tick,){:回复,nil,% {[],new_timer}}结束
此代码看起来非常类似于上面的逻辑,在Handle_cast / 2回调中。在这种情况下,我们'重新允许手动调用:刷新事件,这需要当前处于状态和#39; s缓冲区并将其保存到数据库(在do_flush / 1方法中)。再次,缓冲区将重置为空列表,并且还复位计时器。
调用handle_info / 2回调以处理所有其他消息(即由genserver.call/3和genserver.cest / 2触发的那些邮件)。在我们的情况下,我们使用它来处理内部在Genserver本身内部传递的消息。
true def handle_info(:tick,%{buffer} = _state)do do_flush(buffer)timer = process .send_after(self(),:tick,){:norecly,%{[],timer}}结束
答案:刻度处理在常规间隔中刷新我们缓冲区中的事件。 (在这种情况下,每5秒钟。)
当服务器即将退出时,可以调用此回调,并可以处理任何清理任务。
true def terminate(_reason,%{buffer} = _state)do logger .info("关闭前刷新事件缓冲区...")do_flush(缓冲区)结束
通过陷阱存在于上面的init / 1回调中,我们可以确保在此Genserver过程终止之前,我们互化的任何'留在我们的缓冲区中。否则,我们可能会在内存中的一些事件之前终止进程的情况结束,其中有机会保存到数据库,并且会丢失。
def start_link(_opts)do genserver .start_link(__module__,[],__module__)结束def插入(事件)do genserver .cast(__module__,{:插入,事件}){:确定,事件}结束def flush()do genserver .call(__module__,:flush,:无限):ok结束
这可能是上面三种方法的最少直观 - 一旦我们理解这里的代码,其余的应该或多或少地到位。
正如您可能猜到的那样,start_link / 1方法是负责,嗯,启动Genserver进程。所有它' s正在致电genserver本身上调用start_link / 3,有一些奇怪的参数:
__Module__,这只是模块本身的别名(在这种情况下,Plausible.Event.WriteBuffer)。此第一参数将Plausible.Event.Went.WriteBuffer作为回调模块,以便在通过Genserver.Call和Genserver.Cast传播到服务器进程时,我们可以使用我们在上面实现的服务器回调。请注意,调用genserver.start_link(plausible.event.writeBuffer,[],名称:Plausible.Event.WriteBuffer)将导致相同的行为。使用__module__的主要优点是,如果我们决定重命名模块,我们只需要在一个地方更新它!
空列表(即[])作为第二个参数传递。这将传递给我们的init / 1回调作为默认缓冲区。我们' ll在下面更详细地看一下这一点。
最后一个参数是选项的关键字列表。在这里,我们为该过程分配名称:__module__(与名称相同:plausible.event.writeBuffer)。这允许我们使用模块名称(igenserver.call(__模块__,消息,超时)/ genserver.cest(__模块__,消息))传递给Genserver的消息。否则,在启动Genserver之后,我们必须继续参考其进程ID(或" pid"在start_link / 1方法中返回的pid&#34),然后明确传递它。 (这是一个非常常见的做法。)
(Genserver还有一种称为sign start / 3的方法,它采用与start_link / 3相同的参数 - 唯一的区别是start_link / 3"链接" Genserver进程到当前过程,这是有用的我们希望将Genserver作为A&#34的一部分启动;通过在主管下使用Start_Link开始一个进程,它允许应用程序监视任何错误/崩溃的过程,并自动重新启动此过程必要的。)
defmodule合理的。应用程序def start(_type,_args)do shopts = [#...合理的..writebuffer,#...] opts = [:one_for_one,plausible .supervisor]#... supervisor .start_link (儿童,选择)结束#...结束
Genserver' s start_link / 3方法是让我们在此主管下启动过程。 (当我们包括Plausible.Event.Went.writeBuffer在主管的子女中,当调用Supervisor.Start_Link / 2时,会自动调用其Start_Link / 3方法。)
希望在这一点上,我们非常了解在Plausible.event.writeBuffer.start_link / 1中进行的' start.start_link / 1!现在,左盖上的所有这些都是我们的客户端API中暴露的两种方法:插入/ 1和Flush / 0
在这里,我们看到所有这些都在做一个事件并通过演员/ 2方法向我们的Genserver发送异步请求。 (因为我们在这里使用CAST / 2而不是呼叫/ 3,我们知道这是一个非阻塞呼叫,它将立即返回,无论在回调中发生什么。)
Genserver.Cast / 2的第一个参数采用进程ID(即PID)或已注册的服务器名称。在这种情况下,我们将名称设置为__module__在我们的genserver.start_link / 3的第3个参数中,这就是为什么我们在这里使用它。 (这是Genservers的一种非常常见的做法。)
第二个论点是"消息"我们想常常以元度或原子的形式发送。在这里,我们发送{:插入,事件},它将在上面讨论的Handle_cast({:插入,事件},...)回调中处理。
由于我们不等待返回值,因此插入/ 1方法刚刚返回传递的事件的回声,以{:确定,事件}的形式。
Genserver.Call/3,与Genserver.Cast / 2不同,阻止该过程直到回调完成。因此,如果:刷新事件最终需要10秒钟才能执行,则刷新/ 0方法将不会返回:OK直到10秒过去。
呼叫/ 3采取第三个论点,我们在这里看到了:无限。这代表了在发生超时错误之前允许该方法采取的最大时间。默认情况下,这设置为5_000或5秒。通过进入:无限,我们'只要需要完成,就允许过程需要。
让'最后一看一下完整的模块定义,要仔细检查我们了解的一切'我在代码中添加了一些评论来帮助解决一点。 😉
DefModule合理的..Event .writeBuffer确实使用Genserver要求记录记录器#每5秒5秒5秒5_000#允许在我们的缓冲区列表中最多10,000个事件10_000 ################### ######################################## ########客户端API ################################## ###################################"""启动链接的Genserver进程,传递在当前模块(`__module__` =='plausible.fent.writebuuster`)作为回调模块和服务器进程的别名/名称,以便将来参考。我们还通过空列表(`[]`)作为内部状态的初始值' s`缓冲区的初始值(在下面的`init / 1`回调中处理)。 """ def start_link(_opts)do genserver .start_link(__module__,[],__module__)结束""发送到当前模块的“:INSERT`”消息,并将所提供的“事件”添加到内部状态和#39; S“缓冲区”的Genserver进程中。 """ def插入(事件)do genserver .cast(__module__,{:插入,事件}){:好的,事件} End"""发送(同步)`:flush`:手动' s的genserver过程,它手动" flushes"内部状态的所有事件' s`缓冲区ob到数据库。将`: timeout`选项设置为`:Infinity`,只要需要完成此调用即可。 """ def flush()do genserver .call(__module__,:flush,:无穷大):确定############################ #######################################服务器回调# ######################################## #########################"""设置Genserver的初始状态,陷阱退出,以便在下面的“终止/ 2”回调中优雅地处理流程终止。 """ true def init(缓冲区)do进程.flag(:trap_exit,true)timer = process .send_after(self(),:tick,){:确定,%{buffer,timer}} End"" "处理`:插入的呼叫传播到`genserver.cest / 2`。在此回调中,我们将新事件添加到内部状态和#39; s`缓冲区。如果我们达到“缓冲区”的最大容量,我们" Flush"它到数据库并重置我们的州。否则,我们只需使用更新的`Buffer`更新内部状态。 """ True Def Handle_cast({:插入,事件},%{buffer,timer} = _state)do new_buffer = [事件|缓冲区]如果长度(new_buffer)> = do logger .info(" buffer full,刷新到磁盘")进程.cancel_timer(timer)do_flush(new_buffer)new_timer = process .send_after(self(),:勾选,){:noreply,%{[],new_timer}} else {:noreply,%{state | new_buffer}}结束"""处理内部`:勾号的回调。在此回调中,我们每5秒(即'@ flush_interval_ms`)自动互化`缓冲区填充""" true def handle_info(:tick,%{buffer} = _state)do do_flush(buffer)timer = process .send_after(self(),:tick,){:noreply,%{[],timer}}末端" ""处理`:刷新的消息传递给`genserver.call / 3`。在此回调中,我们处理目前内部状态的任何内容互化的东西' s`缓冲区ob the数据库。 """ true def handle_call(:flush,_from,%{buffer,timer} = _state)do process .cancel_timer(timer)do_flush(buffer)new_timer = process .send_after(self
......