仕事:Python + Kafka + Writer Framework + multithread + queueで制御プログラム開発

なんかstreamsyncからwriter-frameworkになったらしいので

Ubuntu 24.04.1 LTSにDockerをインストールする
を参考に作業をすすめる
本当に自分のマシンがUbuntu 24.04.1 LTSか確認する
lsb_release -a
競合するパッケージをアンインストールする
for pkg in docker.io docker-doc docker-compose docker-compose-v2 podman-docker containerd runc; do sudo apt-get remove $pkg; done
Dockerインストール可能なようにapt関連の設定を行う
aptでインストールするために必要なdocker公式のGPG keyを配置する
sudo apt-get update
sudo apt-get install ca-certificates curl
sudo install -m 0755 -d /etc/apt/keyrings
sudo curl -fsSL https://download.docker.com/linux/ubuntu/gpg -o /etc/apt/keyrings/docker.asc
sudo chmod a+r /etc/apt/keyrings/docker.asc
aptでDockerをインストールするためにリポジトリ情報を追加する
echo \
"deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] https://download.docker.com/linux/ubuntu \
$(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \
sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
aptに追加したリポジトリ情報を認識させる
sudo apt update
Dockerをインストールする
sudo apt-get install docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin
sudoがなくてもDockerを実行できるようにする
sudo groupadd docker
sudo usermod -aG docker <ユーザー名>
その後再起動
グループにユーザー名が追加されているか確認
cat /etc/group | grep docker
dockerが動くか確認
docker run hello-world
動いてないコンテナ(イメージじゃないよ)が残るので
docker rm <CONTAINER ID>
で削除
ちなみに動いていないコンテナの再起動は
docker start <CONTAINER ID>
でできる。

Writer FrameworkをDockerから起動する
まずはDockerfileの置き場を作る
cd ~/
mkdir dockers
cd dockers
mkdir writer_demo
cd writer_demo
vim Dockerfile
以下のDockerfileを作成する
FROM python:3.9
WORKDIR /usr/src/app
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["writer", "hello", "--port","3006","--host","0.0.0.0","--enable-remote-edit"]
requirements.txtも作る
writer
作成したDockerfileに基づいてDockerイメージをビルドする
docker build -t writer-demo -f Dockerfile .
Dockerコンテナを作成・実行する
docker run --rm --name writer-demo -t -p 3006:3006 writer-demo
webブラウザで127.0.0.1:3006にアクセスすると動いていることが確認できた

Kafka(KRaft)の環境をdocker-composeで構築していく
とりあえず、
あたりを参考にして、自分なりの環境を構築していくservices:
kafka_broker:
image: confluentinc/cp-kafka:latest
hostname: kafka_broker
container_name: kafka_broker
user: root
ports:
- "9092:9092"
- "9093:9093"
environment:
KAFKA_KRAFT_MODE: "true"
KAFKA_PROCESS_ROLES: controller,broker
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@localhost:9093"
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_LOG_DIRS: /var/lib/kafka/data
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_LOG_RETENTION_HOURS: 168
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
CLUSTER_ID: "NIKU"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- ./data:/var/lib/kafka/data
```bash
docker compose up -d
このままでは色々エラーがでるので、深堀りせねば

以前構築したkafkaの環境が思ったとおりに動作してくれないので、今度は
を参考に構築してみるあれ?
よく見たらconfluent 、Apacheじゃない。????なにか間違えてる可能性大である

もっぺんApache Kafkaについて勉強
DockerでApache Kafkaを実行
docker run -d --name broker apache/kafka:latest
Apache Kafkaを実行しているコンテナでシェル実行
docker exec --workdir /opt/kafka/bin/ -it broker sh
test-topicという名前のトピックを作成する
./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test-topic
test-topicにイベントを送信
./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic
test-topicからイベントを受信
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning
Overriding the default broker configurationを読む
docker run -d \
--name broker \
-e KAFKA_NODE_ID=1 \
-e KAFKA_PROCESS_ROLES=broker, controller \
-e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
-e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 \
-e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 \
-e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 \
-e KAFKA_NUM_PARTITIONS=3 \
apache/kafka:latest
docker rm -f broker
Docker Composeを使う
mkdir ~/apachekafka
cd ~/apachekafka
vim docker-compose.yaml
services:
broker:
image: apache/kafka:latest
container_name: broker
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_NUM_PARTITIONS: 3
docker compose up -d
docker compose down
Multiple nodes
vim docker-compose.yaml
services:
controller-1:
image: apache/kafka:latest
container_name: controller-1
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: controller
KAFKA_LISTENERS: CONTROLLER://:9093
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
controller-2:
image: apache/kafka:latest
container_name: controller-2
environment:
KAFKA_NODE_ID: 2
KAFKA_PROCESS_ROLES: controller
KAFKA_LISTENERS: CONTROLLER://:9093
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
controller-3:
image: apache/kafka:latest
container_name: controller-3
environment:
KAFKA_NODE_ID: 3
KAFKA_PROCESS_ROLES: controller
KAFKA_LISTENERS: CONTROLLER://:9093
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
broker-1:
image: apache/kafka:latest
container_name: broker-1
ports:
- 29092:9092
environment:
KAFKA_NODE_ID: 4
KAFKA_PROCESS_ROLES: broker
KAFKA_LISTENERS: 'PLAINTEXT://:19092,PLAINTEXT_HOST://:9092'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker-1:19092,PLAINTEXT_HOST://localhost:29092'
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
depends_on:
- controller-1
- controller-2
- controller-3
broker-2:
image: apache/kafka:latest
container_name: broker-2
ports:
- 39092:9092
environment:
KAFKA_NODE_ID: 5
KAFKA_PROCESS_ROLES: broker
KAFKA_LISTENERS: 'PLAINTEXT://:19092,PLAINTEXT_HOST://:9092'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker-2:19092,PLAINTEXT_HOST://localhost:39092'
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
depends_on:
- controller-1
- controller-2
- controller-3
broker-3:
image: apache/kafka:latest
container_name: broker-3
ports:
- 49092:9092
environment:
KAFKA_NODE_ID: 6
KAFKA_PROCESS_ROLES: broker
KAFKA_LISTENERS: 'PLAINTEXT://:19092,PLAINTEXT_HOST://:9092'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker-3:19092,PLAINTEXT_HOST://localhost:49092'
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
depends_on:
- controller-1
- controller-2
- controller-3
外部から接続できない
他の情報源を探す

なぜ外部から接続できないのかわかった
下のサイトを読めばわかった
KAFKA_ADVERTISED_LISTENERSは
broker使いたい人、ここにアクセスしてねっていうことなので
docker のホスト(コンテナじゃない)のIPアドレスかホスト名を入力しなくちゃだめ

pythonからKafka Client使いたいなら
jupyter labをインストールして、そこでkafkaを使っていく

とりあえず外部から通信可能にしたKafka構成
DockerコンテナとしてKafkaを起動、ホストマシンのネットワークを192.168.0.0/24とした場合
services:
controller-1:
image: apache/kafka:latest
container_name: controller-1
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: controller
KAFKA_LISTENERS: CONTROLLER://:9093
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
broker-1:
image: apache/kafka:latest
container_name: broker-1
ports:
- 9093:9093
- 19092:19092
- 9092:9092
environment:
KAFKA_NODE_ID: 4
KAFKA_PROCESS_ROLES: broker
KAFKA_LISTENERS: 'PLAINTEXT://:19092,PLAINTEXT_HOST://:9092'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://192.168.0.10:19092,PLAINTEXT_HOST://localhost:9092'
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
depends_on:
- controller-1

Kafkaを理解する
を参考に勉強する
まずはKafkaクライアントの目線で理解する
- クライアント起動
- 指定されたブローカー(ここではAと呼ぶ)にアクセス(このブローカーはイベントやりとりのためのでない、どのブローカーとイベントをやり取りすればよいか聞くためのブローカー)
- どのブローカーAでイベントを読み書きするためのブローカー(ここではBと呼ぶ)を取得
- 実際にブローカーBとイベントをやり取りする
基本的には、Kafkaクライアントは以上の流れでイベントのやり取りを開始するようだ
Kafka Brokerにおけるlistener(リスナー)とは何か理解する
KafkaのBrokerにおけるlistenerとは
- ホスト/IP
- Kafkaの通信に使うポート番号
- プロトコル
の組み合わせで指定されるBrokerが持つインターフェースのこと
listenerを表す記述方法として
<リスナー名>://<BrokerのNICが持つIP>:<ポート番号>
という形になる。<リスナー名>と<ポート番号> は一対一対応する必要がある(ポート番号の重複は許さない)
KAFKA_ADVERTISED_LISTENERS(advertised.listeners)とは
クライアント起動時にアクセスするブローカーAからもらうブローカーBのリスナー情報
要は実際にイベントのやり取りをするリスナーを記述する
node.id (KAFKA_NODE_ID)
process.roles(KAFKA_PROCESS_ROLES)が空でなければ、必要らしい(推測:どのNode IDがどのロールを果たしているか関連付けるために使ってる?)。KRaft modeで動かすときに必要な設定。
process.roles (KAFKA_PROCESS_ROLES)
これの値は'broker', 'controller' , 'broker, controller'の3つの値を取る。(これが空出ない場合KRaft modeで動く=暗にZooKeeperを使わないことを意味する)
listeners(KAFKA_LISTENERS)
上で説明済み
advertised.listeners(KAFKA_ADVERTISED_LISTENERES)
上で説明済み