Elixir:实用并发食谱

2021-07-24 14:45:27

虽然 Erlang 运行时以作为 Elixir 编译的高并发平台而闻名,但我们大多数人最终在日常工作中解决了相同的问题。我们使用 Phoenix 来引导我们的 Web 应用程序,编写 JSON API 并在我们的页面上撒上一些 javascript。尽管确实通过使用 Phoenix 我们已经免费获得了整个并发模型,但我们为我们的产品开发的大多数功能并没有利用所有经常分布式的 Erlang、复杂的监督树或 GenServers 队列。在这篇文章中,我的目的是强调一些常见的场景,我们可以利用 Erlang 的并发模型以及 Elixir 的抽象来构建更好、更快和更安全的软件。把它想象成……一本食谱。我们可能要解决的最简单的问题是如何进行“即发即忘”的计算。换句话说,我们如何告诉我们的系统异步执行一些代码而不关心它何时完成,也不关心结果。为此,Elixir 提供了 Task 抽象。任务是旨在在其生命周期内运行单个操作的进程。它可以是一个长期的操作,比如记录的批处理,也可以是一个短的操作,比如发送 Slack 通知。通常,运行一次性任务最简单的方法是使用 Task.start/1。但是,这是第一个提示:不要那样做。在 Elixir 中运行一次性操作的最佳方法是使用 Task.Supervisor.start_child/2 在监督树下生成任务。最好在它们自己的监督树下运行任务的主要原因是允许对进程进行适当的清理。当一个 Supervisor 被删除时,它的所有子元素也会被删除,这使您可以干净地删除应用程序。这并不意味着您希望重新启动您的任务。实际上,Task.Supervisor 的默认策略是 :temporary,这意味着它们永远不会重新启动。如果事情在某个时候变得奇怪,这只是一种避免悬空进程的方法。他们总是这样做。

defmodule FireAndForgetExample.Application do use Application def start(_type, _args) do children = [ # 启动OTP应用下的监督树。 {Task.Supervisor, name: FireAndForgetExample.TaskSupervisor} ] Supervisor.start_link(children, strategy: :one_for_one) endenddefmodule FireAndForgetExample.OtherModule do def process_event(event) do # 启动监督树下的任务。 Task.Supervisor.start_child(FireAndForgetExample.TaskSupervisor, fn -> send_slack_notification("Hey! We got an event!") end) event |> do_something() |> do_something_else() endend 但是,如果我们真的关心结果呢?有时运行某个操作并忘记它是有用的,但大多数时候我们确实想对它的结果做一些事情。如果我们手头的问题是由多个操作组成的,您可以异步运行这些操作,因为它们不相互依赖,例如,将一堆文档上传到 S3,或向不同的人发送一批电子邮件,最简单的解决方案是实施扇入/扇出策略。这也可以通过使用任务来完成。我们可以在不使用主管的情况下天真地做到这一点,或者我们可以像我们之前提到的那样在主管下启动它们。我总是建议在将要交付到生产环境的代码中使用 Supervisor,但是,为了简单起见,让我们看一个没有它的例子: defmodule FanInFanOutExample do def send_notifications(notifications) do notice # Spin a task per element |> Enum.map(&Task.async(fn -> send_single_notification(&1) end) # 等待所有这些 |> Enum.map(&Task.await/1) end def send_single_notification(notification) do # ... endend 好的事情这种方法是它只需要最长的任务,当函数完成时,我们将有一个包含所有结果的列表。在 Elixir 1.11 中还有 Task.await_many/2,它在引擎盖下,它所做的不仅仅是简单的迭代和等待,但最终,确实让我们到达了同一个地方。我们可能遇到的另一种问题是“_How can we can run some work每隔 N 分钟/秒定期/etc."。利用 Eli 中可用的不同抽象,这是相当简单的xir 但最终在 OTP 中:GenServer。

GenServer 是“通用服务器”的缩写。它基本上是一个可以接收消息并允许我们指定回调的过程,因此它可以对这些消息执行不同的操作。我不会详细介绍 GenServers,因为我之前已经写过关于它们的文章。那么,我们如何使用 GenServer 进行调度?简单:利用 handle_info/2 回调和 Process.send_after/3。换句话说,我们将向执行工作的 GenServer 添加一个回调,然后使用 Process.send_after/3 调度消息。最后,为了确保它在一段时间后再次运行,我们确保在回调返回之前再次调用 Process.send_after/3。让我们看看它的外观: defmodule SchedulingExample do use GenServer @default_minutes 3 def start_link(args \\ []) do GenServer.start_link(__MODULE__, to_map(args)) end defp to_map(args) do %{ minutes: Keyword.get( args, :minutes, @default_minutes), always: Keyword.get(args, :forever, true), } end def init(%{minutes: minutes} = state) do schedule_work(minutes) {:ok, state} end def handle_info(:work, %{minutes:minutes,forever:forever} = state) do # 在这里做我的工作...如果永远做 schedule_work(minutes) end {:noreply, state} end defp schedule_work(minutes) do milliseconds = to_milliseconds(minutes) Process.send_after(self(), :work, milliseconds) end defp to_milliseconds(minutes) do minutes |> :timer.minutes() |> Kernel.trunc() endend 此外,当利用 GenServers 和其他抽象流程,通常是一个很好的调用,使服务器模块具有尽可能少的业务逻辑。在这种特殊情况下,如果 GenServer 可以简单地从模块调用一个函数,我们就是黄金。通过这种方式,我们可以将流程管理与我们的业务完全分离,从而更轻松地进行测试……以及更轻松的时间。最后,我们可能也想在 Supervisor 下生成这种工作进程,但可能使用不同的策略,例如 :one_for_one,以便它们在崩溃时重新启动。在某些情况下,我们可能希望在特定时间运行我们的代码。不一定每 3 分钟一次,而是每天早上 8 点。虽然利用相同的工具完全可以实现这一点,但我会务实并推荐 Quantum。它允许您使用 cron 语法来安排函数的执行,并消除管理流程的所有复杂性。这是一个经验丰富的库,在社区内被广泛采用,非常轻巧......而且非常简单。有时,您会发现自己有一个端点,该端点的查询时间太长,或者一个过程必须始终处理大量数据并提供糟糕的用户体验。在这些情况下,有时缓存结果可能是有意义的。有时,花几个小时调整查询本身或重新设计解决方案可能更有意义,但有时缓存可能更有意义。让我们谈谈它什么时候出现。

如果您已经使用过 Elixir 或 Erlang,您就会知道它的数据结构是不可变的,但是,它有自己的方式来处理共享的可变状态:进程。为了保存一些状态,访问它并更改它,我们可以通过一个进程以多种不同的方式来做到这一点。保持某种状态的最简单的解决方案是创建一个代理。代理是围绕状态的最简单的抽象,有时,如果我们需要的是一个没有太多电池的简单解决方案,它实际上可能是最好的选择。 Agents 的一个好处是它是一个单一进程,这意味着许多并发客户端将依次获得它们的 Agent 份额,这意味着您不必担心竞争条件。另一方面,如果它开始成为瓶颈,那也可能是一件坏事。其他时候,如果特工没有为你剪断它,你可能会更快一些。在这些情况下,ETS 可能是一个不错的选择。 ETS 的好处是它总是更快,因为它不通过 Erlang 调度器,此外它还支持并发读取和写入,而代理不支持。但是,当您想要进行原子操作时,它会受到更多限制。总的来说,它非常适合简单的共享键/值存储,但是否更适合或不适合您的特定问题,这取决于您。一个简单的方法可能如下所示: defmodule EtsCacheExample do def init!(seed, table_name) when is_atom(table_name) do case :ets.info(table_name) do :undefined -> :ets.new(table_name, [:set , :public, :named_table]) _ -> 引发“名称为 #{table_name(pool_name)} 的 ETS 表已经存在。” end add(seed, pool_name) end def teardown(table_name) do :ets.delete(table_name) end def add(value, table_name) do :ets.insert_new(table_name, {value}) end def exists?(value, pool_name) do case :ets.lookup(table_name, value) do [] -> false _ -> true end end def retrieve_all(table_name) do table_name |> :ets.match({:"$1"}) |> List.flatten( ) endend 我要提到的第三个选项是 GenServers。大多数情况下,代理或 ETS 就足够了,但是,在某些情况下,为每个 API 用户提供自己的小缓存可能是有意义的。一个很好的理由可能是因为我们需要为读/写提供一定的原子性,而代理将成为瓶颈。 GenServers 的优势之一是它们允许我们非常轻松地为每个用户启动一个,因此它们不会成为 Agent 的瓶颈。然而,虽然 Elixir 确实提供了使缓存变得容易的必要抽象,但我对此的建议通常是依靠社区的肩膀。例如,Saša Jurić 不久前写过 ConCache 就是这样做的,但还有很多其他的。不实现它自己的一个好处是在处理并发时有很多边缘情况,并且在最初几次很容易出错。正如他们所说,计算机科学中两个最难的问题是命名和缓存。有了这一切,我希望能够为您可能遇到的一些问题提供一些潜在的解决方案。就像 IT 中的所有问题一样,每个解决方案有时都有意义,有时却没有,但归根结底,我相信这些都是 Elixir 或 Erlang 开发人员的主要技术。 Elixir 使处理并发代码变得如此容易,大多数问题都可以解决,而无需引入第三方库。当然还有很多不能提及的内容,例如使用 gen_statem 进行状态机,或使用工作池进行节流工作,但我将留待改天再说。

哦,最后一件事,不要忘记监督你的过程......这是免费的容错:)