Cloud SQLにある大量なテーブルをBigQueryに入れる話
経緯
こんにちは、データエンジニアのjcです。
昨年度から大規模なデータ分析基盤の構築に携わっています。最近Cloud SQLにある6つのDBの数百個のテーブルを日次洗い替えでBigQueryにあるデータ基盤に入れるタスクを取り組んでいます。
Cloud SQLとBigQuery両方ともGCPのサービスのため、federated queriesを利用すると簡単にできそうに見えますが、
実際に行ってみると、以下の3つの課題を気づきました。
- BigQuery側でスキーマ情報を含めたテーブルを一々作成するのは現実的ではない
- プロダクトの進化とともにテーブル・カラムが頻繁に作成・変更されるため、BigQuery側でも対応しないといけない
- Cloud SQLにあるテーブルの定義をそのまま取ってきてもBigQueryではMySQLとPostgreSQLの一部の型が対応されていない
少し苦労していましたが、幸い解決方法を見つけました。
今後躓く方もいるかもしれないので、知見を共有したいと思います。
BigQuery側でスキーマ情報を含めたテーブルを一々作成するのは現実的ではない
BigQueryはクエリの結果によってテーブルを作成できるので、事前にテーブルを作っておく必要がありません。
大量なテーブルを一々作成するのは現実的ではない課題の解決法としてはDBのメタ情報(descriptionを含めて)をそのまま生かしてテーブル作成用クエリを生成し、テーブルを作成します。
例えばPostgreSQLの場合、まずはテーブルのメタ情報
SELECT
schemaname,
relname AS table_name,
obj_description(relid) AS description
FROM pg_catalog.pg_statio_all_tables
WHERE schemaname = '{YOUR_SCHEMA}'
とカラムのメタ情報を取得します。
SELECT
c.table_schema,
c.table_name,
c.column_name,
c.data_type,
pgd.description
FROM pg_catalog.pg_statio_all_tables AS st
INNER JOIN pg_catalog.pg_description pgd on (
pgd.objoid = st.relid
)
RIGHT JOIN information_schema.columns c ON (
pgd.objsubid = c.ordinal_position and
c.table_schema = st.schemaname and
c.table_name = st.relname
)
WHERE c.table_schema = '{YOUR_SCHEMA}';
上記のメタ情報をfor文で回してテーブル作成用のクエリテンプレートの$columns
$pg_schema
$table_name
に入れたら、テーブル作成用のクエリが生成されます。Airflowとかで定期的にクエリを実行すれば問題解決です。
from textwrap import dedent
from string import Template
query_template = Template(
dedent(
"""
DROP TABLE IF EXISTS {YOUR_BQ_DATASET}.{YOUR_TABLE_NAME};
CREATE TABLE
{YOUR_BQ_DATASET}.{YOUR_TABLE_NAME} AS (
SELECT
*
FROM
EXTERNAL_QUERY("{YOUR_CONNECTION}",
\"\"\" SELECT $columns FROM $pg_schema.$table_name \"\"\"));
"""
)
)
また、機会があれば別記事で紹介しますが、もう少し工夫すれば、テーブルとカラムのdescriptionもBigQueryに入れることが可能です。
プロダクトの進化とともにテーブル・カラムが頻繁に作成・変更されるため、BigQuery側でも対応しないといけない
上記のクエリテンプレートを利用すればこの課題も解決できます。しかし、毎回テーブルを作り直しているため、予想外な通信エラーなどによってCREATE
文の実行が失敗し、DROP
文だけ実行された場合、テーブルが消えます。BigQueryのTransactionsを使えばいいじゃんって言われそうなところでしたが、
残念ながら、外部クエリEXTERNAL_QUERY
を利用している場合、Transactionsを使えません。
回避策としては、DROP
をする前に一回SELECT
で外部クエリをテストすると良いでしょう。
クエリテンプレートはこんな感じになりました。
from textwrap import dedent
from string import Template
init_query_temp = Template(
dedent(
"""
SELECT
*
FROM
EXTERNAL_QUERY("{connection}",
\"\"\" SELECT $columns FROM $pg_schema.$table_name LIMIT 1 \"\"\");
DROP TABLE IF EXISTS {YOUR_BQ_DATASET}.{YOUR_TABLE_NAME};
CREATE TABLE {YOUR_BQ_DATASET}.{YOUR_TABLE_NAME} AS (
SELECT
*
FROM
EXTERNAL_QUERY("{connection}",
\"\"\" SELECT $columns FROM $pg_schema.$table_name \"\"\"));
"""
)
)
Transactionsと比べたら確実な方法ではないですが、現時点ではもっと良さそうな方法を見つけていません。
BigQueryで対応されていないデータ型がある
そもそもCloudSQLとBigQueryの用途が異なるため、BigQueryで対応されていないデータ型が存在するのも当たり前ですね。
Unsupported type: money, time with time zone, inet, path, pg_lsn, point, polygon, tsquery, tsvector, txid_snapshot, uuid, box, cidr, circle, interval, jsonb, line, lseg, macaddr, macaddr8
全ての非対応型を文字列に変換するというシンプルな方法で解決できます。
pandas
で実装したスクリプトを共有しておきます。
import bigquery
from textwrap import dedent
from string import Template
BIGQUERY_UNSUPPORTED_TYPE = set(
[
"money",
"time with time zone",
"inet",
"path",
"pg_lsn",
"point",
"polygon",
"tsquery",
"tsvector",
"txid_snapshot",
"uuid",
"box",
"cidr",
"circle",
"interval",
"jsonb",
"line",
"lseg",
"macaddr",
"macaddr8",
]
)
pg_columns_meta_sql = Template(
dedent(
"""
SELECT
*
FROM EXTERNAL_QUERY("$connection",
\"\"\"
SELECT
c.table_schema,
c.table_name,
c.column_name,
c.data_type,
pgd.description
FROM pg_catalog.pg_statio_all_tables AS st
INNER JOIN pg_catalog.pg_description pgd on (
pgd.objoid = st.relid
)
RIGHT JOIN information_schema.columns c ON (
pgd.objsubid = c.ordinal_position and
c.table_schema = st.schemaname and
c.table_name = st.relname
)
WHERE c.table_schema = '$pg_schema';
\"\"\");
"""
)
)
bq_client = bigquery.Client(credentials="{YOUR_CREDENTIALS}", project="{YOUR_PROJECT_ID}")
columns_meta_df = (
bq_client.query(
pg_columns_meta_sql.substitute(connection=connection, pg_schema=pg_schema),
)
.result()
.to_dataframe()
)
columns_meta_df["casted_column_name"] = columns_meta_df["column_name"].where(
~columns_meta_df["data_type"].isin(BIGQUERY_UNSUPPORTED_TYPE),
columns_meta_df["column_name"] + "::varchar",
)
まとめ
CloudSQLにあるデータをBigQueryに入れるのは簡単そうに見えますが、テーブル数が多くなると、意外と課題がありました。
以上です。ご参考になれば幸いです。
Discussion