AresDB:优步GPU驱动的开源实时分析引擎

2020-09-14 02:47:09

在优步,实时分析使我们能够获得业务洞察力和运营效率,使我们能够做出数据驱动的决策,以改善优步平台上的体验。例如,我们的运营团队依靠数据来监控市场健康状况,并发现我们平台上的潜在问题;由机器学习模型支持的软件利用数据来预测乘客供应和司机需求;数据科学家使用数据来改进机器学习模型,以实现更好的预测。

过去,我们利用许多第三方数据库解决方案进行实时分析,但没有一个解决方案能够同时满足我们所有的功能、可扩展性、性能、成本和运营要求。

AresDB于2018年11月发布,是一个开源的实时分析引擎,它利用非传统的电源-图形处理单元(GPU),使我们的分析能够大规模增长。作为一种新兴的实时分析工具,GPU技术在过去几年中取得了显著的进步,使其非常适合实时计算和并行数据处理。

在接下来的几节中,我们将介绍AresDB的设计,以及这一功能强大的实时分析解决方案如何让我们更高效地统一、简化和改进Uber的实时分析数据库解决方案。阅读完本文后,我们希望您在自己的项目中尝试使用AresDB,并发现该工具对您自己的分析需求也很有用!

数据分析对优步业务的成功至关重要。除其他功能外,这些分析还用于:

根据我们收集的聚合指标做出自动决策(如行程定价和欺诈检测)。

仪表板和决策系统利用实时分析系统,以高QPS和低延迟对相对较小但价值很高的数据子集(具有最大的数据新鲜度)进行类似的查询。

在优步,实时分析解决的最常见问题是如何计算时间序列聚合,这些计算让我们深入了解用户体验,从而相应地改善我们的服务。通过这些计算,我们可以在任意过滤(或有时连接)的数据上按特定维度(如天、小时、城市ID和旅行状态)请求某个时间范围内的指标。多年来,优步以不同的方式部署了多种解决方案来解决这一问题。

我们用来解决这类问题的一些第三方解决方案包括:

Apache Pinot是一个用Java编写的开源分布式分析数据库,可用于大规模数据分析。Pinot在内部采用lambda架构查询列存储中的批量和实时数据,使用倒排位图索引进行过滤,并依赖星形树进行聚合结果缓存。但是,它不支持基于键的重复数据删除、向上插入、连接和地理空间过滤等高级查询功能。此外,作为基于JVM的数据库,在Pinot上执行查询在内存使用方面的成本较高。

优步使用ElasticSearch来满足各种流媒体分析需求。它构建在Apache Lucene之上,用于存储文档和倒排索引的全文关键字搜索。它已被广泛采用和扩展,也支持聚合。倒排索引支持过滤,但没有针对基于时间范围的存储和过滤进行优化。它将记录存储为JSON文档,增加了额外的存储和查询访问开销。与Pinot一样,Elasticsearch是基于JVM的数据库,因此不支持联接,其查询执行需要较高的内存成本。

虽然这些技术有其自身的优势,但它们缺乏我们的用例所需的关键功能。我们需要一个统一、简化和优化的解决方案,并且开箱即用(或者更确切地说,在GPU内部)来实现解决方案。

为了以高帧率渲染图像的真实视图,GPU高速并行处理大量几何图形和像素。虽然处理单元的时钟频率在过去几年里一直处于平稳状态,但芯片上的晶体管数量只会根据摩尔定律增加。因此,以千兆次/秒(GFLOP/s)为单位衡量的GPU计算速度正在迅速提高。下面的图1描述了NVIDIA GPU和英特尔CPU多年来的理论GFLOP/s趋势:

在设计我们的实时分析查询引擎时,集成GPU处理是很自然的选择。在优步,典型的实时分析查询处理几天的数据,其中有数百万到数十亿条记录,然后在很短的时间内对它们进行过滤和聚合。此计算任务非常适合通用GPU的并行处理模型,因为它们:

提供更高的计算吞吐量(GFLOPS/s),使其非常适合可以并行的繁重计算任务(单位数据)。

与中央处理器(CPU)相比,提供更高的计算到存储(ALU到GPU全局内存)数据访问吞吐量(而不是延迟),使其成为处理需要大量数据的I/O(内存)受限并行任务的理想选择。

一旦我们决定使用基于GPU的分析数据库,我们就评估了几个利用GPU满足我们需求的现有分析解决方案:

Kinetica是一款基于GPU的分析引擎,最初于2009年面向美国军事和情报应用程序进行营销。虽然它展示了GPU技术在分析方面的巨大潜力,但我们发现我们的用例缺少许多关键功能,包括架构更改、部分插入或更新、数据压缩、列级内存/磁盘保留配置以及通过地理空间关系连接。

OmniSci,一个开源的、基于SQL的查询引擎,看起来是一个很有前途的选择,但当我们评估该产品时,我们意识到它没有Uber用例的关键功能,比如重复数据删除。虽然OMiniSci在2017年开源了他们的项目,但在对他们基于C++的解决方案进行了一些分析后,我们得出的结论是,无论是回馈还是派生他们的代码库都是不可行的。

基于GPU的实时分析引擎,包括GPUQP、CoGaDB、GPUDB、Ocelot、OmniDB和Virgian,经常被学术机构使用。然而,考虑到它们的学术目的,这些解决方案侧重于开发算法和设计概念证明,而不是处理现实世界的生产场景。出于这个原因,我们根据我们的范围和规模对它们进行了折扣。

总体而言,这些引擎展示了使用GPU技术进行数据处理的巨大优势和潜力,它们激励我们构建自己的基于GPU的实时分析解决方案,以满足Uber的需求。考虑到这些概念,我们构建并开源了AresDB。

在较高级别上,AresDB将其大部分数据存储在主机内存(连接到CPU的RAM)中,使用CPU处理数据摄取,并通过磁盘进行数据恢复。在查询时,AresDB将数据从主机内存传输到GPU内存,以便在GPU上进行并行处理。如下图2所示,AresDB由内存存储、元数据存储和磁盘存储组成:

与大多数关系数据库管理系统(RDBMS)不同,AresDB中没有数据库或模式作用域。所有表都属于同一AresDB集群/实例中的同一作用域,用户可以直接引用。用户将其数据存储为事实数据表和维度表。

事实表存储无限的时间序列事件流。用户使用事实表来存储实时发生的事件/事实,每个事件都与一个事件时间相关联,该表通常按事件时间查询。事实表存储的信息类型的一个示例是Trips,其中每个Trip是一个事件,并且Trip请求时间通常被指定为事件时间。如果一个事件有多个时间戳与其关联,则只有一个时间戳被指定为事实数据表中显示的事件的时间。

维度表存储实体(包括城市、客户和驱动程序)的当前属性。例如,用户可以将城市信息(如城市名称、时区和国家/地区)存储在维度表中。与随时间无限增长的事实表相比,维度表始终受大小限制(例如,对于Uber,Cities表受世界上实际城市数量的限制)。维度表不需要特殊的时间列。

字符串将自动转换为枚举。SmallEnum可以保存基数高达256的字符串类型。

使用AresDB,字符串在进入数据库之前会自动转换为枚举类型(枚举),以提高存储和查询效率。这允许进行区分大小写的相等检查,但不支持连接、子字符串、GLOB和正则表达式匹配等高级操作。我们打算在将来添加全字符串支持。

基于列的存储,可压缩以提高存储效率(以存储数据的字节为单位,减少内存使用量)和查询效率(查询期间从CPU内存到GPU内存的数据传输减少)。

使用主键重复数据删除功能进行实时更新,可在数秒内实现高数据准确性和近乎实时的数据新鲜度。

GPU支持的查询处理,可实现由GPU支持的高度并行化数据处理,提供低查询延迟(亚秒级到秒级)。

AresDB以列式格式存储所有数据。每列的值存储为列值向量。每列中的值的有效性/空性存储在单独的空向量中,每个值的有效性由一位表示。

AresDB将未压缩和未排序的列数据(活动矢量)存储在实时存储中。实时存储中的数据记录被分区为配置容量的(实时)批次。新批次在摄取时创建,而旧批次在记录存档后清除。主键索引用于定位重复数据删除和更新的记录。下面的图3演示了如何组织实时记录并使用主键值来定位它们:

批次中每列的值存储为列向量。每个值向量中的值的有效性/空度被存储为单独的空向量,每个值的有效性由一个比特表示。在下面的图4中,我们为City_id列提供了一个具有五个值的示例:

AresDB还通过事实表将成熟、排序和压缩的列数据(归档矢量)存储在归档存储中。存档存储中的记录也被分区为批处理。与活动批不同,归档批包含特定世界时协调(UTC)日的记录。存档批次使用Unix纪元以来的天数作为其批次ID。

记录将根据用户配置的列排序顺序保持排序。如下面的图5所示,我们首先按City_id列排序,然后按Status列排序:

通过提早对低基数列进行排序,最大限度地提高压缩效果。最大化压缩提高了存储效率(存储数据所需的字节数更少)和查询效率(从CPU传输到GPU内存的字节数更少)。

允许对常见等值过滤器(如City_id=12)进行廉价的基于范围的预过滤。预过滤使我们能够最大限度地减少需要从CPU内存传输到GPU内存的字节数,从而最大限度地提高查询效率。

仅当列以用户配置的排序顺序显示时,才会压缩该列。我们不尝试压缩高基数列,因为通过压缩高基数列节省的存储量可以忽略不计。

排序后,使用游程长度编码的变体压缩每个限定列的数据。除了值向量和空向量之外,我们还引入计数向量来表示相同值的重复。

客户端通过张贴upsert批处理来通过摄取HTTP API摄取数据。Upsert批处理是一种自定义的序列化二进制格式,它最大限度地减少了空间开销,同时仍然保持数据可随机访问。

当AresDB接收到要摄取的upsert批时,它首先将upsert批写入重做日志以进行恢复。在将upsert批次附加到重做日志的末尾之后,AresDB随后识别并跳过事实数据表上的较晚记录,以便将其摄取到实时存储中。如果记录的事件时间早于存档的截止事件时间,则该记录被视为“延迟”。对于不被认为是“延迟”的记录,AresDB使用主键索引来定位实时存储中应该应用它们的批次。如下图6所示,全新的记录(基于主键值以前从未见过)将应用于空白空间,而现有记录将直接更新:

在接收时,记录要么附加/更新到实时存储中,要么附加到等待放置在存档存储中的回填队列中。

我们定期对实时存储记录运行计划流程(称为归档),以将新记录(以前从未归档的记录)合并到归档存储中。存档将只处理实时存储中事件时间落在旧截止时间(距离上次存档过程的截止时间)和新截止时间(基于表架构中的存档延迟设置的新截止时间)范围内的记录。

记录的事件时间将用于确定当我们将归档数据批处理为日常批处理时,记录应该合并到哪个归档批次中。归档在合并期间不需要主键值索引重复数据消除,因为只会归档旧截止范围和新截止范围之间的记录。

下面的图7描述了基于给定记录的事件时间的时间线:

在此方案中,归档间隔是两次归档运行之间的时间,而归档延迟是事件时间之后但事件可以归档之前的持续时间。两者都在AresDB的表模式配置中定义。

如上图7所示,事实表的旧记录(事件时间早于存档截止时间)被附加到回填队列,并最终由回填过程处理。一旦回填队列达到其阈值,该过程也由回填队列的时间或大小触发。与实时存储的摄取相比,回填是异步的,并且在CPU和内存资源方面相对更昂贵。回填在以下场景中使用:

与归档不同,回填是幂等的,需要基于主键值的重复数据删除。正在回填的数据最终将对查询可见。

回填队列以预先配置的大小维护在内存中,并且在大规模回填加载期间,在队列被回填运行清除之前,客户端将被阻止继续进行。

在目前的实现中,用户将需要使用优步创建的Ares查询语言(AQL)来对AresDB运行查询。AQL是一种有效的时间序列分析查询语言,与其他类似SQL的语言一样,它不遵循SELECT FROM WHERE GROUP BY的标准SQL语法。相反,AQL是在结构化字段中指定的,可以与JSON、YAML和GO对象一起携带。例如,代替SELECT COUNT(*)FROM TRIPS GROUP BY CITY_ID WHERE STATUS=‘COMPLETED’和REQUEST_AT>;=1512000000,JSON中等效的aql写为:

{*“table”:“Trips”,a“Dimension”:[{“sqlExpression”:“City_id”}],“Measures”:[{“sqlExpression”:“count(*)”}],*“rowFilters”:[“status=‘Completed';“*],*”timeFilter“:{**”Column“:”Request_at“,*”From“:”2天前“*}}。

在JSON格式中,AQL为仪表板和决策系统开发人员提供了比SQL更好的编程查询体验,因为它允许他们使用代码轻松地组合和操作查询,而无需担心SQL注入等问题。它作为典型体系结构上的通用查询格式,从Web浏览器、前端服务器和后端服务器一直到数据库(AresDB)。此外,AQL为时间过滤和区块化提供了方便的语法糖,并具有本地时区支持。该语言还支持隐式子查询等功能,以避免常见的查询错误,并使后端开发人员可以轻松地进行查询分析和重写。

尽管AQL提供了各种好处,但我们充分意识到,大多数工程师更熟悉SQL。公开用于查询的SQL接口是我们将研究的增强AresDB用户体验的后续步骤之一。

AQL查询被编译成内部查询上下文。过滤器、维度和度量中的表达式被解析为抽象语法树(AST),以便稍后通过GPU进行处理。

AresDB利用预过滤器在将存档数据发送到GPU进行并行处理之前对其进行廉价过滤。由于归档数据根据配置的列顺序进行排序,因此一些过滤器可能能够通过应用二进制搜索来定位相应的匹配范围来利用该排序顺序。具体地说,所有第一个X排序列上的等值过滤器和可选的排序X+1列上的范围过滤器可以作为预过滤器处理,如图9所示,如下所示:

预滤波后,只需要将满足过滤条件的绿色值推送到GPU进行并行处理。输入数据被馈送到GPU,并在那里一次执行一批。这既包括活动批处理,也包括归档批处理。

AresDB利用CUDA流进行流水线数据馈送和执行。在每个查询上交替使用两个流,以便在两个重叠的阶段中进行处理。在下面的图10中,我们提供了此过程的时间线图示:

为简单起见,AresDB利用推力库实现查询执行过程,该库提供微调的并行算法构建块,以便在当前查询引擎中快速实现。

在推力方面,使用随机存取迭代器来存取输入和输出矢量数据。每个GPU线程在其工作负载位置查找输入迭代器,读取值并执行计算,然后将结果写入输出迭代器上的相应位置。

下面的图11在一个示例AST上演示了此过程,该示例AST在查询编译阶段从维度表达式request_at-request_at%86400 0生成:

在OOPK模型中,AresDB查询引擎遍历AST树的每个叶节点,并返回其父节点的迭代器。在根节点也是叶的情况下,根操作直接在输入迭代器上执行。

在每个非根非叶节点(在本例中为模运算),分配临时临时空间向量来存储从REQUEST_AT%86400表达式产生的中间结果。利用推力,在GPU上启动内核函数来计算该操作符的输出。结果存储在临时空间迭代器中。

在根节点,内核函数以与非根、非叶节点相同的方式启动。根据表达式类型采取不同的输出操作,具体如下:

表达式求值后,执行排序和归约,进行最终聚合。在排序和约简操作中,我们使用维度向量的值作为排序和约简的关键值,将度量向量值作为聚合的值。这样,具有相同维度值的行将被分组并聚合在一起。下面的图12描述了此排序和还原过程:

归档/回填过程临时存储(在归档和回填过程中分配的临时内存)