Apache Avroとはなんなのか?
今回は、Apache Avro(以下、Avro)について調べてみました。今回も以下のツールを使って対象プロジェクトを決めました!
※ 本企画に関する記事の目的は、それぞれのプロジェクトを本格的に深ぼるのではなく、プロジェクト名⇆どんな内容かをパッと思い出せるようにすることを目指します!
※ とはいえ深ぼってみたいプロジェクトがあればどんどん複数連載になると思います。
Avroとは?
公式サイトによると、
Apache Avro™ is the leading serialization format for record data, and first choice for streaming data pipelines. It offers excellent schema evolution, and has implementations for the JVM (Java, Kotlin, Scala, …), Python, C/C++/C#, PHP, Ruby, Rust, JavaScript, and even Perl.
ということで、データのシリアル化の形式ということです。ストリーミングデータパイプラインの選択肢として、優れたスキーマ、JVMやPythonなど言語ごとの実装も用意されているとのことです。私自身、Google Cloudの資格であるProfessional Data Engineerの学習をしているときにAvro形式がデータ形式として優れているということをみていたので、今回本格的に調べてみるいい機会になりました。
Avroが提供するもの
Avroは以下の機能を提供します。
- リッチなデータ構造
- コンパクトかつ高速なバイナリーフォーマット
- 永続的なデータを格納するためのコンテナファイル
- RPCの対応
- 動的言語とのシンプルな統合
- データファイルの読み書きやRPCプロトコルの使用・実装にはコード生成は必要ない
- コード生成はオプションの最適化であり、静的型付け言語にのみ実装する価値がある
Avroのスキーマについて
- Avroはスキーマ依存のフォーマットということです。Avroデータの読み取り時は、書き込み時に使用されたスキーマが常に存在します。これにより、各データは値ごとのオーバーヘッドなしで書き込むことができ、シリアル化は高速かつ小型化されるようです。データとそのスキーマは完全に自己記述的であるため、動的なスクリプト言語での使用も容易になります。
- Avroデータがファイルに保存されると、そのスキーマも一緒に保存されるため、ファイルは後で任意のプログラムで処理できます。データを読み取るプログラムが異なるスキーマを期待する場合でも、両方のスキーマが存在するため簡単に解決できます。
- AvroをRPCで使用する場合、クライアントとサーバーは接続ハンドシェイクでスキーマを交換します。クライアントとサーバーはどちらも相手の完全なスキーマを持っているため、同じ名前のフィールド、欠落したフィールド、余分なフィールドなどの対応関係を簡単に解決できます。
- AvroのスキーマはJSONで定義されます。これにより、JSONライブラリが既に存在する言語での実装が容易になります。
使ってみた
今回はPython SDKを用いてAvroを使ってみました。以下を参考に進めます。なお、こちらで実装されているコードではPython2系の記述(print hoge)が実装されているので、こちらの記事ではPython3系の記述に変更しています。
インストール
uv
を使って環境を構築しました。
uv init avro_test -p 3.12
cd avro_test
uv add avro
スキーマの定義
スキーマをJSON形式で定義します。ファイル名はuser.avsc
というファイル名で定義します。
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
書き込みコードの実装
それではuser.avsc
に定義したスキーマを元にAvro形式でデータを保存するコードを実装してみます。
import avro.schema
from avro.datafile import DataFileWriter
from avro.io import DatumWriter
def main():
schema = avro.schema.parse(open("user.avsc", "rb").read())
writer = DataFileWriter(open("users.avro", "wb"), DatumWriter(), schema)
writer.append({"name": "Alyssa", "favorite_number": 256, "favorite_color": "red"})
writer.append({"name": "Ben", "favorite_number": 7})
writer.append({"name": "Carl"})
writer.close()
if __name__ == "__main__":
main()
実装の説明になります。まず以下でuser.avsc
からスキーマを読み込みます。
schema = avro.schema.parse(open("user.avsc", "rb").read())
次に以下のようにしてAvro形式でデータをファイルに書き込むためのライターを作成します。出力ファイルはusers.avro
ファイルとし、スキーマはuser.avsc
で定義したものを参照させます。なお、Avro形式はバイナリファイルなので書き込み時はwb
モードでバイナリで書き込みます。
writer = DataFileWriter(open("users.avro", "wb"), DatumWriter(), schema)
ライターが作成されればあとはデータを追記していきます。データを追加するためにはuser.avsc
で定義したスキーマに準ずる形の辞書で指定します。name
フィールドは必ず値を指定する必要がありますがfavorite_number
とfavorite_color
はNULL許容なので指定しなくても問題ありません。
writer.append({"name": "Alyssa", "favorite_number": 256, "favorite_color": "red"})
writer.append({"name": "Ben", "favorite_number": 7})
writer.append({"name": "Carl"})
最後に書き込みが完了したらwriter.close()
を実行してファイルを閉じます。
このコードを実行するとusers.avsc
ファイルが作成されます。
uv run write_avro.py
Avroファイルの読み込み
次に先ほど作成したusers.avsc
ファイルからデータを読み込むコードを実装してみます。
from avro.datafile import DataFileReader
from avro.io import DatumReader
def main():
reader = DataFileReader(open("users.avro", "rb"), DatumReader())
for user in reader:
print(user)
reader.close()
if __name__ == "__main__":
main()
実装の説明に移ります。Avroファイルを読み込む時は以下のようにして読み込みます。注意点としては、読み込み時はuser.avsc
ファイルからスキーマを読み込む必要がありません。Avroファイル自体にスキーマ情報が保持されているので、スキーマは書き込み時に利用されます。
reader = DataFileReader(open("users.avro", "rb"), DatumReader())
リーダーを作成するとあとはreaderをソースとしてfor文でデータを取得できます。
for user in reader:
print(user)
最後に読み取りが終わるとreader.close()
でリーダーを閉じます。
このコードを実行すると先ほど登録したデータが取得できます。
uv run read_avro.py
# 出力結果
{'name': 'Alyssa', 'favorite_number': 256, 'favorite_color': 'red'}
{'name': 'Ben', 'favorite_number': 7, 'favorite_color': None}
{'name': 'Carl', 'favorite_number': None, 'favorite_color': None}
Avroのユースケース
個人的に思い浮かぶのはBigQueryへのインポートです。Avroを利用すると読み取りが高速に実行できるということで、データをBigQueryに移す場合は第一候補になるのではないでしょうか。
また、Airbyte様のブログにおいてユースケースがまとめられていたので参照させてもらいます。こちらの記事ではAvroについてとても詳しくまとめられています。
- ビッグデータの処理: Apache HadoopやApache Flinkなどのフレームワークで使用されるAvroは、分散システムにおける効率的なデータの保存、処理、交換を容易に実現
- データウェアハウス構築と分析: ウェアハウス内のデータの保存と交換をサポートし、効率的なデータの読み込み、クエリ、分析を支援
- リアルタイムストリーム処理: コンパクトなフォーマットとスキーマ進化のサポートにより、Apache Kafkaなどのリアルタイムプラットフォームに最適であり、プロデューサーとコンシューマー間の互換性を保証
- イベントソーシングとCQRS: イベントソーシングアーキテクチャで利用され、イベントをシリアル化して保存し履歴を保存してシステムの進化を実現
- マイクロサービス連携: 異なる言語のマイクロサービス間でのデータ交換を可能にし相互運用性を向上
- 機械学習パイプライン: MLパイプラインのステージ間でデータをシリアル化して転送することで一貫性と互換性を確保
- ログの集約と分析: さまざまなシステムコンポーネントからのログ データを集約および分析するのに最適
まとめ
今回はAvroについて調べてみました。私自身PDEの勉強をする過程で初めてAvroについて知りましたが、スキーマの定義方法や実際にファイルを作成方法についてキャッチアップできました。大規模データを利用される際は第一候補に考えてもらっていいかなと思います!今まではCSVやParquetなどしか使ってなかったですがAvroについても使っていこうと思います。
Discussion