🍣
Apache kafkaとSnowflakeで遊んでみた
ようやくApache kafkaを使ってSnowflakeにデータをリアルタイムに入れ込むことができたので手順を残したいと思います。
環境
mac book 上のDocker Ubuntu20.04
- Dockerのメモリ8GB
- Dockerのディスク50GB
事前準備
- Snowflakeのアカウントを持っていること
- Snowflake上にKAFKA用のデータベースとスキーマを作成ずみであること
- macにDocker環境を構築済みであること
手順
Docker上にUbuntu20.04を立ち上げる
ポート番号、nameは適当です。
$ docker run -d -it -p 8080:8080 --name kafka0 ubuntu:20.04
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
92fec308d008 ubuntu:20.04 "/bin/bash" 16 seconds ago Up 15 seconds 0.0.0.0:8080->8080/tcp kafka0
$ docker exec -it 92fec308d008 /bin/bash
root@92fec308d008:/#
sudoのインストールとkafkaユーザの作成
root@92fec308d008:/# apt install sudo -y
root@92fec308d008:/# sudo adduser kafka
root@92fec308d008:/# sudo adduser kafka sudo
ユーザの切り替えとパッケージのアップデート
root@92fec308d008:/# su - kafka
kafka@92fec308d008:~$
kafka@92fec308d008:~$ sudo apt update
OpenJDKのインストール
kafka@92fec308d008:~$ sudo apt install default-jdk
kafka@92fec308d008:~$ java --version
openjdk 11.0.18 2023-01-17
OpenJDK Runtime Environment (build 11.0.18+10-post-Ubuntu-0ubuntu120.04.1)
OpenJDK 64-Bit Server VM (build 11.0.18+10-post-Ubuntu-0ubuntu120.04.1, mixed mode, sharing)
Kafkaのパッケージをダウンロード
kafka@92fec308d008:~$ wget https://archive.apache.org/dist/kafka/2.7.0/kafka_2.13-2.7.0.tgz
kafka@92fec308d008:~$ tar xzf kafka_2.13-2.7.0.tgz
kafka@92fec308d008:~$ ls
kafka_2.13-2.7.0 kafka_2.13-2.7.0.tgz
kafka@92fec308d008:~$ sudo mv kafka_2.13-2.7.0 /usr/local/kafka
Systemdユニットファイルの作成
zookeeper
kafka@92fec308d008:~$ cat /etc/systemd/system/zookeeper.service
[Unit]
Description=Apache Zookeeper server
Documentation=http://zookeeper.apache.org
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
ExecStop=/usr/local/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
kafka
kafka@92fec308d008:~$ cat /etc/systemd/system/kafka.service
[Unit]
Description=Apache Kafka Server
Documentation=http://kafka.apache.org/documentation.html
Requires=zookeeper.service
[Service]
Type=simple
Environment="JAVA_HOME=/usr/lib/jvm/java-1.11.0-openjdk-amd64"
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh
[Install]
WantedBy=multi-user.target
systemctlのインストールとサービスの起動
kafka@92fec308d008:~$ sudo apt install systemctl
kafka@92fec308d008:~$ sudo systemctl daemon-reload
kafka@92fec308d008:~$ sudo systemctl start zookeeper
kafka@92fec308d008:~$ sudo systemctl start kafka
Snowflakeに入れるデータの作成準備
kafka@92fec308d008:/usr/local/kafka$ sudo apt install python3-pip
kafka@92fec308d008:/usr/local/kafka$ pip3 install pandas
kafka@92fec308d008:/usr/local/kafka$ pip3 install kafka-python
kafka@92fec308d008:~$ pip3 install faker
フェイクデータ作成Pythonプログラムの用意
fake_kafka.py
import pandas as pd
import random
from faker import Faker
from random import randrange
from datetime import datetime
from time import sleep
from json import dumps
from kafka import KafkaProducer
nr_of_customers = 100
fake = Faker('de_DE')
customers = dict()
topic_name='sales-data'
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda x: dumps(x).encode('utf-8'))
for customers_id in range(nr_of_customers):
# Create transaction date
d1 = datetime.strptime(f'1/1/2021', '%m/%d/%Y')
d2 = datetime.strptime(f'8/10/2021', '%m/%d/%Y')
transaction_date = fake.date_between(d1, d2)
#create customer's name
name = fake.name()
# Create gender
gender = random.choice(["M", "F"])
# Create email
email = fake.ascii_email()
#Create city
city = fake.city()
#create product ID in 8-digit barcode
product_id = fake.ean(length=8)
#create amount spent
amount_spent = fake.pyfloat(right_digits=2, positive=True, min_value=1, max_value=100)
customers={
'transaction_date':str(transaction_date) ,
'name':name,'gender':gender,'city':city,
'email':email,'product_id':product_id,
'amount_spent':amount_spent
}
print(customers)
producer.send(topic_name, value=dumps(customers))
sleep(5)
プログラムを実行し、フェイクデータを生成できていることを確認
kafka@92fec308d008:~$ python3 fake_kafka.py
{'transaction_date': '2021-02-13', 'name': 'Louise Ladeck', 'gender': 'F', 'city': 'Nauen', 'email': 'hornichmirco@bauer.de', 'product_id': '92141735', 'amount_spent': 73.5}
{'transaction_date': '2021-06-07', 'name': 'Antonie Wilmsen-Weller', 'gender': 'F', 'city': 'Tecklenburg', 'email': 'yhethur@yahoo.de', 'product_id': '08297778', 'amount_spent': 11.46}
^Z
[2]+ Stopped python3 fake_kafka.py
snowflake-kafka-connectorのJARファイルのダウンロード
lib配下に配置します。
kafka@92fec308d008:/usr/local/kafka/libs$ sudo wget https://repo1.maven.org/maven2/com/snowflake/snowflake-kafka-connector/1.5.0/snowflake-kafka-connector-1.5.0.jar
Snowflake用の接続情報の作成
秘密鍵は後で作成するので、それ以外を埋めます。
詳細はSnowflakeの公式ドキュメントを参照ください。
kafka@92fec308d008:/usr/local/kafka/config$ cat SF_connect.properties
connector.class=com.snowflake.kafka.connector.SnowflakeSinkConnector
tasks.max=8
topics=sales-data
snowflake.topic2table.map=sales-data:sales_data
buffer.count.records=10000
buffer.flush.time=60
buffer.size.bytes=5000000
snowflake.url.name=*****.ap-northeast-1.aws.snowflakecomputing.com
snowflake.user.name=******
snowflake.private.key=''
snowflake.database.name=KAFKA_DB
snowflake.schema.name=KAFKA_SCHEMA
key.converter=com.snowflake.kafka.connector.records.SnowflakeJsonConverter
value.converter=com.snowflake.kafka.connector.records.SnowflakeJsonConverter
name=sakes_data_kafka_test
connect-standalone.propertiesの修正
connect-standalone.propertiesの末尾にlibのフォルダパスを記入します。
kafka@92fec308d008:/usr/local/kafka/config$ cat connect-standalone.properties | tail -1
plugin.path=/usr/local/kafka/libs
秘密鍵の生成
kafka@92fec308d008:~$ openssl genrsa -out rsa_key.pem 2048
kafka@92fec308d008:~$ openssl rsa -in rsa_key.pem -pubout -out rsa_key.pub
Snowflake Web UIでユーザに公開鍵を割り当てる
alter user ***** set RSA_PUBLIC_KEY='MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA6oUUbWp7ZmTBJRfhY7CB
WEiin90JRqBwXADbXIQ/LbqeHpeZVT4HNTfOMLKRyXeJyfChVOkNRn2FimLs6r/AlPk+ASl2JBcSGx********1i1gtGjOlTJj/ygtLCWHnqE8Y77KqK74My9jcG********B0csNRv9zz+0UsvYKrvu4LM1bUdDa
CQIDAQAB';
connect-standalone.propertiesの修正
秘密鍵を設定します。
kafka@92fec308d008:~$ cat /usr/local/kafka/config/SF_connect.properties
connector.class=com.snowflake.kafka.connector.SnowflakeSinkConnector
tasks.max=8
topics=sales-data
snowflake.topic2table.map=sales-data:sales_data
buffer.count.records=10000
buffer.flush.time=60
buffer.size.bytes=5000000
snowflake.url.name=******.ap-northeast-1.aws.snowflakecomputing.com
snowflake.user.name=*****
snowflake.private.key='MIIEogIBAAKCAQEA6oUUbWp7ZmTBJRfhY7CBWEiin90JRqBwXADbXIQ/LbqeHpeZ\
VT4HNTfOMLKRyXeJyfChVOkNRn2FimLs6r/AlPk+ASl2JBcSGxMMvITARP2nOObU\
gPfCFNAIT9ygupTjcpj7*********************EBtpdGDLrmLpEkwnXEsWXEc\
YX2Wa9C8Fb4IKI3U/U4X12omQIxLrN6WRnOXi5OKHOrdQNGO5Mn+fULA1iUuHaie\
UtpvmPdN2NEig0Gk1i1gtGjOlTJj/ygtLCWHnqE8Y77KqK74My9jcGWLzrhRqk5O\
rTd8vHRB0csNRv9zz+0UsvYKrvu4LM1bUdDaCQIDAQABAoIBAElkyJXNgzzyPzf6\
~略~
tQ7cuutCNgOD0HXuxJsFjnSYehlelyURRsJF09+JsMt2Xfk3oy87P6p20klE+ltr\
nDmLjG5eikxBrDfwEz8oI5htfJcJ2/qGELLPt8SqbUulGyZB2hysGKiJWSODzaQw\
D7whAoGAOPSlDhvulc8teI7gSW//3Spl2WGjaVGgc38b18bsobyeiwgKbcgvlTdX\
/v02cWaHB6PWP2GBk9JDhV2GU96UxzZ9GxOM9SEKJZNSUd+4MIu6/45ln6MztrAM\
lrrTt+370XEGl/K+Vtp9YfOFZ3AWcdXw0wfMAQdOgFaSiiXzTkQ='
snowflake.database.name=KAFKA_DB
snowflake.schema.name=KAFKA_SCHEMA
key.converter=com.snowflake.kafka.connector.records.SnowflakeJsonConverter
value.converter=com.snowflake.kafka.connector.records.SnowflakeJsonConverter
name=sakes_data_kafka_test
sales-dataというトピックを作成
kafka@92fec308d008:/usr/local/kafka$ ./bin/kafka-topics.sh --create --topic sales-data --bootstrap-server localhost:9092
Created topic sales-data.
別々のウィンドウでfake_kafak.pyとkafkaスタンドアローンを起動します
kafka@92fec308d008:~$ python3 fake_kafka.py
kafka@92fec308d008:/usr/local/kafka$ ./bin/connect-standalone.sh config/connect-standalone.properties config/SF_connect.properties
Snowflakeにデータが取り込まれることを確認する
スタンドアローン実行から数十秒待つと、以下のイメージの通りSnowflakeでデータが取り込まれます。
今回の手順はUdemyや公式ドキュメント、海外ブログを参考にしています。
AWSでお金を掛けずに無料でSnowflakeのストリーミングを試してみたい方はぜひ参考にしてみてください。
Discussion