Apache Nifi是目前可用的最流行的开源数据流引擎之一。NIFI几乎支持所有主要的企业数据系统,并允许用户创建有效、快速和可扩展的信息流系统。使用NIFI创建数据流系统很简单,而且有一条明确的途径来添加对尚未作为NIFI处理器提供的系统的支持。所有这些都推动了Nifi的大规模采用。
有几个Minio客户在他们的使用案例中使用Nifi。这些客户正在利用Minio实现高性能数据湖,通常会合成多个不同的数据集。Nifi允许这些客户将此数据路由到相关的最终消费者。
常见模式之一是使用Nifi中的Minio对象元数据创建自定义流。
具体的用例可能会有所不同,例如,有些人可能想要区别对待上传到Minio的csv和json文件,有些人可能想要分离jpg、png和pdf文件,有些人可能只想将json文件转换为拼图并将其存储回Minio-以此类推。
在这篇文章中,我将解释如何设置Nifi来监听Minio事件通知。然后,我们将了解如何通过Nifi处理器解析Minio事件json。然后,我们将从事件json中筛选用户定义的元数据头。最后,我们将看看如何根据事件json中是否存在头来采取后续步骤。
我们将使用Minio WebHook事件通知将Apache Nifi配置为事件目标。为此,首先在Nifi GUI中创建一个ListenHTTP处理器。然后将其配置为侦听某个端口。请参阅下面的处理器详细信息:
创建处理器后,为我们刚刚创建的WebHook服务器配置Minio事件通知。
mc mb myminio/source emc管理配置设置myminio NOTIFY_WEBHOOK:nifi endpoint=http://localhost:8086/contentListenermc管理服务重新启动myMiniomc事件添加myminio/source arn:mino:sqs::nifi:webhoke--event put
这里,我们将MINIO配置为每当存储桶源上有PUT事件时向http://localhost:8086/contentListener发送通知。
既然Minio和Nifi之间已经建立了通信,下一步就是使用EvaluateJsonPath Nifi处理器。我们使用它来解析Minio事件通知json有效负载,并识别它是否包含某个用户定义的元数据头。
如果报头X-AMZ-META-KEY1存在,我们进行下一步,否则我们在这里丢弃流。我们还获取对象和存储桶名称,以便可以将其传递到下一步。
在我的示例中,我查找头X-AMZ-META-KEY1。您可以在此处调整元数据字段以适合您的使用情形:
如果EvaluateJsonPath Nifi处理器找到了我们要查找的标头,我们将进入下一步。在本例中,我选择从Minio获取对象。我们使用FetchS3Object处理器来实现这一点。
为了完成操作,如果对象获取通过,我们将文件保存在本地驱动器上,否则我们会记录一个错误。完整的流程如下所示:
客户不可避免地面临数据流挑战,Apache Nifi已成为解决这些挑战的热门选择。在Minio,我们越来越多地看到使用Nifi作为数据流协调器来构建快速、可扩展和有效的管道的领域中的用例。
在这篇文章中,我们了解了如何使用事件通知和内置Nifi处理器构建基于Minio和Nifi的数据流系统。我们了解了数据流如何检查事件通知有效负载,并识别是否有特定的头可用。根据此筛选器事件的结果,我们添加了另一个从Minio获取对象并将其保存到本地的步骤。
你自己试试吧。如果你还没有Minio,你可以在这里下载。如果您需要帮助,请查看我们的文档。您也可以看看我们的公共松弛频道。