🍣

Apache kafkaとSnowflakeで遊んでみた

2023/03/26に公開

ようやくApache kafkaを使ってSnowflakeにデータをリアルタイムに入れ込むことができたので手順を残したいと思います。

環境

mac book 上のDocker Ubuntu20.04

  • Dockerのメモリ8GB
  • Dockerのディスク50GB

事前準備

  • Snowflakeのアカウントを持っていること
  • Snowflake上にKAFKA用のデータベースとスキーマを作成ずみであること
  • macにDocker環境を構築済みであること

手順

Docker上にUbuntu20.04を立ち上げる

ポート番号、nameは適当です。

$ docker run -d -it -p 8080:8080 --name kafka0 ubuntu:20.04
$ docker ps
CONTAINER ID   IMAGE          COMMAND       CREATED          STATUS          PORTS                    NAMES
92fec308d008   ubuntu:20.04   "/bin/bash"   16 seconds ago   Up 15 seconds   0.0.0.0:8080->8080/tcp   kafka0
$ docker exec -it 92fec308d008 /bin/bash
root@92fec308d008:/#

sudoのインストールとkafkaユーザの作成

root@92fec308d008:/# apt install sudo -y
root@92fec308d008:/# sudo adduser kafka
root@92fec308d008:/# sudo adduser kafka sudo

ユーザの切り替えとパッケージのアップデート

root@92fec308d008:/# su - kafka
kafka@92fec308d008:~$
kafka@92fec308d008:~$ sudo apt update

OpenJDKのインストール

kafka@92fec308d008:~$ sudo apt install default-jdk
kafka@92fec308d008:~$ java --version
openjdk 11.0.18 2023-01-17
OpenJDK Runtime Environment (build 11.0.18+10-post-Ubuntu-0ubuntu120.04.1)
OpenJDK 64-Bit Server VM (build 11.0.18+10-post-Ubuntu-0ubuntu120.04.1, mixed mode, sharing)

Kafkaのパッケージをダウンロード

kafka@92fec308d008:~$ wget https://archive.apache.org/dist/kafka/2.7.0/kafka_2.13-2.7.0.tgz
kafka@92fec308d008:~$ tar xzf kafka_2.13-2.7.0.tgz
kafka@92fec308d008:~$ ls
kafka_2.13-2.7.0  kafka_2.13-2.7.0.tgz
kafka@92fec308d008:~$ sudo mv kafka_2.13-2.7.0 /usr/local/kafka

Systemdユニットファイルの作成

zookeeper

kafka@92fec308d008:~$ cat /etc/systemd/system/zookeeper.service
[Unit]
Description=Apache Zookeeper server
Documentation=http://zookeeper.apache.org
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
ExecStop=/usr/local/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

kafka

kafka@92fec308d008:~$ cat /etc/systemd/system/kafka.service
[Unit]
Description=Apache Kafka Server
Documentation=http://kafka.apache.org/documentation.html
Requires=zookeeper.service

[Service]
Type=simple
Environment="JAVA_HOME=/usr/lib/jvm/java-1.11.0-openjdk-amd64"
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh

[Install]
WantedBy=multi-user.target

systemctlのインストールとサービスの起動

kafka@92fec308d008:~$ sudo apt install systemctl
kafka@92fec308d008:~$ sudo systemctl daemon-reload
kafka@92fec308d008:~$ sudo systemctl start zookeeper
kafka@92fec308d008:~$ sudo systemctl start kafka

Snowflakeに入れるデータの作成準備

kafka@92fec308d008:/usr/local/kafka$ sudo apt install python3-pip
kafka@92fec308d008:/usr/local/kafka$ pip3 install pandas
kafka@92fec308d008:/usr/local/kafka$ pip3 install kafka-python
kafka@92fec308d008:~$ pip3 install faker

フェイクデータ作成Pythonプログラムの用意

fake_kafka.py
import pandas as pd
import random
from faker import Faker
from random import randrange
from datetime import datetime
from time import sleep
from json import dumps
from kafka import KafkaProducer

nr_of_customers = 100

fake = Faker('de_DE')

customers = dict()

topic_name='sales-data'

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda x: dumps(x).encode('utf-8'))

for customers_id in range(nr_of_customers):

    # Create transaction date
    d1 = datetime.strptime(f'1/1/2021', '%m/%d/%Y')
    d2 = datetime.strptime(f'8/10/2021', '%m/%d/%Y')
    transaction_date = fake.date_between(d1, d2)

    #create customer's name
    name = fake.name()

    # Create gender
    gender = random.choice(["M", "F"])

    # Create email
    email = fake.ascii_email()

    #Create city
    city = fake.city()

    #create product ID in 8-digit barcode
    product_id = fake.ean(length=8)

    #create amount spent
    amount_spent = fake.pyfloat(right_digits=2, positive=True, min_value=1, max_value=100)

    customers={
            'transaction_date':str(transaction_date) ,
            'name':name,'gender':gender,'city':city,
            'email':email,'product_id':product_id,
            'amount_spent':amount_spent
            }
    print(customers)

    producer.send(topic_name, value=dumps(customers))
    sleep(5)

プログラムを実行し、フェイクデータを生成できていることを確認

kafka@92fec308d008:~$ python3 fake_kafka.py
{'transaction_date': '2021-02-13', 'name': 'Louise Ladeck', 'gender': 'F', 'city': 'Nauen', 'email': 'hornichmirco@bauer.de', 'product_id': '92141735', 'amount_spent': 73.5}
{'transaction_date': '2021-06-07', 'name': 'Antonie Wilmsen-Weller', 'gender': 'F', 'city': 'Tecklenburg', 'email': 'yhethur@yahoo.de', 'product_id': '08297778', 'amount_spent': 11.46}
^Z
[2]+  Stopped                 python3 fake_kafka.py

snowflake-kafka-connectorのJARファイルのダウンロード

lib配下に配置します。

kafka@92fec308d008:/usr/local/kafka/libs$ sudo wget https://repo1.maven.org/maven2/com/snowflake/snowflake-kafka-connector/1.5.0/snowflake-kafka-connector-1.5.0.jar

Snowflake用の接続情報の作成

秘密鍵は後で作成するので、それ以外を埋めます。
詳細はSnowflakeの公式ドキュメントを参照ください。

kafka@92fec308d008:/usr/local/kafka/config$ cat SF_connect.properties
connector.class=com.snowflake.kafka.connector.SnowflakeSinkConnector
tasks.max=8
topics=sales-data
snowflake.topic2table.map=sales-data:sales_data
buffer.count.records=10000
buffer.flush.time=60
buffer.size.bytes=5000000
snowflake.url.name=*****.ap-northeast-1.aws.snowflakecomputing.com
snowflake.user.name=******
snowflake.private.key=''
snowflake.database.name=KAFKA_DB
snowflake.schema.name=KAFKA_SCHEMA
key.converter=com.snowflake.kafka.connector.records.SnowflakeJsonConverter
value.converter=com.snowflake.kafka.connector.records.SnowflakeJsonConverter
name=sakes_data_kafka_test

connect-standalone.propertiesの修正

connect-standalone.propertiesの末尾にlibのフォルダパスを記入します。

kafka@92fec308d008:/usr/local/kafka/config$ cat connect-standalone.properties | tail -1
plugin.path=/usr/local/kafka/libs

秘密鍵の生成

kafka@92fec308d008:~$ openssl genrsa -out rsa_key.pem 2048
kafka@92fec308d008:~$ openssl rsa -in rsa_key.pem -pubout -out rsa_key.pub

Snowflake Web UIでユーザに公開鍵を割り当てる

alter user ***** set RSA_PUBLIC_KEY='MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA6oUUbWp7ZmTBJRfhY7CB
WEiin90JRqBwXADbXIQ/LbqeHpeZVT4HNTfOMLKRyXeJyfChVOkNRn2FimLs6r/AlPk+ASl2JBcSGx********1i1gtGjOlTJj/ygtLCWHnqE8Y77KqK74My9jcG********B0csNRv9zz+0UsvYKrvu4LM1bUdDa
CQIDAQAB';

connect-standalone.propertiesの修正

秘密鍵を設定します。

kafka@92fec308d008:~$ cat /usr/local/kafka/config/SF_connect.properties
connector.class=com.snowflake.kafka.connector.SnowflakeSinkConnector
tasks.max=8
topics=sales-data
snowflake.topic2table.map=sales-data:sales_data
buffer.count.records=10000
buffer.flush.time=60
buffer.size.bytes=5000000
snowflake.url.name=******.ap-northeast-1.aws.snowflakecomputing.com
snowflake.user.name=*****
snowflake.private.key='MIIEogIBAAKCAQEA6oUUbWp7ZmTBJRfhY7CBWEiin90JRqBwXADbXIQ/LbqeHpeZ\
VT4HNTfOMLKRyXeJyfChVOkNRn2FimLs6r/AlPk+ASl2JBcSGxMMvITARP2nOObU\
gPfCFNAIT9ygupTjcpj7*********************EBtpdGDLrmLpEkwnXEsWXEc\
YX2Wa9C8Fb4IKI3U/U4X12omQIxLrN6WRnOXi5OKHOrdQNGO5Mn+fULA1iUuHaie\
UtpvmPdN2NEig0Gk1i1gtGjOlTJj/ygtLCWHnqE8Y77KqK74My9jcGWLzrhRqk5O\
rTd8vHRB0csNRv9zz+0UsvYKrvu4LM1bUdDaCQIDAQABAoIBAElkyJXNgzzyPzf6\
                            ~略~
tQ7cuutCNgOD0HXuxJsFjnSYehlelyURRsJF09+JsMt2Xfk3oy87P6p20klE+ltr\
nDmLjG5eikxBrDfwEz8oI5htfJcJ2/qGELLPt8SqbUulGyZB2hysGKiJWSODzaQw\
D7whAoGAOPSlDhvulc8teI7gSW//3Spl2WGjaVGgc38b18bsobyeiwgKbcgvlTdX\
/v02cWaHB6PWP2GBk9JDhV2GU96UxzZ9GxOM9SEKJZNSUd+4MIu6/45ln6MztrAM\
lrrTt+370XEGl/K+Vtp9YfOFZ3AWcdXw0wfMAQdOgFaSiiXzTkQ='
snowflake.database.name=KAFKA_DB
snowflake.schema.name=KAFKA_SCHEMA
key.converter=com.snowflake.kafka.connector.records.SnowflakeJsonConverter
value.converter=com.snowflake.kafka.connector.records.SnowflakeJsonConverter
name=sakes_data_kafka_test

sales-dataというトピックを作成

kafka@92fec308d008:/usr/local/kafka$ ./bin/kafka-topics.sh --create --topic sales-data --bootstrap-server localhost:9092
Created topic sales-data.

別々のウィンドウでfake_kafak.pyとkafkaスタンドアローンを起動します

kafka@92fec308d008:~$ python3 fake_kafka.py
kafka@92fec308d008:/usr/local/kafka$ ./bin/connect-standalone.sh config/connect-standalone.properties config/SF_connect.properties

Snowflakeにデータが取り込まれることを確認する

スタンドアローン実行から数十秒待つと、以下のイメージの通りSnowflakeでデータが取り込まれます。

今回の手順はUdemyや公式ドキュメント、海外ブログを参考にしています。
AWSでお金を掛けずに無料でSnowflakeのストリーミングを試してみたい方はぜひ参考にしてみてください。

Discussion