Spark中可重现的分布式随机数生成

2020-06-11 15:24:01

在这篇文章中,我们将使用Spark以一种完全独立于数据分区方式的方式生成随机数。也就是说,给定一个固定的种子,我们的Spark程序将在所有硬件和设置上产生相同的结果。为此,我们引入了一个新的PRNG,并使用TestU01和PractRand测试套件来评估其质量。

当我们希望能够对不同值的非随机参数重复实验时,可重现的伪随机序列特别有用。例如,假设我们正在运行一项需要从数据集中随机抽取样本的实验。然后,我们希望能够针对完全相同的随机样本改变实验参数,同时也能够轻松地对其他样本运行实验。

大多数编程语言都附带一个或多个内置伪随机数生成器(PRNG)。如果你正在读这篇文章,你可能已经用过几次了。

PRNG的类型很多,而且大部分都是以同样的方式运作的。您设置一个开始状态(种子),然后对于每个调用,您将获得固定的位数(通常一次32位或64位,因此我们将其称为数字)。如果您知道种子,则可以重复生成相同的位序列。

‌这种PRNG的基本设计不适合在分布式计算中使用。如果我们想要并行生成随机序列,我们通常会为每个并行进程使用相同的PRNG和不同的种子。这种方法的问题是,当并行度级别改变时,产生的序列也会改变。

‌Spark SQL提供了rand()函数,该函数就存在这个问题。为每个分区创建PRNG的实例,并由全局种子加上分区索引作为种子(参见eval和codegen)。如果您的Spark程序在不同的群集拓扑上、以不同的模式、使用不同的设置执行,或者如果Spark的新版本改变了默认值,则分区可能会改变。

为了解决上述问题,我们希望使用Spark为DataSet中的行生成一个随机数序列,以便该方法与数据如何分区以及Spark作业在其上运行的底层堆栈完全无关。

该算法基本上枚举行,然后将哈希函数应用于行索引。散列值是伪随机数。

我怀疑这是一个新的想法,但当我研究这个问题时,我没有发现任何使用它的例子。因此,在进入细节之前,我想分享一下我使用这种方法背后的直觉。

‌线性同余生成族(LCG)与教科书上的多重mod素数散列函数有相似之处(您知道,h(X)=ax+bm od  p h(X)=ax+b\mod p h(X)=ax+bm o d p,对于随机选择的a a,b b和素数p p,h(X)=ax+b m o d p)。基本上,对LCG的状态进行散列以生成下一个数字,然后该数字就是新状态。乘法模素数期望将输入数字序列的散列值分散在[0,p−1][0,p-1][0,p-1][0,p−1]中,但是由于其线性,将数字从1111散列到nnn将得到一个均匀间隔的数字序列(Modp)。这不符合随机数序列的要求。然而,众所周知,广泛使用的散列函数(如Murmu3和xxHash)具有良好的雪崩特性,即,输入的小变化会导致输出和色散的大变化,尽管还没有证明它们的形式性质。

导入org.apache.partk.sql.{DataFrame,Row}导入org.apache.spark.sql.catalyst.expressions.XXH64import org.apache.partk.sql.type.{LongType.StructField,StructType}def addRNGColumn(df:DataFrame,colName:String,Seed:Long=42L):DataFrame={val rddWithIndex=df.rdd.zipWithIndex()df.spokSession.sqlContext。Row.fromSeq(row.toSeq:+XXH64.hashLong(index,Seed))},StructType(df.schema.field:+StructField(colName,LongType,false))}。

该函数接受一个DataFrame并生成一个新的DataFrame,其中包含一个包含随机数的附加列。我们在RDDS上使用zipWithIndex()函数为DataFrame的行生成从1111到nnn的索引。请注意,调用此函数将触发一个操作(请参见代码)。

对于散列,我们使用xxHash,因为它是带有Spark的64位版本,并且正如我们将在下一节中看到的那样,对于这项任务执行得很好。

众所周知,zipWithIndex()速度很慢-尤其是在与DataFrames一起使用时,转换为RDD和返回需要进一步序列化/反序列化行。

用于添加连续索引的Spark SQL函数不存在。这很可能是因为向分布式数据集添加连续索引本身就需要两次遍历数据:一次用于计算偏移本地索引所需的分区大小,另一次用于添加索引。这与Spark SQL、查询优化器等不能很好地配合使用。

zipWithIndex()正好采用上述偏移量方法。同样的想法可以基于Spark SQL函数monotally_increating_id()轻松实现。这对于DataFrames来说肯定会更快(我试过了),但附带了其他警告,我不想在这篇文章中提及。

我们声称有一种算法可以解决这个问题,但还有一个重要的问题有待回答:使用xxHash散列从1111到nnn的数字真的能得到一个好的随机序列吗?

在本节中,我们将讨论如何评估PRNG,并在我们的“xxHash the number from 1111 to nn n”-PRNG上运行一些实验。

首先,假设随机比特序列是这样的序列,其中每个比特以相等的概率取值0 0 0或11,而与其他比特无关。关于随机性还有其他定义,但这一定义既实用又切中要害。

然而,请注意,如果根据定义给我们一种确定序列是否随机的方法,则任何PRNG都将无法通过此测试。对于任意固定的t,具有b位状态的PRNG只能生成2b 2^b 2b长度为t的不同序列,对于任意t>;b>;b t>;提示冯·诺依曼参考。

约翰·冯·诺伊曼(John Von Neumann)的必备照片,他以提出第一个PRNG而闻名,他说:“任何考虑产生随机数字的算术方法的人,当然都是处于罪恶状态。”这张照片不是我拍的,所以它不是我的。

我们不是试图证明一个序列是随机的,而是可以确定一个序列是否在统计上与真正的随机序列是不可区分的。我们通过计算一些测试统计量来做到这一点,我们知道随机序列的分布(或其近似值)。

例如,满足我们定义的长度为nn的序列中零(或1)的个数服从正态分布,平均值为1 2n\FRAC 1 2 n 2 1​n,方差为1 4 n\FRAC 1 4 n 4 1​n。我们可以计算由我们的PRNG生成的序列的平均值,并将其提交给统计检验,以确定它是否与预期分布显著不同。

显然,此测试不会帮助我们拒绝像0n/2 1n/2 0^{n/2}1^{n/2}0 n/2 1n/2这样的序列,因此需要更多的测试。

测试的另一个示例是计算序列的Lempel-Ziv解析。Lempel-Ziv解析在当今使用的许多压缩方案中都是完整的,随机性和可压缩性之间的反向关系非常明显(随机数据不能很好地压缩-另请参阅Kolmogorov复杂性)。经验证明,随机序列的Lempel-Ziv分析中的短语数服从正态分布,均值为n/loglogn/\lognn/logn,方差为0.266 n/(⁡⁡n)3 0.266 n/(\logn)^30。2 6 6n/(Log N)3.。

同样,一些序列不能基于该测试被宣布为非随机序列,因为在Lempel-Ziv方案中存在压缩不好的非随机序列。

文献中已经提出了具有不同强度的有限数量的测试(请参阅参考资料以了解更多信息),并且其中许多已经在两个用于评估PRNG的软件库中实现:TestU01和PractRand。一个好的PRNG应该通过这些测试。

这篇博客文章很好地描述了如何下载、构建和开始使用TestU01。

TestU01设计为使用PRNG为每次调用生成32位。我们坚持使用64位版本的xxHash,因此在下面的实现中,我们将哈希值分为高32位和低32位。PRNG的状态由计数器和指示我们应该返回值的高32位还是低32位的二进制值组成。我们在每隔一秒的调用中递增计数器。

#include<;stdbool.h>;#include";TestU01.h";#include";xxhash.h";static unsign long y=0U;static uint64_t hash;static bool LOWER=TRUE;UNSIGNED INT xxhash(Void){IF(LOWER){HASH=XXH64(&;y,8,42);LOWER=FALSE;RETURN(INT)哈希&;0。}}int main(){unif01_Gen*gen=unif01_CreateExternGenBits(";xxhash";,xxhash);bcell_SmallCrush(Gen);bcell_Crush(Gen);bcell_bigcrush(Gen);unif01_DeleteExternGenBits(Gen);return 0;}。

TestU01由三个测试集合组成:SmallCrush、Crush和Big Crush。上面的程序运行这三个程序。在我的笔记本电脑上花了大约12个小时。下面显示所有三个测试都通过了,没有任何注释。

=SmallCrush的汇总结果=版本:TestU01 1.2.3生成器:xxhash统计数:15 CPU总时间:00:00:05.56所有测试均已通过=版本:TestU01 1.2.3生成器:xxhash统计数:144总CPU时间:00:25:07.50所有测试均已通过=

PractRand允许您从STDIN读取要测试的序列。下面的C程序实现我们的算法并将散列值写入STDOUT。

#include<;stdio.h>;#include";xxhash.h";static unsign long y=1u;unsign long xxhash(Void){uint64_t hash=XXH64(&;y,8,42);y+=1;return hash;}int main(){while(1){uint64_t h=xxhash();fwrite((void*)&;h,si.。

这篇博客文章描述了如何设置PractRand。我们运行上面的程序并通过管道将输出传递给PractRand。

PractRand将对不断增大的序列运行其测试。我让它在我的笔记本电脑上运行了大约30个小时,最后测试了一个4TB的序列。这是一个由5个⋅1 0 11 5\cot10^{11}5⋅1 0 1 1 64位数字组成的序列,对于我的用例来说已经足够了。运行的输出如下所示。

RNG_TEST Using PractRand Version 0.93RNG=rng_stdin64,Seed=0xdf55786dtest set=Normal,Folding=Standard(64 Bit)RNG=RNG_stdin64,Seed=0xdf55786dlength=256 MB(2^28字节),时间=3.5秒测试名称原始处理评估品牌(12):384(1)R=+14.7 p~=1.8e-5异常.和158个测试结果(。时间=7.5秒测试名称原始处理评估品牌(12):384(1)R=+14.7 p~=1.8e-5异常.和168个无异常的测试结果rng=rng_stdin64,种子=0xdf55786dlength=1 GB(2^30字节),时间=14.9秒测试名称原始处理评估品牌(12):384(1)R=+14.7 p=1.8e-5异常.和179。时间=56.5秒201个测试结果中无异常rng=rng_stdin64,种子=0xdf55786dlength=8 GB(2^33字节),时间=113秒212个测试结果中无异常rng=rng_stdin64,种子=0xdf55786dlength=16 GB(2^34字节),时间=226秒223个测试结果中无异常rng=rng_stdin。时间=1783秒255个测试结果中无异常rng=rng_stdin64,种子=0xdf55786dlength=256 GB(2^38字节),时间=3488秒265个测试结果中无异常rng=rng_stdin64,种子=0xdf55786dlength=512 GB(2^39字节),时间=6948秒276个测试结果中无异常。t)R=-7.1p=1-1.3e-3异常.和296个无异常的测试结果rng=rng_stdin64,种子=0xdf55786dlength=4TB(2^42字节),时间=56881秒测试名称原始处理评估[Low1/64]bcfn(2+0,13-0,T)R=+8.2p=6.2e-4异常.和307个无异常的测试结果。

然而,对于较短的序列,PractRand报告说Brank测试结果是“不寻常的”。PractRand的测试文档说,“Brank上的[f]病症表明,在某种程度上,PRNG的输出,或者至少部分,是非常线性的,可以严格地通过对以前的PRNG输出的位进行异或来产生。”情况可能是,对于带有许多零的小输入值,xxHash的雪崩并不像我们希望的那样好。

对于长序列,BCFN测试也报告了同样的情况。文档称“列出的第一个参数为低的BCFN故障通常与DC6参数化故障(这是混合/雪崩不足的小型混乱PRNG的典型故障)的含义相似。”同样,我们散列的数字序列按1递增,这一事实可能会反映在结果序列中。

我们的算法通过了TestU01和PractRand中的所有测试。一些测试被PractRand标记为“异常”,这可能表明算法中存在弱点的症状。

为了进行比较,Java/Scala中使用的LCG和Spark使用的XorShift PRNG在TestU01(请参见本文的结果)和PractRand(我自己运行此测试)中都没有通过不可忽略数量的测试。

我们的算法是否足够强大取决于用例。对于将数据随机划分为用于机器学习的训练和验证子集之类的事情,我会毫不犹豫地使用该算法。

这篇文章使用的标题图像是从我们的算法生成的随机比特中提取的像素噪声。‌它除了可视化随机性之外没有任何作用。通常,不应该使用可视化而不是统计测试来确定序列是否是随机的。但是我想要这个帖子的标题图片。

使用以下Python脚本读取文件并将其转换为画布上的像素。

从tkinter import*master=TK()canvas_width=1400canvas_width=600w=canvas(master,width=canvas_width,Height=canvas_high)w.pack()with open(";Random_Binary";,";Rb";)as f:byte=f.read(1)bit=0 for x in range(0,canvas_width,2):for y in range(0,canvas_width,2)。7:位=0字节=f.read(1)r=(int.from_bytes(byte,';Big';)>;>;bit)&;1位+=1,如果r==1:w.create_Rectangle((x,y),(x+2,y+2),Outline=";";,Fill=";#616161";)ELSE:w.create_Rectangle((x,y),(x+2,y+2),Outline=";";,Fill=";#cccccc";)mainloop()。

整数和字符串的高速散列(Mikkel Thorup)。讲课内容包括描述模-模素数哈希函数族等内容。

“什么是随机序列?”(塞尔吉奥·B·沃尔坎)。关于随机性定义的讨论。

“用于密码应用的随机和伪随机数发生器的统计测试套件”(A.Ruhkin等人)。技术报告,详细说明评估PRNG的统计测试。

“TestU01:随机数生成器经验测试的C库”(Pierre L‘厄瓜多尔和Richard Simard)。TestU01中使用的统计测试说明和多个PRNG的测试结果。