Snowpipe Streaming X Kafkaでデータ取り込みやってみた
23年7月のアップデートでSnowpipe Streaming + Kafka Connectorがサポートされたので遅くなりましたが、どんな感じなのか試してみました。
Snowpipe Streaming + Kafka Connectorのポイント
公式ドキュメントより抜粋
KafkaからのデータのロードチェーンでSnowpipeを Snowpipe Streaming に置き換えることができます。
指定されたフラッシュバッファーのしきい値(時間、メモリ、またはメッセージ数)に達すると、仮のステージングされたファイルにデータを書き込むSnowpipeとは異なり、コネクタはSnowpipe Streaming API (「API」)を呼び出して、データの行をSnowflakeテーブルから書き込みます。
このアーキテクチャにより、ロード遅延が短縮され、同様の量のデータをロードするためのコストが削減されます。
Snowpipe Streamingで使用するには、Kafkaコネクタのバージョン1.9.1(またはそれ以上)が必要です。
Snowpipe Streamingで使用するKafkaコネクタには、Snowflake Ingest SDK が含まれており、Apache Kafkaトピックからターゲットテーブルに直接行をストリーミングできます。
なるほど〜直接テーブルの書き込むスタイルなのね!
では、早速検証していきたいと思います!
注意事項
kafkaコネクタのバージョン
- Snowpipe StreamingをサポートするKafkaコネクタの最小必要バージョンは1.9.1です。
- Snowflakeは、Kafkaコネクタのバージョン2.0.0以降の使用をお勧めします。
Kafka構成プロパティの必須項目
- snowflake.ingestion.method
- デフォルトはSnowpipe
- snowpipe_streaming
- snowflake.role.name
- テーブルに行を挿入するときに使用するアクセス制御ロールを指定
コンバーター
Snowpipe Streamingを使用するKafkaコネクタは、次の key.converter または value.converter 値をサポートしていません。
- com.snowflake.kafka.connector.records.SnowflakeJsonConverter
- com.snowflake.kafka.connector.records.SnowflakeAvroConverter
- com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry
環境準備
kafkaとsnowflakeの環境構築は以下記事を参考に進めました。
Snowpipe + kafkaの動作のおさらい
比較のためにSnowpipe + kafkaの動作を確認してみました。
公式ドキュメントの図の通り、StageとPipeが作成されています。
kafkaコネクタ2.1.0をダウンロードする
root@ec1ba8625c65:/usr/local/kafka/libs# wget https://repo1.maven.org/maven2/com/snowflake/snowflake-kafka-connector/2.1.0/snowflake-kafka-connector-2.1.0.jar
Kafka構成プロパティを変更
Kafka構成プロパティを以下のように修正しました。
name=kafka_test
connector.class=com.snowflake.kafka.connector.SnowflakeSinkConnector
tasks.max=4
topics=streaming-schematization
snowflake.topic2table.map=streaming-schematization:streaming_schematization_tbl
buffer.count.records=10000
buffer.flush.time=5
buffer.size.bytes=20000000
snowflake.url.name=*****.ap-northeast-1.aws.snowflakecomputing.com
snowflake.user.name=tony
snowflake.private.key='MIIEowIBAAKCAQEAlG9FHwsdJVzL03/VO8JVB8AFtVYzsdgJdUJoC5DNfuUPJij8\
9MiBz17hk5bZ5rbrxtVavw+acph7l9Mqr0DhKZFcWBv78jWwvPL+nT/x20ph84QM\
XGsud8ZcUa7n2QZdCntuVeipkvPDE2Ia0+L7ouahWi/QrrwqRTb6h9ncDT315ZII\
UVHFCP5pHgfLSadjUBT3ygwT0/ysVwKAvfbSzHi4YXx21f6FdFOAgdSmTMddjyod\
/QscpFLpRJWaKUD3U12wYjQUJzQlcT/n7bg0D7MZEkMQApYxPB6Iz/epf3AZHj6X\
eAYEt2rklVd8jcvaeyu2ZbIj+BTKMc+iCj8dUQIDAQABAoIBABWeUxnfW4JajpEi\
OCM4/XtgezRa9rJ7PX1ni5HLqDJIGDm9lIHuRgG21vHQ8bGJiLNv1YSoOB1imfWV\
〜略〜
GqrIZLn36P7+BKs5jEwf9fz7j/8vD+P3vbxudUUVhTxN/l56fI3EidBk+YNE+yqv\
aAwA2QKBgHwB2CdDWm1T4X82PwwhVgomKJXchAtR3O+GhKXkcageV0MrIdoCut19\
1IdkVnn+iiZtbQY+1csKSI5/sLO/O+YaHu8KWe0PQUU0iv+Xg5YWzv+woODPrF1o\
SHk4jTxG8IIgMLnpBSOuKhlyA0EODXZW1AQ+OFEZ8gdEppm1yHop'
snowflake.database.name=KAFKA_DEMO
snowflake.schema.name=SNOWPIPE_KAFKA
snowflake.role.name=KAFKA_ROLE
snowflake.ingestion.method=snowpipe_streaming ★ここを追加
jmx=true
errors.tolerance=all
Snowpipe Streaming + Kafkaの動作
おお!確かにPipeとStageが作成されずに直接テーブルに書き込まれていますね!
スキーマ検出およびスキーマ進化
Snowpipe Streamingを使用したKafkaコネクタは、スキーマ検出および進化をサポートしています。
Snowflakeのテーブルの構造を自動的に定義して進化させ、Kafkaコネクタによりロードされた新しいSnowpipe Streamingデータの構造をサポートすることができます。
Snowpipe Streamingを使用したKafkaコネクタのスキーマ検出と進化を有効にするには、以下のKafkaプロパティを構成する必要があります。
- snowflake.ingestion.method
- snowflake.enable.schematization
- schema.registry.url
詳細については公式ドキュメントをご参照ください。
公式ドキュメントを参考にスキーマ検出/進化の設定を試してみましたが、下図の通りRECORD_METADATAしか取り込まれなくなり、行き詰まってしまいました。
Snowflake側の設定不備なのか、kafka側の設定不備なのか、はたまたJSONデータの不備なのかまだ分からないです。
もう少し調査をしたいと思います。
Discussion