随着数据处理管道的数量和复杂性的增加,您可以通过将其分解为一系列较小的任务并协调这些任务作为工作流一部分的执行来简化整个过程。为此,许多开发人员和数据工程师使用由社区创建的Apache Airflow这个平台,以编程方式编写,安排和监视工作流。借助Airflow,您可以将工作流程作为脚本进行管理,通过用户界面(UI)对其进行监视,并通过一组功能强大的插件来扩展其功能。但是,手动安装,维护和扩展Airflow并同时为其用户处理安全性,身份验证和授权需要花费大量时间,而您宁愿专注于解决实际的业务问题。
由于这些原因,我很高兴宣布适用于Apache Airflow的Amazon Managed Workflows(MWAA),这是一项完全托管的服务,可让您轻松在AWS上运行Apache Airflow的开源版本,并构建工作流程来执行摘录-transform-load(ETL)作业和数据管道。
气流工作流程使用Amazon Athena查询从Amazon Simple Storage Service(S3)等来源检索输入,在Amazon EMR集群上执行转换,并可以使用所得数据在Amazon SageMaker上训练机器学习模型。使用Python编程语言,将Airflow中的工作流编写为有向非循环图(DAG)。
Airflow的主要优势在于其通过插件的开放可扩展性,使您可以创建与AWS交互的任务或工作流所需的本地资源,包括AWS Batch,Amazon CloudWatch,Amazon DynamoDB,AWS DataSync,Amazon ECS和AWS Fargate,Amazon弹性Kubernetes服务(EKS),Amazon Kinesis Firehose,AWS Glue,AWS Lambda,Amazon Redshift,Amazon Simple Queue Service(SQS)和Amazon Simple Notification Service(SNS)。
为了提高可观察性,可以将Airflow指标发布为CloudWatch指标,并将日志发送到CloudWatch Logs。默认情况下,Amazon MWAA提供自动的次要版本升级和补丁程序,并带有一个选项来指定在其中执行这些升级的维护时段。
创建环境–每个环境都包含Airflow集群,包括调度程序,工作程序和Web服务器。
将您的DAG和插件上传到S3 – Amazon MWAA将代码自动加载到Airflow中。
在Airflow中运行DAG –从Airflow UI或命令行界面(CLI)运行DAG,并使用CloudWatch监控环境。
如何使用Amazon MWAA创建气流环境在Amazon MWAA控制台中,单击“创建环境”。我给环境命名,然后选择要使用的Airflow版本。
然后,选择S3存储桶和文件夹以加载我的DAG代码。桶名称必须以airflow-开头。
对于插件和要求,我可以选择要使用的S3对象版本。如果我使用的插件或要求在我的环境中创建了不可恢复的错误,Amazon MWAA将自动回滚到以前的工作版本。
我从网络开始单击“下一步”以配置高级设置。每个环境都使用两个可用区中的私有子网在Amazon Virtual Private Cloud中运行。 Web服务器对Airflow UI的访问始终受使用AWS Identity and Access Management(IAM)的安全登录保护。但是,您可以选择在公共网络上访问Web服务器,以便可以通过Internet或VPC中的专用网络登录。为简单起见,我选择一个公共网络。我让Amazon MWAA使用正确的入站和出站规则创建一个新的安全组。 (可选)我可以添加一个或多个现有安全组,以针对您的环境微调对入站和出站流量的控制。
现在,我配置我的环境类。每个环境都包括一个调度程序,一个Web服务器和一个工作程序。工人们会根据我的工作量自动扩大和缩小。我们会根据DAG的数量为您提供关于使用哪个类的建议,但是您可以随时监视环境的负载并修改其类。
始终为静态数据启用加密,尽管我可以选择由AWS Key Management Service(KMS)管理的自定义密钥,但我将保留AWS代表我拥有和管理的默认密钥。
为了进行监控,我将环境性能发布到CloudWatch Metrics。默认情况下启用此功能,但是启动后我可以禁用CloudWatch指标。对于日志,我可以指定日志级别以及哪些Airflow组件应将其日志发送到CloudWatch Logs。我保留默认设置,仅发送任务日志并使用日志级别的INFO。
我可以修改Airflow配置选项的默认设置,例如default_task_retries或worker_concurrency。目前,我不会更改这些值。
最后,但最重要的是,我配置了环境将用来访问我的DAG,编写日志以及运行DAG来访问其他AWS资源的权限。我选择“创建新角色”,然后单击“创建环境”。几分钟后,即可使用新的Airflow环境。
使用Airflow UI在Amazon MWAA控制台中,我寻找刚创建的新环境,然后单击Open Airflow UI。创建一个新的浏览器窗口,并通过AWS IAM通过安全登录进行身份验证。
在那里,我在movie_list_dag.py文件中寻找放在S3上的DAG。 DAG正在下载MovieLens数据集,使用Amazon Athena在S3上处理文件,并将结果加载到Redshift集群中,如果缺少该表,则创建该表。
从气流进口DAG 从airflow.operators.python_operator导入PythonOperator 从airflow.operators导入HttpSensor,S3KeySensor 从airflow.contrib.operators.aws_athena_operator导入AWSAthenaOperator 从airflow.utils.dates导入days_ago 从datetime导入datetime,timedelta 从io导入StringIO 从io导入BytesIO 从进口睡眠开始 导入csv 汇入要求 导入json 导入boto3 导入压缩文件 进口io s3_bucket_name ='我的桶' s3_key ='文件/' redshift_cluster ='my-redshift-cluster' redshift_db ='dev' redshift_dbuser ='awsuser' redshift_table_name ='电影_演示' test_http ='https://grouplens.org/datasets/movielens/latest/' download_http ='http://files.grouplens.org/datasets/movielens/ml-latest-small.zip' athena_db ='demo_athena_db' athena_results ='雅典娜结果/' create_athena_movie_table_query =“”“ 如果不存在,则创建外部表Demo_Athena_DB.ML_Latest_Small_Movies( `movieId` int, `title`字符串, 类型字符串 ) 行格式SERDE'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' 与SERDEPROPERTIES( 'serialization.format'=',', 'field.delim'=',' )位置's3://my-bucket/files/ml-latest-small/movies.csv/ml-latest-small/' TBLPROPERTIES( 'has_encrypted_data'='false', 'skip.header.line.count'='1' ); “” create_athena_ratings_table_query =“”“ 如果不存在,则创建外部表Demo_Athena_DB.ML_Latest_Small_Ratings( `userId` int, `movieId` int, `rating` int, 时间戳记bigint ) 行格式SERDE'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' 与SERDEPROPERTIES( 'serialization.format'=',', 'field.delim'=',' )位置's3://my-bucket/files/ml-latest-small/ratings.csv/ml-latest-small/' TBLPROPERTIES( 'has_encrypted_data'='false', 'skip.header.line.count'='1' ); “” create_athena_tags_table_query =“”“ 如果不存在,则创建外部表Demo_Athena_DB.ML_Latest_Small_Tags( `userId` int, `movieId` int, `tag` int, 时间戳记bigint ) 行格式SERDE'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' 与SERDEPROPERTIES( 'serialization.format'=',', 'field.delim'=',' )位置's3://my-bucket/files/ml-latest-small/tags.csv/ml-latest-small/' TBLPROPERTIES( 'has_encrypted_data'='false', 'skip.header.line.count'='1' ); “” join_tables_athena_query =“”“ 选择替换(m.title,'“','')作为标题,等级 从demo_athena_db.ML_Latest_Small_Movies m 内联接(SELECT等级,来自demo_athena_db.ML_Latest_Small_Ratings WHERE等级> 4的movieId)r on m.movieId = r.movieId “” def download_zip(): s3c = boto3.client('s3') indata = requests.get(download_http) n = 0 zipfile.ZipFile(io.BytesIO(indata.content))为z: zList = z.namelist() 打印(zList) 对于zList中的i: 打印(i) zfiledata = BytesIO(z.read(i)) n + = 1 s3c.put_object(Bucket = s3_bucket_name,Key = s3_key + i +'/'+ i,Body = zfiledata) def clean_up_csv_fn(** kwargs): ti = kwargs ['task_instance'] queryId = ti.xcom_pull(key ='return_value',task_ids ='join_athena_tables') 打印(查询ID) athenaKey = athena_results +“ join_athena_tables /” + queryId +“。csv” 打印(athenaKey) cleanKey = athena_results +“ join_athena_tables /” + queryId +“ _ clean.csv” s3c = boto3.client('s3') obj = s3c.get_object(Bucket = s3_bucket_name,Key = athenaKey) infileStr = obj ['Body']。read()。decode('utf-8') outfileStr = infileStr.replace('“ e”','') outfile = StringIO(outfileStr) s3c.put_object(Bucket = s3_bucket_name,Key = cleanKey,Body = outfile.getvalue()) def s3_to_redshift(** kwargs): ti = kwargs ['task_instance'] queryId = ti.xcom_pull(key ='return_value',task_ids ='join_athena_tables') 打印(查询ID) athenaKey ='s3://'+ s3_bucket_name +“ /” + athena_results +“ join_athena_tables /” + queryId +“ _clean.csv” 打印(athenaKey) sqlQuery =“ copy” + redshift_table_name +“ from'” + athenaKey +“'iam_role'arn:aws:iam :: 163919838948:role / myRedshiftRole'CSV IGNOREHEADER 1;” 打印(sqlQuery) rsd = boto3.client('redshift-data') resp = rsd.execute_statement( ClusterIdentifier = redshift_cluster, 数据库= redshift_db, DbUser = redshift_dbuser, Sql = sqlQuery ) 打印(resp) 返回“确定” def create_redshift_table(): rsd = boto3.client('redshift-data') resp = rsd.execute_statement( ClusterIdentifier = redshift_cluster, 数据库= redshift_db, DbUser = redshift_dbuser, Sql =“如果不存在则创建表” + redshift_table_name +“(标题字符不同,等级为int);” ) 打印(resp) 返回“确定” DEFAULT_ARGS = { '所有者':'气流', 'depends_on_past':错误, '电子邮件':['[email protected]'], 'email_on_failure':错误, 'email_on_retry':错误 } 与DAG( dag_id ='电影列表-dag', default_args = DEFAULT_ARGS, dagrun_timeout = timedelta(小时= 2), start_date = days_ago(2), schedule_interval ='* / 10 * * * *', 标签= ['雅典娜','红移'], )作为dag: check_s3_for_key = S3KeySensor( task_id ='check_s3_for_key', bucket_key = s3_key, wildcard_match =真, bucket_name = s3_bucket_name, s3_conn_id ='aws_default', 超时= 20, poke_interval = 5, dag = dag ) files_to_s3 = PythonOperator( task_id =“ files_to_s3”, python_callable = download_zip ) create_athena_movie_table = AWSAthenaOperator(task_id =“ create_athena_movie_table”,query = create_athena_movie_table_query,database = athena_db,output_location ='s3://'+ s3_bucket_name +“ /” +“ athena_results +'create_athena_movie_table') create_athena_ratings_table = AWSAthenaOperator(task_id =“ create_athena_ratings_table”,query = create_athena_ratings_table_query,database = athena_db,output_location ='s3://'+ s3_bucket_name +“ /” + athena_results +'create_athena_ratings_table') create_athena_tags_table = AWSAthenaOperator(task_id =“ create_athena_tags_table”,query = create_athena_tags_table_query,database = athena_db,output_location ='s3://'+ s3_bucket_name +“ /” + athena_results +'create_athena_tags_table') join_athena_tables = AWSAthenaOperator(task_id =“ join_athena_tables”,query = join_tables_athena_query,database = athena_db,output_location ='s3://'+ s3_bucket_name +“ /” + athena_results +'join_athena_tables') create_redshift_table_if_not_exists = PythonOperator( task_id =“ create_redshift_table_if_not_exists”, python_callable = create_redshift_table ) clean_up_csv = PythonOperator( task_id =“ clean_up_csv”, python_callable = clean_up_csv_fn, Provide_context =真 ) transfer_to_redshift = PythonOperator( task_id =“ transfer_to_redshift”, python_callable = s3_to_redshift, Provide_context =真 ) check_s3_for_key >> files_to_s3 >> create_athena_movie_table >> join_athena_tables >> clean_up_csv >> transfer_to_redshift files_to_s3 >> create_athena_ratings_table >> join_athena_tables files_to_s3 >> create_athena_tags_table >> join_athena_tables files_to_s3 >> create_redshift_table_if_not_exists >> transfer_to_redshift
在代码中,使用诸如PythonOperator之类的运算符(针对通用Python代码)或AWSAthenaOperator来创建不同的任务,以使用与Amazon Athena的集成。要查看这些任务如何在工作流中进行连接,您可以看到最新的几行,为简单起见,在此重复(不缩进):
check_s3_for_key >> files_to_s3 >> create_athena_movie_table >> join_athena_tables >> clean_up_csv >> transfer_to_redshift files_to_s3 >> create_athena_ratings_table >> join_athena_tables files_to_s3 >> create_athena_tags_table >> join_athena_tables files_to_s3 >> create_redshift_table_if_not_exists >> transfer_to_redshift
Airflow代码在Python中重载了右移>>运算符以创建依赖关系,这意味着应首先执行左侧的任务,并将输出传递到右侧的任务。查看代码,这很容易阅读。上面的四行中的每一行都添加了依赖关系,并且将它们一起评估以按正确的顺序执行任务。
在Airflow控制台中,我可以看到DAG的图形视图,以清楚地表示任务的执行方式:
现在可用美国东部(北弗吉尼亚州),美国西部(俄勒冈州),美国东部(俄亥俄州),亚太地区(新加坡),亚太地区(Toyko),亚太地区(悉尼)提供适用于Apache Airflow(MWAA)的Amazon托管工作流。 ),欧洲(爱尔兰),欧洲(法兰克福)和欧洲(斯德哥尔摩)。您可以从控制台,AWS命令行界面(CLI)或AWS开发工具包启动新的Amazon MWAA环境。然后,您可以使用Airflow的集成生态系统在Python中开发工作流程。
使用Amazon MWAA,您可以根据环境等级和所使用的工人付费。有关更多信息,请参见定价页面。
上游兼容性是Amazon MWAA的核心宗旨。我们对AirFlow平台的代码更改已发布回开源。
借助Amazon MWAA,您可以将更多的时间花费在为工程和数据科学任务构建工作流上,而无需花费更多时间来管理和扩展Airflow平台的基础架构。