データパイプラインについて w/Airflow
前書き
1年間にわたりデータパイプラインやデータアーキテクチャについて考え、学んできました。この文章では、その振り返りとともに、私の考えをまとめています。多くの資料を参考にしていますが、私なりの解釈を加えていますので、皆さんのご意見や感想をお待ちしています。
全体構成
全体的な構成は3層に分かれており、一時的な保存層を合わせると5層になります。
受領層
上流のシステムからデータを受け取る層です。
この層のデータには、個人情報などの機微な情報が含まれる可能性があるため、ライフサイクルポリシーを用いて定期的にデータを消去することが重要です。
データの受け取り方は、「push型(上流から送られる)」と「pull 型(自ら取得する)」の2種類があります。pull型の場合、データ取得のための処理が必要です。
Datalake 層
受領層で受け取ったデータに対し、個人情報の除去やフォーマットの統一などの処理を行った後のデータを保存する層です。
企業の規定にもよりますが、基本的には(パスに日付を設けながら)長期保存します。データは積み上げ方式で管理します。
取り込み層 (stg)
Datalake 層に保存されたデータを一時的に取り込む層です。
この層では、主キーの重複や区分値などのデータバリデーションを行います。
(後述しますが、全件データの場合、前回と今回のデータの両方を読み込みます。)
データウェアハウス(DWH)層
取り込み層に保存されたデータを、DWH に保存する層です。
取込層のデータに以下のような処理を行います。
- カラム名の標準化
- 値の標準化
- システムカラム付与(連携日(airflowの論理日付)、ファイル連携日、論理削除日付)
- (全件連携の場合) 前回のデータとの比較し、差分のみをDWHに保存する。(物理削除などがある場合は論理削除フラグをつけたレコードを追加する。)
データマート層
個別の目的に応じて、DWHからデータを抽出し、加工や集計を行う層です。
データマート層が更新された際に、変更に迅速に対応できるように依存関係を構築することが重要です。
具体的なAirflow pipeline
今回作ったpipeline を紹介します。
全体像
(見づらくてすみません。)
前提 : if ごとに config が有り、読みこむとdag がいい感じにデータを読みこんでくれる形となっています。
受領層
最終的なゴール : 所定のディレクトリに上流のデータが保存されている。
- start_task
- DAG の開始タスクです。
- branch_pull_or_push_task
- 上流からデータを受け取るか、自ら取得するかを判断するタスクです。
- dummy_push_task
- 特に何もしません。(push なので)
- pull_sensor_task
- 上流からデータを取得するdagを別に用意しております。
- そのdagが正常終了している場合のみ、次のタスクに進みます。
- join_task
- push と pull のどちらかが正常終了している場合のみ、次のタスクに進みます。 ("none_failed_min_one_success")
- initialize_task
- データレイクのパスなどの様々な情報を初期化します。
Datalake 層
最終的なゴール : 受領層から受け取ったデータを所定の処理を行いdatalake に保存する。
- preprocessing_task
- 個人情報の除去を行う (名前 : マスク、hash化 , 電話番号 : 市外局番抜き出し...)
- ファイルフォーマットの統一を行う (tsv,csv,... -> parquet)
- db の要件に合わせて桁や文字数を変更する。 (varchar が 655535 文字までの場合、それ以上の文字数は切り捨てる。)
- ファイルのサイズによって処理するリソースをairflow local -> spark -> emr の様に変更する。またファイルが分割されている場合もあるので並列処理に対応する。
取り込み層 (stg)
- cleaning_up_stg_table_task
- 前回のデータを削除する。
- create_stg_table_task
- stg テーブルを作成する。
- load_datalake_to_stg_task
- datalake から stg にデータを読み込む。 (全件の場合には前日分も読みこむ)
- categoryvalue_validation_task
- カテゴリ値のバリデーションを行う。
- pk_not_null_validation_task
- 主キーのバリデーションを行う。
- unique_key_validation_task
- 主キーのユニーク制約のバリデーションを行う。
データウェアハウス(DWH)層
- insert_stg_to_dwh_task
- stg から dwh にデータを読み込む。 (全件の場合はここで前回のデータとの差分を抽出し insert する。)
- datalake_dwh_row_cout_validation_task
- datalake と dwh のレコード数が一致しているかを確認する。(差分は対象の日付のみだが、全件の場合は、datalake(対象日付)と積み上げのdwh を比較するため、かなり信用性は高い)
データマート層 及び その他
データマート
- trigger_datamart_aggregate_1
- aggregate_1 という datamart の dag を trigger する。 (datalake rerun 時にも対応できるようにするため)
その他
- clean_up_stg_table_task
- 今回のデータを削除する。
- update_catalog_task
- データカタログを更新する。(データカタログには最新の更新日を記載し、テーブルが更新されているのかを利用者が判定できるようにする。)
終わりに
データだけで長年ご飯を食べている方がたくさんいる中、1年ほどの若輩者が恐縮ですが、私なりに考えたデータパイプラインについてまとめてみました。
意見などありましたら、是非教えてください。
Discussion