🛤️

データ分析基盤のヘルスダッシュボードで稼働状況を可視化する

に公開

はじめに

AWSで障害が発生した際にAWS Health Dashboardを確認するように、データ分析基盤もプラットフォームとして稼働状況を分かりやすくお知らせする方法が必要だと感じました。

例えば、データ分析基盤でエラーが発生し対応中に、ビジネスサイドの各部門から「前日のデータが反映されていない」という問い合わせが多数寄せられるケースがあり、個別に返信していくのはかなり手間です。このような状況では、関係者全員が稼働状況を把握できる環境が有効です。

データ分析基盤のワークフローは、TROCCOを利用しているため、TROCCOの転送設定で転送元 - TROCCO(ワークフロー定義)を利用して構築します。

ダッシュボードイメージ

データフローイメージ

データ分析基盤の日次バッチ結果をBigQueryに転送

転送元 - TROCCOの設定

https://documents.trocco.io/docs/data-source-trocco
転送元 - TROCCOの設定 のワークフロー定義で取得できる項目

転送設定の新規作成

転送元 TROCCOの設定

下記の設定で、実行日(実行日の00:00:00〜実行日時)のワークフローの実行状況が更新される設定になります。あとは、どのくらいの頻度で更新するか設定してください。
毎時実行で実施すれば一旦問題ないかなと思っています。


ワークフローの実行状況データの整形

転送してきたデータを加工するクエリです。

  1. スケジュール設定されたメインのワークフローのみを可視化するため
where executor_type != 'workflow'
  1. 1日に1回実行した成功結果を採用する
        qualify
            row_number() over (
                partition by created_date, pipeline_definition_id
                order by
                    -- 1. succeeded を優先
                    case when default_status = 'succeeded' then 1 else 0 end desc,
                    -- 2. 最新の created_at を優先
                    created_at desc
            )
            = 1
  1. BIダッシュボードで視認しやすいように絵文字を採用
            case
                when status in ('error', 'canceled', 'interrupting', 'retry_waiting')
                then '🚨 障害の可能性あり'
                else '✅ 正常稼働中'
            end as status_summary,
  1. 全体
with
    /* Import ====================
    =========================== */
    audit_log as (
        select
            created_at,
            finished_at,
            pipeline_definition_id,
            pipeline_definition_name,
            executor_type,
            status
        from `audit_log`.`workflow_definition_logs`
        where executor_type != 'workflow'  -- ワークフローのタスクとして実行を除外
    ),

    /* Logic =====================
    =========================== */
    status_list as (
        select
            date(datetime(created_at, "Asia/Tokyo")) as created_date,
            pipeline_definition_id,
            pipeline_definition_name,
            datetime(created_at, "Asia/Tokyo") as created_at,
            datetime(finished_at, "Asia/Tokyo") as finished_at,
            timestamp_diff(finished_at, created_at, minute) as execution_minutes,

            -- executor_type の日本語化
            case
                executor_type
                when 'manual'
                then '🖐 手動実行'
                when 'scheduler'
                then '⏰ スケジュール実行'
                when 'workflow'
                then '🔗 ワークフローのタスクとして実行'
                when 'api'
                then '🌐 APIによる実行'
                when 'retry'
                then '🔄 リトライ実行(停止位置から再実行、またはリトライ設定による再実行)'
                else executor_type
            end as executor_type,

            -- status の日本語化
            case
                status
                when 'queued'
                then '⏳ 実行待ち'
                when 'setting_up'
                then '⚙️ 実行準備中'
                when 'executing'
                then '🚀 実行中'
                when 'interrupting'
                then '✋ 実行中断中'
                when 'succeeded'
                then '✅ 実行完了(成功)'
                when 'error'
                then '❌ 実行完了(エラー)'
                when 'canceled'
                then '🛑 実行完了(キャンセル)'
                when 'skipped'
                then '⏭️ 実行完了(スキップ)'
                when 'retry_waiting'
                then '🔄 リトライ待ち(エラー後、再実行までの待機状態)'
                else status
            end as status,
            status as default_status,
            case
                when status in ('error', 'canceled', 'interrupting', 'retry_waiting')
                then '🚨 障害の可能性あり'
                else '✅ 正常稼働中'
            end as status_summary,
            case
                when status in ('error', 'canceled', 'interrupting', 'retry_waiting')
                then 'データ分析基盤上での前日までのデータ集計が未完了です。復旧対応中です。しばらくお待ちください。'
                else 'データ分析基盤上での前日までのデータ集計は正常に完了しています。'
            end as status_description
        from audit_log
    ),

    final as (
        select
            created_date,
            created_at,
            finished_at,
            execution_minutes,
            pipeline_definition_id,
            pipeline_definition_name,
            executor_type,
            status_summary,
            status,
            status_description
        from status_list
        qualify
            row_number() over (
                partition by created_date, pipeline_definition_id
                order by
                    -- 1. succeeded を優先
                    case when default_status = 'succeeded' then 1 else 0 end desc,
                    -- 2. 最新の created_at を優先
                    created_at desc
            )
            = 1
    )

select *
from final
  1. 実行結果


6. 集計データを元にいい感じにダッシュボード作成

おわりに

発展版としては、エラーやダウンが発生した際に、どのBIダッシュボードなどの出力先へ影響が及ぶかを可視化できると、保守・運営の効率化に大いに役立つと感じます。
また、鉄道の遅延情報のように、どの路線(データフロー)に影響が生じているかをビジュアルで表現できれば、状況把握がさらに容易になる気がします。

加えて、副次的な効果として、データ量に応じた実行時間の変動もモニタリング可能となり、パフォーマンス管理にも活用できるでしょう。

参考

https://documents.trocco.io/docs/data-source-trocco
https://health.aws.amazon.com/health/status

Discussion