🙆

Snowpipe Streaming X Kafkaでデータ取り込みやってみた

2023/10/28に公開

23年7月のアップデートでSnowpipe Streaming + Kafka Connectorがサポートされたので遅くなりましたが、どんな感じなのか試してみました。

Snowpipe Streaming + Kafka Connectorのポイント

公式ドキュメントより抜粋
KafkaからのデータのロードチェーンでSnowpipeを Snowpipe Streaming に置き換えることができます。
指定されたフラッシュバッファーのしきい値(時間、メモリ、またはメッセージ数)に達すると、仮のステージングされたファイルにデータを書き込むSnowpipeとは異なり、コネクタはSnowpipe Streaming API (「API」)を呼び出して、データの行をSnowflakeテーブルから書き込みます。
このアーキテクチャにより、ロード遅延が短縮され、同様の量のデータをロードするためのコストが削減されます。

Snowpipe Streamingで使用するには、Kafkaコネクタのバージョン1.9.1(またはそれ以上)が必要です。
Snowpipe Streamingで使用するKafkaコネクタには、Snowflake Ingest SDK が含まれており、Apache Kafkaトピックからターゲットテーブルに直接行をストリーミングできます。

なるほど〜直接テーブルの書き込むスタイルなのね!
では、早速検証していきたいと思います!

注意事項

kafkaコネクタのバージョン

  • Snowpipe StreamingをサポートするKafkaコネクタの最小必要バージョンは1.9.1です。
  • Snowflakeは、Kafkaコネクタのバージョン2.0.0以降の使用をお勧めします。

Kafka構成プロパティの必須項目

  • snowflake.ingestion.method
    • デフォルトはSnowpipe
    • snowpipe_streaming
  • snowflake.role.name
    • テーブルに行を挿入するときに使用するアクセス制御ロールを指定

コンバーター

Snowpipe Streamingを使用するKafkaコネクタは、次の key.converter または value.converter 値をサポートしていません。

  • com.snowflake.kafka.connector.records.SnowflakeJsonConverter
  • com.snowflake.kafka.connector.records.SnowflakeAvroConverter
  • com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry

環境準備

kafkaとsnowflakeの環境構築は以下記事を参考に進めました。
https://zenn.dev/kyami/articles/546d0701f137c7

Snowpipe + kafkaの動作のおさらい

比較のためにSnowpipe + kafkaの動作を確認してみました。
公式ドキュメントの図の通り、StageとPipeが作成されています。

kafkaコネクタ2.1.0をダウンロードする

root@ec1ba8625c65:/usr/local/kafka/libs# wget https://repo1.maven.org/maven2/com/snowflake/snowflake-kafka-connector/2.1.0/snowflake-kafka-connector-2.1.0.jar

Kafka構成プロパティを変更

Kafka構成プロパティを以下のように修正しました。

name=kafka_test
connector.class=com.snowflake.kafka.connector.SnowflakeSinkConnector
tasks.max=4
topics=streaming-schematization
snowflake.topic2table.map=streaming-schematization:streaming_schematization_tbl
buffer.count.records=10000
buffer.flush.time=5
buffer.size.bytes=20000000
snowflake.url.name=*****.ap-northeast-1.aws.snowflakecomputing.com
snowflake.user.name=tony
snowflake.private.key='MIIEowIBAAKCAQEAlG9FHwsdJVzL03/VO8JVB8AFtVYzsdgJdUJoC5DNfuUPJij8\
9MiBz17hk5bZ5rbrxtVavw+acph7l9Mqr0DhKZFcWBv78jWwvPL+nT/x20ph84QM\
XGsud8ZcUa7n2QZdCntuVeipkvPDE2Ia0+L7ouahWi/QrrwqRTb6h9ncDT315ZII\
UVHFCP5pHgfLSadjUBT3ygwT0/ysVwKAvfbSzHi4YXx21f6FdFOAgdSmTMddjyod\
/QscpFLpRJWaKUD3U12wYjQUJzQlcT/n7bg0D7MZEkMQApYxPB6Iz/epf3AZHj6X\
eAYEt2rklVd8jcvaeyu2ZbIj+BTKMc+iCj8dUQIDAQABAoIBABWeUxnfW4JajpEi\
OCM4/XtgezRa9rJ7PX1ni5HLqDJIGDm9lIHuRgG21vHQ8bGJiLNv1YSoOB1imfWV\
〜略〜
GqrIZLn36P7+BKs5jEwf9fz7j/8vD+P3vbxudUUVhTxN/l56fI3EidBk+YNE+yqv\
aAwA2QKBgHwB2CdDWm1T4X82PwwhVgomKJXchAtR3O+GhKXkcageV0MrIdoCut19\
1IdkVnn+iiZtbQY+1csKSI5/sLO/O+YaHu8KWe0PQUU0iv+Xg5YWzv+woODPrF1o\
SHk4jTxG8IIgMLnpBSOuKhlyA0EODXZW1AQ+OFEZ8gdEppm1yHop'
snowflake.database.name=KAFKA_DEMO
snowflake.schema.name=SNOWPIPE_KAFKA
snowflake.role.name=KAFKA_ROLE
snowflake.ingestion.method=snowpipe_streaming ★ここを追加
jmx=true
errors.tolerance=all

Snowpipe Streaming + Kafkaの動作

おお!確かにPipeとStageが作成されずに直接テーブルに書き込まれていますね!

スキーマ検出およびスキーマ進化

Snowpipe Streamingを使用したKafkaコネクタは、スキーマ検出および進化をサポートしています。
Snowflakeのテーブルの構造を自動的に定義して進化させ、Kafkaコネクタによりロードされた新しいSnowpipe Streamingデータの構造をサポートすることができます。
Snowpipe Streamingを使用したKafkaコネクタのスキーマ検出と進化を有効にするには、以下のKafkaプロパティを構成する必要があります。

  • snowflake.ingestion.method
  • snowflake.enable.schematization
  • schema.registry.url

詳細については公式ドキュメントをご参照ください。

公式ドキュメントを参考にスキーマ検出/進化の設定を試してみましたが、下図の通りRECORD_METADATAしか取り込まれなくなり、行き詰まってしまいました。
Snowflake側の設定不備なのか、kafka側の設定不備なのか、はたまたJSONデータの不備なのかまだ分からないです。
もう少し調査をしたいと思います。

Discussion