🔥

Docker 環境で Kafka を起動し Ruby で Producer/Consumer を実装してみる

2022/09/11に公開

はじめに

Docker 環境で Kafka 起動し、 Ruby で実装した Producer/Consumer も一緒に動かしてみます。

送受信するメッセージのデータフォーマットとしては JSON や Avro などあるようですが、
今回は ProtocolBuffers を使用してみます。

今回書いたコードは下記リポジトリにおいています。
(この記事で紹介するコードに少し+αしたものになってますがほぼ同じです。)
https://github.com/ydammatsu/kafka-proto-sample

Kafka とは

Apache Kafka は、大規模なストリーム処理、リアルタイムデータパイプライン、データ統合に使用されるオープンソースの分散型ストリーミングシステムです。

https://www.confluent.io/ja-jp/what-is-apache-kafka/

Kafka を docker compose で動かす


(引用: Apache Kafkaの概要とアーキテクチャ)

Kafka を起動させるには上記の図でいう、Kafka Broker と Zookeeper が必要になります。
どちらも Confluent 社が用意した DockerImage があるためそれを使います。

docker-compose.yml
version: "3"
services:
  kafka:
    image: confluentinc/cp-kafka
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
      KAFKA_LISTENERS: EXTERNAL_SAME_HOST://:29092,INTERNAL://:9092
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL_SAME_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL_SAME_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    ports:
      - 29092:29092
    depends_on:
      - zookeeper
  zookeeper:
    image: confluentinc/cp-zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 32181
  # 以下省略(Producer と Consumer も docker compose で動かす場合はそれぞれ追記する)
  # ...

上記の docker-compose.yml の構成ではローカルからは localhost:29092、docker compose 内からは kafka:9092 で Kafka にアクセス出るように環境変数を指定しています。

環境変数から色々な設定ができるので、詳しくは公式サイトを参考にしてください。

後は docker compose コマンドで起動させ、エラーなく立ち上がってそうならOKです。

$ docker compose up

(任意) kcat で Kafka にアクセスしてみる。

kcat(旧kafkacat) は Kafka の Producer/Consumer の CLI ツールです。
ローカルからサクッと Kafka と疎通ができるのでインストールしましょう。

Mac でのインストール

brew install kcat

コンテナ内の Kafka と疎通してみる

上記の docker compose up で立ち上げた Kafka コンテナにアクセスします。

$ kcat -L -b localhost:29092

# 下記のような出力がされていれば疎通OKです。
Metadata for all topics (from broker 1001: localhost:29092/1001):
 1 brokers:
  broker 1001 at localhost:29092 (controller)

kcat コマンドの詳しい使い方は公式サイトを確認してみてください。

https://docs.confluent.io/ja-jp/platform/7.1.1/tutorials/examples/clients/docs/kcat.html

Producer/Consumer を Ruby で作る

使う gem はこちらです。
Kafka のクライアントライブラリとして ruby-kafka
ProtoBuf のシリアライズ/デシリアライズのライブラリとして google-protobuf を入れています。

Gemfile
gem 'ruby-kafka'
gem 'google-protobuf'

proto の作成

送受信するメッセージのスキーマを定義します。

sample.proto
syntax = "proto3";

package sample;

import "google/protobuf/timestamp.proto";

# Kafka で送受信するメッセージの型です。
message TestMessage {
  string content = 1;
  google.protobuf.Timestamp created_at = 2;
}

上記の proto ファイルのスキーマからRubyのコードを自動生成してリポジトリ内に置いてください。
色々なやり方があると思いますが、 'grpc-tools' という gem をインストールしてローカルでコマンドを叩いてください。

# 自動生成コマンドの例です。出力位置などはよしなに指定してください。
$ grpc_tools_ruby_protoc -I proto --ruby_out=pb  proto/sample.proto

下記のようなファイルが自動生成されればOKです。
これをProducer/Consumerでrequireして使います。

sample_pb.rb
# Generated by the protocol buffer compiler.  DO NOT EDIT!
# source: sample.proto

require 'google/protobuf/timestamp_pb'
require 'google/protobuf'

Google::Protobuf::DescriptorPool.generated_pool.build do
  add_file("sample.proto", :syntax => :proto3) do
    add_message "sample.TestMessage" do
      optional :content, :string, 1
      optional :created_at, :message, 2, "google.protobuf.Timestamp"
    end
  end
end

module Sample
  TestMessage = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("sample.TestMessage").msgclass
end

Producer の実装

producer.rb
# frozen_string_literal: true

require 'date'
require 'ruby-kafka'
require 'path/to/pb' # protoから自動生成したコードをrequireします

# docker compose で立ち上げるときは kafka:9092、
# ローカルで立ち上げるときは localhost:29092 とします
# client_id は任意です
kafka = Kafka.new(['kafka:9092'], client_id: 'producer1')

# topic名は任意、Consumer側ではこのtopic名を指定することで subscribeできる
topic = 'topic1' 

producer = kafka.producer

# ProtoBufでシリアライズしたメッセージを生成します。
message = Sample::TestMessage.new(content: 'Hello World!', created_at: Time.now).to_proto

producer.produce(message, topic: topic)
producer.deliver_messages
puts 'メッセージを送信しました'

docker compose run などで上記ファイルを実行すれば topic1 という topic にメッセージが送信されます。

$ bundle exec ruby path/to/producer.rb

実行できたら kcat -b localhost:29092 -t topic1 で topic のメッセージを確認してみてください。

Consumer の実装

consumer.rb
# frozen_string_literal: true

require 'date'
require 'ruby-kafka'
require 'path/to/pb' # protoから自動生成したコードをrequireします

# docker compose で立ち上げるときは kafka:9092、
# ローカルで立ち上げるときは localhost:29092 とします
# client_id は任意です
kafka = Kafka.new(['kafka:9092'], client_id: 'consumer1')

# consumer は無限ループするのでシグナルを送って止められるようにします
trap('TERM') { consumer.stop }

# group_id は任意です。
consumer = kafka.consumer(group_id: 'group1')

consumer.subscribe('topic1')

# topic に対してポーリングを行うので無限ループで動きます。
# automatically_mark_as_processed を true にすると自動でメッセージがコミットされます。
consumer.each_message(automatically_mark_as_processed: true) do |message|
  # ProtoBufのメッセージをデシリアライズします。
  sample_message = Sample::TestMessage.decode(message.value)

  content = sample_message.content
  created_at = Time.at(sample_message.created_at.seconds).strftime('%Y/%m/%d %H:%M')
  puts "メッセージを受信しました => 内容: #{content}, 日時: #{created_at}"
end

docker compose run などで上記ファイルを実行すれば topic1 という topic に入っているメッセージをサブスクライブすることができます。

$ bundle exec ruby path/to/consumer.rb

Producer でメッセージを送信してから、Consumer を実行すると標準出力に受信したメッセージが表示されると思います。

補足

Kafka が正常に起動していないと Kafka::ConnectionError などが出ます。

また、topic が作成されていない時に Consumer が subscribe を行うとエラーが出るので注意してください。
(上記のProducerでメッセージを送信すると一緒にtopicもつくられるので一回はメッセージを送った状態でConsumerを起動させてみてください。)

group_id, client_id などについては詳しくは公式ドキュメントなどを読んでみてください。

https://docs.confluent.io/ja-jp/platform/6.0.1/clients/consumer.html

Discussion

ログインするとコメントできます