🐍

[Python]EC2(非GCP環境のサーバ) → BigQueryにCloud Storage経由でデータをロードする

に公開

EC2(非GCP環境のサーバ)からBigQueryに、Cloud Storage経由でデータをロードするサンプルコード(覚書き)

前提条件

プログラム動作環境であるEC2(非GCP環境のサーバ)は作成済であるものとする

事前準備

(1) サービスアカウントの作成

データをロードしたいGCPのプロジェクトにサインインし、以下の条件でサービスアカウントを作成しておく

項目 実施内容
鍵の発行 JSON形式で発行
ロール BigQuery ジョブユーザー(bigquery.jobs.create)
BigQuery データ編集者(bigquery.tables.create)
Storage オブジェクト管理者

(2) BigQueryにて、データセットとテーブルの作成

以下の名称でデータセット、テーブルを作成する

項目 名称
データセット test
テーブル user_insights

※テーブルについては、GCPコンソール画面右上の「Cloud Shellをアクティブにする」から、コンソール画面を起動し、以下のコマンドを実行して作成する

bq mk \
  --table \
  --time_partitioning_field date \
  --time_partitioning_type DAY \
  --time_partitioning_expiration 94608000 \
  --description "This is user insights table" \
  test.user_insights \
  date:DATETIME,id:INTEGER,pv:INTEGER,uu:INTEGER,session:INTEGER,created_by:DATETIME,updated_by:DATETIME

(3) Cloud Storageにて、バケットとフォルダの作成

以下の名称でバケット、フォルダを作成する

項目 名称
バケット 任意の名前(※)
フォルダ user_insights

バケット名はグローバルに一意である必要があるため、これまでに作成されていないお好みの名前をつける

(4) Python実行環境の準備

EC2(非GCP環境のサーバ)にログイン&任意のディレクトリを作成し、Python実行用の仮想環境を準備する

mkdir -p [任意のディレクトリ] && cd [任意のディレクトリ]
python3 -m venv env
# 仮想環境に入る: 以後、プログラム実行の際も同様
. env/bin/activate 
# pipのインストール&アップグレード
python3 -m pip install --upgrade pip
# 必要なGCPパッケージをインストール
pip install google-auth
pip install google-cloud-bigquery
pip install google-cloud-storage
# インストール状況の確認
pip list

# 仮想環境から抜ける場合
deactivate

サンプルコード

load_csv_to_bigquery_via_gcs.py
import datetime
import csv

from google.oauth2 import service_account
from google.cloud import storage
from google.cloud import bigquery

# GCP接続設定: ./xxxxx.jsonは前提条件(1)で発行した鍵 本プログラムと同階層に配置する想定
credentials = service_account.Credentials.from_service_account_file(
    './xxxxx.json', 
    scopes=["https://www.googleapis.com/auth/cloud-platform"],
)

storage_client = storage.Client(credentials=credentials)
bigquery_client = bigquery.Client(credentials=credentials, project=credentials.project_id)


def upload_csv(bucket_name, source_file_name, destination_blob_name):
    """ローカル → GCSにCSVファイルをアップロードする

    Parameters
    ----------
    bucket_name : string
        GCSバケット名
    source_file_name : string
        ローカル環境のCSVファイルのパス
    destination_blob_name : string
        GCSバケット上のCSVファイルのパス
    """
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(source_file_name)


def load_to_bigquery(gcs_uri):
    """Cloud Storage → BigQueryへデータをロードする

    Parameters
    ----------
    gcs_uri : ロード対象のCSVのgsutil URI
    """
    table_id = "test.user_insights"
    job_config = bigquery.LoadJobConfig(
        schema=[
            bigquery.SchemaField("date", "DATETIME"),
            bigquery.SchemaField("id", "INTEGER"),
            bigquery.SchemaField("pv", "INTEGER"),
            bigquery.SchemaField("uu", "INTEGER"),
            bigquery.SchemaField("session", "INTEGER"),
            bigquery.SchemaField("created_by", "DATETIME"),
            bigquery.SchemaField("updated_by", "DATETIME"),
        ],
        # 1行目がヘッダの場合、スキップ
        # skip_leading_rows=1,
        # The source format defaults to CSV, so the line below is optional.
        source_format=bigquery.SourceFormat.CSV,
    )

    load_job = bigquery_client.load_table_from_uri(
        gcs_uri, table_id, job_config=job_config
    ) 

    load_job.result()

def delete_duplicate_record():
    """test.user_insightsテーブルの日付とidが重複するレコードを削除する
    """
    query = """
    delete from test.user_insights a 
    where exists(
    select * from test.user_insights b 
    where 
     a.date = b.date
     and a.date = b.date
     and a.updated_by < b.updated_by)
    """
    query_job = bigquery_client.query(query)

##### 以下、主処理 #####

try:
    # 投入データの整形
    current_time = datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=9)))
    current_time_format = format(current_time, '%Y-%m-%d %H:%M:%S')
    data = [
        ['2023-01-01', 10001, 345, 201, 50, current_time_format, current_time_format],
        ['2023-01-01', 10002, 56, 32, 22, current_time_format, current_time_format],
        ['2023-01-01', 10003, 102, 76, 69, current_time_format, current_time_format],
        ['2023-02-01', 10001, 452, 333, 102, current_time_format, current_time_format],
        ['2023-02-01', 10002, 78, 51, 30, current_time_format, current_time_format],
        ['2023-02-01', 10003, 88, 54, 27, current_time_format, current_time_format],
    ]

    # 配列の中身をCSV出力
    fllename = 'user_insights.csv'
    f = open('user_insights.csv', 'w')
    writer = csv.writer(f)
    writer.writerows(data)
    f.close()

    # ファイルをGCSにアップロード
    bucket_name = 'xxxxxxx' # バケット名は前提条件(3)でつけた名前
    destination_blob_name = 'user_insights/' + fllename
    upload_csv(bucket_name, fllename, destination_blob_name)

    # Cloud Storage → BigQueryへロード
    gcs_uri = "gs://" + bucket_name + '/' + destination_blob_name
    load_to_bigquery(gcs_uri)

    # 重複レコードの削除
    delete_duplicate_record()
except Exception as e:
    print(e)
finally:
    print('all finish')

動作確認

上のサンプルコードを事前準備(1)で発行した鍵とともに事前準備(4) で作ったPython実行環境に配置し、以下のコマンドを実行

python3 load_csv_to_bigquery_via_gcs.py

参考

Discussion