Go具有许多有用的内置功能,可用于安全,并发和并行代码。不管这些功能多么实用,它们都无法为您编写程序。与许多语言一样,最重要的知识不是语言的特征,而是众所周知的模式,这些模式将这些特征组合成可以解决经常发生的问题的解决方案。我刚开始将Go用作日常的基本语言,最近遇到了一种有用的模式,我认为值得分享。有人告诉我,在Palantir,这种方式称为发痒器图案。
您可以选择限制服务中的并行度,即一次处理多少个请求。
我们将永远拥有服务循环,等待信号重新考虑队列的内容,
让我们看一下代码并更详细地讨论这些步骤。我们从类型开始:
//虚拟请求-通常是一些Protobuf / Thrift结构类型Request int type Service结构{mu sync。互斥体队列*列表。列出sema chan int loopSignal chan结构{}}
我们将使用一个通道来表示一个有限的信号量。尽管您也可以使用sync.Semaphore,但在Go中这是非常习惯的做法。我们对所需解决方案的明确要求之一是,调用者在发出请求时绝不能阻塞。我们将始终首先使请求入队,而处理却要异步完成,这一事实可以满足这一要求。请注意,我的解决方案不会提供检索请求响应的方法。
func(s * Service)EnqueueRequest(请求请求)错误{s。亩Lock()延迟s。亩解锁()。排队。 PushBack(请求)日志。 Printf("将请求添加到长度为%d \ n",s。queue。Len())的队列中。 tickleLoop()返回nil}
我们为队列获取了一个互斥锁,因为我们将有多个goroutine并发访问它。我们排队请求,然后通知循环。最后一部分进行了解释。现在我们有了添加请求的方法,我们可以处理代码来处理请求。由于这是一项服务,因此我们将使其永远循环,以等待新工作完成。这也是循环信号起作用的地方:
func(s * Service)循环(ctx上下文。Context){日志。为{select {case<-s}的Println("正在启动服务循环")。 loopSignal:s。 tryDequeue()case<-ctx。完成():日志。 Printf("取消循环上下文")返回}}}
我们的循环方法将具有服务循环,直到在我们传入的上下文中实现Done()通道为止。这将使我们在程序结束时或发生错误的情况下正常关闭服务。更重要的是,我们等待loopSignal告诉我们重新考虑队列。该信号在以下两种情况之一中发送:
您可以说服自己,这是我们服务周期中仅有的两个必须考虑排队的事件。如果服务的队列为空,则使新请求入队将通知服务查看该新请求。如果请求已入队,但由于达到了我们配置的并行度的限制而无法立即处理,则唯一会改变的时间是请求完成处理后,即信号量由一个令牌补充。
现在让我们看一下tryDequeue,它在循环“发痒”时调用。其不确定的命名源于以下事实:调用tryDequeue时,我们:
根本不知道队列中是否有任何请求,因为该信号可能是由于完成了对上一个飞行中请求的处理,
如果队列中有请求,不知道我们是否有足够的资源。
func(s * Service)tryDequeue(){s。亩Lock()延迟s。亩如果s解锁()。排队。 Len()== 0 {return} select {case s。 sema<-1:请求:= s。出队()log。 Printf("出队请求%v \ n",request)进入s。处理(请求)默认值:log。 Printf("接收到循环信号,但已达到请求限制")}} func(s * Service)出队()Request {element:= s。排队。前()s。排队。删除(元素)返回元素。价值。 (请求)}
我们通过测试队列的长度来检查不确定性的第一种情况。如果我们知道我们有工作要做,我们需要检查信号量是否具有足够的插槽以允许处理请求。因为信号量信号量将是一个缓冲通道,所以我们知道,如果信号量中没有剩余的插槽,则发送新令牌(整数1)将阻止该信号量,或者在这种情况下触发默认情况。如果我们达到了并行性的极限,我们只需处理该请求,直到重新标记该循环。出队功能是一个小助手,它可以处理从队列中弹出元素的烦人事务,由于Go缺乏泛型,因此涉及类型转换。
如果一个请求通过了tryDequeue函数的严格测试并被选中进行处理,我们将在处理方法中处理它:
func(s * Service)流程(请求Request){延迟s。补充()日志。 Printf("处理请求%v \ n",request)//模拟工作<-time。之后(时间。持续时间(rand。Intn(500))*时间。毫秒)}函数(s *服务)补充(){<-s。信号日志。 Printf("补充信号量,现在%d /%d个插槽正在使用中\ n&#34 ;、 len(s.sema),cap(s.sema))s。 tickleLoop()}
真正有趣的是补充功能。此功能有两个职责。首先,它在信号量中释放了一个插槽,以便可以处理新的请求。其次,它会打勾循环,告知您是时候重新考虑请求队列,以防万一有其他先完成的请求被阻止的请求。如果您还记得的话,这是必须重新考虑请求队列的两种情况中的第二种。
让我们也揭开tickleLoop函数的神秘面纱。它只是使非阻塞发送到loopSignal通道:
func NewService(ctx上下文。Context,requestLimit int)*服务{service:=&服务{队列:清单。 New(),sema:make(chan int,requestLimit),loopSignal:make(chan struct {},1),} go service。循环(ctx)返回服务}
Warning: Can only detect less than 5000 characters