🐍
[Python]EC2(非GCP環境のサーバ) → BigQueryにCloud Storage経由でデータをロードする
EC2(非GCP環境のサーバ)からBigQueryに、Cloud Storage経由でデータをロードするサンプルコード(覚書き)
前提条件
プログラム動作環境であるEC2(非GCP環境のサーバ)は作成済であるものとする
事前準備
(1) サービスアカウントの作成
データをロードしたいGCPのプロジェクトにサインインし、以下の条件でサービスアカウントを作成しておく
項目 | 実施内容 |
---|---|
鍵の発行 | JSON形式で発行 |
ロール | BigQuery ジョブユーザー(bigquery.jobs.create) BigQuery データ編集者(bigquery.tables.create) Storage オブジェクト管理者 |
(2) BigQueryにて、データセットとテーブルの作成
以下の名称でデータセット、テーブルを作成する
項目 | 名称 |
---|---|
データセット | test |
テーブル | user_insights |
※テーブルについては、GCPコンソール画面右上の「Cloud Shellをアクティブにする」から、コンソール画面を起動し、以下のコマンドを実行して作成する
bq mk \
--table \
--time_partitioning_field date \
--time_partitioning_type DAY \
--time_partitioning_expiration 94608000 \
--description "This is user insights table" \
test.user_insights \
date:DATETIME,id:INTEGER,pv:INTEGER,uu:INTEGER,session:INTEGER,created_by:DATETIME,updated_by:DATETIME
(3) Cloud Storageにて、バケットとフォルダの作成
以下の名称でバケット、フォルダを作成する
項目 | 名称 |
---|---|
バケット | 任意の名前(※) |
フォルダ | user_insights |
バケット名はグローバルに一意である必要があるため、これまでに作成されていないお好みの名前をつける
(4) Python実行環境の準備
EC2(非GCP環境のサーバ)にログイン&任意のディレクトリを作成し、Python実行用の仮想環境を準備する
mkdir -p [任意のディレクトリ] && cd [任意のディレクトリ]
python3 -m venv env
# 仮想環境に入る: 以後、プログラム実行の際も同様
. env/bin/activate
# pipのインストール&アップグレード
python3 -m pip install --upgrade pip
# 必要なGCPパッケージをインストール
pip install google-auth
pip install google-cloud-bigquery
pip install google-cloud-storage
# インストール状況の確認
pip list
# 仮想環境から抜ける場合
deactivate
サンプルコード
load_csv_to_bigquery_via_gcs.py
import datetime
import csv
from google.oauth2 import service_account
from google.cloud import storage
from google.cloud import bigquery
# GCP接続設定: ./xxxxx.jsonは前提条件(1)で発行した鍵 本プログラムと同階層に配置する想定
credentials = service_account.Credentials.from_service_account_file(
'./xxxxx.json',
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
storage_client = storage.Client(credentials=credentials)
bigquery_client = bigquery.Client(credentials=credentials, project=credentials.project_id)
def upload_csv(bucket_name, source_file_name, destination_blob_name):
"""ローカル → GCSにCSVファイルをアップロードする
Parameters
----------
bucket_name : string
GCSバケット名
source_file_name : string
ローカル環境のCSVファイルのパス
destination_blob_name : string
GCSバケット上のCSVファイルのパス
"""
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(source_file_name)
def load_to_bigquery(gcs_uri):
"""Cloud Storage → BigQueryへデータをロードする
Parameters
----------
gcs_uri : ロード対象のCSVのgsutil URI
"""
table_id = "test.user_insights"
job_config = bigquery.LoadJobConfig(
schema=[
bigquery.SchemaField("date", "DATETIME"),
bigquery.SchemaField("id", "INTEGER"),
bigquery.SchemaField("pv", "INTEGER"),
bigquery.SchemaField("uu", "INTEGER"),
bigquery.SchemaField("session", "INTEGER"),
bigquery.SchemaField("created_by", "DATETIME"),
bigquery.SchemaField("updated_by", "DATETIME"),
],
# 1行目がヘッダの場合、スキップ
# skip_leading_rows=1,
# The source format defaults to CSV, so the line below is optional.
source_format=bigquery.SourceFormat.CSV,
)
load_job = bigquery_client.load_table_from_uri(
gcs_uri, table_id, job_config=job_config
)
load_job.result()
def delete_duplicate_record():
"""test.user_insightsテーブルの日付とidが重複するレコードを削除する
"""
query = """
delete from test.user_insights a
where exists(
select * from test.user_insights b
where
a.date = b.date
and a.date = b.date
and a.updated_by < b.updated_by)
"""
query_job = bigquery_client.query(query)
##### 以下、主処理 #####
try:
# 投入データの整形
current_time = datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=9)))
current_time_format = format(current_time, '%Y-%m-%d %H:%M:%S')
data = [
['2023-01-01', 10001, 345, 201, 50, current_time_format, current_time_format],
['2023-01-01', 10002, 56, 32, 22, current_time_format, current_time_format],
['2023-01-01', 10003, 102, 76, 69, current_time_format, current_time_format],
['2023-02-01', 10001, 452, 333, 102, current_time_format, current_time_format],
['2023-02-01', 10002, 78, 51, 30, current_time_format, current_time_format],
['2023-02-01', 10003, 88, 54, 27, current_time_format, current_time_format],
]
# 配列の中身をCSV出力
fllename = 'user_insights.csv'
f = open('user_insights.csv', 'w')
writer = csv.writer(f)
writer.writerows(data)
f.close()
# ファイルをGCSにアップロード
bucket_name = 'xxxxxxx' # バケット名は前提条件(3)でつけた名前
destination_blob_name = 'user_insights/' + fllename
upload_csv(bucket_name, fllename, destination_blob_name)
# Cloud Storage → BigQueryへロード
gcs_uri = "gs://" + bucket_name + '/' + destination_blob_name
load_to_bigquery(gcs_uri)
# 重複レコードの削除
delete_duplicate_record()
except Exception as e:
print(e)
finally:
print('all finish')
動作確認
上のサンプルコードを事前準備(1)で発行した鍵とともに事前準備(4) で作ったPython実行環境に配置し、以下のコマンドを実行
python3 load_csv_to_bigquery_via_gcs.py
参考
-
サービス アカウント キー ファイルを使用してクライアントを作成する | BigQuery | Google Cloud
-
SQLで重複を削除するサンプルコード 最新データを残してdeleteするには? | POTEPAN STYLE
→ 上記のプログラム中では最大のもの以外を削除したいので、not existsではなくexistsを使っている
Discussion