流式网桥--以Kafka、RabbitMQ、MQTT和CoAP为例

2020-05-15 06:45:10

如今,我们可以依靠不同的流媒体系统来传输和收集数据。我们使用的一些系统需要极高的可用性。另一方面,对于其他系统,高可用性可能是拥有可靠系统之间的权衡。有时使系统变慢,可以留出空间来增加传输数据的可靠性。令人着迷的是,选择是无穷无尽的。例如,要流式传输我们的数据,我们可以使用推或拉机制。我们意识到它们之间的区别是非常重要的。Pull机制是一种进程不断循环通过通道或缓冲区以查看其中内容的机制。推送机制将数据发送到进程正在等待它的位置。这两种截然不同的机制正是分别使用Kafka和RabbitMQ之间的主要区别。Kafka起源于2010年的LinkedIn,RabbitMQ是在2007年2月1日由一家同名的公司开发的:RabbitMQ Ltd。值得注意的是,RabbitMQ实现了AMQP(高级消息队列协议)。RabbitMQ也被称为AMQP经纪人。后来,卡夫卡选择开发流动巴士。这两个协议是本文的重点。同时,我们还将介绍物联网(IoT)中主要使用的其他不同的数据传输协议。一种是MQTT(消息队列遥测传输),另一种是CoAP(受限应用协议)。让我们从查看本文将要研究的协议的历史开始。

非常重要的是要注意到,这篇文章将会被修改。考虑到它的复杂性和其中描述的运动部件的数量,要解释代码的每一个比特是非常不可能的。无论如何,我们的重点都是MQTT、CoAP、RabbitMQ和Kafka。该代码是一个附加值,将在此过程中添加到本文中。在此期间,让我们使用我们的分析技能来处理这一部分。

1999年,IBM的安迪·斯坦福-克拉克和欧洲技术学院的阿伦·尼珀两位工程师设计了这个协议。他们试图解决的问题是不同输油管道通过卫星连接的不可靠。我们的想法是创建一种占用非常窄带宽的轻量级协议。该软件必须支持任何类型的数据,并支持多个级别的QoS(服务质量)。该协议被设计为M2M(机器对机器)方式,目前广泛用于物联网协议中。

RabbitMQ主要用于通过AMQP(高级消息队列协议)发送消息。该协议是由伦敦摩根大通的John O‘Hara于2003年发明的。其目标是为企业消息传递创建语言中立的网络协议。通过实现这一点,John O‘Hara为不同AMQP实现的激增创造了一个市场。最广为人知的是ActiveMQ、Apache QPID和RabbitMQ。

RabbitMQ是由Rabbit Technologies Ltd于2007年2月1日开发的。RabbitMQ是AMQP协议的代理实现。

该协议也应该作为M2M在物联网中使用。该议定书的初稿于2009年12月首次出现在RFC7252中。几年后的2014年,它获得了C.Bormann的批准并撰写。尽管本文使用CoAP,但请注意,CoAP是为受限设备设计的Internet应用程序协议。受限设备是物联网架构中服务于特定目的的终端节点。它们通常与传感器相连。同时,它也是一种服务层协议,允许在资源受限的设备之间进行连接。COAP代表:受限应用协议。

2010年,Jay Kreps、Jun Rao和Neha Narkhede在LinkedIn总部创建了Kafka。他们试图解决的问题是不断增长的信息摄取率。实现AMPQ的系统(如RabbitMQ和ActiveMQ)在事务支持、消息跟踪和协议中介方面提供了扩展的可能性。这些都是提供高可用性和可靠性的繁重操作。然而,这些实时系统并不适合从LinkedIn产生的过度增长的消息交换中保留所有数据。同时,它们都没有存储数据。相反,他们只是在传递数据。对于每次读取,都会从队列中删除每条消息。为了保存数据,需要额外的设计时间、精力和处理来将数据传输到持久存储机制。LinkedIn的开发人员意识到,这些消息中的大多数实际上并不需要RabbitQ提供的所有消息处理,因此产生的开销也太多,理想情况下应该有一个系统提供开箱即用的持久化机制,并避免即时删除消息。

为了完成我们的研究,我们将玩一个侦探游戏,我们可以检查数据库,这样就可以找到我们调查所需的线索。

要继续分析本文,我们首先需要了解,在本练习中,我们将介绍相当广泛的技术。以下是我们需要的物品清单:

多个JDK版本的切换机制(如SDK Man、自定义Bash脚本、Update-Alternative)。只选你最喜欢的)。

如果运行良好,那么我们准备继续。现在我们来看一下我们的案子。

我们将实施四个主要参与者。一个是火车,另一个是卡车,然后我们有一座火车的桥,最后是一座卡车的桥。卡车的桥架具有开启和关闭的能力。火车的桥是固定的,没有开放时间。

主要目标是监督进出桥梁的内容和时间。有可能为其他重要的调查提供数据。在这个项目中,我们将找到一个调查游戏,其中有一起谋杀案被报告,玩家必须找出凶手是谁。没什么太复杂的。这只是了解这些体系结构的所有基本参与者以及它们如何协同工作的一种方式。

我们把这事分开吧。现在我们将简要描述这些服务应该做些什么。

中央服务部负责保存我们所有的数据。在这种情况下,我们有几个顾虑。中央服务需要有某种持久化机制来保存数据。此外,它还需要能够以某种方式接收来自外部服务的大量数据。

我们的列车服务负责3项重要功能。它需要定期发送有关列车上的商品和人员的数据。它还需要在过桥时发送检入和检出消息。

因为我们的运输车辆本身并不载客,所以我们不需要客运服务。然而,它确实有一名司机,可能还有一名货运人员。在我们的示例中,我们将忽略这一点,并将重点放在商品上。

在我们的桥接服务器中,我们需要实现3个重要功能。我们将提供传感器服务。它将检测火车的登机和退票。它不会登记任何与列车身份有关的信息,只会登记桥上有一列火车的事实。此信息将与来自列车服务器的传感器服务模块的数据进行三角测量。还将实施两个物联网相关元素。一个读取温度,另一个读取湿度值。

列车和车辆服务器将共享相同的Bridge_01联盟。它们通过不同的桥运行,理论上我们应该为两个桥都有两个不同的联邦。但是,为了简化本文,我们将只使用相同的联邦。尽管如此,我们仍将使用不同的消息内容来区分它们。

让我们从思考坚持需要什么开始吧。首先,我们查看要从物联网接收的数据,这将是温度和湿度。在真实场景中,这些将代表巨大的数据量。在这种情况下,我们将看到某种大数据机制。然而,大数据本身就是整个范式。在本例中,我们的大数据机制将是Cassandra。我们需要考虑的另一件事是乘客信息。我们想定期发送整个乘客数据。也就是说,我们有兴趣登记他们的重量。如上所述,准确监控这一点很重要。这也代表了很多信息。与仪表的信息一样,它也是时间序列数据。如果在其中一段时间内某些数据丢失,我们真的不会那么担心。我们定期重复发送所有乘客的数据,频率相对较高,信息应该不会有太大变化。为此,我们也会以大数据的方式对乘客信息进行登记。

现在是时候考虑我们将以更低的频率接收的其他数据,以及我们实际上不想错过任何消息的地方。或者至少,我们只是想要更多的可靠性。对于这种情况,我们确实希望受益,主要是从事务处理和消息处理中获益。协议中介无论如何都会发生,因为我们的消息需要通过AMQP,但这也确保了它们的可靠性。在本文中,我们不会讨论网站的创建,但考虑到这将是真实案例场景中的最终目标,我们将牢记这一点。对于网站,最好实现ER数据库模型。为此,我们选择PostgreSQL。

为了将数据传送到Cassandra,我们将使用Apache Spark。这样我们就能把Spark和我们的Kafka流连接起来。我们将在中央流服务中运行Spark进程,我们将在火车和桥梁中运行Kafka进程。我们将让RabbitMQ经纪人在桥上、火车上和车辆上运行,以获得我们需要的商品数据。同样,我们将在最后三个玩家上运行RabbitMQ代理。

在接下来的子章中,我们将介绍所有相关的实现。我会尽量不在描述中重复我自己。这本身就是一个挑战,因为在不同的容器中分布着非常相似的配置。我们将通过自上而下的解释,换句话说,这将给我们一种剥离该体系结构不同组件的不同层的感觉。通过这种方式,我认为我们可以更好地理解和发现系统是如何工作的,以及所有移动部件是如何协同工作的。

需要注意的是,尽管实现和项目概述看起来非常广泛和复杂,但实际的编码并不复杂。现在我们将深入到代码中,逐一介绍每个主题。

此架构中的主要技术参与者之一是RabbitMQ。我们正在使用这项技术来获取商品信息和传感器数据。商品信息至关重要,传播率很低。在传感器中,数据的传输速率甚至更低,因为只有在火车过桥时才会发生这种情况。我们还知道,我们将不需要重播这些消息中的任何一条。原因是,虽然我们需要确保我们在火车上装载的货物完全到达目的地,但防止盗窃的机制已经到位。集装箱是密封的,乘客不能登上或穿过商品车厢。我们只需要商品信息就能知道它的重量对桥有什么影响。关于传感器,我们也不想存储检入/检出数据。一次丢失的签入/签出数据在一天中不会对数百个数据造成影响。因此,这些信息实际上都不是VITA,其吞吐量也不是很高。

在配置这个之前,重要的是要知道我们不会深入到RabbitMQ的配置中。尽管有多种可能的配置,但我们将尽可能保持1:1的比例。

我们开始吧。在bl-center-server/bl-center-stream中,我们找到一个具有以下定义的Docker:

FROM jesperancinha/je-Streams:0.0.1 WORKDIR/root ENV lang=C.UTF-8 run rabbitmq-plugins enable-offline rabbitmq_management run rabbitmq-plugins enable rabbitmq_Federation run rabbitmq-plugins enable rabbitmq_management run rabbitmq-plugins enable rabbitmq_shovel rabbitmq_management copy entrypoint.sh/root entry。

我们首先使用我创建的一个映像,其中包含RabbitMQ的原始安装。如果您有兴趣了解更多,请在这里查看。在我们继续之前,配置管理页面很重要。我们将离线配置它,我们将能够通过网页进行配置。之后,我们将启用称为联合的功能。简单地说,这就是将两个RabbitMQ代理连接在一起的方式。它们可以靠得很近,也可以物理上位于很远的地方。这背后的想法是,他们最终以单一经纪人的身份工作。我们需要它来将列车、桥梁和车辆中运行的RabbitMQ代理连接到中央服务器。我们还增加了铁铲管理。这一点非常重要,因为我们希望通过我们的联盟自动交换我们的消息。如果不这样做,消息将滞留在源RabbitMQ代理上,并且将需要另一个外部交互才能将它们移动到其目的地RabbitMQ代理。最后,为了能够可视化我们的网页并允许其他容器在中心流服务中找到我们的代理,我们需要提供两个重要的端口。这是端口5672和15672。它们分别是RabbitMQ服务器端口和Web GUI(图形用户界面)。RabbitMQ中的约定是这些端口之间的关系相差10000。在我们的示例中,如果我们说我们的服务器端口是5672,那么我们已经暗示我们的图形用户界面端口是10000+5672=15672端口。

现在让我们来看一下我们的entrypoint.sh。我们的入口点文件非常大,因此最好看一下各个部分。首先,我们启动服务器:

在启动我们的服务之后,我们仍然需要下载一个名为rabbitmqadmin的模块。此模块允许我们通过命令行配置虚拟主机、队列和交换等功能。因为rabbitmqadmin在python3上运行,但是在/usr/bin/python中查找它,所以我们需要在这个位置创建一个指向python3的符号链接。

现在我们需要考虑RabbitMQ服务器的一般配置。我们只需要创建我们的用户。我们添加一个密码为test的测试用户。然后,我们为用户提供管理员配置文件。最后,我们将权限设置为从根到新创建的用户的所有权限。

rabbitmqctl add_user测试测试rabbitmqctl set_user_tag测试管理员rabbitmqctl set_permission-p/test“.*”

我们将需要创建联合队列。我们创建它们的顺序并不重要。但是,它们确实需要同时运行才能访问。正如我们在概述中看到的,我们需要创建5个联邦。我们将以相同的方式创建它们。通过这种方式,我们可以从创建bash函数中受益,以帮助我们创建不同的联邦。我们要做的是创建一个虚拟主机。然后,我们将在测试用户上设置对其的所有权限。RabbitMQ需要交换来接收消息。RabbitMQ中的交换是消息路由器,它通过绑定和路由键将消息分发到队列。交流的类型很多。在我们的示例中,我们将使用扇出。这种类型的交换只意味着实际上没有使用路由密钥。这意味着如果更多的队列绑定到此交换,则每个队列都将获得传递到该交换的每条消息的副本。现在我们可以创建我们的队列了。然后,我们将队列与交换绑定。然后,我们将此队列与远程服务中的上游联合起来。最后,我们设置了联邦策略。

联合成员(){rabbitmqctl add_vhost bl_$1_vh rabbitmqctl set_permission-p bl_$1_vh test“.*”“.*”.*“*”rabbitmqadmin-u test-p test-V bl_$1_vh声明交换名称=bl_$1_exchange type=fan out rabbitmqadmin-u test-p test-V bl_$1_vh声明队列名称=。-上游bl_$1_upstream‘{“uri”:“amqp://test:test@bl_’$2';_SERVER:5672/bl_‘$1';_vh“,”Expires“:3600000}’rabbitmqctl set_policy-p bl_$1_vh-Apply-to all bl_$1_policy”.*$1.*“‘{”联合上游-设置“:”所有“}’}。

最后,我们为要创建的每个联邦调用我们的函数。在本例中,我们创建了5个虚拟主机、5个交换、5个队列、5个联盟和5个联盟策略。

虚拟主机、交换机和队列的创建与列车、车辆和桥梁服务器相同。唯一不同的是,在后3个版本中,没有创建任何联盟。因此,也不需要创建联盟策略。

这就是其中一个联合会的外观。在本例中,这是网桥服务从在线状态传感器获取数据以注册签入和签出的联合:

在我们的Kafka设置中,我们已经讨论了我们想要获得所有的仪表数据和乘客数据。这些是嗨

..