🐷

Salesforce Bulk API 2.0 + Python で Salesforce →GCS 連携

2024/06/24に公開

Salesforce のデータを Cloud Storage に連携するバッチ処理を作ったので、その備忘録的記事です。

構成

処理一覧

  1. Cloud Scheduler をトリガーに Workflows を実行
  2. 取得対象オブジェクト項目名を取得
  3. 上記 2 で取得した項目名を元に SOQL を生成し Bulk API 2.0 のクエリジョブを作成
  4. クエリジョブのステータスをポーリング
  5. クエリジョブ結果を取得し Cloud Storage にストリーミングアップロード

Cloud Run で実行される処理は Python で実装してます。
2〜4 は Cloud Run Service を利用した API で 5 は Cloud Run Jobs を利用しています。
Cloud Storage へのアップロードはストリーミング形式です。

実装

Salesforce データ取得 & Cloud Storage アップロードする処理

Cloud Run で実行する処理です。
Flask のルーティングされた関数から呼び出されます。
Salesforce に対する処理は requests ライブラリと SalesForce Bulk API 2.0
Cloud Storage への処理は google.cloud ライブラリを使用しています。

import io
import os
import json
import requests
from google.cloud import storage
from datetime import datetime

API_VER = 'v59.0'
BASE_URL = 'https://Domain.my.salesforce.com'
TOKEN_URL = f'{BASE_URL}/services/oauth2/token'
API_BASE_URL = f'{BASE_URL}/services/data/{API_VER}'
EXCLUDE_LIST = ['address']


class SalesforceAPI:
    def __init__(self, client_id, client_secret):
        self.headers = self.get_access_token(client_id, client_secret)

    @staticmethod
    def get_access_token(client_id: str, client_secret: str) -> dict:
        # アクセストークンを取得
        payload = {
            'grant_type': 'client_credentials',
            'client_id': client_id,
            'client_secret': client_secret
        }
        response = requests.post(TOKEN_URL, data=payload)
        response.raise_for_status()
        tokens = response.json()
        return {
            'Authorization': f'{tokens.get("token_type")} {tokens.get("access_token")}',
            'Content-Type': 'application/json'
        }

    def get_fields(self, object_name: str) -> list:
        # Salesforce オブジェクトの項目名を取得
        describe_url = f'{API_BASE_URL}/sobjects/{object_name}/describe'
        response = requests.get(describe_url, headers=self.headers)
        response.raise_for_status()
        describe_data = response.json()
        field_names = [[field['name'], field['type']] for field in describe_data['fields']]
        fields = []
        for column, data_type in field_names:
            # Bulk APIは複合項目へのアクセスができないため除外
            if data_type not in EXCLUDE_LIST:
                fields.append(column)
        return fields

    def create_bulk_query_job(self, object_name: str, field_names: list) -> str:
        # クエリジョブ作成し クエリジョブIDを取得
        query = f"SELECT {', '.join(field_names)} FROM {object_name}"
        job_data = {
            'operation': 'query',
            'query': query,
        }
        job_create_url = f'{API_BASE_URL}/jobs/query'
        response = requests.post(job_create_url, headers=self.headers, data=json.dumps(job_data))
        response.raise_for_status()
        job_info = response.json()
        return job_info['id']

    def get_job_status(self, job_id: str) -> str:
        # クエリジョブの処理状況を取得
        job_status_url = f'{API_BASE_URL}/jobs/query/{job_id}'
        response = requests.get(job_status_url, headers=self.headers)
        response.raise_for_status()
        return response.json()['state']

    def export(self, job_id: str, bucket_name: str, blob_name: str):
        # Salesforce オブジェクトのデータを取得し Cloud Storage にストリーミングアップデート
        bucket = storage.Client().get_bucket(bucket_name)
        count = 0
        locator = ''
        try:
            while locator is not None:
                locator_uri = f'locator={locator}&' if count else ''
                results_url = f'{API_BASE_URL}/jobs/query/{job_id}/results?{locator_uri}'
                response = requests.get(results_url, headers=self.headers)

                records = int(response.headers.get('Sforce-NumberOfRecords', 0))
                count += records

                obj = io.BytesIO(response.content)
                blob = bucket.blob(blob_name)
                blob.upload_from_file(obj, content_type='text/csv')

                locator = response.headers.get('Sforce-Locator', '')
                locator = None if locator == 'null' else locator
        except Exception as e:
            raise e

API の バージョンや URL の ドメインは各自のものに置き換えてご利用ください。
Bulk API は複合項目のエクスポートに対応していないため、エクスポート対象から除外しています。

API エンドポイント

API エンドポイントは簡単ですが以下の感じです。
Salesforce オブジェクトから項目名を取得・返却する API の例です。
以下のように API から呼び出せるようにすると Cloud Workflows で自動化しやすいですね。

import os
from salesforce import SalesforceAPI
from flask import Flask, request, jsonify, abort

app = Flask(__name__)
salesforce_api = SalesforceAPI()


@app.route('/get_fields', methods=['GET'])
def get_fields():
    # Salesforce オブジェクトの項目名を取得
    try:
        # クエリパラメータからobject_nameを取得
        object_name = request.args.get('object_name')
        if not object_name:
            return jsonify({'error': 'object_name is required'}), 400

        # Salesforceの項目名を取得する処理
        fields = salesforce_api.get_fields(object_name)

        # 結果をJSON形式で返す
        return jsonify({'fields': fields})
    except Exception as e:
        abort(500, description=str(e))

if __name__ == '__main__':
    app.run(debug=True, host='0.0.0.0', port=int(os.environ.get('PORT', 8080)))

ワークフローの定義

ジョブをポーリングする処理 (jobPolling) とデータ取得・アップロードする処理 (exportJob) は
サブワークフローで実装しています。

main:
  params: [args]
  steps:
    - init:
        assign:
          - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
          - job_location: [利用するリージョン]
          - object_name: ${args.object_name}
          - bucket_name: [Cloud Storage のバケット名]
    - getFields:
        call: http.get
        args:
          url: ${"https://[Cloud Run の URL]/get_fields?object_name=" + object_name}
          auth:
            type: OIDC
        result: fields_json
    - bulkQuery:
        call: http.post
        args:
          url: https://[Cloud Run の URL]/bulk_query
          body:
            fields: ${fields_json.body.fields}
            object_name: ${object_name}
          auth:
            type: OIDC
        result: job_id_json
    - polling:
        call: jobPolling
        args:
          job_id: ${job_id_json.body.job_id}
        result: status
    - check:
        switch:
          - condition: ${status != "JobComplete"}
            next: finally
        next: export
    - export:
        call: exportJob
        args:
          project_id: ${project_id}
          job_location: ${job_location}
          job_id: ${job_id_json.body.job_id}
          object_name: ${object_name}
          bucket_name: ${bucket_name}
        result: status

jobPolling:
  params: [job_id]
  steps:
    - bulkQueryJobPolling:
        call: http.get
        args:
          url: ${"https://[Cloud Run の URL]/get_status?job_id=" + job_id}
          auth:
            type: OIDC
        result: job_status
    - checkStatus:
        switch:
          - condition: ${job_status.body.status in ["UploadComplete", "InProgress"]}
            next: wait
        next: returnStatus
    - wait:
        call: sys.sleep
        args:
          seconds: 60
        next: bulkQueryJobPolling
    - returnStatus:
        return: ${job_status.body.status}


exportJob:
  params: [project_id, job_location, job_id, object_name, bucket_name]
  steps:
    - salesforceToStorage:
        call: googleapis.run.v1.namespaces.jobs.run
        args:
          name: ${"namespaces/" + project_id + "/jobs/" + "bi-sf2bq-export"}
          location: ${job_location}
          body:
            overrides:
              containerOverrides:
                args:
                  - --job_id
                  - ${job_id}
                  - --object_name
                  - ${object_name}
                  - --bucket_name
                  - ${bucket_name}
        result: job_status

Workflows の引数は Salesforce オブジェクト名(object_name)のみです。
引数は Cloud Scheduler から渡します。

Cloud Run へのデプロイ

この記事ではお話ししませんが、デプロイに関する記事を書いたのでこちらを参照ください。

動作確認

Cloud Scheduler から引数を渡してスケジュール実行してみました。
ちゃんと引数が渡されていますね。(ファイルもちゃんと Cloud Storage にアップロードされていました)

まとめ

Salesforce のデータをデータ基盤に連携したいケースはたくさんあるかと思います。
有料のサービスもいくつかありますが、スモールスタートでデータ基盤を立ち上げる場合など、できるだけコストを抑えて連携したいという時に参考にしていただけると幸いです。

株式会社fundbook

Discussion