📈

BigQueryによるリアルタイムなアクセスログ解析のアーキテクチャ

2022/12/29に公開約19,200字

はじめに

私事ですが、秋に Google Cloud の Professional Cloud Architect という認定資格を取得しました。

https://www.credential.net/64a4db66-3325-49ef-9fb9-2d6f9e7668e3#gs.gbw19p

業務で5年間ほど Google Cloud Platform (GCP) を利用してきましたが、個人的に BigQuery とは簡単なクエリを実行する程度の付き合いで、真剣に向き合えていませんでした。
資格試験の勉強をきっかけに BigQuery のストレージ費用が意外と安い事や、最近より高機能になっている事に今更ながら気付きました。

例えば、今年の夏 には Cloud Pub/Sub に BigQuery Subscriptions が追加され、Dataflow 等を使わずとも Cloud Pub/Sub と BigQuery がリアルタイムに繋がるようになりました。

この記事では、リアルタイムなアクセスログ解析のためのアーキテクチャを提案し、パフォーマンス、運用、費用の3つの観点から検討します。

対象読者

  • GCP で新規にアクセスログを管理する方法について調べている方
  • 既に稼働しているアクセスログ管理のシステムに課題を感じ、改修を検討している方
  • MaxMind GeoLite2 (または GeoIP2) を BigQuery に取り込んで活用したい方
  • Cloud Pub/Sub や BigQuery といった GCP のマネージドサービスの活用例を知りたい方

TL;DR

  • BigQuery Subscription を使用して、アプリケーションから Cloud Pub/Sub に送られたアクセスログを直接 BigQuery に書き込みます。
  • リクエスト元 IP アドレスに基づく位置情報を利用するために、MaxMind GeoLite2 Database を BigQuery に取り込み、アクセスログに対して結合できるようにします。

低コストで始められる、スケーラブルでリアルタイムにアクセスログの解析ができる環境を構築しました。

実際にこのアーキテクチャでアクセスログ部分を実装したアプリケーションはこちらです。
個人的に開発しているチャットボットになります。

https://github.com/ww24/linebot

アーキテクチャ

全体像

先にアーキテクチャの全体像を紹介します。

この図は Introducing a Google Cloud architecture diagramming tool | Google Cloud Blog で紹介されているツールで描きました。

  • クライアントから呼び出される API Server が居て、リクエストによってアクセスログが生成されます
  • アクセスログは Apache Avro 形式で Cloud Pub/Sub Topic に送信されます
    • Topic にはスキーマが設定されており、メッセージの検証が行われます
    • BigQuery Subscription によってスキーマのマッピングが行われ BigQuery にリアルタイムに書き込まれます

下半分 (オレンジ背景) は、リクエスト元 IP アドレスからおおよその位置情報を特定するための仕組みです。

アクセスログのリアルタイム書き込み

この構成では、アプリケーションで生成されたアクセスログはリアルタイムに BigQuery に書き込まれ、即座にクエリできるようになります。

リソースの作成

BigQuery の データセットとテーブルを作成します。

テーブルはパーティションの有効期間を365日間に設定しているため、1年経つと自動的に削除されます。

terraform (BigQuery)
resource "google_bigquery_dataset" "access_log" {
  dataset_id    = "${local.name}_access_log"
  friendly_name = "${local.name} access log"
  description   = "${local.name} access log dataset"
  location      = "US"
}

resource "google_bigquery_table" "access_log" {
  dataset_id = google_bigquery_dataset.access_log.dataset_id
  table_id   = "access_log"
  clustering = ["timestamp"]
  schema     = file("access_log_schema/v1.json")

  time_partitioning {
    expiration_ms            = 31536000000 # 1 year
    type                     = "DAY"
    field                    = "timestamp"
    require_partition_filter = true
  }
}

次に、Cloud Pub/Sub のリソースを作成します。

terraform (Cloud Pub/Sub)
resource "google_project_service_identity" "pubsub" {
  provider = google-beta
  service  = "pubsub.googleapis.com"
}

resource "google_bigquery_table_iam_member" "pubsub_sa_bigquery" {
  dataset_id = google_bigquery_table.access_log.dataset_id
  table_id   = google_bigquery_table.access_log.table_id
  for_each   = toset(["roles/bigquery.metadataViewer", "roles/bigquery.dataEditor"])
  role       = each.key
  member     = "serviceAccount:${google_project_service_identity.pubsub.email}"
}

resource "google_pubsub_schema" "access_log_schema_v1" {
  name       = "${local.name}-access-log-v1"
  type       = "AVRO"
  definition = file("access_log_schema/v1.avsc")
}

resource "google_pubsub_topic" "access_log_v1" {
  name = "${local.name}-access-log-v1"

  schema_settings {
    schema   = google_pubsub_schema.access_log_schema_v1.id
    encoding = "BINARY"
  }
}

resource "google_pubsub_subscription" "access_log_v1_bq" {
  name                       = "${local.name}-access-log-v1-bq"
  topic                      = google_pubsub_topic.access_log_v1.name
  ack_deadline_seconds       = 10
  message_retention_duration = "604800s"

  bigquery_config {
    table               = "${var.project}:${google_bigquery_table.access_log.dataset_id}.${google_bigquery_table.access_log.table_id}"
    use_topic_schema    = true
    write_metadata      = false
    drop_unknown_fields = true
  }

  depends_on = [google_bigquery_table_iam_member.pubsub_sa_bigquery]
}

パフォーマンス

Cloud Pub/Sub も BigQuery もサーバーレスで、自動的にスケールアウトします。
リクエスト数が増加しアクセスログの流量が増えても耐えうる、スケーラブルな構成です。

しかし、実際には割り当ての上限があるので、大規模なリクエストが想定されるサービスでは API Quota を考慮する必要があります。

Cloud Pub/Sub

https://cloud.google.com/pubsub/quotas

  • リージョン毎の Publisher のスループット (pubsub.googleapis.com/regionalpublisher)
    • 中規模リージョン: 48,000,000 kB/min (800 MB/s)
  • リージョン毎の BigQuery Subscriber のスループット (pubsub.googleapis.com/regionalpushsubscriber)
    • 大規模リージョン: 8,400,000 kB/min (140MB/s)

大規模リージョン: europe-west1, europe-west4, us-central1, us-east1, us-east4, us-west1, us-west2
中規模リージョン: asia-east1, asia-northeast1, asia-southeast1, europe-west2, europe-west3

BigQuery

https://cloud.google.com/bigquery/quotas#write-api-limits

  • Storage Write API のスループット (AppendBytesThroughputPerProject, AppendBytesThroughputPerProjectEU)
    • US or EU (multi-region): 3GB/s

理論上のパフォーマンス上限

デフォルトの Quota を引き上げない場合で試算します。

BigQuery Subscriber のスループットが最も小さいため、ここがボトルネックになります。
アクセスログ1件あたりのサイズが 0.5KB とすると、280,000req/s まではリアルタイムに BigQuery に書き込める計算になります。これを超えた分は遅延して書き込まれる可能性があります。

つまり、Quota の緩和申請をせずに、理論上は毎秒28万リクエストまでスケール可能な構成という事になります。
リアルタイム性を一時的に犠牲にできる(一時的な遅延が許容できる)要件では、リージョン毎の Publisher のスループットを超えない範囲でより大規模なリクエストにも恐らく対応できるでしょう。

リクエスト元 IP アドレスに基づく位置情報

BigQuery Subscription を使用する場合、Dataflow のように途中で処理を挟む事はできないため、例えば IP アドレスに基づく位置情報が必要な場合、BigQuery に取り込んだ後で処理する必要があります。

ここでは、MaxMind 社によって無償提供されている MaxMind GeoLite2 という GeoIP Database を使用します。IP アドレスからおおよその位置情報 (国、都市) を特定することができます。

より精度の高い位置情報を必要とする場合には有償の MaxMind GeoIP2 という上位版が利用できます。

Cloud Scheduler と Cloud Run Job によって定期的に MaxMind から CSV 形式でこの Database をダウンロードし BigQuery に取り込んでおく事で、必要になった際にアクセスログと結合して利用します。

Cloud Run Job による定期的なダウンロード

Cloud Run Job を使用して、定期的に MaxMind のサイトから Zip ファイルをダウンロードして展開、CSV ファイルを GCS にアップロードします。

具体的には、次の shell script を container image us.gcr.io/google.com/cloudsdktool/google-cloud-cli:412.0.0-alpine 上で定期実行します。

script
curl -sL "https://download.maxmind.com/app/geoip_download?edition_id=GeoLite2-City-CSV&license_key=${MAXMIND_LICENSE_KEY}&suffix=zip" -o City.zip
curl -sL "https://download.maxmind.com/app/geoip_download?edition_id=GeoLite2-City-CSV&license_key=${MAXMIND_LICENSE_KEY}&suffix=zip.sha256" | awk '{print $1"  City.zip"}' > shasum.txt
sha256sum -c shasum.txt
unzip -p City.zip "GeoLite2-**/GeoLite2-City-Blocks-IPv4.csv" > GeoLite2-City-Blocks-IPv4.csv
unzip -p City.zip "GeoLite2-**/GeoLite2-City-Blocks-IPv6.csv" > GeoLite2-City-Blocks-IPv6.csv
unzip -p City.zip "GeoLite2-**/GeoLite2-City-Locations-en.csv" > GeoLite2-City-Locations-en.csv
gsutil cp GeoLite2-City-Blocks-IPv*.csv GeoLite2-City-Locations-en.csv "${DESTINATION_URL}"

外部テーブルを用いた BigQuery から GCS の参照

BigQuery の外部テーブルを用いると直接 GCS 上の構造化データ (CSV 等) をクエリすることができます。今回は、今年の夏 に GA になった BigLake テーブル という、アクセスコントロール周りがよりネイティブのテーブルに近い機能を利用します。

これにより、先程 GCS にアップロードした3つの CSV ファイルをデータソースとする2つの BigLake テーブルを作成することで、BigQuery からクエリできるようになります。

terraform
resource "google_bigquery_table" "geolite2-city-blocks" {
  dataset_id = google_bigquery_dataset.geolite2.dataset_id
  table_id   = "GeoLite2-City-Blocks"
  schema     = file("geolite2/geolite2_city_blocks_schema.json")
  external_data_configuration {
    connection_id         = google_bigquery_connection.geolite2.name
    autodetect            = false
    ignore_unknown_values = true
    source_uris           = ["gs://${var.geolite2_bucket}/GeoLite2-City-Blocks-IPv*.csv"]
    source_format         = "CSV"
    csv_options {
      quote             = ""
      skip_leading_rows = 1
    }
  }

  lifecycle {
    ignore_changes = [external_data_configuration[0].connection_id]
  }
}

resource "google_bigquery_table" "geolite2-city-locations" {
  dataset_id = google_bigquery_dataset.geolite2.dataset_id
  table_id   = "GeoLite2-City-Locations"
  schema     = file("geolite2/geolite2_city_locations_schema.json")
  external_data_configuration {
    connection_id         = google_bigquery_connection.geolite2.name
    autodetect            = false
    ignore_unknown_values = true
    source_uris           = ["gs://${var.geolite2_bucket}/GeoLite2-City-Locations-en.csv"]
    source_format         = "CSV"
    csv_options {
      quote             = ""
      skip_leading_rows = 1
    }
  }

  lifecycle {
    ignore_changes = [external_data_configuration[0].connection_id]
  }
}

定期的なデータ変換

BigQuery の機能で、定期的にクエリを実行することができます。
一般的には、日次の集計などに使われますが、今回はクエリしやすい形にデータ変換するために利用します。

BigQuery は (この記事を執筆している時点で) CIDR を直接扱うことに対応していないため、CIDR をネットワーク(プレフィックス)とプレフィックス長に分離します。
加えて、クエリのパフォーマンスを向上させるために、国 (country)、都市 (city) の情報を予め結合しておきます。

このクエリの結果を GeoLite2-City テーブルに出力します。

Transform geolite2.GeoLite2-City
SELECT
  country_iso_code AS country,
  city_name AS city,
  NET.IP_FROM_STRING(REGEXP_EXTRACT(network, r'(.*)/')) network,
  CAST(REGEXP_EXTRACT(network, r'/(.*)') AS INT64) mask
FROM `geolite2.GeoLite2-City-Blocks`
JOIN `geolite2.GeoLite2-City-Locations`
USING(geoname_id)

定期的なスナップショットの作成

IP アドレスと位置情報のマッピングは変更される事があるので、リクエストを受信した時にどの位置情報と判定されていたかという、その時のマッピングが重要になります。
そこで、テーブルのスナップショットを日次で管理することで、その日のマッピング情報を最小限のストレージコストで永続化します。

BigQuery の定期的なクエリでは、クエリの結果を通常のテーブルに出力することはできますが、スナップショットであったり、テーブルの作成にオプションを指定したいというようなケースには対応していないため、DDL を書く必要があります。

更に、クエリの結果をテーブルに出力する場合は、出力先のテーブル名にテンプレート構文が利用できるため日次のテーブルが作りやすいのですが、DDL を書く場合は直接テンプレート構文が利用できないため、次のように手続き型言語を使用する必要があります。

次の SQL は、GeoLite2-City テーブルを基に、有効期間が365日間のスナップショットテーブルを日次で作成します。
作成されるテーブル名は、例えば GeoLite2_City_20221230 のようになります。

Snapshot geolite2.GeoLite2_City_YYYYMMDD
DECLARE snapshot_name STRING;
DECLARE expiration TIMESTAMP;
DECLARE query STRING;

SET expiration = TIMESTAMP_ADD(@run_time, INTERVAL 365 DAY);
SET snapshot_name = CONCAT(
  "`geolite2.GeoLite2_City_",
  FORMAT_DATE('%Y%m%d', EXTRACT(DATE FROM @run_time AT TIME ZONE "Asia/Tokyo")),
  "`"
);

SET query = CONCAT(
  "CREATE SNAPSHOT TABLE IF NOT EXISTS ",
  snapshot_name,
  " CLONE `geolite2.GeoLite2-City` OPTIONS(expiration_timestamp = TIMESTAMP '",
  expiration,
  "');"
);

EXECUTE IMMEDIATE query;

クエリ

アクセスログをクエリする際に、リクエスト元 IP アドレスに基づく位置情報を結合します。

前述の通り、BigQuery は CIDR に標準で対応していません。
まず思い浮かぶのは、MaxMind のドキュメント にあるように、geoip2-csv-converter を使用して CIDR で表現されるネットワークの先頭と末尾の IP アドレスへ変換し、BETWEEN 演算子を用いて結合するという方法ですが、これは BigQuery では非効率です。

次の記事を参考に、CIDR をネットワークとプレフィックス長に分離しておきます。
これは前処理として既に行いました。

https://cloud.google.com/blog/products/data-analytics/geolocation-with-bigquery-de-identify-76-million-ip-addresses-in-20-seconds/

IP アドレスとプレフィックス長からネットワークアドレスが算出できます。
プレフィックス長を IPv4 と IPv6 それぞれ取りうる範囲で生成し、ネットワークアドレスとプレフィックス長が一致する場合に、そのネットワークに含まれる IP アドレスと判定できるため、そのように結合します。
この方法は BigQuery においては非常に効率的で、BETWEEN 演算子を用いて結合するよりも飛躍的にパフォーマンスが向上します。

これを毎回 SQL で記述したり、どこかにメモしておいてコピペするのは非効率なので、テーブル関数 を定義しておきます。

with geolocation tvf
-- with_geolocation function
CREATE OR REPLACE TABLE FUNCTION `linebot_access_log.with_geolocation`(since TIMESTAMP, until TIMESTAMP) AS
WITH
  access_logs AS (SELECT *
    FROM `linebot_access_log.access_log`
    WHERE `timestamp` BETWEEN since AND until),
  geolocations AS (SELECT *
    FROM `geolite2.GeoLite2_City_*`
    WHERE _TABLE_SUFFIX = FORMAT_DATE('%Y%m%d', DATE(since)))
SELECT * FROM access_logs
LEFT JOIN (
  WITH ips AS (SELECT DISTINCT ip FROM access_logs)
  -- IPv4 address => country, city
  SELECT ip, country, city FROM (
    SELECT NET.IP_TRUNC(NET.SAFE_IP_FROM_STRING(ip), mask) network, *
    FROM ips, UNNEST(GENERATE_ARRAY(8,32)) mask
    WHERE ip LIKE '%.%'
  )
  JOIN geolocations USING (network, mask)
  UNION ALL
  -- IPv6 address => country, city
  SELECT ip, country, city FROM (
    SELECT NET.IP_TRUNC(NET.SAFE_IP_FROM_STRING(ip), mask) network, *
    FROM ips, UNNEST(GENERATE_ARRAY(19,64)) mask
    WHERE ip LIKE '%:%'
  )
  JOIN geolocations USING (network, mask)
) USING (ip)

これは次のように使います。
例なので全てのカラムを含めていますが、実際には必要のあるカラムだけを指定することでコスト最適化に繋がります。

example
SELECT * REPLACE (STRING(`timestamp`, "Asia/Tokyo") AS `timestamp`)
FROM `linebot_access_log.with_geolocation`("2022-12-29 0:00:00 Asia/Tokyo", "2022-12-30 0:00:00 Asia/Tokyo")
ORDER BY `timestamp`

運用

アクセスログの解析

位置情報が不要なケース

日本時間で 12/30 のアクセスログを時系列順にクエリしたい場合は次のようになります。

SELECT * REPLACE (STRING(`timestamp`, "Asia/Tokyo") AS `timestamp`)
FROM `linebot_access_log.access_log`
WHERE TIMESTAMP_TRUNC(`timestamp`, DAY, "Asia/Tokyo") = "2022-12-30 0:00:00 Asia/Tokyo"
ORDER BY `timestamp`

IP アドレスごとのリクエスト数をクエリしたい場合は次のようになります。

SELECT `ip`, COUNT(*) AS `count`
FROM `linebot_access_log.access_log`
WHERE TIMESTAMP_TRUNC(`timestamp`, DAY, "Asia/Tokyo") = "2022-12-30 0:00:00 Asia/Tokyo"
GROUP BY 1
ORDER BY `count` DESC

位置情報が必要なケース

日本時間で 12/30 のアクセスログを時系列順にクエリしたい場合は次のようになります。

SELECT * REPLACE (STRING(`timestamp`, "Asia/Tokyo") AS `timestamp`)
FROM `linebot_access_log.with_geolocation`("2022-12-30 0:00:00 Asia/Tokyo", "2022-12-31 0:00:00 Asia/Tokyo")
ORDER BY `timestamp`

国ごとのリクエスト数をクエリしたい場合は次のようになります。

SELECT `country`, COUNT(*) AS `count`
FROM `linebot_access_log.with_geolocation`("2022-12-30 0:00:00 Asia/Tokyo", "2022-12-31 0:00:00 Asia/Tokyo")
GROUP BY 1
ORDER BY `count` DESC

スキーマの変更

滅多に無いと思いますが、アクセスログに項目を追加する場合、つまりスキーマを変更する場合、反映手順を誤らなければダウンタイムなしで移行できます。

  1. Avro Schema にフィールドを追加する
  2. アプリケーションが参照する Avro Schema を更新し、Cloud Pub/Sub Topic へ新しいスキーマのアクセスログが送信されるようにする
  3. 新しい Avro Schema を基に Cloud Pub/Sub Schema, Topic, Subscription を新しく作成する
  4. BigQuery Table Schema にフィールドを追加する

4 は bigquery_config[0].drop_unknown_fieldstrue であればいつ実行しても問題ありません。

荒削りですが、Avro Schema から BigQuery Table Schema へ変換する Go 製のコマンドラインツールを作成しました。こちらを使うと、BigQuery Table Schema (JSON) を人間が書かなくて済むので少し楽ができます。

https://github.com/go-oss/avro-bq-schema

ランニングコスト

Cloud Pub/Sub

https://cloud.google.com/pubsub/pricing#bigquery

BigQuery Subscription を使用する場合、スループットに対して $50/TiB の料金が設定されています。無料枠はありません。
その代わり、BigQuery へのデータ取り込み費用は追加で発生しません。

例えば、BigQuery Subscription を使わずに、Basic Subscription と独自に実装した worker から BigQuery Storage Write API を呼び出すよりも安価に利用することができます。

BigQuery

https://cloud.google.com/bigquery/pricing

BigQuery はクエリによって処理されたデータ量と、データをテーブルに永続化するためのストレージの使用量に対して費用が発生する料金モデルとなっています。

また、データ取り込みに使用する API にそれぞれ料金が設定されています。
BigQuery Subscription で使用される API は BigQuery Storage Write API ですが、前述の通り BigQuery Subscription を使う場合はデータ取り込み費用が発生しません。

ストレージ費用

US (multi-region) の場合のストレージ費用 (2022/12 現在)

オペレーション 料金 詳細
アクティブストレージ $0.02/GB 毎月10GBまで無料
長期保存 $0.01/GB 毎月10GBまで無料

90日間連続して変更されていないテーブルまたはパーティションには長期保存の料金が適用され、ストレージ費用が半額になります。

これは Cloud Storage (GCS) の us-central1 の Standard Storage に格納して、ライフサイクルを設定し 90日後に Nearline Storage にストレージクラスを変更するイメージに近いです。 (GCS のストレージクラスの変更には費用が掛かるため、正確には異なります)
そのため、基本的にはデータを BigQuery に置いたままの方がメリットが多いと思います。

クエリ費用

クエリによって処理されたデータ量に対して US (multi-region) では $5.00/TiB の料金が設定されています。毎月 1TiB までの無料枠があります。

これは古い記事ですが、BigQuery のコスト最適化に関して今でも有用な事が書かれていたので参考になります。

https://cloud.google.com/blog/ja/products/data-analytics/cost-optimization-best-practices-for-bigquery

トータルコストの試算

  • リクエストが毎秒平均 1,000
  • アクセスログのサイズが1件あたり 0.5KB
  • アプリケーションは asia-northeast1 で稼働させる
    • アプリケーションのランニングコストは試算に含めない
  • BigQuery
    • ロケーションは US (multi-region)
    • ストレージ費用は、アクセスログが1年間分蓄積されていると仮定して計算
    • クエリで処理されるデータ量は1ヶ月で合計 15TB と仮定 (1年間分のデータ量相当)
  • MaxMind の Database を格納する Cloud Storage と BigQuery の費用は考慮しない (誤差程度)
    • Cloud Run のランニングコストは無料枠に収まる

1日あたりのアクセスログのデータ量: 43,200,000 KB (41.20 GB)
1ヶ月あたりのアクセスログのデータ量: 1,296,000,000 KB (1.206 TB)

試算結果は次の通り、2022/12 現在の価格で毎月 $463 となりました。


引用元: https://cloud.google.com/products/calculator/#id=a24cbf4f-93a0-4948-bd59-a56f0520a00f

今回、スループットの高さと BigQuery のストレージ費用の安さを優先して、ロケーションに US (multi-region) を選んだ為、Cloud Pub/Sub から BigQuery の間でリージョン間通信による費用が発生しますが、仮に全て asia-northeast1 で完結させた場合でもトータルの費用に大差はありませんでした。

総括

リアルタイムなアクセスログ解析のアーキテクチャを提案し、検討しました。

  • パフォーマンス
    • 理論上、毎秒28万リクエストまでスケールする
  • 運用
    • リアルタイムにアクセスログを収集し、必要に応じて IP アドレスに基づく位置情報を結合することができる
    • スキーマの変更は Cloud Pub/Sub Schema, Topic, Subscription を新規で作成することでダウンタイムなしで移行できる
  • コスト
    • 毎月 $463 (平均毎秒 1,000 リクエスト、1年間アクセスログを保持した場合)

理論上のパフォーマンスは算出できましたが、実際にどの程度の規模のリクエストをリアルタイムに捌けるのか、リクエスト数が増えてスループットの上限に達した際の遅延とその回復性といった詳細については、実際に負荷試験を行って確認しなければ分かりません。

このあたりを個人の趣味で進めるのは限界があるため、年が明けてから本業の方で本格的に取り組んでいこうと思います。

それでは皆様、良いお年を。

Discussion

ログインするとコメントできます