🐍

[Python]EC2(非GCP環境のサーバ) → BigQueryにCloud Storage経由でデータをロードする

2023/02/08に公開

EC2(非GCP環境のサーバ)からBigQueryに、Cloud Storage経由でデータをロードするサンプルコード(覚書き)

前提条件

プログラム動作環境であるEC2(非GCP環境のサーバ)は作成済であるものとする

事前準備

(1) サービスアカウントの作成

データをロードしたいGCPのプロジェクトにサインインし、以下の条件でサービスアカウントを作成しておく

項目 実施内容
鍵の発行 JSON形式で発行
ロール BigQuery ジョブユーザー(bigquery.jobs.create)
BigQuery データ編集者(bigquery.tables.create)
Storage オブジェクト管理者

(2) BigQueryにて、データセットとテーブルの作成

以下の名称でデータセット、テーブルを作成する

項目 名称
データセット test
テーブル user_insights

※テーブルについては、GCPコンソール画面右上の「Cloud Shellをアクティブにする」から、コンソール画面を起動し、以下のコマンドを実行して作成する

bq mk \
  --table \
  --time_partitioning_field date \
  --time_partitioning_type DAY \
  --time_partitioning_expiration 94608000 \
  --description "This is user insights table" \
  test.user_insights \
  date:DATETIME,id:INTEGER,pv:INTEGER,uu:INTEGER,session:INTEGER,created_by:DATETIME,updated_by:DATETIME

(3) Cloud Storageにて、バケットとフォルダの作成

以下の名称でバケット、フォルダを作成する

項目 名称
バケット 任意の名前(※)
フォルダ user_insights

バケット名はグローバルに一意である必要があるため、これまでに作成されていないお好みの名前をつける

(4) Python実行環境の準備

EC2(非GCP環境のサーバ)にログイン&任意のディレクトリを作成し、Python実行用の仮想環境を準備する

mkdir -p [任意のディレクトリ] && cd [任意のディレクトリ]
python3 -m venv env
# 仮想環境に入る: 以後、プログラム実行の際も同様
. env/bin/activate 
# pipのインストール&アップグレード
python3 -m pip install --upgrade pip
# 必要なGCPパッケージをインストール
pip install google-auth
pip install google-cloud-bigquery
pip install google-cloud-storage
# インストール状況の確認
pip list

# 仮想環境から抜ける場合
deactivate

サンプルコード

load_csv_to_bigquery_via_gcs.py
import datetime
import csv

from google.oauth2 import service_account
from google.cloud import storage
from google.cloud import bigquery

# GCP接続設定: ./xxxxx.jsonは前提条件(1)で発行した鍵 本プログラムと同階層に配置する想定
credentials = service_account.Credentials.from_service_account_file(
    './xxxxx.json', 
    scopes=["https://www.googleapis.com/auth/cloud-platform"],
)

storage_client = storage.Client(credentials=credentials)
bigquery_client = bigquery.Client(credentials=credentials, project=credentials.project_id)


def upload_csv(bucket_name, source_file_name, destination_blob_name):
    """ローカル → GCSにCSVファイルをアップロードする

    Parameters
    ----------
    bucket_name : string
        GCSバケット名
    source_file_name : string
        ローカル環境のCSVファイルのパス
    destination_blob_name : string
        GCSバケット上のCSVファイルのパス
    """
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(source_file_name)


def load_to_bigquery(gcs_uri):
    """Cloud Storage → BigQueryへデータをロードする

    Parameters
    ----------
    gcs_uri : ロード対象のCSVのgsutil URI
    """
    table_id = "test.user_insights"
    job_config = bigquery.LoadJobConfig(
        schema=[
            bigquery.SchemaField("date", "DATETIME"),
            bigquery.SchemaField("id", "INTEGER"),
            bigquery.SchemaField("pv", "INTEGER"),
            bigquery.SchemaField("uu", "INTEGER"),
            bigquery.SchemaField("session", "INTEGER"),
            bigquery.SchemaField("created_by", "DATETIME"),
            bigquery.SchemaField("updated_by", "DATETIME"),
        ],
        # 1行目がヘッダの場合、スキップ
        # skip_leading_rows=1,
        # The source format defaults to CSV, so the line below is optional.
        source_format=bigquery.SourceFormat.CSV,
    )

    load_job = bigquery_client.load_table_from_uri(
        gcs_uri, table_id, job_config=job_config
    ) 

    load_job.result()

def delete_duplicate_record():
    """test.user_insightsテーブルの日付とidが重複するレコードを削除する
    """
    query = """
    delete from test.user_insights a 
    where exists(
    select * from test.user_insights b 
    where 
     a.date = b.date
     and a.date = b.date
     and a.updated_by < b.updated_by)
    """
    query_job = bigquery_client.query(query)

##### 以下、主処理 #####

try:
    # 投入データの整形
    current_time = datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=9)))
    current_time_format = format(current_time, '%Y-%m-%d %H:%M:%S')
    data = [
        ['2023-01-01', 10001, 345, 201, 50, current_time_format, current_time_format],
        ['2023-01-01', 10002, 56, 32, 22, current_time_format, current_time_format],
        ['2023-01-01', 10003, 102, 76, 69, current_time_format, current_time_format],
        ['2023-02-01', 10001, 452, 333, 102, current_time_format, current_time_format],
        ['2023-02-01', 10002, 78, 51, 30, current_time_format, current_time_format],
        ['2023-02-01', 10003, 88, 54, 27, current_time_format, current_time_format],
    ]

    # 配列の中身をCSV出力
    fllename = 'user_insights.csv'
    f = open('user_insights.csv', 'w')
    writer = csv.writer(f)
    writer.writerows(data)
    f.close()

    # ファイルをGCSにアップロード
    bucket_name = 'xxxxxxx' # バケット名は前提条件(3)でつけた名前
    destination_blob_name = 'user_insights/' + fllename
    upload_csv(bucket_name, fllename, destination_blob_name)

    # Cloud Storage → BigQueryへロード
    gcs_uri = "gs://" + bucket_name + '/' + destination_blob_name
    load_to_bigquery(gcs_uri)

    # 重複レコードの削除
    delete_duplicate_record()
except Exception as e:
    print(e)
finally:
    print('all finish')

動作確認

上のサンプルコードを事前準備(1)で発行した鍵とともに事前準備(4) で作ったPython実行環境に配置し、以下のコマンドを実行

python3 load_csv_to_bigquery_via_gcs.py

参考

Discussion