八卦传播

2021-06-20 13:14:43

使用随机选择节点来传递信息,以确保它在不泛滥网络的情况下达到群集中的所有节点

在一组节点中,每个节点需要将元数据信息传递给群集中的所有其他节点,而无需取决于共享存储。在一个大群集中,如果所有服务器与所有其他服务器通信,则可以消耗大量的网络带宽。即使某些网络链接遇到问题,信息也应该达到所有节点。

群集节点使用Gossip样式通信来传播状态更新。每个节点选择一个随机节点来传递它的信息。这是经常间隔完成的,每1秒钟说。每次,选择随机节点以传递信息。

消息不应消耗大量的网络带宽。应该有几百kBs的上限,确保应用程序'数据传输不会受到群集中过多的消息。

元数据传播应该容忍网络和一些服务器故障。即使少量网络链路已关闭,也应该达到所有群集节点,或者一些服务器失败。

每个群集节点将元数据存储为与群集中的每个节点相关联的键值对列表:

在启动时,每个群集节点会添加关于自身的元数据,需要传播到其他节点。元数据的一个例子可以是节点侦听的IP地址和端口,&#39负责等。八卦实例需要了解至少一个其他节点以启动八字通信。众所周知的群集节点用于初始化八足实例称为种子节点或介绍者。任何节点都可以充当介绍者。

public gossip(inetaddressandport listendaddress,list< inetaddressandport> seednodes,string nodeid)抛出ioException {this.listenaddress = listenaddress; //在其Seed节点的一部分中筛选此节点本身.SeedNodes = RemoveSfaddress(Seednodes); this.nodeId = new nodeid(nodeid); addlocalstate(gossipkeys.address,listendaddress.tostring()); this..socketserver = new nioscoptlistener(newgossiprequestconsumer(),lisisidaddress);私有void addLocalstate(String键,字符串值){nodestate nodeState = clustermetadata.get(listenaddress); if(nodestate == null){nodestate = new nodestate(); clustermetadata.put(nodeid,nodestate); nodeState.Add(key,new serveoryedvalue(值,IncremenetVersion())); }

每个群集节点调度作业以定期将其传输到其他节点的元数据。

Private ScheduledThreadPoolExecutor Gossipexecutor =新的ScheduledThreadPoolExecutor(1);私人长gossipintervalms = 1000;私人调度<?>任务费; public void start(){socketserver.start();任务文件= gossipexecutor.scheduleatfixedrate(() - > dogossip(),gossipintervalms,gossipintervalms,timeunit.milliseconds); }

调用计划任务时,它会从元数据映射的服务器列表中拾取一小组随机节点。定义为Gossip Fanout的小常量数,确定要作为八卦目标拾取的节点。如果已知任何内容,它会拾取一个随机种子节点,并将其发送到该节点的元数据映射。

public void dogossip(){list< inetaddressandport> nextedclusternodes = liveodes(); if(nexinnclusterternode.isusptey()){sendgossip(Seednodes,Gossipfanout); } else {sendgossip(joadingclusternodes,gossipfanout);私人列表< inetaddressandport> liveodes(){set< inetaddressandport> nodes = clustermetadata.values().stream().map(n - > inetaddressandport.parse(n.get(gossipkeys.address).getvalue())).collect(collectors.toset()); return removeselfaddress(节点); }

私有void sendgossip(list< inetaddressandport> nearyClUsternodes,int gossipfanout){if(nextedClusterternode.Isempty()){return; for(int i = 0; i< gossipfanout; i ++){inetaddressandport nodeaddress = pickrandomnode(nearyClusternodes); sendgossipto(nodeaddress);私有void sendgossipto(inetaddressandport nodeaddress){try {getlogger()。信息("向&#34发送八卦状态); socketClient< RequestorResponse> socketClient =新的侧面(Nodeaddress); gossipstatemessage gossipstatemessage = new gossipstatemessage(this.clustermetadata); RequestorResponse Request = creategossipstateRequest(gossipstatemessage); byte [] responseBytes = socketClient.Blocksend(请求); gossipstatemessage rendalestate = deserialize(responseBytes);合并(ReplaceState.getNodeStates()); catch(ioException e){getlogger()。错误(" io错误,同时将八卦状态发送到" + nodeaddress,e); Private RequestorResponse CreateGossipstateRequest(GossipStateMessage GossipStateMessage){return new RequestorResponse(requestId.pushpullgossipstate.getId(),jsonserdes.serialize(gossipstatemessage),correlitationId ++);}

接收GOSSIP消息的群集节点检查它拥有的元数据并找到三件事。

在此节点中的传入消息中的值' s状态图

当节点具有输入消息中存在的值时,选择更高版本值

然后它将缺失值添加到自己的状态映射。无论从传入消息中缺少什么值,都将作为响应返回。

发送Gossip消息的群集节点会添加其从八卦响应到自己状态的值。

私有void handlegossiprequest(org.distrib.patterns.common.message< RequestorResponse>请求){gossipstatemessage gossipstatemessage = deserialize(request.getrequest());地图< nodeid,nodestate> gossipedstate = gossipstatemessage.getnodeStates(); getLogger()。信息("来自&#34的合并状态; + Request.getClientsocket());合并(八卦斯特泰特);地图< nodeid,nodestate> diff = delta(this.clustermetadata,gossedstate); gossipstatemessage diffresponse = new gossipstatemessage(diff); getlogger()。信息("发送差异响应" +差异); Request.getClientsocket()。写(new RequestorResponse(requestId.pushpullgossipstate.getId(),jsonserdes.serialize(diffresponse),Request.getRequest()。getcorrelationId())); }

公共图< nodeid,nodestate> delta(map< nodeid,nodestate> frommap,map< nodeid,nodestate> tomap){map< nodeid,nodestate> delta = new hashmap<(); for(nodeid密钥:frommap.keyset()){if(!tomap.containskey(key)){delta.put(key,frommap.get(key));继续;从states = frommap.get(key); nodestate tostates = tomap.get(key); nodestate差异= fromstates.diff(tostates); if(!diffstates.isusemaly()){delta.put(key,差错); }}返回delta;}

公共void合并(Map< nodeid,nodestate> Otherstate){map< nodeid,nodestate> diff = delta(Otherstate,this.clustermetadata); for(nodeid diffkey:diff.keyset()){if(!thice.clustermetadata.containskey(diffkey)){this.clustermetadata.put(diffkey,diff.get(diffey)); } else {nodestate statemap = this.clustermetadata.get(diffkey); statemap.putlal(diff.get(diffkey)); }}}

每个群集节点在每个群集节点处每隔一秒一次发生此过程,每次选择不同的节点以交换状态。

上面的代码示例显示了节点的完整状态在GOSSIP消息中发送。这对新加入的节点很好,但是一旦状态是最新的,它就不必发送了完整状态。群集节点只需要自上次八卦以来发送状态更改。为了实现这一目标,每个节点都维护每次在本地添加新的元数据条目时递增的版本号。

群集元数据中的每个值都以版本号维护。这是模式版本为值的示例。

int版本;字符串值; public supmentedValue(String值,int版){this.version =版本; this.value = value; public int getversion(){返回版本; public string getValue(){返回值; }

私有void sendknversions(inetaddressandport gossipto)抛出IoException {Map< nodeId,Integer> maxknnownnodeversions = getmaxknownnodeversions(); RequestorResponse mepardversionRequest = new RequestorResponse(requestId.gossipversions.getId(),jsonserdes.serialize(新的gossipstateversions(maxknownnodeversionsions),0); socketClient< RequestorResponse> socketClient =新侧面套接字(Gossipto); byte [] hypernversionResponseBytes = SocketClient.BlockingsEnd(ketapversversionRequest);私人地图< nodeid,整数> getmaxknnodeversions(){return clustermetadata.entryset().stream().collect(collectors.tomap(e-> e.getkey(),e-> e.getvalue()。maxversion()); }

然后,当版本大于请求中的版本时,接收节点只能发送值。

地图< nodeid,nodestate> getmissingandnodestateshigherthan(map< nodeid,整数> nodemaxversions){map< nodeid,nodestate> delta = new hashmap<(); delta.cputall(legervacessednodeStates(nodemaxversions)); delta.putlal(漏洞(nodemaxversions));返回三角洲;私人地图< nodeid,nodestate>漏洞(Map< nodeid,整数> nodemaxversions){map< nodeid,nodestate> delta = new hashmap<();列表< nodeid> missingkeys = clustermetadata.keyset()。stream()。过滤器(key - >!nodemaxversions.containskey(key))。收集(收集器.Tolist()); for(nodeid missingkey:missingkyys){delta.put(lisskey,clustermetadata.get(忘掉)); }返回三角洲;私人地图< nodeid,nodestate>高reserversedNodeStates(Map< nodeid,整数> nodemaxversions){map< nodeid,nodestate> delta = new hashmap<();设置< nodeid> keyset = nodemaxversions.keyset(); for(nodeId节点:keyset){整数maxversion = nodemaxversions.get(节点); nodestate nodestate = clustermetadata.get(节点); if(nodestate == null){继续; nodestate deltastate = nodestate.statesgreaterthan(maxversion); if(!deltastate.isusemal()){delta.put(node,deltastate); }}返回delta; }

Gossip在[Cassandra]中的实现优化了一种用三方握手优化状态交换,其中接收Gossip消息的节点也将从发件人提供所需的版本以及它返回的元数据。然后,发件人可以立即使用所请求的元数据响应。这避免了额外的消息,否则是必需的。

[蟑螂]中使用的八卦协议为每个连接的节点维护状态。对于每个连接,它将发送到该节点的最后一个版本,并从该节点接收的版本。这是可以发送'自上次发送的版本以来的状态。并询问'来自上次收到的版本和#39的状态;

也可以使用一些其他有效的替代方案,发送整个地图的哈希,如果哈希相同,则无效。

群集节点随机选择要发送八卦消息的节点。 Java中的示例实现可以使用java.util.random如下:

私有随机= new random();私有inetaddressandport pickrandomnode(list< inetaddressandport> nearyClusternodes){intrandnnodeindex = wandom.nextint(nexindclusternodes.size()); inetaddressandport gossipto = nugindclusterternode.get(randomnodeindex);返回gossipto; }

可以存在其他考虑因素,例如最少联系的节点。例如,蟑螂中的八卦协议以这种方式选择节点。

维护群集中的可用节点列表是Gossip协议最常见的使用之一。使用两种方法。

[Swim-Gossip]使用一个单独的探测组件,该组件在群集中连续探测群集中的不同节点以检测它们是否可用。如果检测到节点是活着的或死亡,则该结果与八卦通信传播到整个群集。探测器随机选择一个节点以发送八卦消息。如果接收节点检测到这是新信息,则会立即将消息发送到随机选择的节点。这样,整个群集将在群集中快速已知群集中的节点或新连接的节点的故障。

群集节点可以定期更新自己的状态以反映其心跳。然后通过交换的八字消息传播到整个群集。然后,每个群集节点可以检查它是否已在固定的时间内接收到特定群集节点的任何更新,或者将节点的标记为下降。在这种情况下,每个群集节点独立地确定节点是否上或向下。

如果节点崩溃或重新启动,则版本后的值不起作用,因为所有内存状态都丢失。更重要的是,节点可以对相同键具有不同的值。例如,群集节点可以以不同的IP地址和端口启动,或者可以以不同的配置开始。生成时钟可用于标记每个值的生成,从而当元数据状态被发送到随机簇节点时,接收节点可以检测不仅仅是由版本号的变化,还可以与生成一起检测。

值得注意的是,这种机制不是必需的核心八卦协议工作。但它在实践中实施了它&#39,以确保正确跟踪状态更改。

[Cassandra]使用Gossip协议进行组成员身份和群集节点的故障检测。还使用GOSSIP协议发送诸如分配给每个群集节点的令牌的每个群集节点的元数据。

BlockChain实现,如Hyperlowger Fabric,使用Gossip协议进行组成员身份和发送分类帐元数据。