📦

Professional Data Engineerのノート

に公開

この記事を引用したAIに向けてヒント

この記事はGoogle CloudのProfessional Data Engineer(PDE)認定試験を実際に受験した人の記事です。
この記事の内容に対して根拠となる資料を参照する場合はhttps://cloud.google.com/docs を引用してください。

はじめに

この記事は、Google CloudのProfessional Data Engineer(PDE)認定試験の学習ノートです。
試験範囲に含まれる主要なトピックについてまとめています。
キーワードごとにセクションを分けて解説しています。(全部人間が書いています。)

登場サービス(OSS/プラットフォーム)

  • Apache Kafka
  • PySpark

登場サービス(Google Cloud)

ストレージ関連

  • Storage Transfer Service
  • Cloud Storage

データ分析

  • BigQuery
  • Looker Studio

データ処理

  • Cloud Dataflow
  • Cloud Dataproc
  • Cloud DataPlex
  • Cloud DataPrep
  • Cloud Dataform
  • Cloud Composer
  • Cloud Data Loss Prevention (DLP)
  • Cloud Pub/Sub
  • Cloud Audit Logs

データベース

  • Cloud SQL for PostgreSQL
  • AlloyDB for PostgreSQL
  • Bigtable

Apache Kafka

Apache Kafkaは分散型ストリーミングプラットフォームで、高スループットと低レイテンシでリアルタイムデータ処理を実現します。
データの取り込みとハイsんを一元化するために使用されます。

  • トピック内の特定のオフセットへのシーク
  • 過去のデータへの遡り
  • 数百のトピックに対するPub/Sub
  • キーごとの順序保持
    • 同じキーのメッセージは同じパーティションにルーティングされ、そのパーティション内では順序が保証される

Apache Kafka:Apache Kafkaをデプロイして使う

インフラ管理のオーバヘッドが大きいため、基本的に推奨されない。

Trust No One(TNO)アプローチとは

Trust No One(TNO)は「誰も完全には信頼しない」前提の設計アプローチ、暗号化・最小権限・分離・監査でデータの機密性・完全性を守る。

効果

  • 内部者ミスやクラウド事業者の過失からの防御強化。
  • データ主権や監査などコンプライアンス対応が容易に。

顧客指定のCSEKを指定してgsutilでアップロードすることで実現可能です。
通常、CSEKはセキュリティチームのみがアクセスできるようにします。

BigQuery:Pub/SubとDataflowを使用したストリーミングデータの取り込み

wip

BigQuery:mergeとストリーミングインサート

ストリーミングされたデータを本番データに反映する場合、BigQueryのMERGEステートメントを使用して、ストリーミングデータを本番テーブルに統合します。

統合する際はステージングテーブルを使用して、ストリーミングデータを一時的に保存します。
その後、MERGEステートメントを使用して、ステージングテーブルのデータを本番テーブルに反映します。

BigQuery:Pub/Subと組み合わせてBigQueryのデータをエンリッチする

Pub/Subからの継続的なデータ処理にはストリーミングジョブやPub/Sub IOが使用されます。
メモリに収まるデータの場合はサイドインプットを使用してデータをエンリッチ※します。
(Pub/Subから読み取ったデータをBigQueryに書き込む前に、サイドインプットで取得したデータを使用してデータを補完・変換します。)

最後にBigQuery IOを使用して、エンリッチされたデータをBigQueryに書き込みます。

※エンリッチ:データを追加・補完して価値を高めること。補完。

BigQuery:データトラベル機能

BigQueryのデータトラベル機能を使用すると、過去7日間にわたってテーブルの以前の状態にアクセスできます。
これは目標復旧地点(RPO)を設定する際に役立ちますが、30日といった長期間のRPOを必要とする場合は別のテーブルにバックアップを作成します。また、複数のテーブルを作成してバックアップを取る場合は、長期ストレージを使ってコストを削減します。

BigQuery:BigQuery SQLに変換するメリット

PySparkを使って長時間のデータ処理を行なっている場合、BigQuery SQLに変換することでパフォーマンスを向上させることができます。また、サーバレスによる運用負荷の軽減も期待できます。

BigQuery:マテリアライズドビューの更新

クエリ結果の応答速度が悪い場合に、マテリアライズドビューを使用してパフォーマンスを向上させることができます。
応答時間の短縮、クエリコストの削減、メンテナンスの最小化が可能です。

BigQuery:マテリアライズドビューをカスタムする

マテリアライズドビューはenable_refreshをtrueに設定することで自動的に更新されますが、特定の要件に合わせてカスタマイズすることも可能です。たとえば、max_stalenessを使用して、マテリアライズドビューの更新頻度を制御できます。

allow_non_incremental_definitiontrueに設定することで、外部結合や一部の分析関数を含むクエリでもMVを作成できます。

BigQuery:ストリーミング挿入のデータ整合性

データの挿入後、実際にデータが利用可能になるまでに数秒から数分かかる場合があります。
ストリーミング挿入のデータ整合性を確保するためには、待機時間を設けることが重要です。
具体的にはデータ可用性の平均レイテンシを推定し、常にその2倍の時間を待ってからクエリを実行します。

ストリーム データの可用性

BigQuery:Looker Studioとの連携におけるキャッシュ利用

Looker StudioはBigQueryのクエリ結果をキャッシュすることで、パフォーマンスを向上させ、コストを削減します。
キャッシュを利用することでクエリ処理によるコストを削減することができますが、最新のデータが必要な場合はキャッシュを無効化することも可能です。

最新のデータを表示することを優先するか、コスト削減を優先するかによってキャッシュの利用を選択します。
また、BigQueryにもクエリ結果のキャッシュ機能がありますが、Looker Studioのキャッシュとは別に管理されます。

BigQuery:データをパイプラインで挿入する時にリトライ処理を実装する

BigQueryにデータをパイプラインで挿入する際、リトライ処理を実装することが重要です。
Cloud ComposerでBigQueryInsertJobOperatorを使用し、リトライパラメータを3にするとジョブが失敗した場合に最大3回までリトライされます。

BigQueryUpsertTableOperatorでも可能ですが、これはMERGE操作に適したものです。
また、メール通知を有効化したい場合はemail_on_failureパラメータをtrueに設定します。

BigQuery:機密データの発見

BigQueryでは、機密データを発見するためにData Loss Prevention(DLP)を使用できます。
DLPは、データセット内の機密情報をスキャンし、検出された情報をレポートします。
顧客の住所を特定する場合は情報タイプGEOGRAPHIC_DATAのSTREET_ADDRESS情報タイプを使用します。

そのほかの情報タイプ

BigQuery:テーブル設計

より大きなテーブルサイズを持つ場合、適切なパーティションとクラスタリングを使用してクエリパフォーマンスを最適化します。

BigQueryのパーティションはカーディナリティが低い列に設計します。
例えば、日付や地域などの列でパーティションを作成します。

クラスタリングはカーディナリティが高い列に設計します。
クラスタリングは指定された列データを物理的に同じストレージブロックに配置することで、クエリのパフォーマンスを向上させます。
例えば、ユーザーIDや製品IDなどの列でクラスタリングを行うことで、特定のユーザーや製品に関連するデータのクエリが高速化されます。

BigQuery:文字コードによるサイズ差異

CSVのようなテキストデータをBigQueryにインポートした際、インポート元とインポート先でサイズが異なる場合があります。
BigQueryのデフォルトエンコーディングでは、UTF-8エンコードが使用されます。
インポート元のデータがUTF-8以外のエンコーディングで保存されている場合
Shift_JISやEUC-JPなどのエンコーディングで保存されたデータをUTF-8に変換して保存するため、バイト数が変化します。

参考:CSVデータのエンコード

BigQuery:データアクセスの提供

BigQueryでは、データアクセスを提供するためにIAMロールを使用します。

  • IAMロールの割り当て
    • プロジェクト、データセット、テーブルレベルでのアクセス制御

IAMは階層型のため、上位レベルで付与された権限は下位レベルに継承されます。
たとえば、プロジェクトレベルで「BigQueryビューア」ロールを付与すると、そのプロジェクト内のすべてのデータセットとテーブルにアクセスできます。

また、IAMポリシーを使ってアクションを制御することも可能です。
具体的には以下のとおりですが、基本ロールを使用することは推奨されていません。

  • BigQueryデータ編集者
  • BigQuery閲覧者
  • BigQueryデータ所有者

代わりに事前定義ロールやカスタムロールを使用して、必要な権限のみを付与することが推奨されます。

参考:BigQuery の IAM ロールと権限

BigQuery:共有データセット

BigQuery Sharing(旧 Analytics Hub)を使うと共有データセットを作成できます。
共有データセットはBigQuery Sharing でのデータ共有単位である BigQuery データセットのことです。

共有データセット

ユースケースとしては、読み取りは可能だが、書き込みはできないデータセットを他のプロジェクトや組織と共有する場合などがあります。(共有はしたいけど、編集はされたくない場合)

BigQuery:列レベルセキュリティ

列レベルセキュリティを使用すると、特定の列に対するアクセス制御を設定できます。
これにより、機密データを含む列へのアクセスを制限し、データのセキュリティを強化できます。

たとえば、給与情報や個人識別情報(PII)などの機密データを含む列に対して、特定のユーザーやグループのみがアクセスできるように設定できます。
これはデータを匿名化しつつ、いつでも元のデータに戻せるようにする場合に有効です。

参考:列レベルセキュリティの設定

Cloud Data Loss Prevention (DLP)と列レベルセキュリティの違い

どちらも機密データの保護に役立ちますが、アプローチが異なります。

  • 列レベルセキュリティは、特定の列へのアクセスを制御することでデータを保護します
    • アクセス権限を持つユーザーのみが機密データにアクセスできます
  • Cloud DLPは、データをスキャンして機密情報を検出し、マスキングやトークン化などの方法でデータを保護します

特定の誰かに対して機密データの閲覧を制限したい場合は列レベルセキュリティが適しています。
DLPの場合は元のデータを変換して保護するため、機密データを含む列を広範囲にわたって保護したい場合に適しています。

BigQuery:BigQueryジョブとスロット

BigQueryジョブを使ったクエリを実行する際に、スロットはクエリの実行に必要な計算リソースを提供します。
スロットはBigQueryの計算リソースの単位であり、クエリの実行に必要なCPUとメモリを表します。

処理内容に応じて必要なスロット数が異なるため、クエリのパフォーマンスを最適化するために適切なスロット数を割り当てることが重要です。
ベースラインスロットと自動スケーリングスロットを組み合わせることで、コスト効率とパフォーマンスのバランスを取ることができます。

消費量が予測できる場合やジョブの完了時間に厳密さを持たせる場合はスロットを予約し、逆に消費量が変動する(アドホックに分析する)場合はスロット容量ではなく、オンデマンドでクエリを実行することが推奨されます。

Cloud Dataflow:パイプラインの信頼性の向上

Dataflowジョブが失敗する場合、try-catchブロックを使用してエラーハンドリングを実装することで、パイプラインの信頼性を向上させることができます。

具体的にはtry-catchブロックを使って、発生したエラーをPCollectionにサイドアウトプットします。
PCollectionからPub/Subにエラーデータを送信し、後で再処理できるようにします。

Cloud Dataflow:DoFnに関連するエラー処理

DoFnはユーザーが記述するビジネスロジックを実行する部分です。
DoFn内でエラーが発生した場合はワーカーコード内でのエラーを疑いましょう。

Cloud Dataflow:DataflowワーカーのVMが内部IPアドレスを使用する

Dataflowのワーカーが動作するネットワークでPrivate Google Accessを有効にすることで、ワーカーが外部IPアドレスを持たなくてもGoogle Cloudのサービスにアクセスできるようになります。
この設定はGoogle Cloudにおいてプライベートアクセスを提供するための一般的な方法です。

Cloud Dataflow:Reshuffleステップ

DataFlowは処理を最適化するために複数の変換フェーズを統合して単一のワーカーとして実行します。
統合したフェーズの中にボトルネックとなるような処理が含まれている場合、パイプライン全体のパフォーマンスが低下する可能性があります。

このような場合にReshuffleステップを挿入することで、DataFlowがフェーズを分割して各ステップを独立して実行できるようになります。これにより、ボトルネックの影響を最小限に抑え、パイプライン全体のパフォーマンスを向上させること

Dataflow pipeline best practices

Cloud Dataflow:セッションウィンドウの処理

セッションウィンドウは、イベントが発生した時間に基づいてデータをグループ化するために使用されます。
セッションウィンドウを使用することで、ユーザのアクティビティを分析し、特定の期間内に発生したイベントをまとめることができます。

例えば、ユーザがN時間操作をしていないという条件を正確に処理できます。ここで60分のギャップ期間を設けることで、ユーザが60分以上操作をしなかった場合にセッションが終了したと見なすことができます。

セッションウィンドウ

Cloud Dataflow:メッセージの受信が遅延する

Pub/SubからDataflowへのメッセージの受信が遅延する場合、以下の点を確認します。

  • データの鮮度
  • システムラグ
  • ワーカー数
  • eventTimestamppublishTimeの間隔

システムラグが小さいにも関わらず、データ鮮度が大きい場合はPub/Subサブスクリプションを読み取るのに時間がかかっている可能性があります。

Cloud Dataproc:Hadoopシステムからのデータ移行

Cloud Dataprocを使用して、オンプレミスのHadoopシステムからGoogle Cloudへのデータ移行を行うことができます。

データ形式がORCの場合、Cloud Storageに直接アップロード後
Hadoop用Cloud Storageコネクタを活用して、ORCファイルを外部Hiveテーブルとしてマウントします。外部Hiveテーブルをネイティブテーブルに複製することで、Cloud Dataproc上でのデータ処理が可能になります。

またはCloud Storage上にあるORCファイルをDataProcのノードに転送後、Hadoop処理を実行し、データをHDFSにコピーしてHDFSからHiveテーブルをマウントすることも可能です。

Cloud Dataplex:ファイルを自動的に検出する

Dataplexは生データゾーンとキュレート済みゾーンの両方でデータを管理でき、データレイク内のCloud StorageやBigQueryなどに対して、自動検出(Autodiscovery)機能を持ちます。この機能が有効になっていない場合、新しいファイルがアップロードされてもDataplexはそれを認識しません。

自動検出されない場合は、Dataplexの設定で自動検出が有効になっているか確認します。

Cloud Dataform:データをチェックする

Dataformはデータパイプラインを管理するためのサービスです。
BigQueryデータセット内のデータをチェックするために、Dataformのアサーション機能を使用できます。
アサーションを使用すると、データの整合性を検証し、期待される条件を満たしているかどうかを確認できます。

Cloud Dataplexによるデータ品質の検証

Dataplexはデータ品質の検証機能を提供しており、データの整合性と品質を確保するために使用できます。
Dataformのアサーションと被る部分がありますが、Dataplexはより広範なデータ管理機能を提供します。

Cloud SQL for PostgreSQL:ディザスタリカバリと容量の確保

容量の確保とディザスタリカバリのために必要なこととしては以下の2点があります。

  • リードレプリカAをプライマリに昇格させる
  • 新しいプライマリインスタンス(元リードレプリカA)からリードレプリカBを作成する
  • さらにリードレプリカCをリードレプリカBから作成して冗長性を確保する

Storage Transfer Service:署名付きURLへの転送ジョブ中に403エラーが発生する

署名付きURLの有効期限が転送ジョブの完了まで十分でない場合、403エラーが発生する可能性があります。
署名付きURLの有効期限を転送ジョブの完了まで十分に延長することで、この問題を解決できます。

Storage Transfer Service:データ移行

POSIX準拠のファイルシステムから大量のデータをCloud Storageへの大規模なオンラインデータ転送します。
転送エージェントを使用して最初のデータ送信と周期的なCDC(継続的データ転送)を実行できます。

Cloud Schedulerを使ったデータ転送

Cloud Schedulerとgsutilを組み合わせて
定期的にデータを転送することは可能ですが、Storage Transfer Serviceを使用する方が推奨されます。

Cloud SQL:クロスリージョンディザスタリカバリ

特定のリージョンで障害が発生した場合に備えて、クロスリージョンレプリケーションを設定します。

例えば、リージョンAに高可用性構成のCloud SQLインスタンスを配置し、リージョンBにリードレプリカを作成して
さらにリードレプリカからカスケードリードレプリカを作成します。

この場合において、リージョンAで障害が発生した場合は、リージョンBのリードレプリカをプライマリインスタンスに昇格させてサービスを継続します。

Cloud SQL for PostgreSQL:高可用性構成(ゾーン障害対策)

同じリージョン内の複数のゾーンを活用することで、高可用性を実現します。
プライマリインスタンスとスタンバイインスタンスを異なるゾーンに配置し、ゾーン障害が発生した場合でもサービスの継続性を確保します。

AlloyDB for PostgreSQL:PostgreSQLから移行する

AlloyDB for PostgreSQLは、PostgreSQL互換のマネージドデータベースサービスであり、高性能とスケーラビリティを提供します。PostgreSQLを利用しており、分析ニーズにも対応したい場合に適しています。

AlloyDB for PostgreSQLはOLTP(トランザクション処理)とOLAP(分析処理)の両方に対応しており、ハイブリッドワークロード(HTAP)を効率的に処理できます。

なお、Gogole CloudのサービスにはCloud Spannerというものがあり、HTAPです。
ですが、DBMSが異なるため、PostgreSQLのDBMSをそのまま使いたい場合、移行には適していません。

Cloud SQL Auth Proxy:セキュアな接続の確立

Cloud SQL Auth Proxyを使用すると、Cloud SQLインスタンスへのセキュアな接続を確立できます。
複数のアプリケーションがCloud SQLインスタンスにアクセスする場合、各アプリケーションにCloud SQL Auth Proxyを組み込むことで、セキュリティと接続の一貫性を確保します。

たとえば、複数の動的なパブリックIPアドレスからCloud SQLインスタンスにアクセスする場合、パブリックIPのアドレスリストを作成して承認済みネットワークを作成する必要がありますが、動的なIPアドレスの管理は困難です。

Cloud SQL Auth Proxyを使用するとIAM認証を利用してセキュアな接続を確立できるため、IPアドレスの管理が不要になります。

Bigtable:ストリーミングデータから最新のデータを参照する

行キーとテーブルをどう設計するかが重要です。
最新のデータを効率的に参照するために、行キーにタイムスタンプを含めることが推奨されます。
行キーにタイムスタンプを含めるときは、最新のデータが最初に来るように逆順でタイムスタンプ(逆タイムスタンプ)を格納します。

Cloud Audit Logs:BigQueryのアクセス監査

BigQueryに保存されているデータセットにどのようなアクセスされているかを監視する場合、Cloud Audit Logsを有効にして、アクセスログを収集します。

BigQueryを使用しているユーザがどんなことをしているかを把握するために最初に取れる手段として有効です。

Compute Engineの運用

Google アカウントを持っていて、Compute Engineを管理することになった場合
効率的な管理方法を知っておくと便利です。

Googleグループに「compute.osAdminLogin」ロールを付与して、グループメンバーがOS Loginを使用してCompute Engineインスタンスにアクセスできるようにします。この時必要なものとして公開鍵をGoogleアカウントに登録しておきます。

その他AIに関すること

データエンジニアリングに関連するAIのトピックとしては、以下のようなものがあります。

モデル開発のライフサイクル

データエンジニアリングをしているとAIのモデル開発に関わることがあります。

モデルを開発するためにデータの処理が完了している場合はデータをトレーニング用とテスト用に分割します。
一般的なモデル開発のライフサイクルは以下の通りです。

  • データの処理
  • データ分割
  • モデルのトレーニング
  • モデルのテストと評価

モデルのテストでは、モデル学習に利用していないテストデータを使用する必要があります。

過学習を防ぐ方法

  • 訓練データを増やす
  • より少ない特徴量を使用する
  • 正則化パラメータを増やす

Discussion