KafkaストリームとRust Tokioの非同期処理
この記事はRust Advent Calendar 2023 - Qiitaの18日目の記事です。
Kafkaとは?
Kafka
とは分散型のイベント・ストリームのプラットフォームです。メッセージ・キューと似ていますが、Kafkaではメッセージは消費された後も永続化されますのでデータベースのように使用することも可能です。
Kafkaのクラウド・サービスがupstashやconfluentから提供されています。本ブログではDockerイメージを使用して、ローカル開発環境でkafkaサーバーを起動して検証します。
ちなみに昨年のアドベントではこちらの記事を書きました。
Kafkaサーバーのインストール
KafkaサーバのDockerイメージがありますのでローカルで起動します。ポート29092
で起動します。
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
docker compose up -d
RustクライアントSDK
Rustのプロジェクト作成とライブラリのインストールをします。tokio
との相性の良いrust-rdkafkaを使用します。
cargo new rust-kafka
cargo add rdkafka
コードを書く
2つのプログラムを書きます。メッセージ・キューではお馴染みのプロデューサー&コンシューマーのパターンで書きます。両者ともスタンド・アローンのプログラムとして作成します。
ソースコードをこちらにおきます。
まずはプロデューサー(メッセージの送信側)から実装しましょう。見たままですので特に説明は不要かと思います。
producer.send().await
のように非同期的にメッセージを送信します。メッセージはFutureRecord
を使用して作成します。
use rdkafka::ClientConfig;
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::util::Timeout;
#[tokio::main]
async fn main() {
// プロデューサーを作成します。
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", "localhost:29092") // kafkaサーバー
.set("message.timeout.ms", "5000")
.create()
.expect("Producer creation error");
// メッセージをトピック(my-topic-1)に送信します。
producer.send(FutureRecord::<(), _>::to("my-topic-1")
.payload("Hello123"), Timeout::Never) // メッセージ
.await // 非同期呼び出しです。
.expect("Failed to produce");
println!("Message sent");
}
トピック(Topic)とはTokioの非同期チャネルのようなものです。トピックが存在しない場合は、自動作成されますので前もって作成する必要はありません。
次にメッセージの受信側を実装しましょう。
consumer.subscribe(トピック名)
して、consumer.stream()
でメッセージをストリーム受信します。msg.payload_view()
でメッセージの本文を取り出します。
use rdkafka::{ClientConfig, Message};
use rdkafka::consumer::{Consumer, StreamConsumer};
use futures::{TryStreamExt};
#[tokio::main]
async fn main() {
let consumer = ClientConfig::new()
.set("bootstrap.servers", "localhost:29092")
.set("enable.partition.eof", "false")
.set("group.id", "my-group-1")
.create::<StreamConsumer>()
.expect("Failed to create client");
consumer.subscribe(&["my-topic-1"]).unwrap();
let stream_processor = consumer.stream().try_for_each(|msg| {
async move {
println!("Received message from topic: {}, partition: {}, offset: {}, timestamp: {:?}",
msg.topic(), msg.partition(), msg.offset(), msg.timestamp());
tokio::time::sleep(tokio::time::Duration::from_millis(3_000)).await;
let r = match msg.payload_view::<str>() {
Some(Ok(payload)) => format!("Payload len for {} is {}", payload, payload.len()),
Some(Err(_)) => "Message payload is not a string".to_owned(),
None => "No payload".to_owned(),
};
println!("{}", r);
Ok(())
}
});
stream_processor.await.expect("stream processing failed");
}
グループID(group.id)とは任意に設定できる名前で複数のコンシューマをグルーピングするために使用します。
複数のコンシューマーが同一のトピックをサブスクライブしたとします。その場合、同一グループ内のコンシューマーでは、誰か一人だけが受信します。グループ名が異なる場合は、その両者のコンシューマーが同じメッセージを受信(マルチキャスト)します。
ユースケースとしては、group-A
は機械学習のLLM呼び出し、group-B
はオブジェクトストアへの保存というように同一メッセージに対して異なる処理をしたい場合に使用します。
メッセージのメタデータ
Kafkaではメッセージのペイロードと一緒にメタデータも渡されます。HTTPプロトコルに例えるならば、HTTPボディがペイロード、HTTPヘッダーがメタデータになります。
メタデータにはタイムスタンプやオフセットがセットされます。メタデータにアクセスする方法を以下に示します。
use rdkafka::Timestamp::{CreateTime, LogAppendTime, NotAvailable};
...
match message {
Ok(msg) => {
// Accessing the metadata
let topic = msg.topic();
let partition = msg.partition();
let offset = msg.offset();
let timestamp = msg.timestamp();
let key = msg.key_view::<str>().unwrap();
// Example of using metadata
println!("Received message from topic: {}, partition: {}, offset: {}", topic, partition, offset);
if let Ok(key) = key {
println!("Key: {}", key);
}
match timestamp {
CreateTime(t) => println!("Created at: {}", t),
LogAppendTime(t) => println!("Logged at: {}", t),
NotAvailable => println!("Timestamp not available"),
}
実行結果です。
tokioを使用した非同期処理
さて次に本題のtokioを使用した非同期処理をやってみましょう。
イメージしやすいように実際のユースケースを考えてみましょう。
例えば、Webアプリケーションのサーバ側から、アクセス・ログやイベント・ログを機械学習のインプットデータに利用したいとします。
このような場合、Webサーバ側からログをKafkaに非同期に送信しHTTPレスポンスを即時にブラウザに返し、非同期的にバックエンド・サーバ(k8s等)でkafkaのトピックからログをストリーミング受信する解決策が考えられます。
まずはプロデューサーです。tokio::spawn
を使用してメッセージを一度に送信しています。
私のローカルのmac M2で試しましたが10万メッセージを一瞬で送信完了しました。CPUもメモリもピクリともしませんでした。
use rdkafka::config::ClientConfig;
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::util::Timeout;
use tokio;
#[tokio::main]
async fn main() {
let producer: FutureProducer = // (同じなので略)
let mut tasks = Vec::new();
for i in 0..100_000 { // 10万メッセージ送信する。
let message = "a".repeat(100_000);
let producer_clone = producer.clone();
// Spawn a new task for sending each message
let task = tokio::spawn(async move {
let record = FutureRecord::to("my-topic-1")
.key("my-key-1")
.payload(&message);
match producer_clone.send(record, Timeout::Never).await {
Ok(delivery) => println!("Sent: {:?}", delivery),
Err((e, _)) => eprintln!("Error: {:?}", e),
}
});
tasks.push(task);
}
// Wait for all tasks to complete
for task in tasks {
task.await.expect("Task failed to complete");
}
println!("All messages sent");
}
次にコンシューマー側を実装しましょう。
msg_stream.next()
でメッセージをストリーム受信します。このwhileループは無限ループになります。受信するメッセージが無くなってもループを抜けません。tokio::spawn()
を使用し、別スレッドで重たい処理をさせます。
while let Some(msg) = msg_stream.next().await{
tokio::spawn(重たい処理) // I/O呼び出し。サンプルではスリープ。
}
全体のコードになります。
use rdkafka::{ClientConfig, Message};
use rdkafka::consumer::{Consumer, StreamConsumer};
use futures::{StreamExt, TryStreamExt};
use tokio;
use std::sync::Arc;
#[tokio::main]
async fn main() {
let consumer:StreamConsumer = // (同じなので略)
consumer.subscribe(&["my-topic-1"]).unwrap();
while let Some(message) = consumer.stream().next().await {
match message {
Ok(msg) => {
// Extract a payload from the message.
let tailored_msg = match msg.payload_view::<str>() {
Some(Ok(payload)) => {
format!("Prepared payload: {}, len: {}", payload, payload.len())
}
Some(Err(_)) => "Message payload is not a string".to_owned(),
None => "No payload".to_owned(),
};
// Do a heavy task with a tokio thread.
tokio::spawn(async move {
println!("process the msg: {}", &tailored_msg[..30]);
// For example, run a batch processing, making API calls, etc.
tokio::time::sleep(tokio::time::Duration::from_millis(10_000)).await;
println!("Done!");
});
}
Err(e) => eprintln!("Error receiving message: {:?}", e),
}
}
// while-end. ここに到達することはない。無限に受信し続ける。
}
tokio::spawnのスレッドから直接msgを参照するのはRust所有権の処理でコードが複雑になるため、一旦tailored_msg
に取り出し、それをスレッドに渡すようにしています。
念の為、RustRover
のスクリーンショットを貼っておきます。型情報が表示されて分かりやすいです。
私のmac M2で実行したところ、10万メッセージすべてを同時に完了することができました。
TypeScript版
最後におまけでTypeScript
版を載せておきます。
TS版
import {Kafka} from 'kafkajs';
// Kafka configuration
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:29092']
});
// Creating the Producer
const producer = kafka.producer();
const produce = async () => {
await producer.connect();
try {
await producer.send({
topic: 'test-topic',
messages: [
{value: 'ABCDEFG!'},
],
})
console.log('Message sent successfully');
} catch (err) {
console.error("Error in sending message", err);
} finally {
await producer.disconnect();
}
};
// Creating the Consumer
const consumer = kafka.consumer({groupId: 'test-group'});
const consume = async () => {
await consumer.connect();
await consumer.subscribe({topic: 'my-topic-1', fromBeginning: true});
await consumer.run({
eachMessage: async ({topic, partition, message}) => {
console.log({
value: message.value.toString(),
});
},
});
};
// Invoke produce and consume functions
produce().catch(e => console.error(`[producer] ${e.message}`, e));
consume().catch(e => console.error(`[consumer] ${e.message}`, e));
Discussion