👓

【python】salesforceのデータをpythonからAPIで取得してcsvファイル出力する

2021/07/30に公開

やること

pyhtonのライブラリを利用してsalesforceからデータを取って来て
JSONをcsvに変換してファイル出力します。

背景

salesforceのデータをBIツールやデータ分析で簡単に活用できるようにDWHに蓄積するために、
データレイクにどのDBにもロードしやすいcsvファイルとして保持したいと思いPoCしてみます。

使用ライブラリ

今回は以下を使います。
simple-salesforce
https://github.com/simple-salesforce/simple-salesforce
環境にインストール

pip install simple-salesforce

前提

  • SF通信設定のパターンは複数ある。詳細はsimple-salesfoceのreadme参照。
    • 今回はsalesforce側でAPI発行サーバのIPを許諾設定したパターンのため
      セキュリティトークンは不要。
  • API名はsalesfoceのオブジェクト名により決まる。salesforceにて確認。
  • レスポンス形式はJSON
  • APIは1コールで2000レコードまで取得できる。
    • 2000レコード以上存在する場合はレスポンスに付与されるnextRecordsUrlを見て
      続きを取得するためAPIを再度発行
  • bulk api機能で一気に取得することもできるがサーバにかなり負荷がかかるのであまりおすすめできない。

コード

sf_api.py

#!/usr/bin/env python
# coding: utf-8
import sys
import os
import json
import time
import csv
from simple_salesforce import Salesforce
#########################################################################
# SF-API発行関数
# 第一引数:取得したいAPI名(api_name)
#########################################################################
def do_sf_api(api_name):
    # SF通信設定
    sf = Salesforce(username='ユーザ名', password='パスワード', security_token='')
    # API名でディスパッチして取得オブジェクト毎のSQQL作成
    if api_name == 'AAA':
        # 「AAA」オブジェクトのデータを取得するクエリ
        query_str = 'SELECT id FROM AAA'
    elif api_name == 'BBB':
        # 「BBB」オブジェクトのデータを取得するクエリ
        query_str = 'SELECT id FROM BBB'
    else:
        print('no match api kind')
        return 1
    # API発行
    res = sf.query(query_str)
    # csv書き込みモード初期値:上書き(write)モード
    mode = 'w'
    # 書き込みモード判別カウンタ
    cnt  = 0
    while True:
        if cnt > 0:
            # 書き込みモードを追記(append)に変更
            mode = 'a'
        if not res['done']:
	    // 取得できたところまでファイル出力
            do_output_csv(res, api_name, mode)
	    // データ取得が完了していないので再度API発行
            res = sf.query_more(res['nextRecordsUrl'],identifier_is_url=True)
        else:
	    // 最後の取得データをファイル出力
            do_output_csv(res, api_name, mode)
            break
        # カウンタ累積代入
        cnt+=1
#########################################################################
# csv出力関数
# 第一引数:APIのレスポンスデータ(api_res)
# 第二引数:ファイル名の一部。API名を渡している(file_name)
# 第三引数:ファイル出力モード(mode)
#########################################################################
def do_output_csv(api_res, file_name, mode):
    # 辞書型を文字列型に変換
    json_str_res = json.dumps(api_res)
    # 文字列型からJSON value内の改行コードを除去して、を辞書型に変換
    json_dict = json.loads(json_str_res.replace('\\r\\n','').replace('\\n', ''))
    # list of dictの抽出
    target_dicts = json_dict['records']
    # attributes要素を除外
    for i in range(len(target_dicts)):
        try:
            del target_dicts[i]['attributes']
        except KeyError:
            pass
    # csvファイル出力:カレントディレクトリに「sf_API名.csv」というファイルで出力
    with open('./sf_{}.csv'.format(file_name), mode) as f:
        # dialectの登録
        csv.register_dialect('dialect01', doublequote=True, quoting=csv.QUOTE_ALL)
        # DictWriter作成
        writer = csv.DictWriter(f, fieldnames=target_dicts[0].keys(), dialect='dialect01')
        # csvへの書き込み
        if mode == 'w':
            # 初回書き込み時(modeがappendではなくwriteの場合)はヘッダ付与
            writer.writeheader()
        for target_dict in target_dicts:
            writer.writerow(target_dict)
    return 0
#########################################################################
# main 処理
# 第一引数:取得したいAPI名(api_name)
#########################################################################
if __name__ == "__main__":
    # 処理開始時間
    start = time.time()
    # 引数情報を代入
    args = sys.argv
    api_name = args[1]
    # API発行
    try:
        res = do_sf_api(api_name)
    except Exception as e:
        print('do_sf_api error:' + e)
        sys.exit(1)
    # 処理終了時間
    elapsed_time = time.time() - start
    # 処理時間出力
    print ("elapsed_time:{0}".format(elapsed_time) + "[sec]")
    # 終了コード
    sys.exit(0)

使用方法

./sf_api.py <API名>

最後に

上記コードで無事に取得、出力することを確認できました。
今は単純にハードコーディングしてディスパッチしていますが、
実際にバッチとかで組み込む際はSQQL(salesforceのクエリ)を外部定義して
ファイル読込するとかしといたほうが運用しやすそうですね。

Discussion