以NVMe存储的速度限制加载CSV文件

2020-10-11 15:53:08

无可奉告,我计划写一系列文章来讨论一些简单但不令人尴尬的并行算法。这些将具有实际用途,并且很可能位于多核CPU或CUDA GPU上。今天的是第一个讨论CSV文件解析器的并行算法实现的文章。

在过去,当我们的旋转磁盘速度达到100MiB/s的峰值时,我们只有两个选择:要么我们根本不关心文件加载时间,将其视为生命成本,要么我们有一种与底层内存表示纠缠在一起的文件格式,以挤出数据加载的最后一点性能。

那个世界早已一去不复返了。我目前的工作站使用软件RAID0(Mdadm),使用两个1TB的Samsung 970EVO NVMe存储进行数据存储。此设置通常提供2GiB/s左右的读/写速度(您可以在此处阅读有关我的工作站的更多信息)。

CSV文件格式完全属于这两种格式中的前一类。交换CSV文件的人最关心的是互操作性。真正关心速度和效率的人会转而使用其他格式,如Apache Parquet或Apache Arrow。但是CSV文件仍然存在。到目前为止,它仍然是卡格尔比赛中最常见的形式。

存在许多CSV文件解析器的实现。其中,csv2和Vince的CSV解析器将是两个常见的实现。这还没有考虑到标准实现,比如来自Python的实现。

这些实现中的大多数都避免使用多核。这是一个合理的选择。在许多可能的场景中,您将加载许多小的CSV文件,并且这些文件可以在任务级别并行完成。这是一个不错的选择,直到最近,当我不得不处理一些许多Gibs CSV文件。即使是从tmpfs加载这些文件也可能需要几秒钟的时间。这表明CPU解析时存在性能瓶颈。

克服CPU解析瓶颈的最明显的方法是充分利用ThreadRipper 3970x的32个内核。如果我们可以可靠地按行分解解析,这可能会简单得令人尴尬。不幸的是,RFC4180阻止我们简单地使用换行符作为行分隔符。特别地,当单元格内容被引用时,它可以包含换行符,并且这些换行符不会被识别为行分隔符。

Paratext首先实现了并行CSV解析的两遍方法。随后,在针对大数据分析的推测性分布式CSV数据解析中对其进行了记录。除了两遍方法之外,本文还讨论了一种更复杂的、适用于高延迟分布式环境的投机方法。

在过去的几天里,我实现了两遍方法的一个变体,它可以最大限度地利用NVMe存储带宽。这是一个有趣的旅程,因为我有很长一段时间没有用C语言编写任何严肃的解析器了。

CSV文件表示具有行和列的简单表格数据。因此,要解析CSV文件,就意味着要将文本文件划分为可以用行和列索引唯一标识的单元。

在C++中,这可以使用string_view以零复制方式完成。在C中,每个字符串都必须以NULL结尾。因此,您需要操作原始缓冲区,或者将其复制过来。我选了后者。

为了简化解析器实现,假设给了我们一个内存块,它是CSV文件的内容。这可以在C中使用以下命令来完成:

FILE*FILE=fopen(";文件路径";,";r";);const int fd=fileno(FILE);fSeek(FILE,0,SEEK_END);const size_t FILE_SIZE=ftell(FILE);fSeek(FILE,0,SEEK_SET);void*const data=mmap((Caddr_T)0,file_size,prot_read,map_share,fd,0);

我们将使用OpenMP的并行for循环来实现核心算法。如今,Clang已经对OpenMP提供了相当全面的支持。但尽管如此,我们将只使用OpenMP提供的非常微不足道的部分。

要并行解析CSV文件,我们首先需要将其分解成块。我们可以将文件分成1MiB字节序列作为我们的块。在每个块中,我们可以开始找到正确的换行符。

RFC 4180中的双引号可以引用换行符,这使我们更难找到正确的换行符。但与此同时,RFC定义了通过背靠背使用两个双引号来转义双引号的方法。这样,如果我们从文件的开头算起双引号,那么到目前为止,如果我们遇到奇数个双引号,我们就知道换行符在带引号的单元格中。如果在换行之前遇到偶数个双引号,我们就知道这是新行的开始。

我们可以从每个块的开头开始计算双引号。但是,因为我们不知道此块之前的双引号是奇数还是偶数,所以无法区分换行符是新行的起点,还是仅在带引号的单元格内。不过,我们所知道的是,块内奇数个双引号后的换行符是同一类换行符。在这一点上,我们根本不知道那是哪一类。我们可以把这两个班分开数。

#定义CSV_QUOTE_BR(c,n)\do{\if(c##n==QUOTE)\+QUOTES;\ELSE IF(c##n==';\n';){\++COUNT[QUOTES&;1];\IF(STARTER[QUOTES&;1]==-1)\STARTER[QUOTES&;1]=(Int)(p-p_start)+n;\}\}WHILE(0)PARALLEL_FOR(i,aligned_chunks){const uint64_t*pd=(const uint64_t*)(data+i*chunk_size);const char*const p_start=(const char*)pd;const uint64_t*const pd_end=pd+chunk_size/sizeof(Uint64_T);int引号=0;int starter[2]={-1,-1};Int count[2]={0,0};for(;pd<;pd_end;pd++){//批量加载8字节。Const char*const p=(const char*)pd;char c0,c1,c2,c3,c4,c5,c6,c7;c0=p[0],c1=p[1],c2=p[2],c3=p[3],c4=p[4],c5=p[5],c6=p[6],c7=p[7];csv_QUOTE_BR(c,0);CSV_QUOTE_BR(c,1);CSV_QUOTE_BR(c,2);CSV_QUOTE_BR(c,3);CSV_QUOTE_BR(c,4);CSV_QUOTE_BR(c,5);CSV_QUOTE_BR(c,6);CSV_QUOTE_BR(c,7);}crlf[i]。EVEN=COUNT[0];crlf[i]。奇数=计数[1];crlf[i]。EVEN_STARTER=STARTER[0];crlf[i]。ODD_STARTER=STARTER[1];crlf[i]。QUOTES=QUOTES;}PARALLEL_END for。

在第一次遍历之后,我们可以按顺序检查每个块的统计信息,以计算给定CSV文件中的行数和列数。

在偶数个双引号之后的第一个块中的换行符将是第一个块中的行数。因为我们知道第一个块中双引号的数量,所以我们现在知道第二个块中的哪类换行符是行的起始点。这些换行符的总和是行数。

对于列数,我们可以遍历第一行并计算双引号之外的列分隔符的数量。

第二遍将复制块,空终止每个单元格,如果可能,转义双引号。我们可以在分配给第一次传递的块的基础上利用我们的逻辑。但是,与第一遍不同的是,解析逻辑并不是从每个块的最开始的。它从该块中一行的第一个起始点开始,到下一个块中一行的第一个起始点结束。

事实证明,第二遍占用了我们的大部分解析时间,原因很简单,因为它完成了此遍中的大部分字符串操作和复制。

第一遍和第二遍都展开为8字节批处理解析,而不是逐字节解析。对于第二遍,我们做了一些改动,以快速检查是否有需要处理的分隔符、双引号或换行符,或者我们可以简单地将其复制过来。

Const uint64_t delim_ask=(Uint64_T)0x0101010101010101*(Uint64_T)delim;const uint64_t delim_v=v^delim_ask;if((delim_v-(Uint64_T)0x010101010101)&;((~delim_v)&;(Uint64_T)0x808080808080)){//有分隔符。}。

工作站使用AMD ThreadRipper 3970x,内存为128GiB,运行频率为2666 MHz。它有2个三星1TB 970 EVO和基于mdadm的RAID0。

副文本在过去的两年里一直没有得到积极的发展。我在修补paratext/python/paratext/core.py之后,通过删除plitunc方法构建了它。简单的基准测试Python脚本如下所示:

我选择DOHUI NOH数据集,它包含一个496,782行和3213列的16GiB CSV文件。

首先,为了测试原始性能,我将下载的文件移到/tmp,该文件作为内存中的tmpfs挂载。

如果不考虑文件IO,以上性能将是您所能获得的最佳性能。有了上述970EVO RAID0,我们可以针对实际磁盘IO运行另一轮基准测试。请注意,对于此轮基准测试,我们需要在每次运行之前使用:sudo bash-c";echo 3>;/proc/sys/vm/drop_caches";删除系统文件缓存。

CSv2是单线程的。只有一个线程,我们的实现还合理吗?我将PARALLEL_FOR移回串行FOR-LOOP,并在tmpfs上再次运行实验。

它大约比csv2慢2倍。这是意料之中的,因为我们需要空终止字符串并将它们复制到新的缓冲区。

如果不做模糊处理,您就不能真正用C语言发布解析器。幸运的是,在过去的几年里,用C语言编写fuzz程序非常容易,这一次,我选择了LLVM的libFuzzer,并在此过程中打开了AddressSaniizer。

#include<;ccv.h>;#include<;NNC/ccv_nnc.h>;#include<;NNC/CCV_NNC_easy.h>;#include<;stdint.h>;#include<;stdio.h>;#include<;stdlib.h>;#include<;string.h>;int LVMFuzzerInitialize(int*argc,char*argv){CCV_NNC_init();return 0;}int LLVMFuzzerTestOneInput(const uint8_t*data,size_t size){if(size==0)return 0;int column_size=0;ccv_CNNP_dataframe_t*dataframe=CCV_CNNP_dataframe_from_CSV_NEW((void*)data,CCV_CNNP_DATAFRAME_CSV_MEMORY,SIZE,';';,';";';,0,&;column_size);If(Dataframe){if(column_size>;0)//迭代第一列。{CCV_CNNP_DATAFRAME_ITER_t*const ITER=CCV_CNNP_DATAFRAME_ITER_NEW(DATAFRAME,COLUMN_ID_LIST(0));CONST INT ROW_COUNT=CCV_CNNP_DATAFRAME_ROW_COUNT(DATAFRAME);int i;size_t total=0;for(i=0;i<;row_count;i++){void*data=0;CCV_CNNP_DataFrame_ITER_NEXT(ITER,&;data,1,0);TOTAL+=strlen(数据);}CCV_CNNP_DATAFRAME_ITER_FREE(ITER);}CCV_CNNP_DATAFRAME_FREE(数据帧);}返回0;}。

./csv_fuzz-run=10000000-max_len=2097152大约需要2个小时才能完成。在最后一次成功运行之前,我修复了一些问题。

随着多核系统变得越来越普遍,我们应该预计会有更多的程序在传统上认为是单核的级别(如解析)使用这些核。也不一定要很难!有了良好的OpenMP支持,在算法端进行一些简单的调整,我们就可以很容易地利用改进的硬件来完成更多的事情。

接下来,我很高兴能与大家分享我在现代GPU上的并行算法之旅。敬请关注!