时间序列是描述事物如何随时间变化的一种非常常见的数据格式。一些最常见的来源是工业机器和物联网设备、IT基础设施堆栈(如硬件、软件和网络组件),以及随时间共享其结果的应用程序。有效地管理时间序列数据并非易事,因为数据模型不适合通用数据库。
因此,我很高兴地告诉大家,Amazon Time Stream现已全面上市。TimeStream是一种快速、可扩展且无服务器的时间序列数据库服务,可轻松收集、存储和处理每天数万亿次的时间序列事件,速度最高可达关系数据库的1000倍,而成本仅为关系数据库的十分之一。
这是通过TimeStream管理数据的方式实现的:最新数据保留在内存中,历史数据根据您定义的保留策略移动到成本优化的存储中。所有数据始终跨同一AWS区域的多个可用区(AZ)自动复制。将新数据写入内存存储,在此存储中跨三个AZ复制数据,然后返回操作成功。数据复制是基于仲裁的,因此节点或整个AZ的丢失不会破坏持久性或可用性。此外,作为额外的预防措施,内存存储中的数据会持续备份到Amazon Simple Storage Service(S3)。
查询可跨层自动访问和组合最新数据和历史数据,无需指定存储位置,并且支持特定于时间序列的功能,可帮助您近乎实时地识别数据中的趋势和模式。
没有前期费用,您只需为您写入、存储或查询的数据付费。根据负载,TimeStream会自动向上或向下扩展以调整容量,而无需管理底层基础架构。
TimeStream与流行的数据收集、可视化和机器学习服务集成,使其易于与现有和新的应用程序配合使用。例如,您可以直接从AWS IoT Core、Amazon Kinesis Data Analytics for Apache Flink、AWS IoT Greengrass和Amazon MSK获取数据。您可以从Amazon QuickSight可视化存储在TimeStream中的数据,并使用Amazon SageMaker将机器学习算法应用于时间序列数据,例如用于异常检测。您可以使用TimeStream细粒度的AWS Identity and Access Management(IAM)权限轻松地从AWS Lambda函数获取或查询数据。我们提供在开源平台(如Apache Kafka、Telegraf、Prometheus和Grafana)上使用TimeStream的工具。
在TimeStream控制台中使用Amazon TimeStream,我选择了create database。我可以选择创建一个Standard数据库或一个用示例数据填充的示例数据库。我继续使用标准数据库,并将其命名为MyDatabase。
默认情况下,所有时间流数据都是加密的。我使用默认主密钥,但您也可以使用您使用AWS密钥管理服务(KMS)创建的客户管理密钥。这样,您就可以控制主密钥的轮换,以及谁有权使用或管理它。
我完成了数据库的创建。现在我的数据库是空的。我选择create table并将其命名为MyTable。
每个表都有自己的数据保留策略。第一个数据被摄取到内存存储中,在那里可以存储最少一个小时到最多一年的时间。之后,它会自动移到磁性商店,在那里它可以保存最少一天到最多200年,之后就会被删除。在我的例子中,我选择了1小时的内存存储保留和5年的磁性存储保留。
在TIMESTREAM中写入数据时,不能插入早于内存存储保留期的数据。例如,在我的情况下,我将无法插入超过1小时的记录。同样,您不能插入具有未来时间戳的数据。
我完成了表的创建。正如您注意到的,我没有被要求提供数据模式。当数据被摄取时,TimeStream会自动推断出这一点。现在,让我们将一些数据放入表中!
在Amazon Time Stream中加载数据时间流表中的每个记录都是时间序列中的单个数据点,包含:
度量名称、类型和值。每条记录可以包含单个度量值,但是不同的度量值名称和类型可以存储在同一个表中。
描述度量值并可用于筛选或聚合数据的零个或多个维度。表中的记录可以有不同的维度。
例如,让我们构建一个简单的监控应用程序,从服务器收集CPU、内存、交换和磁盘使用情况。每台服务器由主机名标识,并具有表示为国家/地区和城市的位置。
表中的记录将测量不同的东西。我使用的度量名称是:
对于监控应用程序,我使用Python。要收集监控信息,我使用psutil模块,该模块可以与以下内容一起安装:
进口时间 导入boto3 导入Psutil 从botocore.config导入配置 数据库名称=";我的数据库"; Table_name=";MyTable"; 国家=";英国"; 城市=";伦敦"; Hostname=";MyHostname";#您可以使用socket.gethostname()将其动态化 间隔=1#秒 定义准备记录(MEASURE_NAME,MEASURE_VALUE): 记录={ ';时间';:字符串(CURRENT_TIME), ';尺寸';:尺寸标注、 ';MeasureName';:MEASURE_NAME, ';MeasureValue';:str(MEASURE_VALUE), ';MeasureValueType';:';Double'; } 退货记录 定义写入记录(记录)(_R): 尝试: 结果=write_client.write_records(DatabaseName=DATABASE_NAME, TableName=TABLE_NAME, 记录=记录, CommonAttributes={}) 状态=result[';ResponseMetadata';][';HTTPStatusCode';] 打印(";已处理%d条记录。写入记录状态:%s";% (镜头(记录),状态) 除异常错误外: 打印(";错误:";,错误) 如果__名称__==';__Main__';: Session=boto3.Session() WRITE_CLIENT=session.client(';timestream-Write';,config=Config( READ_TIMEOUT=20,MAX_POOL_CONNECTIONS=5000,RETRIES={';MAX_ATTENTIONS';:10})) Query_client=session.client(';timestream-query';) 尺寸=[ {';名称';:';国家/地区';,';值';:国家/地区}, {';名称';:';城市';,';值';:城市}, {';名称';:';主机名';,';值';:主机名}, ] 记录=[] 当为True时: Current_time=int(time.time()*1000) CPU利用率=psutil.cpu_Percent() 内存利用率=psutil.virtual_memory().百分比 交换利用率=psutil.exchange_memory().百分比 磁盘利用率=psutil.disk_use(';/';).百分比 Records.append(prepare_record(';cpu_utilization';,CPU_利用率)) Records.append(PREPARE_RECORD( ';内存利用率';,内存利用率)) Records.append(prepare_record(';swap_utilization';,交换利用率)) Records.append(prepare_record(';disk_utilization';,磁盘利用率)) 打印(";记录{}-CPU{}-内存{}-交换{}-磁盘{}";.format( LEN(记录)、CPU利用率、内存利用率 交换利用率、磁盘利用率)) 如果len(记录)==100: 写入记录(记录)(_R) 记录=[] 时间.休眠(间隔)。
我启动Collect.py应用程序。每100条记录中,数据写入MyData表:
$python3收集器.py 记录4-CPU 31.6-内存65.3-交换73.8-磁盘5.7 记录8-CPU 18.3-内存64.9-交换73.8-磁盘5.7 记录12-CPU 15.1-内存64.8-交换73.8-磁盘5.7 。。。 记录96-CPU 44.1-内存64.2-交换73.8-磁盘5.7 记录100-CPU 46.8-内存64.1-交换73.8-磁盘5.7 处理了100条记录。写入记录状态:200 记录4-CPU 36.3-内存64.1-交换73.8-磁盘5.7 记录8-CPU 31.7-内存64.1-交换73.8-磁盘5.7 记录12-CPU 38.8-内存64.1-交换73.8-磁盘5.7 。。。
现在,在TimeStream控制台中,我看到MyData表的模式根据接收的数据自动更新:
请注意,由于表中的所有度量都是DOUBLE类型,因此MEASURE_VALUE::DOUBLE列包含所有度量的值。如果度量是不同类型的(例如,int或BIGINT),我会有更多的列(如MEASURE_VALUE::INT和MEASURE_VALUE::BIGINT)。
在控制台中,我还可以看到表中有哪些类型的度量、它们对应的数据类型以及用于该特定度量的维度的概要:
从控制台查询数据我可以使用SQL查询时间序列数据。内存存储针对快速时间点查询进行了优化,而磁性存储针对快速分析查询进行了优化。但是,查询会自动处理所有存储(内存和磁盘)上的数据,而不必在查询中指定数据位置。
我直接从控制台运行查询,但我也可以使用JDBC连接来访问查询引擎。我从一个基本查询开始,查看表中的最新记录:
让我们来试试更复杂的东西。我希望看到过去两小时内按主机名在5分钟间隔内聚合的平均CPU利用率。我根据MEASURE_NAME的内容过滤记录。我使用函数bin()将时间舍入为间隔大小的倍数,使用函数ago()比较时间戳:
选择主机名, Bin(时间,5M),表示binned_time, AVG(MEASURE_VALUE::DOUBLE)AS AVG_CPU_EXPLICATION 来自MyDatabase.MyTable 其中MEASURE_NAME=‘CPU利用率'; 和Time>;之前(2h) 按主机名、bin分组(时间,5M)。
收集时间序列数据时,可能会遗漏一些值。这在分布式架构和物联网设备中尤为常见。TimeStream有一些有趣的函数,您可以使用它们来重新填充缺失值,例如使用线性插值,或者基于上一次结转的观测结果。
更广泛地说,TimeStream提供了许多函数来帮助您使用数学表达式、操作字符串、数组和日期/时间值、使用正则表达式以及使用聚合/窗口。
要体验可以对TimeStream执行的操作,您可以创建示例数据库并添加我们提供的两个IoT和DevOps数据集。然后,在控制台查询界面中,查看示例查询以了解一些更高级的功能:
将Amazon Time Stream与Grafana结合使用TimeStream最有趣的方面之一是与许多平台的集成。例如,您可以使用Grafana 7.1或更高版本可视化时间序列数据并创建警报。Time Stream插件是Grafana开源版本的一部分。
我向数据库中添加了一个新的Grafan aDemo表,并使用另一个示例应用程序来持续获取数据。该应用程序模拟从运行在数千台主机上的微服务体系结构收集的性能数据。
我在Amazon Elastic Compute Cloud(EC2)实例上安装Grafana,并使用Grafana CLI添加时间流插件。
在Grafana控制台中,我使用正确的AWS凭据以及时间流数据库和表配置了插件。现在,我可以使用不断收集性能数据的Grafan aDemo表中的数据,选择作为时间流插件一部分分发的样例仪表板:
Amazon Time Stream现已在美国东部(弗吉尼亚州北部)、欧洲(爱尔兰)、美国西部(俄勒冈州)和美国东部(俄亥俄州)提供。您可以将TimeStream与控制台、AWS命令行界面(CLI)、AWS SDK和AWS CloudForment配合使用。使用TimeStream时,您可以根据写入次数、查询扫描的数据和使用的存储进行付费。更多信息,请查看定价页面。
您可以在此repo中找到更多示例应用程序。要了解更多信息,请参阅文档。处理时间序列(包括数据接收、保留、访问和存储分层)从未像现在这样简单。让我知道你要建造什么!