0
241
0
0
首页/专栏/ 技术分享/ 查看内容

基于 ClickHouse、Apache Arrow 和 Perspective 的流式实时可视化

 admin   发表于  2024-10-15 15:10
专栏 技术分享



简介

作为一家由开源驱动的公司,我们热衷于推广那些令人印象深刻的开源项目,不论是因为它们在技术上极具吸引力,还是因为它们满足了我们对性能的极致追求,或是真正能为我们的用户带来帮助。我们在发现 UI 库 Perspective 后,意识到它正好满足了这些要求,能够让用户基于 ClickHouse 数据构建真正的实时可视化!为了验证该库是否能够轻松集成到 ClickHouse 中,我们开发了一个简单的演示应用,通过 Apache Arrow 流式传输外汇(Forex)数据到浏览器中,整个过程仅使用了不到 100 行代码!

该示例非常灵活,用户可以轻松调整,用于可视化任何流入 ClickHouse 的数据集。欢迎分享您的想法,同时向 Perspective 致敬,感谢他们打造了如此优秀的库!

如果您希望运行此 Perspective 示例应用,我们已经为您提供了一个可用的 ClickHouse 实例。您也可以在此尝试在线托管版本【https://perspective-clickhouse.vercel.app/】。最后,我们将探讨数据流式传输的速度极限,以及当前方法的不足之处,未来 ClickHouse 的新功能有望改善这些问题。


Perspective 简介

Perspective 库是一款高性能的数据分析和可视化工具,专为高效管理实时和流式数据集而设计。它提供了丰富的交互式和可定制的可视化图表,包括热图、折线图和树图。和 ClickHouse 一样,Perspective 的设计核心也围绕性能展开。它的底层使用 Rust 和 C++ 编写,并编译为 WebAssembly,使其能够在浏览器中处理数百万数据点,并实时响应持续的数据流。

除了基本的图表渲染,Perspective 还支持快速的数据透视、筛选和聚合操作,这些操作既可以在浏览器中执行,也可以在服务器端完成。同时,它还支持使用 ExprTK 进行表达式计算。虽然 Perspective 并非为 ClickHouse 的 PB 级大规模数据而设计,但它能够对客户端接收到的行数据进行二次转换。如果数据已经可用,仅需简单转换即可生成所需的可视化,从而减少了进一步查询的必要性。

这使得 Perspective 成为 ClickHouse 驱动的应用程序中不可或缺的一部分,尤其是在需要实时分析和顺畅交互的场景中。它支持 Python 和 JavaScript,可以轻松集成到后端的分析流程或基于 Web 的界面中。

虽然 Perspective 能够完美满足 ClickHouse 在标准可视化方面的需求,但我们对它处理流式数据的能力尤为关注。通过只保留最新的 N 行数据,它能够保持稳定的内存消耗。我们尝试验证在流式数据的场景下,如何仅加载最新的增量数据到浏览器,并只保留最相关的数据点进行总结。

我们主要探讨了 Perspective 与 JavaScript 的集成,用户还可以通过 JupyterLab 小部件和客户端库,在 Python 中使用 Perspective 进行交互式数据分析。


ClickHouse 适用于流数据吗?

尽管 ClickHouse 不是传统的流处理引擎,而是一种 OLAP 数据库,它具备增量物化视图等特性,能够实现与 Apache Flink 等流处理技术类似的功能。物化视图通过触发器机制,在数据插入时执行查询(包括聚合操作),并将结果存储到另一个表中,以便后续使用。

尽管一些简单的流处理功能可以通过 ClickHouse 实现,从而简化架构,但我们也认识到 Flink 等流处理引擎在高级场景下提供了更强的功能。ClickHouse 的优势在于可以有效地存储所有数据,允许对历史数据进行查询和分析。

在我们的案例中,我们希望将 ClickHouse 中的最新数据行流式传输到 Perspective 进行可视化展示。我们通过模拟外汇交易数据到 ClickHouse 的实时传输,展示了这一应用场景。这样的功能对交易员尤为有用,同时这些数据行可以在需要时持久化,供未来的历史分析使用。


数据集 - 外汇(Forex)

在本示例中,我们使用了一个外汇数据集。外汇交易是指买卖不同国家的货币对,交易者可以用报价货币从经纪商处购买基础货币(按买入价),或卖出基础货币并获得报价货币(按卖出价)。该数据集记录了各货币对的价格变化,且这些价格变化速度极快,这是该数据集的一个关键特点。

对于不太了解外汇交易的用户,我们建议参考我们之前发布的一篇文章【https://clickhouse.com/blog/getting-data-into-clickhouse-part-3-s3#a-little-bit-about-forex】,其中简要总结了相关概念。

完整数据集存储在一个公共的 S3 存储桶中,可从 www.histdata.com 下载,涵盖了 2000 年至 2022 年的数据,总计 115 亿行和 66 对货币数据(解压后大约 600GB)。

虽然数据集结构简单,但非常适合作为我们的示例。每行代表一次“tick”事件,时间戳精确到毫秒,并且包含基础货币、报价货币以及买入价和卖出价。

CREATE TABLE forex(   `datetime` DateTime64(3),   `bid` Decimal(11, 5),   `ask` Decimal(11, 5),   `base` LowCardinality(String),   `quote` LowCardinality(String))ENGINE = MergeTreeORDER BY (datetime, base, quote)
tick 事件记录的是当股票或商品价格发生预定幅度或小数单位变化时的价格波动。在外汇市场中,当买入价或卖出价发生变化时,即会生成一个 tick 事件。

由于外汇市场没有实时流数据源,我们将通过加载一年份的 parquet 格式数据,并将其时间调整为当前时间,模拟流数据过程。如果您想在本地实例中尝试此应用,可以使用此数据集进行复制——点击此处查看详细信息【https://github.com/ClickHouse/perspective-forex?tab=readme-ov-file#dataset】

需要注意的是,tick 数据并不代表实际的交易。实际交易的每秒发生频率远高于 tick 事件!此外,它也不记录成交价格或交易量(源数据中这些信息为 0,因此被忽略)。tick 数据仅用于标记当价格按‘点’(pip)单位发生变化时的情况。


通过 Arrow 将 Perspective 与 ClickHouse 连接

基础代码

在设置和配置 Perspective 时,需要导入一些必要的包并编写基础代码。官方示例非常详尽,但简单来说,我们会创建一个 worker 和一个 table。worker 代表一个 Web Worker 进程,用来处理耗时的操作(如数据更新),从而减轻浏览器主线程的负担,确保即使在流式传输大量实时数据时,界面仍能保持响应速度。table 则是主要的数据结构,可以动态更新新数据。

import perspective from "https://cdn.jsdelivr.net/npm/@finos/perspective@3.0.0/dist/cdn/perspective.js";const forex_worker = await perspective.worker();// fetch some rows... const forex_table = await market_worker.table(rows, { limit: 20000 })

为了保持示例简洁,我们通过 CDN 导入了 Perspective 包,且没有添加其他依赖。对于那些希望将 Perspective 集成到现有应用或构建生产应用的用户,我们建议参考常见的 JS 框架和构建工具的示例【https://perspective.finos.org/docs/js/#installation】

Perspective 提供了多种数据加载和绑定方式,每种方式都有其优缺点。在本示例中,我们使用的是仅客户端模式,数据流式传输到浏览器中,并通过几行 JavaScript 代码从 ClickHouse 获取数据,WebAssembly 库负责执行所有的计算和 UI 交互。


流式传输最新交易数据

创建 table 时,如下图所示,我们限制了保留的行数,以减少内存消耗。

const forex_table = await market_worker.table(rows, { limit: 20000 });

在本示例中,每当有新交易数据到达时,我们会不断向 table 添加新行,并依靠 Perspective 只保留最新的 20,000 行数据。

截至目前,ClickHouse 尚不支持 WebSockets,也无法将更改后的数据行流式传输到客户端。因此,我们采用轮询方式,通过 HTTP 获取最新的数据行。由于 Perspective 偏好以 Apache Arrow 格式接收数据,我们利用 ClickHouse 返回该格式数据的功能,这还能减少数据传输量。

外汇 tick 事件发生得非常快,交易量最大的货币对每秒可产生多达 35 次 tick。我们希望尽可能快速地获取这些数据,理想的获取频率为每 30-50 毫秒一次,确保所有的价格变化都能可视化。因此,我们的查询需要能够快速执行,每个连接的客户端每秒大约会发出 10 次查询。对于所有连接的客户端来说,预计每秒将发出上百次查询——虽然有些人可能对 ClickHouse 的性能有疑问,但 ClickHouse 能轻松应对这种负载。

我们的查询主要基于事件的时间戳进行过滤,时间戳是主键的第一列,因此过滤性能得到优化。由于所有客户端的查询时间几乎相同(即当前时间),并且时间是单调递增的,这使得查询更容易被缓存。测试表明,即使在 115 亿行数据集上,查询执行时间也不到 10 毫秒,而 HTTP 请求往返 ClickHouse 的延迟(同一区域)约为 50 毫秒。因此,我们使用一个简单的滑动窗口,从当前时间到上一次查询的时间,不断执行查询以尽快获取 ClickHouse 返回的数据行。

上图中的简化示例假设每次查询执行时间为 50 毫秒。我们的查询获取所有列并计算价差(即买入价和卖出价之间的差额)。此外,我们还希望显示当前买入价的变化,这对于交易者来说非常有帮助。为了确保每对货币的初始值具有正确的变化值,我们需要查询时间窗口之外的上一价格。因此,如上图和最终查询所示,我们检索的数据量略多于返回的数据。

SELECT *FROM(   SELECT       concat(base, '.', quote) AS base_quote,       datetime AS last_update,       bid,       ask,       ask - bid AS spread,       ask - any(ask) OVER (PARTITION BY base_quote ORDER BY base_quote ASC, datetime ASC ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS chg   FROM forex   WHERE datetime > {prev_lower_bound:DateTime64(3)} AND datetime <= {upper_bound:DateTime64(3)}   ORDER BY       base_quote ASC,       datetime ASC)WHERE datetime > {lower_bound:DateTime64(3)} AND datetime <= {upper_bound:DateTime64(3)}ORDER BY last_update ASC
┌─base_quote─┬─────────────last_update─┬─────bid─┬────────ask─┬──spread─┬──────chg─┐│ AUD.CAD │ 2024-09-19 13:25:30.8400.979220.979720.0005-0.00002│ XAG.USD │ 2024-09-19 13:25:30.84017.85817.902990.044990.00499│ AUD.JPY │ 2024-09-19 13:25:30.84097.2897.310.03-0.001│ AUD.NZD │ 2024-09-19 13:25:30.8401.098861.099460.00060.00004...│ EUR.AUD │ 2024-09-19 13:25:30.8401.437341.437740.0004-0.00002└────────────┴─────────────────────────┴─────────┴────────────┴─────────┴──────────┘
25 rows in set. Elapsed: 0.012 sec. Processed 24.57 thousand rows, 638.82 KB (2.11 million rows/s., 54.98 MB/s.)Peak memory usage: 5.10 MiB.
请注意,我们如何应用时间范围过滤到 argMax,从而避免对所有 time < lower bound 的行进行完整扫描(而是仅扫描 lower bound - 5 分钟 < time < lower bound 的行)。

我们用于获取下一个数据行的最终函数可以在此处找到【https://github.com/ClickHouse/perspective-forex/blob/ffc48caf4b7395f6d02b12f9920b8754f8035d86/index.js#L47-L51】。该函数使用了上述查询,并以 Arrow 格式请求数据,然后将响应读取为 Perspective 所需的 ArrayBuffer。

我们没有使用 ClickHouse JS 库,主要是为了尽量减少依赖。我们的代码非常简单,但如果构建复杂应用程序,建议使用该库。


初始应用程序展示

我们的应用程序如下图所示,通过调用前述函数,在一个循环中不断获取数据行:

在循环中,我们还会计算最近 10 次请求的平均获取时间。性能表现主要取决于您与 ClickHouse 集群的距离,延迟通常由 HTTP 请求的响应时间决定。如果与 ClickHouse 服务距离较近,我们可以将延迟降低至大约 30 毫秒。

虽然默认展示的是 datagrid 数据表格,但用户可以轻松切换可视化类型并进行数据转换。在下面的示例中,我们将其切换为散点图,用于展示 EUR-GBP 货币对的买入价和卖出价。

为了评估系统的 CPU 负载,我们启动了 20 个并发客户端,总共每秒执行了接近 200 次查询。在这种负载下,ClickHouse 仅使用了不到 2 个 CPU 核心。


尚未完全实现流式传输

在实际应用中,由于 ClickHouse 采用的是最终一致性模型,直接依赖当前时间进行跟踪可能会错过部分数据行。尽管通过谨慎的配置可以缓解这种情况,但并不是最佳方案。此外,数据行的插入可能会有一定延迟,因此我们倾向于将查询时间与当前时间错开,而不是简单地依赖 now() 函数。

我们已认识到这些不足,并开始探索流式查询的概念,该方法能够可靠地返回符合条件的新数据行,从而无需依赖轮询。客户端只需保持与服务器的 HTTP 连接,数据行一旦生成即可直接接收。


使用 Arrow Stream 进行速度测试

虽然实时跟踪 tick 数据非常实用,但我们也想测试 Perspective 处理数据的速度有多快。为此,我们决定按日期升序将整个数据集流式传输到 Perspective,同时只保留最新的 N 个数据点。此时,我们的查询变为:

SELECT concat(base, '.', quote) AS base_quote,  datetime AS last_update,  CAST(bid, 'Float32') AS bid,  CAST(ask, 'Float32') AS ask,  ask - bid AS spreadFROM forexORDER BY datetime ASCFORMAT ArrowStream SETTINGS output_format_arrow_compression_method='none'
请注意,我们在此没有计算每对货币的价格变化,因为这需要使用窗口函数,而窗口函数无法利用 optimize_read_in_order 优化,也无法立即返回流式结果。

你会发现我们不再单纯使用 Arrow 格式,因为这意味着我们需要将压缩后的 60GB 数据集下载到本地并转换为 Perspective 表格。即使使用 Arrow 格式,这对浏览器来说依然过大!

因此,我们利用了 ClickHouse 对 Arrow Stream 格式的支持,分批读取数据并传递给 Perspective。之前的流程中,我们无需使用任何外部库,但为了支持流式传输,我们这次需要用到 Arrow js 库。尽管这个库让处理 Arrow 文件变得非常简单,但为了支持流式传输,我们需要编写更多的 JavaScript 代码。下方展示了我们按批次流式传输整个数据集的最终代码。

async function get_all_rows(table, lower_bound) {   const view = await table.view({ // Create a view with aggregation to get the maximum datetime value       columns: ["last_update"], // Column you're interested in       aggregates: { last_update: "max" } // Aggregate by the maximum of datetime   });   const response = await fetch(clickhouse_url, {       method: 'POST',       body: `SELECT concat(base, '.', quote) AS base_quote, datetime AS last_update, bid::Float32 as bid,  ask::Float32 as ask, ask - bid AS spread              FROM forex WHERE datetime > ${lower_bound}::DateTime64(3) ORDER BY datetime ASC FORMAT ArrowStream SETTINGS output_format_arrow_compression_method='none'`,       headers: { 'Authorization': `Basic ${credentials}` }   });   const reader = await RecordBatchReader.from(response);   await reader.open();   for await (const recordBatch of reader) {  // Continuously read from the stream       if (real_time) { // set to false if we want to stop the stream           await view.delete();           return;       }       const batchTable = new Table(recordBatch); // currently required, see https://github.com/finos/perspective/issues/1157       const ipcStream = tableToIPC(batchTable, 'stream');       const bytes = new Uint8Array(ipcStream);       table.update(bytes);       const result = await view.to_columns();       const maxDateTime = result["last_update"][0];       document.getElementById("last_updated").textContent = `Last updated: ${new Date(maxDateTime).toISOString()}`;       total_size += (bytes.length);       document.getElementById("total_download").textContent = `Total Downloaded: ${prettyPrintSize(total_size,2)}`;   }}

虽然这段代码可以更高效一些,但由于 Perspective 目前要求 update 调用中的数据以字节数组形式传入,我们需要将批次数据转换为表格再传输到数组中。同时,我们使用了 Perspective 的视图,类似于 ClickHouse 的物化视图,它可以在数据加载时执行聚合操作。在这个例子中,我们使用视图来计算流式数据的最大日期,并将其展示在最终的 UI 中。

通过一些额外的代码,我们实现了从“实时”轮询模式切换到“流式传输模式”。切换到流式传输模式后,Perspective 的性能得以展示:

在货币对的折线图中,我们每秒可以渲染数千个数据点,数据传输速度至少达 25MiB/秒。

你可以在这里体验最终的应用程序【https://perspective-clickhouse.vercel.app/】


结论

通过这篇博客文章,我们探索了一个流行的开源可视化库 Perspective,它与 ClickHouse 在处理大规模、快速到达的数据方面表现出了极佳的协同效应!借助 Apache Arrow,我们只需要几行 JavaScript 代码就能将 ClickHouse 与 Perspective 集成。在这个过程中,我们还发现了 ClickHouse 在处理流式数据方面的某些局限性,并指出了目前正在进行的一些改进工作,希望能够解决这些问题。



试用阿里云 ClickHouse企业版


轻松节省30%云资源成本?阿里云数据库ClickHouse架构全新升级,推出和原厂独家合作的ClickHouse企业版,在存储和计算成本上带来双重优势,现诚邀您参与100元指定规格测一个月的活动,了解详情:https://t.aliyun.com/Kz5Z0q9G



路过

雷人

握手

鲜花

鸡蛋

版权声明:本文为 clickhouse 社区用户原创文章,遵循 CC BY-NC-SA 4.0 版权协议,转载请附上原文出处链接和本声明。

评论