在这篇文章中,我将探索AWS中提供的简单工作流服务(SWF)。
要了解SWF的好处,我们首先需要了解什么是工作流。维基百科对此的定义如下:
工作流程由编排的、可重复的活动模式组成,通过将资源系统地组织成流程来转换物料、提供服务或流程信息。它可以被描述为一系列操作、个人或团体的工作、工作人员组织的工作或一个或多个简单或复杂的机制。
在计算机系统中,我们关心的是处理信息的部分。可以将一些内容建模为工作流:
部署管道:我们可以接收一些代码作为输入,然后在工作者机器上构建它。我们可以在不同的机器上并行运行测试。如果所有测试都通过,我们可以将二进制文件部署到另一组机器上。
协调发货:用户在网上商店购买产品,然后在系统上下单。人工监控该系统,并负责在仓库中查找产品并将其运送到正确的地址。当发货时,信息被输入到系统中。工作流程会注意到此信息,并通过电子邮件向用户发送送货详细信息。
异步图像处理:系统将文件上传到系统进行处理(比方说,创建缩略图)。工作流使用多个工作器来执行任务。如果任何一台机器在处理一组文件时出现故障,它们可以由另一名工人接管相同的工作。
这些都是一些高级的例子。在这篇文章中,我将更详细地复习一个例子。
在开始构建工作流之前,让我们先了解一下SWF的组件:
工作流:一组活动,以及定义这些活动如何协同工作以实现某些目标的一些逻辑。
域:工作流位于域中。一个域可以包含多个工作流。不同域中的工作流无法交互。
活动:需要执行的任务类型,例如:调整图像大小、运行测试等。
为了帮助我们熟悉SWF,我们将创建一个工作流来对修复机群中的故障机器的流程进行建模。它将如下所示:
此工作流可用于运行多台机器的数据中心。我们可以使用工作流探测机来查看它们是否工作正常。如果它发现了错误,它会在数据库中将机器状态设置为Maintenance。如果它没有发现任何错误,它将完成执行。
一旦机器耗尽,我们将做两件事。我们会派人来检查一下机器并修理它,然后我们会趁机重塑机器的形象,这样当机器回来时,我们就有了干净的机器。
一旦修复和重映像完成,我们就可以将状态设置回Available并完成执行。
对于我们的活动和决策者,我们将需要AWS SDK。在这一节中,我将展示如何将其准备好。
我们将在我们的示例中使用Ruby,因为它很容易运行,并且得到了很好的支持。撰写本文时,Ruby SDK的最新版本是版本3。我们将创建一个带有依赖项的Gemfile:
AWS SDK将需要与AWS通信,因此我们需要一些凭据,这些凭据可以在如下环境变量中设置:
上图中的每个框都是一项活动。活动可以非常独立,所以我们将开始构建它们。
活动通过轮询挂起任务的工作流来工作。如果它发现有可以执行的任务,它就会这样做,并返回结果。然后继续轮询,直到有更多的工作要做。
我们的活动将与Decider共享一些代码,因此让我们创建一个将在它们之间共享的基类(Workflow_base.rb):
要求';AWS-SDK-SWF';class WorkflowBase domain_name=';datacenter-domain';region=';ap-东南-2';task_list_name=';Repair-Workflow-task-list';version=';14';def initialize@SWF=Aws::swf::client。new(REGION:REGION)REGISTER_DOMAIN(REGION,DOMAIN_NAME)end#为我们的工作流注册一个域(如果它不存在)def REGISTER_DOMAIN(REGION,DOMAIN_NAME)swf=Aws::swf::client。新建(Region:Region)开始SWF。register_domain({name:domain_name,Workflow_Execution_Retention_Period_in_Days:';3';})放置";域#{domain_name}注册";救援Aws::SWF::Errors::DomainAlreadyExistsFaultPut";Domain#{domain_name}已存在";end。
这个类定义了一些在决策器和活动之间共享的常量。它还初始化SWF客户端并注册域(如果域尚不存在)。
由于SWF的工作方式,我们所有活动的代码最好由单个程序处理。此程序将轮询该域中的任何新任务。每次看到任务时,它都会执行该任务,并将结果发送回SWF。因为每个任务都是阻塞的,所以我们可以旋转此程序的多个副本,以便在需要时并行执行任务。
Required';AWS-SDK-SWF';Required_Relative&39;Workflow_base.rb';课堂活动<;Workflow Base活动=[';Probe_Machines';,';DRAIN_MACHINE';,';FIX_MACHINE';,';REIMAGE_MACHINE';,';ENABLE_MACHINE';]def initialize SUPERFIZE SUPER()REGISTER_ACTIVATIONS轮询结束#用域def register_Activity()活动注册活动。每个do|Activity|Begin@swf。REGISTER_ACTIVITY_TYPE({DOMAIN:DOMAIN_NAME,NAME:ACTIVATION,VERSION:VERSION,#处理活动DEFAULT_TASK_START_TO_CLOSE_TIMEOUT:';60';})放置";活动#{活动}注册";救援Aws::SWF::Errors::TypeAlreadyExistsFaults放置";活动#{Activity}已存在";end#在true options={domain:domain_name,task_list:{name:task_list_name}}task=@swf时轮询域以查找此活动的任务def poll。如果是任务,则为Poll_for_Activity_Task(选项)。TASK_TOKEN==对于过期的活动,无PUT';轮询过期。重试';下一个End If!活动。包括?(任务。activeid)提出";活动#{任务.。active_id}未知";end#如果EXECUTE成功,将返回结果,否则ti将#抛出PUT";执行#{TASK。Activity_id}";Begin#调用Activity Result=Send(TASK.。active_id、任务、任务。输入)投入";完成#{任务。active_id}";@swf。Response_Activity_Task_Complete({TASK_TOKEN:TASK。TASK_TOKEN,#SWF不提供知道此结果#属于哪个活动的方法,因此我们将在结果前面加上它的结果:";#{task。active_id}:#{result}";})救援=>;e放置";失败#{task。active_id}";@swf。Response_Activity_TASK_FAILED({TASK_TOKEN:TASK。TASK_TOKEN,原因:@FAILURE})end def探测_MACHINES(TASK,INPUT)#因为这只是一个示例,我实际上没有要测试的计算机,#i&39;我将使用一些模拟数据MACHINES=[';MACHINE-A459Z';,';MACHINE-M3992';,';MACHINE-A873R&39;]计算机。如果CHECK_MACHINE(MACHINE)==';FAIL';#则将其设置为Maintenance。在实际情况中,我们会使用新状态更新数据库。将MACHINE#{MACHINE}设置为Maintenance";RETURN MACHINE END END PUT";RETURN MACHINE END PUTS';未发现损坏的计算机';RETURN';';END#随机决定是否耗尽。在真实情况下,我们将与#the机器通信,或检查数据库def drain_Machine(task,input)put";DRANDER MACHINE#{INPUT}";RANDOM_NUMBER=RAND(5)如果RANDOM_NUMBER==4 PUT';Machine is defined';return end raise";#{input}没有排出";end#在现实生活中,我们会检查人类是否已将任务标记为已修复。在这个#案例中,我们将只休眠并使用随机数def fix_Machine(task,input)put";检查机器#{input}是否固定";sleep(1)Random_number=rand(2)如果Random_number==1放置";机器#{input}已经固定";return end raise";#{input}尚未固定";end#在现实生活中,我们会在这里使用类似chef的东西来重新镜像机器#我们将只睡眠并使用随机数def reimage_Machine(task,input)put";reimage#{input}";sleep(1)Random_number=rand(2)如果Random_number==1放置";机器已经重新镜像";return end raise";#{input}reimage仍在进行";end#假设我们重新启用了机器def enable_machine(task,input)put";将#{input}标记为可用";end#如果Random_number==4 return';FAIL';end return';SUCCESS';END END ACTIONS,则返回机器def CHECK_MACHINE(MACHINE)RANDOM_NUMBER=RAND(5)的随机状态。新的。
上述代码中最重要的部分是轮询循环。POLL_FOR_ACTIVATION_TASK方法将启动与SWF的连接,并将返回要执行的活动(如果有活动可用)。如果在60秒内没有要执行的活动,它将返回一个空的TASK_TOKEN。如果发生这种情况,我们只需再次继续投票即可。
处理每个活动的代码只是伪代码,用来假装我们正在做一些工作。
决定者是整个过程中最复杂的部分。
Required_Relative';Workflow_base.rb';class ExecutionStarter<;WorkflowBase Workflow_Name=';Repair-Workflow';def Initialize Super@SWF。START_WORKFORK_EXECUTION({域:域名,工作流ID:SecureRandom。uuid,工作流类型:{Name:Workflow_Name,Version:Version},TASK_LIST:{Name:TASK_LIST_NAME},EXECUTION_START_TO_CLOSE_TIMEOUT:';36000';,CHILD_POLICY:';TERMINATE';})结束执行启动程序。新的。
现在我们已经准备好了,我们需要打开一个终端窗口,其中包含以下活动:
使用SWF比我预想的要复杂得多。必须对所有事件进行解析才能确定下一个活动,这似乎很容易出错,并且使代码变得混乱。文档也不是很清楚应该如何做到这一点,所以希望这个例子能帮助感兴趣的人。
[架构·自动化·AWS·生产力指数]