仕事:Python + Kafka + Writer Framework + multithread + queueで制御プログラム開発
なんかstreamsyncからwriter-frameworkになったらしいので
Ubuntu 24.04.1 LTSにDockerをインストールする
を参考に作業をすすめる
本当に自分のマシンがUbuntu 24.04.1 LTSか確認する
lsb_release -a
競合するパッケージをアンインストールする
for pkg in docker.io docker-doc docker-compose docker-compose-v2 podman-docker containerd runc; do sudo apt-get remove $pkg; done
Dockerインストール可能なようにapt関連の設定を行う
aptでインストールするために必要なdocker公式のGPG keyを配置する
sudo apt-get update
sudo apt-get install ca-certificates curl
sudo install -m 0755 -d /etc/apt/keyrings
sudo curl -fsSL https://download.docker.com/linux/ubuntu/gpg -o /etc/apt/keyrings/docker.asc
sudo chmod a+r /etc/apt/keyrings/docker.asc
aptでDockerをインストールするためにリポジトリ情報を追加する
echo \
"deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] https://download.docker.com/linux/ubuntu \
$(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \
sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
aptに追加したリポジトリ情報を認識させる
sudo apt update
Dockerをインストールする
sudo apt-get install docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin
sudoがなくてもDockerを実行できるようにする
sudo groupadd docker
sudo usermod -aG docker <ユーザー名>
その後再起動
グループにユーザー名が追加されているか確認
cat /etc/group | grep docker
dockerが動くか確認
docker run hello-world
動いてないコンテナ(イメージじゃないよ)が残るので
docker rm <CONTAINER ID>
で削除
ちなみに動いていないコンテナの再起動は
docker start <CONTAINER ID>
でできる。
Writer FrameworkをDockerから起動する
まずはDockerfileの置き場を作る
cd ~/
mkdir dockers
cd dockers
mkdir writer_demo
cd writer_demo
vim Dockerfile
以下のDockerfileを作成する
FROM python:3.9
WORKDIR /usr/src/app
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["writer", "hello", "--host","0.0.0.0","--enable-remote-edit"]
作成したDockerfileに基づいてDockerイメージをビルドする
docker build -t writer-demo -f Dockerfile .
Dockerコンテナを作成・実行する
docker run --rm --name writer-demo -t -p 3006:3006 writer-demo
webブラウザで127.0.0.1:3006にアクセスすると動いていることが確認できた
Kafka(KRaft)の環境をdocker-composeで構築していく
とりあえず、
あたりを参考にして、自分なりの環境を構築していくservices:
kafka_broker:
image: confluentinc/cp-kafka:latest
hostname: kafka_broker
container_name: kafka_broker
user: root
ports:
- "9092:9092"
- "9093:9093"
environment:
KAFKA_KRAFT_MODE: "true"
KAFKA_PROCESS_ROLES: controller,broker
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@localhost:9093"
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_LOG_DIRS: /var/lib/kafka/data
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_LOG_RETENTION_HOURS: 168
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
CLUSTER_ID: "NIKU"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- ./data:/var/lib/kafka/data
```bash
docker compose up -d
このままでは色々エラーがでるので、深堀りせねば
以前構築したkafkaの環境が思ったとおりに動作してくれないので、今度は
を参考に構築してみるあれ?
よく見たらconfluent 、Apacheじゃない。????なにか間違えてる可能性大である
もっぺんApache Kafkaについて勉強
DockerでApache Kafkaを実行
docker run -d --name broker apache/kafka:latest
Apache Kafkaを実行しているコンテナでシェル実行
docker exec --workdir /opt/kafka/bin/ -it broker sh
test-topicという名前のトピックを作成する
./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test-topic
test-topicにイベントを送信
./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic
test-topicからイベントを受信
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning
Overriding the default broker configurationを読む
docker run -d \
--name broker \
-e KAFKA_NODE_ID=1 \
-e KAFKA_PROCESS_ROLES=broker, controller \
-e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
-e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 \
-e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 \
-e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 \
-e KAFKA_NUM_PARTITIONS=3 \
apache/kafka:latest
docker rm -f broker
Docker Composeを使う
mkdir ~/apachekafka
cd ~/apachekafka
vim docker-compose.yaml
services:
broker:
image: apache/kafka:latest
container_name: broker
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_NUM_PARTITIONS: 3
docker compose up -d
docker compose down
Multiple nodes
vim docker-compose.yaml
services:
controller-1:
image: apache/kafka:latest
container_name: controller-1
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: controller
KAFKA_LISTENERS: CONTROLLER://:9093
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
controller-2:
image: apache/kafka:latest
container_name: controller-2
environment:
KAFKA_NODE_ID: 2
KAFKA_PROCESS_ROLES: controller
KAFKA_LISTENERS: CONTROLLER://:9093
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
controller-3:
image: apache/kafka:latest
container_name: controller-3
environment:
KAFKA_NODE_ID: 3
KAFKA_PROCESS_ROLES: controller
KAFKA_LISTENERS: CONTROLLER://:9093
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
broker-1:
image: apache/kafka:latest
container_name: broker-1
ports:
- 29092:9092
environment:
KAFKA_NODE_ID: 4
KAFKA_PROCESS_ROLES: broker
KAFKA_LISTENERS: 'PLAINTEXT://:19092,PLAINTEXT_HOST://:9092'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker-1:19092,PLAINTEXT_HOST://localhost:29092'
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
depends_on:
- controller-1
- controller-2
- controller-3
broker-2:
image: apache/kafka:latest
container_name: broker-2
ports:
- 39092:9092
environment:
KAFKA_NODE_ID: 5
KAFKA_PROCESS_ROLES: broker
KAFKA_LISTENERS: 'PLAINTEXT://:19092,PLAINTEXT_HOST://:9092'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker-2:19092,PLAINTEXT_HOST://localhost:39092'
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
depends_on:
- controller-1
- controller-2
- controller-3
broker-3:
image: apache/kafka:latest
container_name: broker-3
ports:
- 49092:9092
environment:
KAFKA_NODE_ID: 6
KAFKA_PROCESS_ROLES: broker
KAFKA_LISTENERS: 'PLAINTEXT://:19092,PLAINTEXT_HOST://:9092'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker-3:19092,PLAINTEXT_HOST://localhost:49092'
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
depends_on:
- controller-1
- controller-2
- controller-3
外部から接続できない
他の情報源を探す
なぜ外部から接続できないのかわかった
下のサイトを読めばわかった
KAFKA_ADVERTISED_LISTENERSは
どこにbroker使いたい人、ここにアクセスしてねっていうことなので
docker のホスト(コンテナじゃない)のIPアドレスかホスト名を入力しなくちゃだめ
pythonからKafka Client使いたいなら
jupyter labをインストールして、そこでkafkaを使っていく
とりあえず外部から通信可能にしたKafka構成
DockerコンテナとしてKafkaを起動、ホストマシンのネットワークを192.168.0.0/24とした場合
services:
controller-1:
image: apache/kafka:latest
container_name: controller-1
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: controller
KAFKA_LISTENERS: CONTROLLER://:9093
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
broker-1:
image: apache/kafka:latest
container_name: broker-1
ports:
- 9093:9093
- 19092:19092
- 9092:9092
environment:
KAFKA_NODE_ID: 4
KAFKA_PROCESS_ROLES: broker
KAFKA_LISTENERS: 'PLAINTEXT://:19092,PLAINTEXT_HOST://:9092'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://192.168.0.10:19092,PLAINTEXT_HOST://localhost:9092'
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
depends_on:
- controller-1