🦀

KafkaストリームとRust Tokioの非同期処理

2023/12/18に公開

この記事はRust Advent Calendar 2023 - Qiitaの18日目の記事です。

Kafkaとは?

Kafkaとは分散型のイベント・ストリームのプラットフォームです。メッセージ・キューと似ていますが、Kafkaではメッセージは消費された後も永続化されますのでデータベースのように使用することも可能です。

Kafkaのクラウド・サービスがupstashconfluentから提供されています。本ブログではDockerイメージを使用して、ローカル開発環境でkafkaサーバーを起動して検証します。

ちなみに昨年のアドベントではこちらの記事を書きました。

https://zenn.dev/tfutada/articles/5e87d6e7131e8e

Kafkaサーバーのインストール

KafkaサーバのDockerイメージがありますのでローカルで起動します。ポート29092で起動します。

docker-compose.yml
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22181:2181
  
  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
起動する
docker compose up -d

RustクライアントSDK

Rustのプロジェクト作成とライブラリのインストールをします。tokioとの相性の良いrust-rdkafkaを使用します。

Rustプロジェクトの作成
cargo new rust-kafka
cargo add rdkafka

コードを書く

2つのプログラムを書きます。メッセージ・キューではお馴染みのプロデューサー&コンシューマーのパターンで書きます。両者ともスタンド・アローンのプログラムとして作成します。

ソースコードをこちらにおきます。

まずはプロデューサー(メッセージの送信側)から実装しましょう。見たままですので特に説明は不要かと思います。

producer.send().awaitのように非同期的にメッセージを送信します。メッセージはFutureRecordを使用して作成します。

プロデューサー
use rdkafka::ClientConfig;
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::util::Timeout;

#[tokio::main]
async fn main() {
    // プロデューサーを作成します。
    let producer: FutureProducer = ClientConfig::new()
        .set("bootstrap.servers", "localhost:29092") // kafkaサーバー
        .set("message.timeout.ms", "5000")
        .create()
        .expect("Producer creation error");

    // メッセージをトピック(my-topic-1)に送信します。
    producer.send(FutureRecord::<(), _>::to("my-topic-1")
                      .payload("Hello123"), Timeout::Never) // メッセージ
        .await // 非同期呼び出しです。
        .expect("Failed to produce");

    println!("Message sent");
}

トピック(Topic)とはTokioの非同期チャネルのようなものです。トピックが存在しない場合は、自動作成されますので前もって作成する必要はありません。

次にメッセージの受信側を実装しましょう。
consumer.subscribe(トピック名)して、consumer.stream()でメッセージをストリーム受信します。msg.payload_view()でメッセージの本文を取り出します。

コンシューマー
use rdkafka::{ClientConfig, Message};
use rdkafka::consumer::{Consumer, StreamConsumer};

use futures::{TryStreamExt};

#[tokio::main]
async fn main() {
    let consumer = ClientConfig::new()
        .set("bootstrap.servers", "localhost:29092")
        .set("enable.partition.eof", "false")
        .set("group.id", "my-group-1")
        .create::<StreamConsumer>()
        .expect("Failed to create client");

    consumer.subscribe(&["my-topic-1"]).unwrap();

    let stream_processor = consumer.stream().try_for_each(|msg| {
        async move {
            println!("Received message from topic: {}, partition: {}, offset: {}, timestamp: {:?}",
                     msg.topic(), msg.partition(), msg.offset(), msg.timestamp());

            tokio::time::sleep(tokio::time::Duration::from_millis(3_000)).await;
            let r = match msg.payload_view::<str>() {
                Some(Ok(payload)) => format!("Payload len for {} is {}", payload, payload.len()),
                Some(Err(_)) => "Message payload is not a string".to_owned(),
                None => "No payload".to_owned(),
            };
            println!("{}", r);
            Ok(())
        }
    });

    stream_processor.await.expect("stream processing failed");
}

グループID(group.id)とは任意に設定できる名前で複数のコンシューマをグルーピングするために使用します。
複数のコンシューマーが同一のトピックをサブスクライブしたとします。その場合、同一グループ内のコンシューマーでは、誰か一人だけが受信します。グループ名が異なる場合は、その両者のコンシューマーが同じメッセージを受信(マルチキャスト)します。

ユースケースとしては、group-Aは機械学習のLLM呼び出し、group-Bはオブジェクトストアへの保存というように同一メッセージに対して異なる処理をしたい場合に使用します。

メッセージのメタデータ

Kafkaではメッセージのペイロードと一緒にメタデータも渡されます。HTTPプロトコルに例えるならば、HTTPボディがペイロード、HTTPヘッダーがメタデータになります。

メタデータにはタイムスタンプやオフセットがセットされます。メタデータにアクセスする方法を以下に示します。

メタデータ
use rdkafka::Timestamp::{CreateTime, LogAppendTime, NotAvailable};
...
        match message {
            Ok(msg) => {
                // Accessing the metadata
                let topic = msg.topic();
                let partition = msg.partition();
                let offset = msg.offset();
                let timestamp = msg.timestamp();
                let key = msg.key_view::<str>().unwrap();

                // Example of using metadata
                println!("Received message from topic: {}, partition: {}, offset: {}", topic, partition, offset);
                if let Ok(key) = key {
                    println!("Key: {}", key);
                }

                match timestamp {
                    CreateTime(t) => println!("Created at: {}", t),
                    LogAppendTime(t) => println!("Logged at: {}", t),
                    NotAvailable => println!("Timestamp not available"),
                }

実行結果です。

tokioを使用した非同期処理

さて次に本題のtokioを使用した非同期処理をやってみましょう。

イメージしやすいように実際のユースケースを考えてみましょう。
例えば、Webアプリケーションのサーバ側から、アクセス・ログやイベント・ログを機械学習のインプットデータに利用したいとします。

このような場合、Webサーバ側からログをKafkaに非同期に送信しHTTPレスポンスを即時にブラウザに返し、非同期的にバックエンド・サーバ(k8s等)でkafkaのトピックからログをストリーミング受信する解決策が考えられます。

まずはプロデューサーです。tokio::spawnを使用してメッセージを一度に送信しています。
私のローカルのmac M2で試しましたが10万メッセージを一瞬で送信完了しました。CPUもメモリもピクリともしませんでした。

プロデューサー
use rdkafka::config::ClientConfig;
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::util::Timeout;
use tokio;

#[tokio::main]
async fn main() {
    let producer: FutureProducer = // (同じなので略)

    let mut tasks = Vec::new();

    for i in 0..100_000 { // 10万メッセージ送信する。
        let message = "a".repeat(100_000);
        let producer_clone = producer.clone();

        // Spawn a new task for sending each message
        let task = tokio::spawn(async move {
            let record = FutureRecord::to("my-topic-1")
                .key("my-key-1")
                .payload(&message);

            match producer_clone.send(record, Timeout::Never).await {
                Ok(delivery) => println!("Sent: {:?}", delivery),
                Err((e, _)) => eprintln!("Error: {:?}", e),
            }
        });

        tasks.push(task);
    }

    // Wait for all tasks to complete
    for task in tasks {
        task.await.expect("Task failed to complete");
    }

    println!("All messages sent");
}

次にコンシューマー側を実装しましょう。

msg_stream.next()でメッセージをストリーム受信します。このwhileループは無限ループになります。受信するメッセージが無くなってもループを抜けません。tokio::spawn()を使用し、別スレッドで重たい処理をさせます。

処理パターン
while let Some(msg) = msg_stream.next().await{
    tokio::spawn(重たい処理) // I/O呼び出し。サンプルではスリープ。
}

全体のコードになります。

コンシューマー
use rdkafka::{ClientConfig, Message};
use rdkafka::consumer::{Consumer, StreamConsumer};
use futures::{StreamExt, TryStreamExt};
use tokio;
use std::sync::Arc;

#[tokio::main]
async fn main() {
    let consumer:StreamConsumer = // (同じなので略)
    consumer.subscribe(&["my-topic-1"]).unwrap();

    while let Some(message) = consumer.stream().next().await {
        match message {
            Ok(msg) => {
                // Extract a payload from the message.
                let tailored_msg = match msg.payload_view::<str>() {
                    Some(Ok(payload)) => {
                        format!("Prepared payload: {}, len: {}", payload, payload.len())
                    }
                    Some(Err(_)) => "Message payload is not a string".to_owned(),
                    None => "No payload".to_owned(),
                };

                // Do a heavy task with a tokio thread.
                tokio::spawn(async move {
                    println!("process the msg: {}", &tailored_msg[..30]);
                    // For example, run a batch processing, making API calls, etc.
                    tokio::time::sleep(tokio::time::Duration::from_millis(10_000)).await;
                    println!("Done!");
                });
            }
            Err(e) => eprintln!("Error receiving message: {:?}", e),
        }
    }
    // while-end. ここに到達することはない。無限に受信し続ける。
}

tokio::spawnのスレッドから直接msgを参照するのはRust所有権の処理でコードが複雑になるため、一旦tailored_msgに取り出し、それをスレッドに渡すようにしています。

念の為、RustRoverのスクリーンショットを貼っておきます。型情報が表示されて分かりやすいです。

私のmac M2で実行したところ、10万メッセージすべてを同時に完了することができました。

TypeScript版

最後におまけでTypeScript版を載せておきます。

TS版
TS版
import {Kafka} from 'kafkajs';

// Kafka configuration
const kafka = new Kafka({
    clientId: 'my-app',
    brokers: ['localhost:29092']
});

// Creating the Producer
const producer = kafka.producer();

const produce = async () => {
    await producer.connect();
    try {
        await producer.send({
            topic: 'test-topic',
            messages: [
                {value: 'ABCDEFG!'},
            ],
        })
        console.log('Message sent successfully');
    } catch (err) {
        console.error("Error in sending message", err);
    } finally {
        await producer.disconnect();
    }
};

// Creating the Consumer
const consumer = kafka.consumer({groupId: 'test-group'});

const consume = async () => {
    await consumer.connect();
    await consumer.subscribe({topic: 'my-topic-1', fromBeginning: true});

    await consumer.run({
        eachMessage: async ({topic, partition, message}) => {
            console.log({
                value: message.value.toString(),
            });
        },
    });
};

// Invoke produce and consume functions
produce().catch(e => console.error(`[producer] ${e.message}`, e));
consume().catch(e => console.error(`[consumer] ${e.message}`, e));

Discussion