PySpark是一种包装器语言,允许用户与Apache Spark后端交互以快速处理数据。Spark可以在分布式服务器网络中的海量数据集上运行,如果使用得当,可提供重大的性能和可靠性优势。它带来了挑战,即使对于经验丰富的Python开发人员也是如此,因为PySpark语法利用了Spark的JVM传统,因此实现了可能不熟悉的代码模式。
这本关于PySpark代码风格的固执己见的指南介绍了我们遇到的常见情况,以及基于PySpark Repos中最频繁出现的主题的相关最佳实践。
除了PySpark的细节之外,干净代码的一般实践在PySpark存储库中也很重要-Google PyGuide是了解更多这些实践的有力起点。
首选选项更复杂、更长、更污染-而且是正确的。虽然通常最好避免完全使用F.ol(),但在某些情况下,使用它或另一种显式选择是不可避免的。然而,我们有充分的理由更喜欢第二个例子而不是第一个例子。
与第一种情况一样使用显式列时,DataFrame名称和架构都显式绑定到dataframe变量。这意味着如果df1被删除或重命名,引用df1.colA将中断。
相比之下,F.ol(';Cola&39;)将始终引用正在操作的数据帧中指定的列,在本例中名为df。它根本不需要跟踪其他数据帧状态,因此代码变得更加本地化,不太容易出现“远距离的诡异交互”,这通常是调试上的挑战。
如果列名包含需要括号操作符访问的空格或其他不受支持的字符,则df1[';Cola&39;]与F.ol(';Cola&39;);一样难以写入。
将像F.ol(';PROD_STATUS';)==';Delivered';这样的抽象表达式赋给一个变量,使其可用于多个数据帧,而df.prod_status==';Delivered';总是绑定到DF
幸运的是,通常不需要使用F.ol()复杂的表达式。在Spark3.0之前,这对于一些函数来说是必需的,比如F.Upper(),但是从那以后API变得更加统一了。
在某些上下文中,可能可以访问多个数据帧中的列,并且名称可能会重叠。常见的示例是df.join(df2,on=(df.key==df2.key),HOW=';LEFT';)这样的匹配表达式。在这种情况下,可以通过数据帧直接引用列。您还可以使用DataFrame别名消除连接的歧义(请参阅本指南的连接一节中的更多内容)。
逻辑操作通常驻留在.filter()或F.When()中,需要具有可读性。我们应用与链接函数相同的规则,将同一代码块内的逻辑表达式最多保留为三(3)个表达式。如果它们变长了,通常是代码可以简化或提取出来的迹象。将复杂的逻辑运算提取到变量中使代码更易于阅读和推理,这也减少了错误。
#Bad F.When((F.ol(';Prod_Status';)==';Delivered';)|(F.datediff(';deliveryDate_Actual&39;,';Current_Date';)<;0)&;((F.ol(';CurrentRegistry&39;)!=';';)|((F.datediff(';DeliveryDate_Actual';,';Current_Date';)<;0)&;((F.ol(';OriginalOperator';)!=';';)|(F.ol(';currentOperator';)!=';';),';正在使用';)。
上面的代码可以用不同的方式简化。首先,将重点放在将逻辑步骤分组到几个命名变量中。PySpark要求表达式用括号括起来。这与用于分组逻辑操作的实际括号混合在一起,可能会影响可读性。例如,上面的代码有一个原始作者没有注意到的冗余(F.datediff(df.deliveryDate_Actual,df.current_date)<;0),因为它很难发现。
#较好的HAS_OPERATOR=((F.ol(';OriginalOperator';)!=';';)|(F.ol(';currentOperator';)!=';';)Delivery_Date_Passed=(F.datediff(';deliveryDate_Actual';,';Current_Date';)<;0)HAS_REGISTION=(F.ol(';CurrentRegistry';)。)。RLIKE(';.+';)is_delivered=(F.ol(';Prod_Status';)==';Delivered';)F.When(IS_Delivery|(Delivery_Date_Passed&;(HAS_REGISTION|HAS_OPERATOR)),';服务中';)。
上面的示例去掉了冗余表达式,更易于阅读。我们可以通过减少手术次数来进一步改善它。
#Good Has_Operator=((F.ol(';OriginalOperator';)!=';';)|(F.ol(';currentOperator';)!=';';)Delivery_Date_Passed=(F.datediff(';deliveryDate_Actual';,';Current_Date';)<;0)HAS_REGISTION=(F.ol(';currentRegister';);)。RLIKE(';.+';)IS_DEVERED=(F.ol(';PROD_STATUS';)==';Delivered';)IS_ACTIVE=(HAS_REGISTION|HAS_OPERATOR)F.When(IS_Delivery|(Delivery_Date_Passed&;is_active),';在服务中';)
注意F.When表达式现在是如何简洁和可读的,并且所需的行为对于任何审阅这段代码的人来说都是显而易见的。如果读者怀疑有错误,则只需查看各个表达式。如果您的代码中有单元测试,并且希望将它们抽象为函数,它还可以使每个逻辑块易于测试。
在最后一个示例中仍然有一些代码重复:如何消除重复是读者的练习。
在PySpark转换开始时或在返回之前执行SELECT被认为是很好的做法。此SELECT语句指定与读取器的约定和有关输入和输出的预期数据帧模式的代码。任何选择都应被视为正在准备数据帧以供转换的下一步使用的清理操作。
使SELECT语句尽可能简单。由于常见的SQL习惯用法,对于每个选定的列,只允许使用spk.sql.function中的一个函数,外加一个可选的.alias()以赋予它一个有意义的名称。请记住,这应该谨慎使用。如果在同一SELECT中有三个以上这样的用法,则将其重构为一个单独的函数,如CLEAN_<;dataframe name>;()来封装操作。
除非出于性能原因需要,否则不鼓励在SELECT中使用涉及多个数据帧的表达式或条件操作(如.When())。
SELECT()语句重新定义数据帧的模式,因此它自然支持包含或排除旧列和新列,以及重新定义预先存在的列。通过将所有此类操作集中在一条语句中,识别最终模式变得容易得多,从而使代码更具可读性。它还使代码更加简洁。
#错误的df。SELECT((F.coalesce(F.unix_Timestamp(';CLOSED_AT';,F.unix_Timestamp())-F.unix_Timestamp(';Created_at';))/86400)。别名(';Days_OPEN';))#Good DF。With Column(';Days_OPEN';,(F.coalesce(F.UNIX_TIMESTAMP(';CLOSED_AT';),F.UNIX_TIMESTAMP())-F.Unix_TIMESTAMP(';CREATED_AT';))/86400)。
如果SELECT语句中的列将保持未使用状态,请避免在其中包含这些列,而是选择一组显式的列-这是使用.drop()的首选替代方法,因为它可以保证模式突变不会导致意外的列膨胀数据帧。然而,并不是所有情况下都天生不鼓励删除列;例如,通常在联接之后删除列是合适的,因为联接通常会引入冗余列。
最后,不建议通过SELECT语句添加新列,而是建议对单个列使用.with Column()。在添加或操作数十或数百列时,出于性能原因,请使用单个.select()。
如果需要添加空列以满足模式,请始终使用F.litt(无)填充该列。切勿使用空字符串或表示空值的其他字符串(如NA)。
除了语义上正确之外,使用F.litt(None)的一个实际原因是保留使用isNull等实用工具的能力,而不必验证空字符串、NULL和';na';na等。
虽然注释可以提供对代码的有用见解,但重构代码以提高其可读性通常更有价值。代码本身应该是可读的。如果您使用注释一步一步地解释逻辑,则应该对其进行重构。
#BAD#为COLS:df=df中的c转换时间戳列COLS=[';start_date';,';delivery_date';]。With Column(c,f.from_unixtime(f.ol(C)/1000))。CAST(TimestampType()。
在上面的示例中,我们可以看到这些列被强制转换为时间戳。这条评论并没有增加多少价值。此外,如果更冗长的注释只提供代码中已经存在的信息,那么它可能仍然没有帮助。例如:
#BAD#遍历每一列,除以1000,因为MILIS并转换为COLS中的时间戳COLS=[';START_DATE';,';Delivery_Date';]:df=df。With Column(c,f.from_unixtime(f.ol(C)/1000))。CAST(TimestampType()。
与其留下只描述您编写的逻辑的注释,不如留下给出上下文的注释,解释您在编写代码时做出的决定的原因。这对于PySpark来说尤其重要,因为读者可以理解您的代码,但是通常没有关于馈送到PySpark转换的数据的上下文。小的逻辑片段可能需要数小时的数据挖掘才能理解正确的行为,在这种情况下,解释其基本原理的注释尤其有价值。
#Good#此数据集的使用者需要的是时间戳而不是日期,我们需要#将时间调整1000,因为原始数据源将这些数据存储为Millis#,尽管文档中说它实际上是一个日期。COLS=[';START_DATE';,';Delivery_Date';]COLS中的c:df=df。With Column(c,f.from_unixtime(f.ol(C)/1000))。CAST(TimestampType()。
强烈建议在所有情况下避免使用UDF,因为它们的性能明显不如原生PySpark。在大多数情况下,似乎需要UDF的逻辑可以重构为只使用本机PySpark函数。
使用连接时要小心!如果您执行左联接,并且右侧对一个键有多个匹配项,则该行将与匹配项一样多次重复。这被称为加入爆炸,可以极大地增加变换作业的输出。一定要仔细检查您的假设,以确保您要连接的密钥是唯一的,除非您希望进行乘法运算。
错误的联接是许多难以调试的问题的根源。即使您使用的是默认值(内部),也有一些内容可以帮助您明确指定方式:
#不良航班=航班。加入(飞机,飞机)#也是不好的航班=航班。加入(飞机,#39;飞机_id';,';内部#)#好的飞行=飞行。加入(Aircraft,';Aircraft_id';,HOW=';INTERNAL';)。
避免右连接。如果您要使用右联接,请切换数据帧的顺序,改为使用左联接。这更加直观,因为您正在对其执行操作的数据帧就是您以连接为中心的数据帧。
避免重命名所有列以避免冲突。取而代之的是,为整个数据帧指定一个别名,并使用该别名来选择在末尾需要哪些列。
列:航班=航班的#BAD列=[';START_TIME';,';END_TIME';,';IDLE_TIME';,';TOTAL_TIME';]。With ColumnRename(COL,';FISTING_+COL)停车=停车。WITH COLUN RENAMED(COL,';PARKING_##+COL)航班=航班。加入(Parking,on=';Flight_code';,How=##39;Left&39;)航班=航班。选择(F.ol(';FLASS_START_TIME';)。别名(';FIRST_START_TIME';),F.COL(';FIRTS_END_TIME';)。别名(';FIRST_END_TIME';),F.COL(';PARKING_TOTAL_TIME';)。别名(';CLIENT_PARKING_TOTAL_TIME';))#好的航班=航班。别名(航班)停车=停车。别名(停车)航班=航班。加入(Parking,on=';Flight_code';,How=##39;Left&39;)航班=航班。选择(F.ol(';flights.start_time';))。别名(';FIRST_START_TIME';),F.COL(';flights.end_Time';)。别名(';FIRST_END_TIME';),F.COL(';PARKIN.TOTAL_TIME';)。别名(';CLIENT_PARKING_TOTAL_TIME';)
如果两个列都不需要,最好在连接之前删除重叠的列;
如果您确实需要这两个名称,最好在加入之前将它们中的一个重命名;
在输出数据集之前,应始终解析不明确的列。运行完转换后,您将无法再区分它们。
关于联接的最后一句话是,不要使用.dropDuplates()或.Distant()作为拐杖。如果观察到意外的重复行,那么出现这些重复行几乎总是有一个根本原因。添加.dropDuplates()只会屏蔽此问题,并增加运行时的开销。
链接表达式是一个有争议的话题,但是,由于这是一个固执己见的指南,我们选择建议对链接的使用进行一些限制。有关此建议背后的基本原理的讨论,请参阅本节的结论。
避免将表达式链接到不同类型的多行表达式中,特别是当它们具有不同的行为或上下文时。例如,混合柱创建或加入选择和过滤。
#BAD df=(df.。选择(';a';,';b';,';c';,';键';)。过滤器(F.ol(';a';)==';真实性';)。With Column(';boverc&39;,F.ol(';b&39;)/F.coln(';c';))。加入(df2,';键';,HOW=#39;内部';)。加入(DF3,';KEY';,HOW=';LEFT';)。DROP(';c';)#Better(分步骤)#第一:我们选择并裁剪我们需要的数据#第二:我们创建需要的列#第三:连接其他数据帧df=(Df)。选择(';a';,';b';,';c';,';键';)。过滤器(F.ol(';a';)==';真实性';)df=df。使用Column(';boverc&39;,F.ol(';b&39;)/F.coln(';c';))df=(df。加入(df2,';键';,HOW=#39;内部';)。加入(DF3,';KEY';,HOW=';LEFT';)。丢弃(';c';)。
将每组表达式隔离到其自己的逻辑代码块中提高了易读性,并且更容易找到相关逻辑。例如,下面代码的读者可能会跳到他们看到数据帧被分配df=df的位置。
#BAD df=(df.。选择(';foo';,';bar';,';foobar';,';abc';)。过滤器(F.ol(';abc';)==123)。JOIN(ANTHER_TABLE,';SOME_FIELD';)#Better df=(df.。选择(';foo';,';bar';,';foobar';,';abc';)。过滤器(F.ol(';ABC';)==123)df=df。JOIN(ANTHER_TABLE,';SOME_FIELD';,HOW=';INTERNAL';)。
将表达式链接在一起是有正当理由的。这些通常表示原子逻辑步骤,并且是可以接受的。在同一块中应用最多包含数字链接表达式的规则,以保持代码的可读性。我们建议使用不超过5条语句链。
如果您发现您正在制造更长的链条,或者因为变量的大小而遇到麻烦,请考虑将逻辑提取到一个单独的函数中:
#Bad Customers_with_Shipping_Address=(Customers_with_Shipping_Address。选择(';a';,';b';,';c';,';键';)。过滤器(F.ol(';a';)==';真实性';)。With Column(';boverc&39;,F.ol(';b&39;)/F.coln(';c';))。Join(df2,';key';,How=';Internal';))#Also Bad Customers_with_Shipping_Address=Customers_with_Shipping_Address。选择(';a';,';b';,';c';,';key';)Customers_With_Shipping_Address=Customers_With_Shipping_Address。筛选器(F.ol(';a';)==';true&39;)Customers_With_Shipping_Address=Customers_With_Shipping_Address。With Column(';boverc&39;,F.ol(';b';)/F.ol(';c';))Customers_With_Shipping_Address=Customers_With_Shipping_Address。Join(df2,';key';,HOW=';Internal';)#Better def Join_Customers_with_Shipping_Address(Customers,df_to_Join):Customers=(Customers.。选择(';a';,';b';,';c';,';键';)。筛选器(F.ol(';a';)==';真实性';))Customers=Customers。With Column(';boverc&39;,F.ol(';b&39;)/F.ol(';c&39;))Customers=Customers。JOIN(DF_TO_JOIN,';KEY';,HOW=';INTERNAL&39;)退货客户。
3条以上语句链是分解成单独的、命名良好的函数的主要候选,因为它们已经封装了隔离的逻辑块。
PySpark代码和SQL代码的区别。链接是与大多数(如果不是全部)其他Python样式背道而驰的。您不是在Python中链接,而是赋值。
不鼓励创建大型单代码块。这些通常作为命名函数提取会更有意义。
它不需要全部或不需要,但最多五行链接就可以在实用性和易读性之间取得平衡。
如果您使用的是IDE,则可以更轻松地使用自动提取或执行代码移动(即:pycharm中的cmd+Shift+Up)。
可以链接表达式的原因是因为PySpark是从Spark开发的,Spark来自JVM语言。这意味着传输了一些设计模式,特别是链式模式。但是,Python不能优雅地支持多行表达式,唯一的选择是提供显式换行符,或者将表达式括在圆括号中。如果链发生在根节点,则只需要提供显式换行符。例如:
#需要`\`df=df。筛选器(F.ol(';event';)==';正在执行';)\。过滤器(F.ol(';HAS_TESTS';)==True)\。删除(';has_tests';)#链不在根节点中,因此它不需要`\`df=df。With Column(';安全';,F.When(F.ol(';has_test&39;)==True,';is safe';)。当(F.ol(';Has_Executed';)==True,';没有测试但运行';)。否则(';不安全';)。
为保持一致,请将整个表达式放入单个括号块中,并避免使用\:
#BAD DF=DF。筛选器(F.ol(';event';)==';正在执行';)\。过滤器(F.ol(';HAS_TESTS';)==True)\。Drop(';has_tests';)#Good df=(df.。筛选器(F.ol(';event';)==';正在执行';)。筛选器(F.ol(';HAS_TESTS';)==True)。丢弃(';HAS_TESTS';)。
要小心函数变得太大。一般来说,一个文件不应该超过250行,一个函数不应该超过70行。
尽量将您的代码保存在逻辑块中。例如,如果你有多行引用相同的东西,试着把它们放在一起。将它们分开会降低上下文和可读性。
测试您的代码!如果可以运行本地测试,请这样做,并确保您的新代码是c++。
.