Docker 環境で Kafka を起動し Ruby で Producer/Consumer を実装してみる
はじめに
Docker 環境で Kafka 起動し、 Ruby で実装した Producer/Consumer も一緒に動かしてみます。
送受信するメッセージのデータフォーマットとしては JSON や Avro などあるようですが、
今回は ProtocolBuffers を使用してみます。
今回書いたコードは下記リポジトリにおいています。
(この記事で紹介するコードに少し+αしたものになってますがほぼ同じです。)
Kafka とは
Apache Kafka は、大規模なストリーム処理、リアルタイムデータパイプライン、データ統合に使用されるオープンソースの分散型ストリーミングシステムです。
Kafka を docker compose で動かす
(引用: Apache Kafkaの概要とアーキテクチャ)
Kafka を起動させるには上記の図でいう、Kafka Broker と Zookeeper が必要になります。
どちらも Confluent 社が用意した DockerImage があるためそれを使います。
version: "3"
services:
kafka:
image: confluentinc/cp-kafka
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
KAFKA_LISTENERS: EXTERNAL_SAME_HOST://:29092,INTERNAL://:9092
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL_SAME_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL_SAME_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ports:
- 29092:29092
depends_on:
- zookeeper
zookeeper:
image: confluentinc/cp-zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 32181
# 以下省略(Producer と Consumer も docker compose で動かす場合はそれぞれ追記する)
# ...
上記の docker-compose.yml の構成ではローカルからは localhost:29092
、docker compose 内からは kafka:9092
で Kafka にアクセス出るように環境変数を指定しています。
環境変数から色々な設定ができるので、詳しくは公式サイトを参考にしてください。
後は docker compose コマンドで起動させ、エラーなく立ち上がってそうならOKです。
$ docker compose up
(任意) kcat で Kafka にアクセスしてみる。
kcat(旧kafkacat) は Kafka の Producer/Consumer の CLI ツールです。
ローカルからサクッと Kafka と疎通ができるのでインストールしましょう。
Mac でのインストール
brew install kcat
コンテナ内の Kafka と疎通してみる
上記の docker compose up
で立ち上げた Kafka コンテナにアクセスします。
$ kcat -L -b localhost:29092
# 下記のような出力がされていれば疎通OKです。
Metadata for all topics (from broker 1001: localhost:29092/1001):
1 brokers:
broker 1001 at localhost:29092 (controller)
kcat コマンドの詳しい使い方は公式サイトを確認してみてください。
Producer/Consumer を Ruby で作る
使う gem はこちらです。
Kafka のクライアントライブラリとして ruby-kafka
、
ProtoBuf のシリアライズ/デシリアライズのライブラリとして google-protobuf
を入れています。
gem 'ruby-kafka'
gem 'google-protobuf'
proto の作成
送受信するメッセージのスキーマを定義します。
syntax = "proto3";
package sample;
import "google/protobuf/timestamp.proto";
# Kafka で送受信するメッセージの型です。
message TestMessage {
string content = 1;
google.protobuf.Timestamp created_at = 2;
}
上記の proto ファイルのスキーマからRubyのコードを自動生成してリポジトリ内に置いてください。
色々なやり方があると思いますが、 'grpc-tools' という gem をインストールしてローカルでコマンドを叩いてください。
# 自動生成コマンドの例です。出力位置などはよしなに指定してください。
$ grpc_tools_ruby_protoc -I proto --ruby_out=pb proto/sample.proto
下記のようなファイルが自動生成されればOKです。
これをProducer/Consumerでrequireして使います。
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: sample.proto
require 'google/protobuf/timestamp_pb'
require 'google/protobuf'
Google::Protobuf::DescriptorPool.generated_pool.build do
add_file("sample.proto", :syntax => :proto3) do
add_message "sample.TestMessage" do
optional :content, :string, 1
optional :created_at, :message, 2, "google.protobuf.Timestamp"
end
end
end
module Sample
TestMessage = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("sample.TestMessage").msgclass
end
Producer の実装
# frozen_string_literal: true
require 'date'
require 'ruby-kafka'
require 'path/to/pb' # protoから自動生成したコードをrequireします
# docker compose で立ち上げるときは kafka:9092、
# ローカルで立ち上げるときは localhost:29092 とします
# client_id は任意です
kafka = Kafka.new(['kafka:9092'], client_id: 'producer1')
# topic名は任意、Consumer側ではこのtopic名を指定することで subscribeできる
topic = 'topic1'
producer = kafka.producer
# ProtoBufでシリアライズしたメッセージを生成します。
message = Sample::TestMessage.new(content: 'Hello World!', created_at: Time.now).to_proto
producer.produce(message, topic: topic)
producer.deliver_messages
puts 'メッセージを送信しました'
docker compose run などで上記ファイルを実行すれば topic1
という topic にメッセージが送信されます。
$ bundle exec ruby path/to/producer.rb
実行できたら kcat -b localhost:29092 -t topic1
で topic のメッセージを確認してみてください。
Consumer の実装
# frozen_string_literal: true
require 'date'
require 'ruby-kafka'
require 'path/to/pb' # protoから自動生成したコードをrequireします
# docker compose で立ち上げるときは kafka:9092、
# ローカルで立ち上げるときは localhost:29092 とします
# client_id は任意です
kafka = Kafka.new(['kafka:9092'], client_id: 'consumer1')
# consumer は無限ループするのでシグナルを送って止められるようにします
trap('TERM') { consumer.stop }
# group_id は任意です。
consumer = kafka.consumer(group_id: 'group1')
consumer.subscribe('topic1')
# topic に対してポーリングを行うので無限ループで動きます。
# automatically_mark_as_processed を true にすると自動でメッセージがコミットされます。
consumer.each_message(automatically_mark_as_processed: true) do |message|
# ProtoBufのメッセージをデシリアライズします。
sample_message = Sample::TestMessage.decode(message.value)
content = sample_message.content
created_at = Time.at(sample_message.created_at.seconds).strftime('%Y/%m/%d %H:%M')
puts "メッセージを受信しました => 内容: #{content}, 日時: #{created_at}"
end
docker compose run などで上記ファイルを実行すれば topic1
という topic に入っているメッセージをサブスクライブすることができます。
$ bundle exec ruby path/to/consumer.rb
Producer でメッセージを送信してから、Consumer を実行すると標準出力に受信したメッセージが表示されると思います。
補足
Kafka が正常に起動していないと Kafka::ConnectionError
などが出ます。
また、topic が作成されていない時に Consumer が subscribe を行うとエラーが出るので注意してください。
(上記のProducerでメッセージを送信すると一緒にtopicもつくられるので一回はメッセージを送った状態でConsumerを起動させてみてください。)
group_id
, client_id
などについては詳しくは公式ドキュメントなどを読んでみてください。
Discussion