近两年来,气流一直是Curoology数据堆栈中不可或缺的一部分。在此过程中,我们采用了气流开发模式,极大地加快了我们的工作流程。在这篇文章中,我们想与大家分享一下我们在Curoology发展和操作气流的旅程中的一些经验教训。
我们将假设您熟悉ETL和气流及其核心概念-本文不会深入讨论这些概念,但有许多其他资源可以做到这一点。
在我们深入研究之前,了解气流在治疗数据故事中的作用可能会有所帮助。当Airflow被引入我们的堆栈时,有几个第三方服务本质上提供了某种形式的“ETL-as-a-Service”-这些服务为交易或广告活动元数据等关键数据提供了入口。随着我们需求的增长,最终这些即插即用的ETL服务开始不能满足我们的需求,我们开始将这些管道引入内部。
在内部移动管道的另一个原因是为了确保可靠性:既要保证数据完整性,也要保证管道本身的可靠性。与现成的解决方案相比,SLA往往更容易实现;当您的团队拥有故障管道和停机时间时,可以直接采取行动。因此,随着我们的堆栈的发展,气流在这个故事中变得越来越重要,取代了ETL即服务提供商。
简而言之,Airflow充当我们最重要的数据的主要入口平面,从供应商那里拉入数据,并将这些数据持久保存在我们的数据仓库系统中。需要注意的是,气流只负责将数据移动到我们的仓库或数据湖中,并没有更多:这是我们发现气流真正闪耀的地方,特别是在“ELT”范例的上下文中。
随着我们将管道转移到内部,我们已经开始建立创作这类管道的标准化方法。我们的许多管道本质上都是与供应商的类似REST的API对话的连接器。这样的集成并不是特别有挑战性,特别是在气流的通用语言Python中。
然而,当我们开始构建这些集成时,前几个过程很笨拙,不适合模块化。为了解决这个问题,我们将目光投向了Airflow的插件系统。
诚然,这不是灵丹妙药,最初的传球也同样混乱。我们的第一次尝试将所有东西都集中到一个单一的插件中,这使得一些代码可以重用,但是使得将来的添加变得不必要的耦合和困难。
随着项目的进展,我们最终后退了一步,更多地从战术上思考如何有效地利用气流框架。我们偶然发现了一些由Astronomer提供的现有技术,Feel Inspiration将我们的整体重构为面向平台的“连接器”:封装与供应商API交互所需逻辑的气流插件。
连接器是围绕特定供应商或平台定向的气流插件。例如,我们有一个Facebook连接器。这些连接器通常至少由两件东西组成:
钩子往往尽可能地轻量级:它们毕竟只是一个用于与API对话的通用接口。而操作符往往更复杂,不那么通用,封装了特定任务的专门化业务逻辑。
为了说明这一点,我们来看一个简化版本的Facebook连接器。我们的插件目录可能如下所示:
插件/facebook_plugin/├──__init__.py├──Hooks│├──__init__.py││├──__init__.py│└──facebook_init__.py└──运算符├──__init__.py└──facebook_Campaign_to_S3_Operator.py。
这里我们有一个钩子来管理与Facebook的Graph API的通信,还有一个操作符负责在S3中持久化Facebook广告活动元数据。我们可以想象这些实现会是什么样子。为了利用它们做一些有用的事情,我们需要将它们放入某种DAG中。例如:
气流开发的一个挑战是知道特定的DAG做了您期望它做的事情。虽然Airflow本身不一定提供很多开箱即用的解决方案,但Python为自动化测试提供了特殊的工具。然而,应用这些技术并不总是像使用其他框架那样简单。
上面描述的插件模式适合相当标准的单元式测试,但有一些注意事项。例如,我们为所有挂钩和操作符提供测试。我们结合使用pytest和pytest-vcr,以允许我们针对真实的API响应创建具有可重现结果的测试。这里需要注意的是,这些测试需要通过unittest.mock进行相当数量的深度模拟,这可能是不透明的,并且很难推理。
这是我们的测试相对于连接器的布局方式的简化视图:
Plugins/facebook_plugin/├──__init__.py├──Hooks│├──__init__.py│和├──Cassettes│和│├──测试_facebook_graph_│_get.yaml│和└──测试_facebook_graph_hook.post.yaml│和├──facebook_graph_hook.py│和└──测试_facebook_graph_hook.py└─。─Operators├──__init__.py├──Cassettes│└──├──_facebook_Campaign_to_S3_Operator.yaml Testfacebook_Campaign_to_S3_Operator.py└──TEST_facebook_Campaign_to_S3_Operator.py。
盒式磁带由pytest-vcr包提供,包含测试消耗的可重放请求和响应。
这种测试风格的目标是确保执行关键代码路径,并确保我们从调用API方法或任务执行方法中检索到的数据符合预期的形状。这不能替代在线数据完整性检查(尽管它们也是DAG原创性的一部分),但它们是有用的健全性检查。
摆脱ETL供应商的一个目标是增加我们对系统中数据质量的信心。为此,我们建立了一个小型约定库,对数据的外观进行在线断言。这些断言可能在数据从线路上传出时发生,或者在数据到达我们的仓库或数据湖之后发生。
未来工作的一个主要焦点将是为集成风格的测试构建更好的测试工具,这目前是一个相对的盲点。为了实现这一点,我们已经开始在一个系统上工作,以便轻松地建立独立的环境。这将在不久的将来形成端到端测试的基础。
气流仍然是我们数据堆栈中的重要一层。也就是说,气流是一种具有许多功能和可调参数的复杂工具。Curoology平台团队发现,与我们早期使用该框架的一些尝试相比,通过采用一些关键模式,我们能够有效地利用气流。
我们正在继续发展我们对有效气流模式的理解。特别是,随着我们转向基于S3的数据湖,我们发现了进一步简化连接器的机会。我们还在探索通过Docker和Kubernetes创作机器学习工作流的模式。毫无疑问,在这个过程中还会有更多的发现。
平台团队正在寻找充满激情的数据工程师来帮助构建支持我们内部业务合作伙伴的数据堆栈-从营销自动化到统计建模、ML和NLP的一切都依赖于我们团队正在构建的基础设施。如果您对此感兴趣,请与我们联系!