你还记得安格斯·麦凯弗(Angus&34;Mac&34;MacGyver)吗?这位80年代/90年代颇受欢迎的电视剧的主人公安格斯·麦盖弗(Angus&34;Mac&34;MacGyver)总是富有创造力,他只需一把瑞士军刀、胶带、鞋带和回形针就能解决任何问题。
Kafka Connect的单消息转换(SMT)几乎和MacGyver的瑞士军刀一样多才多艺:
SMT可以是Kafka Connection上下文中出现的这些和许多其他问题的答案。应用于源连接器或宿连接器,SMT允许分别在将Kafka记录发送到Kafka之前或在从主题中使用它们之后修改Kafka记录。
在这篇文章中,我想重点介绍一下SMT的一些有趣的用法(希望不管怎样)。这些用例大多基于我使用Kafka Connect和Debezium的经验,Debezium是一个开源的变更数据捕获(CDC)平台。不久前,当我在Twitter上询问社区有关这方面的问题时,我也得到了一些关于SMT用法的很好的建议:
我绝对推荐去看看这个帖子;非常感谢所有回复的人!要了解更多关于SMT的一般知识,如何配置它们等,请参考这篇文章最后给出的资源。
对于每一类用例,我还询问了我们富有同情心的电视英雄对SMT对HAND任务的有用性的看法。你可以在每个部分的末尾找到他的评级,从📎(不适合)到📎(非常适合)。
SMTS最常见的应用可能是格式转换,即调整数据的类型、格式和表示形式。这可能适用于整个消息,也可能适用于特定的消息属性。让我们首先看几个转换单个消息属性格式的示例:
时间戳:不同的系统往往对如何键入和格式化时间戳有不同的假设。例如,Debezium将大多数时间列类型表示为自纪元以来的毫秒。另一方面,Change事件使用者可能希望使用Kafka Connect的Date类型或ISO-8601格式的字符串(可能使用特定时区)获得这样的日期和时间值
值屏蔽:敏感数据可能需要屏蔽或截断,或者甚至应该完全删除特定字段;随Kafka Connect一起提供的org.apache.kafka.connect.transs.MaskField和ReplaceField SMT在这方面非常有用。
数字类型:与时间戳类似,不同系统对(小数)数字表示的要求可能不同;例如,Kafka Connect的Decimal类型允许传递任意精度的小数,但其数字的二进制表示可能不受所有接收器连接器和消费者的支持。
名称调整:根据所选的序列化格式,可能不支持特定的字段名称;例如,在使用Apache Avro时,字段名称不能以数字开头。
在所有这些情况下,都可以使用现有的现成SMT或定制实现来应用所需的属性类型和/或格式转换。
当使用Kafka Connect将遗留服务和数据库与新建的微服务集成时,这样的格式转换可以在创建反腐败层方面发挥重要作用:通过使用更好的字段名称、选择更合适的数据类型或删除不需要的字段,SMT可以帮助保护新服务的模型不受遗留世界的古怪和怪癖的影响。
但是SMT不能只修改单个字段的表示,还可以调整整个消息的格式和结构。Kafka Connect的ExtractField转换允许从消息中提取单个字段并传播该字段。一个相关的SMT是Debezium用于更改事件扁平化的SMT。它可用于将复杂的Debezium更改事件结构(具有新旧行状态、元数据等)转换为可由许多现有接收器连接器使用的平面行表示形式。
SMT还允许微调模式命名空间;当使用模式注册表来管理模式及其版本时,这可能会很有意义,并且应该为给定主题的消息强制使用特定的模式命名空间。这一类别中的另外两个非常有用的SMT示例是Jeremy Cutenborde的Kafka-connect-Transform-xml和kafka-connect-json-schema,它将获取XML或文本,并根据给定的XML模式或JSON生成类型化的Kafka Connect Struct。
最后,作为一种特殊的格式转换,SMT可以用来修改或设置Kafka记录的键。如果源连接器没有产生任何有意义的键,但可以从记录值中提取一个键,这可能是理想的。此外,在考虑后续流处理时,更改消息键也很有用。例如,在源端选择匹配的键允许通过Kafka流连接多个主题,而不需要重新设置记录的键。
Mac评级:*📎+SMT是进行卡夫卡连接记录格式转换的完美工具。
对Kafka记录模式的更改可能会对消费者造成潜在的破坏。例如,如果记录字段被重命名,消费者必须相应地进行调整,使用新的字段名称读取值。如果字段被完全删除,消费者不能再期待此字段。
消息转换可以帮助实现从一个模式版本到下一个模式版本的这种转换,从而减少消息生产者和消费者生命周期的耦合。在重命名字段的情况下,SMT可以使用原始名称再次添加该字段。这将允许消费者继续使用旧名称读取该字段,并且可以按照自己的速度升级为使用新名称。在一段时间之后,一旦所有消费者都已被调整,则SMT可以再次被移除,只暴露出新的字段名称。类似地,在一段时间之后,当所有消费者都被调整之后,SMT可以再次被移除,只暴露新的字段名称。类似地,一个。例如使用某种恒定的占位符值。在其他情况下,可能从其他仍然存在的字段导出该字段值。然后,消费者可以按照他们自己的速度被更新以不再期望和访问该字段。
但是应该说,这种使用是有限制的:例如,当改变字段的类型时,事情很快变得棘手。一种选择可以是多步骤方法,其中首先添加具有新类型的单独字段,然后如上所述再次重命名它。
Mac的评级:📎表示,SMT主要可以帮助解决围绕模式演变的基本兼容性问题。
在源端应用时,SMT允许过滤出连接器生成的特定记录。它们还可用于控制记录发送到的Kafka主题。当过滤和路由基于实际记录内容时,这尤其有趣。例如,在使用Kafka Connect从某种传感器获取数据的物联网方案中,SMT可用于过滤掉低于特定阈值的所有传感器测量结果,或将高于阈值的测量事件路由到特定主题。
逻辑主题Routing SMT允许将源自多个表的更改事件发送到同一Kafka主题,这在处理Postgres中的分区表或处理分片到多个表中的数据时非常有用。
Filter和ContentBasedRouter SMT允许您使用Groovy或JavaScript等语言中的脚本表达式,根据更改事件的内容过滤和路由更改事件;这种基于脚本的方法可以是易用性(没有Java代码必须编译和部署到Kafka Connect)和表现力之间有趣的中间地带;例如,路由SMT如何与GraalVM的JavaScript引擎一起使用,根据订单类型将更改事件从包含采购订单的表路由到Kafka中的不同主题:
发件箱事件路由器在实现微服务之间数据传播的事务发件箱模式时非常方便:它可以用于将源自单个发件箱表的事件发送到每个聚合(当考虑域驱动设计时)或事件类型的特定Kafka主题。
在Kafka Connect本身中还有两个用于路由的SMT:RegexRouter,它允许基于正则表达式重新路由两个不同的主题;以及TimestampRouter,用于根据记录的时间戳确定主题名称。
虽然路由SMT通常应用于源连接器(定义记录发送到的Kafka主题),但将它们与接收器连接器一起使用也是有意义的。当接收器连接器从主题名称派生下游表名、索引名或类似名称时就是这种情况。
Mac的评级:📎表示,消息过滤和主题路由 - 对SMT没有问题。
墓碑记录是空值的Kafka记录,它们在处理压缩主题时具有特殊的语义:在日志压缩过程中,所有与墓碑记录具有相同关键字的记录都将从主题中移除。
在压缩发生之前,墓碑将在主题上保留一段可配置的时间(通过delete.retention.ms主题设置进行控制),这意味着Kafka Connect接收器连接器也需要处理它们。不幸的是,并不是所有的连接器都为null值的记录做好了准备,通常会导致NullPointerException和类似的情况。在这种情况下,可以使用上面这样的过滤SMT来删除墓碑记录。
现在,当使用像Debezium这样的 - 连接器来从使用软删除的数据库捕获更改时(即,不物理地删除记录,但是在删除记录时将逻辑删除标志设置为真),这些更改事件将被导出为更新事件(从技术上讲是这样的)。现在,当使用诸如Debezium的 - 连接器来从使用软删除的数据库捕获更改时,这些更改事件将被导出为更新事件(从技术上讲是这样的)。例如,当使用诸如Debezium这样的CDC连接器来从使用软删除的数据库捕获更改时(即,没有物理删除记录,但是在删除记录时将逻辑删除标志设置为真),则这些更改事件将被导出为更新事件(从技术上讲是这样的。
Mac的评级:📎表示,SMT在丢弃墓碑或将软删除事件转换为墓碑方面做得很好。不过,不可能在保留原始事件的同时生成额外的墓碑记录
甚至一些高级的企业应用程序模式也可以在SMT的帮助下实现,索赔检查模式就是一个例子。此模式在如下情况下非常有用:
消息可能包含消息流稍后可能需要的一组数据项,但并非所有中间处理步骤都需要这些数据项。我们可能不想在每个处理步骤中携带所有这些信息,因为这可能会导致性能下降,并且因为我们携带了如此多的额外数据而使调试变得更加困难。
一个具体的例子也可以是从数据库表USERS捕获更改的CDC连接器,它具有一个包含用户个人资料图片的BLOB列(当然不是最佳实践,在现实…中仍然不是那么少见。)。
Apache Kafka不适用于大消息。默认情况下,最大消息大小为1 MB,虽然这个大小可以增加,但基准测试显示,小得多的消息具有最佳吞吐量。因此,分块和外部化大有效负载等策略对于确保令人满意的性能至关重要。
将更改数据事件从该表传播到Apache Kafka时,将图片数据添加到每个事件会带来很大的开销,特别是如果图片blob在两个事件之间根本没有更改。
使用SMT,BLOB数据可以外部化到其他存储。在源端,SMT可以从原始记录提取图像数据,例如将其写入网络文件系统或Amazon S3存储桶。记录中的相应字段将更新,以便仅包含外部化有效负载的唯一地址,如S3存储桶名称和文件路径:
作为优化,可以通过比较外部化文件的早期散列和当前散列来避免再次重新上传未改变的文件内容。
应用于接收器连接器的相应SMT实例将从传入记录中检索外部化文件的标识符,从外部存储器获取内容,并在将其传递到连接器之前将其放回记录中。
Mac的评级:📎认为SMT可以帮助将有效载荷外部化,避免大量的卡夫卡记录。不过,依赖另一项服务会增加整体复杂性。
正如我们已经看到的,单一消息转换可以帮助解决Kafka Connection用户通常提出的相当多的要求。但也有局限性;就像MacGyver一样,除了他心爱的瑞士军刀之外,有时还必须使用其他工具,您不应该一直认为SMT是完美的解决方案。
最大的缺点已经在它们的名字中被暗示了:SMT只能用来处理单个记录,一次只能处理一个记录。您不能使用SMT将一条记录分割为多条记录,因为它们最多只能返回一条记录。此外,任何类型的状态处理(如聚合多条记录中的数据或关联多个主题中的记录)都是SMT的禁区。对于此类用例,您应该考虑Kafka Streams和Apache Flink等流处理技术;此外,Apache Camel等集成技术在此也非常有用。
在使用SMT时需要注意的一件事是配置的复杂性;当使用通用的、高度可配置的SMT时,您可能最终会得到很难掌握和调试的冗长配置。您最好实现一个专注于一个特定任务的定制SMT,充分利用Java编程语言的全部功能。
无论您是通过配置使用现成的SMT,还是在Java中实现自定义SMT,测试您的工作都是必不可少的。
虽然单元测试是定制SMT实现的基本测试的可行选择,但是建议使用针对Kafka Connect连接器运行的集成测试来测试SMT配置。这样,您可以确保SMT可以处理实际消息,并且已经按照您想要的方式进行了配置。
TestContainers和对TestContainers的Debezium支持是设置所有必需组件(如Apache Kafka、Kafka Connect、连接器和要测试的SMT)的重要基础。
我时不时希望的一个特定功能是,能够将SMT仅应用于连接器创建或使用的主题的特定子集。尤其是,如果连接器创建了不同类型的主题(如实际的数据主题和另一个具有元数据的主题),则可能希望只将SMT应用于一个组的主题,而不应用于另一个组的主题。KIP-585(筛选和条件SMT)中包含了这一要求,如果您有需求或需要,请加入到关于该主题的讨论中。如果您有需求,请加入到关于该主题的讨论中,如果您有需求或需求,请加入关于该主题的讨论,如果您有需求或需求,请加入关于该主题的讨论,如果您有需求或需求,请加入关于该主题的讨论,如果您有需求或
有几个很棒的演示文稿和博客文章深入描述了什么是SMT,您可以如何实现您自己的SMT,它们是如何配置的,等等。
单一消息转换不是您正在寻找的转换:关于SMT、它们的功能以及限制的很好的概述,Ewen Cheslake-Postava著。
Kafka Connect SMT的实践体验:关于SMT使用案例、注意事项等的深入博客文章,作者:Gian D‘Uia。
现在,考虑到SMT的广泛使用案例,MacGyver会喜欢并使用它们来实施Kafka Connect周围的各种任务吗?我当然会这样认为。但一如既往,必须选择适合这项工作的正确工具:有时SMT可能非常合适,而有时更灵活(更复杂)的流处理解决方案可能更可取。
就像MacGyver一样,当你使用瑞士军刀、管道胶带或回形针时,你必须打个电话。
由Disqus提供支持的评论