ArkFlow – 高性能 Rust 流处理引擎

chenquan:

github: https://github.com/chenquan/arkflow

高性能 Rust 流处理引擎,提供强大的数据流处理能力,支持多种输入输出源和处理器。

特性

  • 高性能:基于 Rust 和 Tokio 异步运行时构建,提供卓越的性能和低延迟
  • 多种数据源:支持 Kafka 、MQTT 、HTTP 、文件等多种输入输出源
  • 强大的处理能力:内置 SQL 查询、JSON 处理、Protobuf 编解码、批处理等多种处理器
  • 可扩展:模块化设计,易于扩展新的输入、输出和处理器组件

安装

从源码构建

1
2
3
4
5
6
7
8
9
# 克隆仓库
git clone https://github.com/chenquan/arkflow.git
cd arkflow

# 构建项目
cargo build --release

# 运行测试
cargo test

快速开始

  1. 创建配置文件 config.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
logging:
level: info
streams:
- input:
type: "generate"
context: '{ "timestamp": 1625000000000, "value": 10, "sensor": "temp_1" }'
interval: 1s
batch_size: 10

pipeline:
thread_num: 4
processors:
- type: "json_to_arrow"
- type: "sql"
query: "SELECT * FROM flow WHERE value >= 10"
- type: "arrow_to_json"

output:
type: "stdout"
  1. 运行 ArkFlow:
1
./target/release/arkflow --config config.yaml

配置说明

ArkFlow 使用 YAML 格式的配置文件,支持以下主要配置项:

顶级配置

1
2
3
4
5
6
7
8
9
10
logging:
level: info # 日志级别:debug, info, warn, error

streams: # 流定义列表
- input: # 输入配置
# ...
pipeline: # 处理管道配置
# ...
output: # 输出配置
# ...

输入组件

ArkFlow 支持多种输入源:

  • Kafka:从 Kafka 主题读取数据
  • MQTT:从 MQTT 主题订阅消息
  • HTTP:通过 HTTP 接收数据
  • 文件:从文件读取数据
  • 生成器:生成测试数据
  • SQL:从数据库查询数据

示例:

1
2
3
4
5
6
7
8
9
input:
type: kafka
brokers:
- localhost:9092
topics:
- test-topic
consumer_group: test-group
client_id: arkflow
start_from_latest: true

处理器

ArkFlow 提供多种数据处理器:

  • JSON:JSON 数据处理和转换
  • SQL:使用 SQL 查询处理数据
  • Protobuf:Protobuf 编解码
  • 批处理:将消息批量处理

示例:

1
2
3
4
5
6
7
pipeline:
thread_num: 4
processors:
- type: json_to_arrow
- type: sql
query: "SELECT * FROM flow WHERE value >= 10"
- type: arrow_to_json

输出组件

ArkFlow 支持多种输出目标:

  • Kafka:将数据写入 Kafka 主题
  • MQTT:将消息发布到 MQTT 主题
  • HTTP:通过 HTTP 发送数据
  • 文件:将数据写入文件
  • 标准输出:将数据输出到控制台

示例:

1
2
3
4
5
6
output:
type: kafka
brokers:
- localhost:9092
topic: output-topic
client_id: arkflow-producer

示例

Kafka 到 Kafka 的数据处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
streams:
- input:
type: kafka
brokers:
- localhost:9092
topics:
- test-topic
consumer_group: test-group

pipeline:
thread_num: 4
processors:
- type: json_to_arrow
- type: sql
query: "SELECT * FROM flow WHERE value > 100"
- type: arrow_to_json

output:
type: kafka
brokers:
- localhost:9092
topic: processed-topic

生成测试数据并处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
streams:
- input:
type: "generate"
context: '{ "timestamp": 1625000000000, "value": 10, "sensor": "temp_1" }'
interval: 1ms
batch_size: 10000

pipeline:
thread_num: 4
processors:
- type: "json_to_arrow"
- type: "sql"
query: "SELECT count(*) FROM flow WHERE value >= 10 group by sensor"
- type: "arrow_to_json"

output:
type: "stdout"