📘

Salesforce Data Cloud Ingestion APIを使ったCSVファイルのアップロード:Pythonによる実例付きガイド

2024/09/20に公開

TL;DR

SalesforceのData Cloud Ingestion APIには、データをリアルタイムで取り込むStreamingタイプと、大量のデータを一括で処理するBulkタイプの2種類があります。本記事では、Bulk Ingestion APIを使用して、Pythonを用いた具体的な実行例とともに、CSVファイルを効率的にData Cloudにアップロードする方法を紹介します。

Data Cloud設定: Ingestion APIスキーマの設定

Data Cloud Ingestion APIを使用してデータをアップロードする前に、取り込みAPIのスキーマ設定を行う必要があります。このスキーマは、取り込むデータの構造やフォーマットを定義するもので、正確に設定することでスムーズなデータ処理が可能となります。

Salesforce公式ヘルプ: Ingestion APIスキーマファイルの要件

参考スキーマ例:
以下は、スキーマの具体例です。このスキーマでは、id(文字列)、email(Eメール形式の文字列)、score(数値)の3つのフィールドを持つデータ構造を定義しています。

test.yaml
openapi: 3.0.3
components:
  schemas:
    Test:
      type: object
      properties:
        id:
          type: string
        email:
          type: string
          format: email
        score:
          type: number

Salesforce接続アプリケーションの作成手順

Data Cloud Ingestion APIを使用するためには、Salesforceの接続アプリケーションを作成し、OAuthスコープを設定する必要があります。接続アプリケーションの作成方法については、以下の公式ヘルプページを参考にしてください。

Salesforce公式ヘルプ: 接続アプリケーションの作成

OAuthスコープの設定
接続アプリケーションを作成する際に、最低以下のOAuthスコープを設定してください:

cdp_ingest_api: Data Cloud Ingestion APIへのアクセスを許可
refresh_token, offline_access: トークンの再取得やオフラインアクセスを許可

これにより、Data Cloud Ingestion APIに対するリクエストが可能となります。

アクセストークン取得

このステップでは、Pythonを使用してSalesforce APIおよびData Cloud APIのアクセストークンを取得する方法を紹介します。トークン取得は2段階のプロセスで、まずSalesforce APIからアクセストークンを取得し、そのトークンを使ってData Cloud APIのアクセストークンを取得します。今回の例では、1つのpythonファイルにこれらの処理を統合しています。

なお、本記事では、Salesforce APIのアクセストークン取得にはパスワード方式を使用していますが、セキュリティ要件に応じてJWT方式を選択することも可能です。

get_access_tokens.py
import requests

def get_salesforce_access_token(client_id, client_secret, username, password):
   """
   Salesforce APIからアクセストークンを取得します。
   """
   url = 'https://login.salesforce.com/services/oauth2/token'
   data = {
       'grant_type': 'password',
       'client_id': client_id,
       'client_secret': client_secret,
       'username': username,
       'password': password  # 必要に応じてセキュリティトークンを付加
   }

   try:
       response = requests.post(url, data=data)
       response.raise_for_status()
       json_response = response.json()
       access_token = json_response.get('access_token')
       if access_token:
           return access_token
       else:
           print("Access token not found in the response.")
   except requests.exceptions.HTTPError as e:
       print(f"HTTP error during Salesforce token retrieval: {e} - {response.text}")
   except Exception as e:
       print(f"Error during Salesforce token retrieval: {e}")
   return None

def get_data_cloud_access_token(salesforce_access_token, data_cloud_domain):
   """
   Data Cloud APIからアクセストークンを取得します。
   """
   url = f'https://{data_cloud_domain}/services/a360/token'
   headers = {
       'Authorization': f'Bearer {salesforce_access_token}',
       'Content-Type': 'application/x-www-form-urlencoded'
   }
   data = {
       'grant_type': 'urn:salesforce:grant-type:external:cdp',
       'subject_token': salesforce_access_token,
       'subject_token_type': 'urn:ietf:params:oauth:token-type:access_token'
   }

   try:
       response = requests.post(url, headers=headers, data=data)
       response.raise_for_status()
       json_response = response.json()
       access_token = json_response.get('access_token')
       if access_token:
           return access_token
       else:
           print("Data Cloud Access token not found in the response.")
   except requests.exceptions.HTTPError as e:
       print(f"HTTP error during Data Cloud token retrieval: {e} - {response.text}")
   except Exception as e:
       print(f"Error during Data Cloud token retrieval: {e}")
   return None

CSVファイルのアップロード

このステップでは、モジュールとしてインポートしたトークン取得処理を活用して、CSVファイルをData Cloud Ingestion APIにアップロードする手順を紹介します。トークン取得処理は get_access_tokens.py で定義されており、このモジュールをインポートして upload_csv.py から呼び出す構成です。

注意点:
以下の変数は、Data Cloud設定で定義したIngestion APIの設定に応じて、適切に定義する必要があります。
JOB_OBJECT_NAME:実際のオブジェクト名に置き換えてください。
JOB_SOURCE_NAME:実際のAPIソース名に置き換えてください。
JOB_OPERATION:操作タイプを指定します(upsertまたはdelete)。

upload_csv.py
import sys
import requests
from get_access_tokens import get_salesforce_access_token, get_data_cloud_access_token

# -------------------------------
# Salesforce 認証情報
# -------------------------------
SALESFORCE_CLIENT_ID = '<クライアントID>'
SALESFORCE_CLIENT_SECRET = '<クライアントシークレット>'
SALESFORCE_USERNAME = '<Salesforceのユーザー名>'
SALESFORCE_PASSWORD = '<Salesforceのパスワード>'

# -------------------------------
# Data Cloud 設定
# -------------------------------
DATA_CLOUD_DOMAIN = '<Data Cloudの組織のドメイン>.my.salesforce.com'
BULK_API_ENDPOINT = '<Bulk APIエンドポイント>'

# -------------------------------
# ジョブ設定
# -------------------------------
JOB_OBJECT_NAME = '<オブジェクト名>'
JOB_SOURCE_NAME = '<ソース名>'
JOB_OPERATION = '<操作タイプ (upsert または delete)>'

# -------------------------------
# ファイルパス
# -------------------------------
CSV_FILE_PATH = '/path/to/your/file/example.csv'

def create_job(access_token, endpoint, object_name, source_name, operation):
    """
    Data CloudのBulk APIを使用してジョブを作成します。
    """
    url = f'{endpoint}/api/v1/ingest/jobs'
    headers = {
        'Authorization': f'Bearer {access_token}',
        'Content-Type': 'application/json'
    }
    payload = {
        "object": object_name,
        "sourceName": source_name,
        "operation": operation
    }

    try:
        response = requests.post(url, headers=headers, json=payload)
        response.raise_for_status()
        job = response.json()
        print('Job Created Successfully:', job)
        return job
    except requests.exceptions.HTTPError as e:
        print(f"HTTP error during job creation: {e} - {response.text}")
    except Exception as e:
        print(f"Error during job creation: {e}")
    return None

def upload_csv_file(access_token, endpoint, job_id, file_path):
    """
    Data CloudのBulk APIを使用してCSVファイルをアップロードします。
    """
    url = f'{endpoint}/api/v1/ingest/jobs/{job_id}/batches'
    headers = {
        'Authorization': f'Bearer {access_token}',
        'Content-Type': 'text/csv'
    }

    try:
        with open(file_path, 'rb') as f:
            response = requests.put(url, headers=headers, data=f)
            response.raise_for_status()
            if response.status_code == 202:
                print('CSV file uploaded successfully.')
                return True
            else:
                print(f"Error during CSV upload: {response.status_code} - {response.text}")
    except requests.exceptions.HTTPError as e:
        print(f"HTTP error during CSV upload: {e} - {response.text}")
    except Exception as e:
        print(f"Error during CSV upload: {e}")
    return False

def close_job(access_token, endpoint, job_id):
    """
    ジョブの状態を 'UploadComplete' に更新し、ジョブをクローズします。
    """
    url = f'{endpoint}/api/v1/ingest/jobs/{job_id}'
    headers = {
        'Authorization': f'Bearer {access_token}',
        'Content-Type': 'application/json'
    }
    payload = {
        "state": "UploadComplete"
    }

    try:
        response = requests.patch(url, headers=headers, json=payload)
        response.raise_for_status()
        job_info = response.json()
        print('Job Closed Successfully:', job_info)
        return job_info
    except requests.exceptions.HTTPError as e:
        print(f"HTTP error during job closing: {e} - {response.text}")
    except Exception as e:
        print(f"Error during job closing: {e}")
    return None

def main():
    """
    メイン処理: Salesforce APIからアクセストークンを取得し、
    Data CloudのBulk APIを使用してジョブを作成・CSVファイルをアップロード・ジョブをクローズします。
    """
    try:
        # Salesforce APIアクセストークンを取得
        salesforce_token = get_salesforce_access_token(
            SALESFORCE_CLIENT_ID,
            SALESFORCE_CLIENT_SECRET,
            SALESFORCE_USERNAME,
            SALESFORCE_PASSWORD
        )
        if not salesforce_token:
            print("Failed to obtain Salesforce API Access Token.")
            sys.exit(1)
        print("Salesforce API Access Token obtained.")

        # Data Cloud APIアクセストークンを取得
        data_cloud_token = get_data_cloud_access_token(
            salesforce_token,
            DATA_CLOUD_DOMAIN
        )
        if not data_cloud_token:
            print("Failed to obtain Data Cloud API Access Token.")
            sys.exit(1)
        print("Data Cloud API Access Token obtained.")

        # ジョブを作成
        job = create_job(
            data_cloud_token,
            BULK_API_ENDPOINT,
            JOB_OBJECT_NAME,
            JOB_SOURCE_NAME,
            JOB_OPERATION
        )
        if not job:
            print("Job creation failed.")
            sys.exit(1)

        job_id = job.get('id')
        if not job_id:
            print("Job ID not found.")
            sys.exit(1)
        print(f'Job Created with ID: {job_id}')

        # CSVファイルのアップロード
        upload_success = upload_csv_file(
            data_cloud_token,
            BULK_API_ENDPOINT,
            job_id,
            CSV_FILE_PATH
        )
        if not upload_success:
            print("CSV upload failed.")
            sys.exit(1)

        # ジョブをクローズ
        closed_job = close_job(
            data_cloud_token,
            BULK_API_ENDPOINT,
            job_id
        )
        if not closed_job:
            print("Failed to close the job.")
            sys.exit(1)
        print("Job closed successfully.")

    except requests.exceptions.RequestException as e:
        print('HTTP Request failed:', e)
        sys.exit(1)
    except Exception as e:
        print('An error occurred:', e)
        sys.exit(1)

if __name__ == "__main__":
    main()

ジョブのステータス確認

ジョブの状態や実行内容を確認することで、処理が正常に行われたかを把握することができます。

check_job_status.py
import sys
import requests
from get_access_tokens import get_salesforce_access_token, get_data_cloud_access_token

# -------------------------------
# Salesforce 認証情報
# -------------------------------
SALESFORCE_CLIENT_ID = '<クライアントID>'
SALESFORCE_CLIENT_SECRET = '<クライアントシークレット>'
SALESFORCE_USERNAME = '<Salesforceのユーザー名>'
SALESFORCE_PASSWORD = '<Salesforceのパスワード>'

# -------------------------------
# Data Cloud 設定
# -------------------------------
DATA_CLOUD_DOMAIN = '<Data Cloudの組織のドメイン>.my.salesforce.com'
BULK_API_ENDPOINT = '<Bulk APIエンドポイント>'

# -------------------------------
# ジョブ設定
# -------------------------------
JOB_ID = '<ジョブID>'

def get_job_info(access_token, endpoint, job_id):
    """
    Data CloudのBulk APIを使用してジョブのステータスを取得します。
    """
    url = f'{endpoint}/api/v1/ingest/jobs/{job_id}'
    headers = {
        'Authorization': f'Bearer {access_token}',
        'Content-Type': 'application/json'
    }

    try:
        response = requests.get(url, headers=headers)
        response.raise_for_status()
        job_info = response.json()
        
        # ジョブ情報の表示
        for key, value in job_info.items():
            print(f"{key}: {value}")
        
        return job_info
    except requests.exceptions.HTTPError as e:
        print(f"HTTP error during job info retrieval: {e} - {response.text}")
    except Exception as e:
        print(f"Error during job info retrieval: {e}")
    return None

def main():
    try:
        # Salesforce APIアクセストークンを取得
        salesforce_token = get_salesforce_access_token(
            SALESFORCE_CLIENT_ID,
            SALESFORCE_CLIENT_SECRET,
            SALESFORCE_USERNAME,
            SALESFORCE_PASSWORD
        )
        if not salesforce_token:
            print("Failed to obtain Salesforce API Access Token.")
            sys.exit(1)
        print("Salesforce API Access Token obtained.")

        # Data Cloud APIアクセストークンを取得
        data_cloud_token = get_data_cloud_access_token(
            salesforce_token,
            DATA_CLOUD_DOMAIN
        )
        if not data_cloud_token:
            print("Failed to obtain Data Cloud API Access Token.")
            sys.exit(1)
        print("Data Cloud API Access Token obtained.")

        # ジョブのステータスを取得
        job_info = get_job_info(
            data_cloud_token,
            BULK_API_ENDPOINT,
            JOB_ID
        )
        if job_info:
            print("Job information retrieved successfully.")
        else:
            print("Failed to retrieve job information.")
    except requests.exceptions.RequestException as e:
        print('HTTP Request failed:', e)
        sys.exit(1)
    except Exception as e:
        print('An error occurred:', e)
        sys.exit(1)

if __name__ == "__main__":
    main()

データストリームの更新履歴から取り込みを確認

CSVファイルのアップロードが成功した後、Data Cloudの管理画面で、データの取り込みが正しく行われたかどうかを確認することができます。データストリームの更新履歴を確認し、取り込まれたデータの状態を確認します。
データストリームの更新履歴

※本記事は、私が所属する会社とは一切関係のない事柄です。

Discussion