BigQueryのパーティション分割テーブルで特定のパーティションを効率的に削除する方法
パーティション分割テーブルとは
BigQueryのパーティション分割テーブルとは、テーブルを日付などのセグメントに分割することでクエリを効率的に実行できるタイプの種類である。table_yyyymmdd
と日付suffixをテーブルに付けて管理するシャーディングテーブルとの違いとして、
- 高いクエリパフォーマンス
- スキーマやメタデータの不要な重複を避けられる
といったメリットが挙げられている。個人的にはワイルドカードを使用した際のネーミング被り事故を防げる点が一番ありがたいことだと思っている(注1)。
dbtでもパーティション分割テーブルの利用がデフォルトとして想定されており、スキーマの変更など、従来は管理が大変だったケースについても簡単に対応できるようになっているため、dbtを使っていると自然と利用機会が増えてくる。
(注1: 例えば、order_yyyymmdd
というテーブルがあった際に、誰かが同じデータセットに order_summary_yyyymmdd
というテーブルを作り始めると、元のorder_*
を参照するクエリは、order_summary_*
も参照することになり、クエリ実行時に高確率でエラーになる。それなりに頻発する事故だが、事前に予見して回避するのはなかなか難しい。)
パーティション分割テーブル特有の悩み
パーティション分割テーブルは、1テーブルで大規模データを管理する思想なため、増分更新ができるdbtのincrementalモデルと相性が良い。
とはいえ、incrementalモデルあるあるだが、運用を続けていると様々な悩みが出てくる。今回の悩みは、incrementalモデルでレコードを追加したパーティション分割テーブルに不正データが混入してしまった際の対処法である。
不正データが発生する原因の具体例としては、
- uniqueなキーが計測システムのバグで重複した
- 上流システムの変更が連携されておらず、これまでとは異なるデータが混入した
などが考えられる。本番環境では確実に回避すべきレベルの事象だが、開発環境でお試しでデータパイプラインを構築しているフェーズだと、これくらいのトラブルは起こりうる。
事前に仕込んでおいたテストなどで検知した後はリカバリ作業を行うことになるが、一部のケースでは、build済の追加先テーブルから不正データを削除する必要がある。というのも、incremental_strategyにinsert_overwriteを使用していれば、不正データを含むパーティションごと削除できるので、原因となる事象を解消した後に再度buildするだけで良いが、以下のように追加済のデータを上書きしない処理が入っているincrementalモデルでは、不正データを置き換えることができない。
{% if is_incremental() %}
where event_time > (select max(event_time) from {{ this }})
{% endif %}
そのため手動でのデータ削除が必要になるが、困ったことにGUIからは特定のパーティションのみ削除することができず、bqコマンドかAPIの二択しかない。加えて、この2つも複数のパーティションを一度に削除することはできないので、複数のパーティションが汚染されていた場合は、手作業だと地獄絵図となる。
トラブルの際は平常心のまま作業できることは少なく、二次災害を避けるためにも手作業での削除は実施したくない。そこで、不正データをミスなく削除する方法をあらかじめ用意しておく。
やり方
使用環境
- Google Colaboratory
- BigQuery
今回はColaboratoryからbqコマンドを投げる方法を紹介する。
といっても大したことはなく、対象のテーブルと期間を指定してその期間分だけループさせてbqコマンドを投げるだけなので、すぐに実装できる。
実装したコード
import datetime
import google.colab import auth
auth.authenticate_user()
project: str = "your_project_id"
dataset: str = "your_dataset_id"
table: str = "your_table_id"
start_date: datetime = datetime("yyyy", "mm", "dd")
end_date: datetime = datetime("yyyy", "mm", "dd")
if start_date > end_date:
raise ValueError("start_date must be earlier than end_date")
while start_date <= end_date:
target_partition = f"{project}:{dataset}.{table}\${start_date.strftime('%Y%m%d')}"
!bq rm -force -table {target_partition}
print(f"deleted {target_partition}")
start_date += timedelta(days=1)
なお、Pythonライブラリで書く場合にはdelete_tableメソッド使うことができる。テーブルの指定方法などはデコレーターを使う点は共通しているので、ほぼbqコマンドと同じように使える。
追加する前に検知したい場合は
上記は追加してしまったデータをどう削除するかという議論だったが、社内で利用者が多岐にわたるテーブルやクライアントに見えるデータについては、そもそも不正データが本番テーブルに書き込まれないことを優先したいケースもある。
まずはsourceなど、上流のデータに対して確実にテストを書いておくことが重要だが、他にも、
- データ抽出・変換処理をephemeralのモデルとして分離し、ephemeralモデルに対してテストを投げる
- 実際の利用者に見えない層でテーブルを作成し、テストに通過したら本番テーブルに書き込む
といった1ステップを置くことで、不正データをユーザー側に露出させることなくデータ管理ができる。ただし、この方法は処理するデータ量や管理するモデル数を増やすことになり、DWHのコスト面などで負荷が大きいので、不正データを本番環境に絶対に出したくない場合のみに使うのが良いだろう。
Discussion