📖

re:Invent 2024: NetflixのApache IcebergとMaestroによる遅延データ処理効率化

2024/01/01に公開

はじめに

海外の様々な講演を日本語記事に書き起こすことで、隠れた良質な情報をもっと身近なものに。そんなコンセプトで進める本企画で今回取り上げるプレゼンテーションはこちら!

📖 AWS re:Invent 2024 - Efficient incremental processing with Apache Iceberg at Netflix (NFX303)

この動画では、NetflixのStaff Software EngineerであるJun Heが、遅延データの効率的な処理方法について解説しています。NetflixがApache IcebergとMaestroを組み合わせて開発したIncremental Processing Support (IPS)により、データの正確性、鮮度、コスト効率を大幅に改善した事例を紹介しています。特に、年間1億5000万ドルを費やすコンピューティングとストレージのコストを、IPSの導入により従来の10%未満に削減できた具体例は注目に値します。また、Netflixが開発したワークフローオーケストレーターMaestroと、オープンテーブルフォーマットのApache Icebergを組み合わせることで、最小限のコード変更で遅延データの効率的な処理を実現した技術的な詳細も説明しています。
https://www.youtube.com/watch?v=s1ySnxVg5rk
※ 動画から自動生成した記事になります。誤字脱字や誤った内容が記載される可能性がありますので、正確な情報は動画本編をご覧ください。
※ 画像をクリックすると、動画中の該当シーンに遷移します。

re:Invent 2024関連の書き起こし記事については、こちらのSpreadsheet に情報をまとめています。合わせてご確認ください!

本編

Netflixのデータ処理課題:Jun Heによる講演の導入

Thumbnail 0

re:Inventの月曜日の朝早くからご参加いただき、ありがとうございます。私はVaidyと申しまして、AWSのStrategic Account Managerを務めております。5年以上にわたってAWSに在籍し、Netflixのデータプラットフォームをサポートするチームを率いる栄誉と喜びを得ております。本日最初の講演者であるJun Heをご紹介させていただきます。JunはNetflixのStaff Software Engineerで、本日は計算コストを削減し、実行時間を改善し、手動での介入や障害を減らす方法で、Netflixが遅延データを段階的に処理する方法についてお話しいただきます。

ビジネスの観点から見ると、これらすべてのワークフロー、タスク、トランザクションが特定の順序で実行されることは、Netflixの社内分析の鮮度と正確性を確保する上で極めて重要です。これには、エンゲージメント、新規登録、広告インプレッション、その他の類似指標の測定方法が含まれます。本日は、Junが説明するサービスの開発を必要とした問題点を含め、いくつかの側面について学ぶことを楽しみにしています。ワークフロー・オーケストレーション・プラットフォームであるMaestroや、Netflixのために、そしてNetflixによって発明されたオープンテーブルフォーマットであるApache Icebergについてお聞きすることになります。Apache Icebergは、ビジネスやスキーマが進化する中でデータレイクのトランザクション一貫性を保つのに役立っています。最後に、これらの取り組みの影響と、皆様が自身の組織でこれらのレシピを活用して実装する方法について学ぶことを楽しみにしています。

Thumbnail 170

Thumbnail 180

Thumbnail 190

Thumbnail 200

Thumbnail 210

Thumbnail 220

Thumbnail 230

   ありがとうございます。皆様、おはようございます。Jun Heと申します。本日は、NetflixにおけるApache IcebergとMaestroを活用した効率的な増分処理について、お話しさせていただくことを大変嬉しく思います。朝8時30分ということで、少しウォーミングアップの演習から始めましょう。遅延データについて聞いたことがある、または扱ったことがある方は手を挙げていただけますか?ビッグデータについてはどうでしょうか?データの正確性の問題について聞いたことはありますか?データの鮮度についてはどうですか?コスト効率の問題に取り組んだことはありますか?Apache Icebergについて聞いたことはありますか?Netflixを視聴したことがある、または聞いたことがありますか?Netflix Maestroについてはどうでしょうか?映画の方でも、ワークフロー・オーケストレーターの方でも結構です。素晴らしいですね。ウォーミングアップの演習にご参加いただき、ありがとうございます。皆様が関連する背景をお持ちだということがわかりました。

Netflixのデータインサイトと遅延データ処理の課題

Thumbnail 260

Thumbnail 280

Thumbnail 300

本講演では、IcebergとMaestroを組み合わせて効率的な増分処理のサポートを構築する方法についてご紹介します。私はNetflixのデータプラットフォームでStaff Software Engineerを務めるJun Heです。ビッグデータ・オーケストレーション・チームのテクニカルリードとして、主にワークフロー・オーケストレーションに注力しています。このプロジェクトのリーダーであり、Apache Iceberg、Kaa、Netflix Maestroなど、複数のオープンソースプロジェクトにコントリビュートしています。講演の概要は以下の通りです:まず問題領域について説明し、次にソリューションのアーキテクチャ設計について説明します。その後、ユースケースと例を示し、最後に重要なポイントと今後の課題についてまとめます。では、まずNetflixのデータインサイトの全体像から見ていきましょう。

Thumbnail 310

Netflixの加入者の方は手を挙げていただけますでしょうか?Netflixをご利用いただき、ありがとうございます。netflix.comでの閲覧や番組視聴を楽しんでいただけていることを願っています。パーソナライズされた番組のレコメンデーションは非常に優れていますが、必ずしも皆様が望むものを完全に提供できているわけではないかもしれません。これらのパーソナライズされた番組のレコメンデーションは、複数の機械学習パイプラインとデータパイプラインから得られるデータインサイトによって支えられています。

データドリブンな企業であるNetflixでは、多くの意思決定が完全にデータインサイトによって導かれています。Netflixのデータプラクティショナー(データエンジニア、データサイエンティスト、Machine Learningエンジニア、ソフトウェアエンジニア、さらにはコンテンツプロデューサーなどの非エンジニアも含む)は、必要なインサイトを得るためにデータパイプラインを活用しています。これは、netflix.comにアクセスした際のランディングページの色使いといったシンプルなものから、パーソナライズされた番組レコメンデーション、あるいはコンテンツプロデューサーがデータインサイトを使用して次のシーズンの更新を決定するといったものまで多岐にわたります。また、セキュリティチームは潜在的なセキュリティの問題を特定するためにデータをスキャンすることもあります。

Thumbnail 410

誰もが必要なインサイトを得るためにデータパイプラインを中心に作業を行っており、ストリーミング、ゲーム、広告、そして最近では生配信コンテンツなど、新しい領域やエリアへとビジネスが継続的に拡大する中で、データへの需要は増え続けています。これらの新しいビジネスイニシアチブには、セキュリティ要件、プライバシー要件、レイテンシー要件などの特定の要件があり、データに関する幅広いユースケースをもたらしています。

Thumbnail 440

データを扱う際、データプラクティショナーは通常、データの正確性、鮮度、コスト効率という3つの共通の課題に直面します。この膨大なデータ量において、すべてのビジネス上の意思決定は高品質なデータに基づいて行われる必要があるため、正確性は非常に重要です。データウェアハウスの規模を考えると、正確なデータを確保するのは非常にコストがかかり、時間もかかります。また、プラクティショナーがより迅速なビジネス判断を可能にするためにデータウェアハウスからのデータを素早く処理する必要があるため、データの鮮度も重要です。そしてNetflixはコンピューティングとストレージだけで年間1億5000万ドル以上を費やしているため、ビジネスを運営する上でコスト効率は常に重要な要素となっています。

Thumbnail 520

これらの3つの問題は一般的な課題です。データ量の多寡に関わらず(もちろん、データ量が膨大な場合はより大きな影響がありますが)、これらの3つの問題を解決することはゲームチェンジャーとなり、新しいパターンを可能にし、ETLパイプラインの再考を促すことになります。これらの問題を解決する際、遅延到着データは大きな課題となります。視覚的な例を使って説明させていただきましょう。昨夜、私は携帯電話でNetflixを視聴していましたが、午後10時20分にバッテリーが切れてしまいました。その時点で生成されたイベントは携帯電話にバッファリングされました。その後、携帯電話を充電器に接続して就寝しました。今日、アプリを開いた時に、昨夜バッファリングされたイベントと今日のイベントが処理のためにサーバーに送信されました。

昨夜生成されたイベントは数時間遅れて処理され、異なるパーティションテーブルに書き込まれることになります。重要なのは、ビジネスにとって重要なのは処理時間ではなく、イベント発生時間だということです。ストリーミングパイプラインでは、多くの場合、ステージングテーブルや一時テーブルにデータを素早く追加するために、処理時間を使用してテーブルをパーティション化します。これによりストリーミングパイプラインやリアルタイムストリーミングパイプラインは大幅に簡素化されますが、遅延到着データの処理は後で分析バッチパイプラインが対処することになります。

Apache IcebergとMaestroによる効率的な増分処理

Thumbnail 610

Netflixのデータ実務者がデータインサイトを得られるよう支援するため、私たちは最高のユーザーエクスペリエンスで整備された道筋を提供するBig Data Analytics Platformを構築しました。ユーザーはMaestroを使用してワークフローを開発し、各種コンピュートエンジン向けのビジネスロジックを記述します。そしてMaestroがすべてをオーケストレーションして簡単にアクセスできる形で実行し、最終的にデータはIcebergテーブルに格納されます。

Thumbnail 650

Netflixのユーザーが遅延データを扱う際の一般的な3つのアプローチを観察してきました。1つ目は「Lookback Window」と呼ばれるものです。この場合、ワークフローのオーナーは専門家としてのドメイン知識を活用します。例えば、3日遅れのデータにはビジネス価値がないことを知っているため、3日間のLookback Windowを設定し、データパイプラインで毎日データを再処理します。これによりデータの正確性の問題に対処できますが、データの鮮度とコスト効率性が犠牲になってしまいます。なぜなら、データを繰り返し再処理する必要があるからです。

2つ目のアプローチは、遅延データを含めて処理する方法です。これは特定のケース、特にその時点でビジネス上の判断を下す必要があり、待つことができない場合に有効です。データの品質に関係なく判断を下さなければならない状況です。この場合、データの鮮度は確保できますが、コスト効率性とデータの正確性が失われます。3つ目のアプローチは「Wait then Process(待ってから処理)」と呼ばれ、3日間待ってからデータを処理する方法です。これによりデータの正確性とコスト効率性は確保できますが、データの鮮度に問題が生じます。

Thumbnail 750

Thumbnail 770

この遅延データの問題を解決するための一般的な手法として、新規データや変更されたデータのみを処理する「Incremental Processing」があります。ここでは分析ETLのユースケースに焦点を当てています。これを解決・サポートするためには、変更の捕捉と変更状態の追跡という2つの課題に対処する必要があります。 このトークでは、IcebergとMaestroを組み合わせて効率的なIncremental Processingのサポートを構築し、多くの新しいパターンやユースケースを実現する方法をご紹介します。

Thumbnail 790

まずはIcebergについて簡単にお話ししましょう。Icebergは大規模な分析テーブル向けの高性能フォーマットです。約8年前にNetflixで開発が始まり、現在ではApacheのトップレベルプロジェクトとなり、最も人気のあるオープンテーブルフォーマットの1つとなっています。私たちのプロジェクトでは、ユーザーデータを読み取ることなく変更を捕捉するためにIcebergの豊富な機能を活用しています。

Thumbnail 850

まず、今回のソリューションで使用するIcebergの基本的な概念についてご説明します。Icebergはテーブルをカタログに配置します。このカタログには、Hive Catalog、Glue Catalog、JDBC Catalog、REST Catalogなどがあり、Netflixでは独自の実装を持っています。 テーブルはメタデータファイルにスナップショットのリストを保存し、各スナップショットにはストレージ内のデータファイルを指すマニフェストファイルのリストがあります。これらのデータファイルは不変であり、Icebergは基本値を取得し、必要に応じて別の値に変換することで値を生成します。Icebergはこれらの関係性の変化をチェックし、テーブルのパーティショニングは設定を使用するようになり、もはやテーブルの物理的なレイアウトに依存しなくなりました。これは私たちが活用する非常に優れた特性です。

Thumbnail 900

Netflixでのデータ生成では、 Iceberg内に数百万のテーブルが作成され、さらにテーブル内のデータの書き込み、読み取り、変換を行うために数十万のワークフローが作成されています。

これらのワークフローを管理するために、私たちはMaestroを開発しました。これは水平方向にスケーラブルなワークフローオーケストレーターです。Maestroはワークフローのライフサイクル全体を管理し、ユーザーにサーバーレスな体験を提供し、サブワークフローや条件分岐などの多くのパターンやコンポーネントを標準で提供します。MaestroはIcebergを含む他のシステムとの統合を念頭に設計されています。

Thumbnail 950

Maestroプロジェクトは約4年前に開始しました。Airflowのような一般的なソリューションを使用する代わりに、Netflixが直面するスケーラビリティ、ユーザビリティ、アクセシビリティなどの特有の課題に対応するため、独自のシステムを開発しました。2021年に最初の社内リリース、2022年にベータリリース、2022年後半に社内GAを実施しました。ユーザーに最小限の変更しか求めない完全管理型の移行を通じて、すべてのユーザーワークフローを旧システムからMaestroに移行するのに1年を費やしました。今年7月には、Maestroのソースコードを一般公開しました。

Maestroを活用したワークフロー管理とIPSの実装

Thumbnail 1010

Thumbnail 1030

実際の例を見て、NetflixのユーザーがMaestroを使用してどのようにワークフローを構築しているかを理解しましょう。ワークフローの定義は設定ベースで、YAMLを使用してワークフローを定義します。ユーザーはマークダウンをサポートする説明を含めることができ、そこにオンコール手順やその他の情報を記載できます。何か問題が発生してメールアラートやPagerDutyアラートを送信する際、オンコールチーム向けにこれらの説明を含めることができます。

Thumbnail 1050

Thumbnail 1060

トリガーの設定には、Cronトリガーや、テーブルの準備が完了した時にワークフローを実行するようなイベント駆動型トリガーを定義することができます。Maestroワークフローはパラメータ化されており、他のパラメータを参照して評価を実行できるパラメータを定義することができます。ユーザーは、実行したいアクションをSQLクエリで記述したり、Sparkジョブの実行などのジョブを指定したりすることができます。メモリ割り当てやクラスターの選択について心配する必要はありません。プラットフォームが自動的に処理してくれますが、必要に応じて設定オプションも用意されています。まずはテストクエリを実行して正常に動作することを確認し、確認後にスケジュールを設定して、UIを通じて実行状況を監視することができます。

Thumbnail 1110

Thumbnail 1140

Maestroは、Netflixの数千人の社内ユーザーをサポートする、誰もが使えるワークフローオーケストレーターです。複数のインターフェースと柔軟な統合機能、動的なワークフローエンジン、拡張可能な実行機能を提供しています。私たちは、データの正確性、鮮度、コスト効率に重点を置いた効率的な実装をサポートする、シンプルで導入しやすいソリューションを提供することを目指しています。

Thumbnail 1150

アーキテクチャ設計について説明しましょう。設計には2つの主要な目標があります。1つ目は、ユーザーデータを読み取ることなく変更を効率的にキャプチャすることです。これは効率性だけでなく、セキュリティとプライバシーの要件にとっても重要です。2つ目は、最小限のコード変更で新規ユーザーに最高のユーザー体験を提供することです。ポイントは、変更のキャプチャをビジネスロジックから切り離し、ユーザーの視点から見て実装エンジンや言語に依存しないようにすることです。ユーザーは、SQLやPythonなど好みの言語でビジネスロジックを記述し、IPSのインクリメンタル処理サポートを有効にすることができ、Maestroが必要なサポートをすべて提供します。

Thumbnail 1210

全体的な設計目標について説明したので、インクリメンタルチェーンのキャプチャをどのように設計しているか見ていきましょう。先ほど述べたように、Apache Icebergはメタデータレイヤーで豊富な情報を提供します。スナップショットではデータファイルへのリンクを取得でき、各データファイルのメタデータにはそのデータファイルに関する情報が表示されます。そのデータファイルの各カラムについて、上限と下限が表示されます。この情報を使用して、データをコピーすることなく、すべての変更データを含むインクリメンタル変更キャプチャテーブルを構築できます。元のテーブルから読み取る代わりに、このテーブル名をユーザーのロジックにパラメータとして渡すことで、すべての変更データを取得することができます。ユーザーは元のテーブルから消費する場合と同様のロジックを書くことができますが、変更データのみを含むこの変更テーブルから消費することになります。データのコピーが不要なため、非常に効率的です。また、ゼロデータコピーテーブルは、元のテーブルと同じスキーマとセキュリティアクセスを持っています。

Thumbnail 1280

Thumbnail 1310

この仕組みを視覚的なグラフで説明します。ここでは、時刻T1におけるIcebergテーブルDB1.table1があります。テーブルの状態はこのようになっています。1つのスナップショットと5つのデータファイルがあり、これらの5つのデータファイルはP0とP1という2つのパーティションに振り分けられています。パーティションは仮想的なもので、不変のデータファイルは異なるパーティションにマッピングされることに注意してください。T2では、ユーザーが変更をコミットしてスナップショットS1を作成し、S1には3つの新しいデータファイルが含まれています。これらの3つの新しいデータファイルは、データ量が少ないため、P0、P1、P2の3つのパーティションに振り分けられています。パーティション情報に基づいて処理を行う場合、P0、P1、P2のパーティションを再処理する必要があり、これはS0スナップショットのデータファイルを再処理することを意味します。これは再処理のようなもので、ターゲットテーブルにすべての書き込みを再度行う必要があります。

Thumbnail 1360

そうではなく、私たちはIPS tablesと呼ばれる新しいテーブルを作成します。これは元のテーブルと同じスキーマとセキュアアクセスを持っています。新しいスナップショットS2を追加する際、元のテーブルからスナップショットS1も読み取ります。そして、例えば3つのリンクなどのデータファイル参照を見つけ、それらのリンクを新しいスナップショットのマニフェストファイルに追加してコミットします。このようにして、この増分変更テーブルの仮想的なパーティションはP0、P1、P2となりますが、変更データのみが存在し、この仮想パーティションには古いS0データファイルは含まれません。プラットフォーム層は、一定期間後にこの一時的な変更テーブルを削除する責任を持ちます。

Thumbnail 1410

この問題を解決する他の方法として、Apache Iceberg + FlinkやSpark Structure Streamingを使用する方法もあります。私たちがこれらのアプローチを採用しなかった主な理由は、コンピュートエンジンと処理や変更データのキャプチャを結合させたくなかったからです。その場合、ユーザーは彼らのライブラリを使用し、ビジネスロジックの中でそれらのAPIを呼び出す必要が出てきてしまいます。ユーザーのビジネスロジックのコード変更量を抑えるため、これは避けたいと考えました。また、SparkやFlinkを使用していないユーザーでも増分処理を利用したい場合があるため、彼らのための解決策も必要でした。ただし、基盤では、これらは全て同様のIcebergの低レベルAPIを使用しています。

Thumbnail 1460

ここでは、その仕組みを示すためにコードスニペットを使用しています。基本的に、まず元のテーブルを読み取り、スナップショットを取得し、タイムスタンプに基づいて特定の条件でフィルタリングします。次に変更テーブルを作成し、元のテーブルからそれらのスナップショット(メタデータのみ)を読み取り、データファイルのリンクを取得して、これらのスナップショットに追加してコミットします。適用可能な場合は、皆さんの業務でも同様のアプローチを検討してみてください。これは仕組みを示すデモンストレーションです。シンプルではありますが本番用のコードではありません。ただし、私たちがどのように実装しているかを明確に示しています。

IPSの実装例と自動修復パターンの紹介

Thumbnail 1510

Thumbnail 1520

このような新しい機能により、多くの新しいパターンが可能になります。 ここでは、Netflixで発見した3つの新しいパターンをご紹介します。1つ目は増分処理で、データの変更を連鎖させてターゲットテーブルに追加していきます。 この連鎖には実際のデータファイルへの参照のみが含まれるため、ユーザーのETLは単にこの連鎖テーブルからすべてを処理し、変換されたデータをターゲットテーブルにマージするだけで済みます。元のテーブルからP0、P1、P2全体を再処理する必要はありません。将来的には、このパターンはIceberg SQLエクステンションで直接サポートされる予定です。

Thumbnail 1550

Thumbnail 1600

2つ目のパターンは、不要な変換を削減するためのフィルターとして変更データを捕捉することです。データパイプラインでは、変更データだけではデータの全体像が把握できない場合や、集計を行う際に変更データだけでは不十分な場合が多々あります。例えば、Netflixで各ユーザーの総視聴時間を集計したい場合、最後のウィンドウからの変更データだけでは不十分です。しかし、変更データはそのウィンドウ内で視聴したユーザーIDを教えてくれます。この情報を使用することで、テーブルからのデータを効率的にフィルタリングし、より小さなデータセットを取得することができます。 変更データを読み取ると、丸と菱形のキーだけが変更されていることがわかります。そうすると、ETLは元のテーブルから菱形と丸のデータだけを読み取り、集計を再処理してターゲットテーブルに挿入するだけで済みます。ここでは単純な合計集計を例として使用していますが、実際のビジネスロジックはもっと複雑になる可能性があります。

Thumbnail 1640

Thumbnail 1660

3番目のパターンは、ロジックの中でキャプチャした範囲パラメータを使用する方法です。このアプローチでは、チェーンテーブルを作成する必要はありません。代わりに、ユーザーが関心を持つ特定のカラムの下限値と上限値を取得するためにデータファイルを読み取ります。この例では、ユーザーはパーティションキーのcolorに関心があります。そして、全てのソーステーブルから、最小の変更がP1で最大がP2であることがわかります。ETLパイプラインでこれらの値を持っていれば、テーブルを結合する際に、P1からP2までの範囲に基づいて全てのテーブルから読み取り、そこで結合や複雑な処理を行うことができます。このパターンを使用することで、範囲を推定する必要がなく、例えば過去3日分を再結合する必要がある場合でも、必要な正確な範囲を知ることができます。

Thumbnail 1700

しかし、まだ課題が残っています。オンボーディングコストは別の大きな懸念事項です。先ほど述べたように、Netflixには何千人ものデータ実務者がいて、何十万ものWorkflowを開発しています。これらのWorkflowは連携して動作し、ユーザーが利用するマルチステージのWorkflowを形成しています。すべてのパイプラインを一度に変更するようユーザーに求めることはできないため、ユーザーが段階的にWorkflowを移行できる、最小限のコード変更で済む解決策が必要です。また、移行したくない場合はそのままでも構いません。このような場合、Incremental ProcessingのWorkflowと通常のWorkflowが共存して動作する必要があります。開発コスト、運用コスト、保守コストなどの実際のコストが、Incremental Processingのメリットを相殺してしまうこともあります。

Thumbnail 1790

では、最小限の変更で魔法のように動作させる方法はあるのでしょうか?答えはイエスです。これらの懸念に対処するため、私たちはテーブルインターフェースに加えて、さらに2つのインターフェースを提供してこれらの問題に対処しています。MaestroはIpCaptureという2つの新しいステップタイプを提供しており、これにはデータプラットフォームによって管理されるIncremental ProcessingやChange Capturingの実装が組み込まれています。パイプラインはデータプラットフォームによって管理されます。

最後のチェックポイントからの変更をキャプチャし、その変更を範囲パラメータまたはテーブルパラメータにパックしてユーザーのWorkflowに渡すことができます。2番目のユーザージョブでそれを利用することができます。最後のインターフェースはIpCommitステップタイプで、IpCaptureステップからの情報に基づいて変更をコミットできます。この設計により、ユーザーは非常に少ない労力で自分のWorkflowをオンボードでき、これは標準的なWorkflowなのでWorkflowを混在させることができます。

Thumbnail 1850

これらのインターフェースは、既存のWorkflowと完全な互換性があります。ユーザーは好みの言語やCompute Engineを使用して作業を続けることができ、2つのステップを追加するだけでIncremental Processingのサポートを有効にできます。Maestroのステップは、先ほどの例で示したように設定ベースなので、ユーザーは簡単にオンボードできます。

Thumbnail 1880

別の例を見てみましょう。ここでは、Auto Remediationと呼ばれる複雑なパターンを使用しています。このAutomationパターンは、Netflixで非常に強力で人気のあるものです。オンコールチームが抱える大きな問題を解決します。通常、ETLパイプラインを実行し、その後、データ品質が良好かどうか、何か問題がないかを確認するための監査ジョブを実行します。問題が見つかると、オンコール担当者が呼び出され、真夜中に起きて確認を行い、問題を修正するスクリプトを実行し、ジョブを再起動して、また寝る、という流れになります。これにより多くの電話呼び出しが発生しますが、彼らの経験では、通常、修正スクリプトを実行して最初にリトライするだけでジョブが成功することが分かっていました。

Thumbnail 1930

Thumbnail 1940

Thumbnail 1950

Thumbnail 1960

そこで、ユーザーのETLとジョブを実行するこのSubworkflowジョブを作成できるパターンを開発しました。 その後、パイプラインを失敗させる代わりに、 Workflowジョブが失敗したかどうかを確認するCheck Statusジョブを実行します。失敗した場合は、Remediationジョブを実行し、このETLパイプラインを 再度実行します。これが失敗した場合に初めて、ユーザーに通知が行きます。

Thumbnail 1970

Thumbnail 1990

Maestroのワークフロー定義では、このように記述します。失敗を無視するというフラグを付けてこのWorkflowを定義します。DAGセクションでは、先ほど説明した条件分岐を定義し、ジョブのステータスに基づいて異なるパスにロジックを振り分けます。 Incremental Processing Supportを有効にするには、前にIpCaptureジョブを1つ追加し、最後にIpCommitを追加するだけです。ETL側でもインクリメンタルにデータを処理するための小さなコード変更が必要ですが、ユーザーから見た変更は最小限で済みます。

Thumbnail 2020

Thumbnail 2040

Thumbnail 2050

Maestroのワークフロー定義では、このように記述します。このWorkflowには、先ほどのスライドで示したワークフロー定義であるWorkflow IDがあり、次にソーステーブルであるmembership_table_01、そしてWorkflowへの新しい呼び出しがあります。 IPCaptureステップを使用してインクリメンタルな変更データのキャプチャを有効にし、ユーザージョブの後にチェックポイントをコミットします。 ステップIDを知っているだけで、ユーザーが状態をコミットするために必要なすべてを取得できます。

IPSの実際の適用例とその効果、今後の展望

Thumbnail 2060

Thumbnail 2090

重要なポイントとして、IPSは効率的にインクリメンタルな変更をキャプチャし、遅延データを処理できます。これらのクリーンなインターフェースにより、このソリューションは言語やエンジンに依存せず、既存のすべてのWorkflowとも互換性があります。また、導入コストが低いことも示しました。次に、実際の例を使用して その仕組みを説明します。これは2段階のパイプラインで、このソーステーブルはPlaybackテーブルと呼ばれ、大量のデータを持っています。

Playback tableは、Stream pipelineが継続的に更新を行う実際のテーブルです。ビジネスオーナーは、データの重要性から2週間分のデータを確認する必要があると判断しました。毎日過去2週間分のデータを再処理し、対象テーブルに対して繰り返しInsert Overwriteを行っているため、非常にコストがかかり時間もかかっています。データの処理に1時間以上かかるため、1時間ごとの実行さえできない状況です。イベント日付でパーティション化されたこの日次テーブルに書き込みを行うと、下流の分析パイプライン(数百に及ぶ可能性があります)のソースとなります。これらの集計パイプラインでも、データの正確性を維持するために過去14日分のデータを再処理する必要があります。

Thumbnail 2170

Thumbnail 2180

Thumbnail 2200

では、最初のステージをどのように書き直すか見ていきましょう。 この場合、元のテーブルからデータを取得する代わりに、変更データのみを含むICDCテーブルからデータを取得し、それをターゲットテーブルにマージします。IPSを使用することで非常に効率的になり、実行の頻度を1時間ごとに短縮することができます。 また、このIPSパターンを使用することで、非常に重要なLookback windowを排除することができます。トラフィックパターンやビジネスロジックが変更された場合、このLookback windowを14日から7日や30日に調整する必要がありますが、その度に下流の全ての利用者にコードの変更を依頼しなければなりません。しかし、Incremental Processing Supportを使用すれば、そのような心配は不要になります。

Thumbnail 2240

このSQLクエリは、ユーザーが行う必要のあるコード変更を示しています。Insert Overwriteの代わりに、Mergingを使用してターゲットテーブルにデータを書き込みます。小さなデータファイルのコンパクト化や重複排除を定期的に行うETLラインが別にある場合は、Insert Intoを使用することもできます。ステージングテーブルにすでに重複データが存在する可能性があるため、ユーザーはすでにそのようなワークフローを持っているかもしれません。彼らはPlayback ICDCテーブルから選択し、重複排除ロジックを含むMergingツールを使用するだけです。この比較から、元の列からターゲット列への複雑な変換というビジネスロジックは全く変更されないため、導入コストが非常に低いことがわかります。

Thumbnail 2300

Thumbnail 2340

次に、IPSを使用して元のパイプラインの第2ステージを更新してみましょう。ここでも変更を捕捉するためにICDCテーブルを作成する必要があります。この集計の変更だけでは不十分なため、元のソーステーブルと結合して必要な変更をすべて取得し、再集計してターゲットテーブルにマージする必要があります。14日間のLookbackが不要になるため、これも非常に効率的です。 このワークフローも日次から時間単位に更新することができます。

Thumbnail 2350

Thumbnail 2360

Thumbnail 2370

Thumbnail 2390

これは私たちが行った変更を示しています。 Overwriteの代わりに、Mergingツールを使用し、集計キーに基づいて元のICDCテーブルと結合します。 これはビジネスロジックの要件に応じて、ユーザーIDやMovie IDなどになります。そして重複キーを削除するための重複排除ロジックを実行します。 このように、2段階のパイプラインをIPS対応の新しいマルチステージパイプラインにアップグレードしたことがわかります。

Thumbnail 2430

IPSを有効にすることで、大幅なコスト削減を実現できます。この例では、14日間の期間を見ると、新しいパイプラインのコストは元のパイプラインの10%未満となっており、大きな削減効果が得られていることがわかります。

Thumbnail 2460

次に、メンバーシップチームの例をご紹介します。彼らは重要なメンバーシップ情報の抽出と集計を行う必要がありました。これは複数のテーブルの結合を伴うため、アナリティクスの分野でよく見られるパターンです。先ほど説明したように、まずデータをRawテーブルに配置し、その後、遅延データが到着する可能性があります。例えば、テーブル1は3時間目にデータがあり、テーブル2は6時間目、最後のテーブルは5時間目と6時間目にデータがあります。最初のパイプラインではPythonを使用してターゲットテーブルにデータをインクリメンタルに追加します。2番目のパイプラインは、IPSを必要としない通常のETLプロセスとして機能し、これらが協調してテーブル1、2、3にデータを格納します。IPSをサポートする最後のパイプラインは、すべてのテーブルからの変更に対して、3時間目から6時間目(最小値と最大値の範囲)を読み取る必要があると判断します。

Thumbnail 2520

彼らはすべてのデータを読み取り、それらのテーブルデータを結合してからターゲットテーブルに書き込みます。このアプローチは、特にユーザーが遅延データの範囲を把握できない場合に価値があります。これは、サインアップやサインオフなどのメンバーシップイベントで特に重要です。この情報がなければ、すべてを確実に捕捉するために30日分の履歴データを読み取る必要があるかもしれません。インクリメンタルな変更のサポートにより、必要な変更だけを捕捉できます。この場合の削減効果は、元のパイプラインがどれだけ過去のデータを読み取る必要があったかによって異なりますが、それでも意味のある改善が見られます。さらに、ユーザーはルックバックウィンドウを管理する必要がなくなり、単にIPS統合アプローチを使用するだけでよいため、メンテナンスの負担も軽減されます。

Thumbnail 2590

Thumbnail 2600

これらの例で、ユーザーがMaestroを使用してIPSを扱うことがいかにシンプルでパワフルであるかをご理解いただけたと思います。簡単に振り返ってみましょう。IPSは新しいパターンを可能にし、以前はストリーミングパイプラインに置き換え可能と考えられていたバッチETLの再考を促します。しかし、ストリーミングパイプラインには独自の課題があり、特にアナリティクスの分野ではバッチパイプラインは依然として非常に人気があります。IPSサポートにより、データの正確性、鮮度、コスト効率を実現し、これまでバッチ処理では不可能だった多くのギャップを埋め、新しい機能を実現できます。

Icebergメタデータを使用することで、ユーザーデータを読み取ることなく効率的に変更を捕捉できます。これにより、データをコピーすることなく、セキュリティとプライバシーのコンプライアンスを維持しながら、高い効率性を実現できます。データの捕捉とビジネスロジックを分離することで、ユーザーは最小限の変更でIPSを実装できます。これらのクリーンなインターフェースは、例で示したように、優れたユーザーエクスペリエンスを提供します。私たちが発見したいくつかのIPSパターンをご紹介しましたが、皆さんの業務に適用できる他のパターンもあるかもしれません。

Thumbnail 2690

今後の展開として、IcebergのSQL拡張機能にView対応を追加することを計画しています。テーブルを構築する代わりに、2つのSnapshotやタイムスタンプ間の変更データを取得するためのViewを作成するだけで済むようになります。この方法では、一時的な変更テーブルを維持管理する必要がなくなるため、プラットフォームのメンテナンスコストを削減しながら、ゼロデータコピーを維持することができます。また、OverrideやDeleteなど、他のタイプのSnapshotもサポートする予定です。このアプローチをCookbookでコミュニティと共有するべく取り組んでおり、IPSを使用したマルチステージの自動カスケードデータバックフィルのサポートも開発中です。

Thumbnail 2750

Thumbnail 2790

最後に、この素晴らしいIncremental Processing Supportを作り上げたNetflixのチームとリーダーの皆様の素晴らしいチームワークに感謝の意を表したいと思います。また、このイベントを主催してくださったAWSにも感謝申し上げます。今朝は最後までご清聴いただき、誠にありがとうございました。これで私のプレゼンテーションを終わらせていただきます。ありがとうございました。


※ こちらの記事は Amazon Bedrock を利用することで全て自動で作成しています。
※ 生成AI記事によるインターネット汚染の懸念を踏まえ、本記事ではセッション動画を情報量をほぼ変化させずに文字と画像に変換することで、できるだけオリジナルコンテンツそのものの価値を維持しつつ、多言語でのAccessibilityやGooglabilityを高められればと考えています。

Discussion