👓
【python】salesforceのデータをpythonからAPIで取得してcsvファイル出力する
やること
pyhtonのライブラリを利用してsalesforceからデータを取って来て
JSONをcsvに変換してファイル出力します。
背景
salesforceのデータをBIツールやデータ分析で簡単に活用できるようにDWHに蓄積するために、
データレイクにどのDBにもロードしやすいcsvファイルとして保持したいと思いPoCしてみます。
使用ライブラリ
今回は以下を使います。
simple-salesforce
環境にインストール
pip install simple-salesforce
前提
- SF通信設定のパターンは複数ある。詳細はsimple-salesfoceのreadme参照。
- 今回はsalesforce側でAPI発行サーバのIPを許諾設定したパターンのため
セキュリティトークンは不要。
- 今回はsalesforce側でAPI発行サーバのIPを許諾設定したパターンのため
- API名はsalesfoceのオブジェクト名により決まる。salesforceにて確認。
- レスポンス形式はJSON
- APIは1コールで2000レコードまで取得できる。
- 2000レコード以上存在する場合はレスポンスに付与される
nextRecordsUrl
を見て
続きを取得するためAPIを再度発行
- 2000レコード以上存在する場合はレスポンスに付与される
- bulk api機能で一気に取得することもできるがサーバにかなり負荷がかかるのであまりおすすめできない。
コード
sf_api.py
#!/usr/bin/env python
# coding: utf-8
import sys
import os
import json
import time
import csv
from simple_salesforce import Salesforce
#########################################################################
# SF-API発行関数
# 第一引数:取得したいAPI名(api_name)
#########################################################################
def do_sf_api(api_name):
# SF通信設定
sf = Salesforce(username='ユーザ名', password='パスワード', security_token='')
# API名でディスパッチして取得オブジェクト毎のSQQL作成
if api_name == 'AAA':
# 「AAA」オブジェクトのデータを取得するクエリ
query_str = 'SELECT id FROM AAA'
elif api_name == 'BBB':
# 「BBB」オブジェクトのデータを取得するクエリ
query_str = 'SELECT id FROM BBB'
else:
print('no match api kind')
return 1
# API発行
res = sf.query(query_str)
# csv書き込みモード初期値:上書き(write)モード
mode = 'w'
# 書き込みモード判別カウンタ
cnt = 0
while True:
if cnt > 0:
# 書き込みモードを追記(append)に変更
mode = 'a'
if not res['done']:
// 取得できたところまでファイル出力
do_output_csv(res, api_name, mode)
// データ取得が完了していないので再度API発行
res = sf.query_more(res['nextRecordsUrl'],identifier_is_url=True)
else:
// 最後の取得データをファイル出力
do_output_csv(res, api_name, mode)
break
# カウンタ累積代入
cnt+=1
#########################################################################
# csv出力関数
# 第一引数:APIのレスポンスデータ(api_res)
# 第二引数:ファイル名の一部。API名を渡している(file_name)
# 第三引数:ファイル出力モード(mode)
#########################################################################
def do_output_csv(api_res, file_name, mode):
# 辞書型を文字列型に変換
json_str_res = json.dumps(api_res)
# 文字列型からJSON value内の改行コードを除去して、を辞書型に変換
json_dict = json.loads(json_str_res.replace('\\r\\n','').replace('\\n', ''))
# list of dictの抽出
target_dicts = json_dict['records']
# attributes要素を除外
for i in range(len(target_dicts)):
try:
del target_dicts[i]['attributes']
except KeyError:
pass
# csvファイル出力:カレントディレクトリに「sf_API名.csv」というファイルで出力
with open('./sf_{}.csv'.format(file_name), mode) as f:
# dialectの登録
csv.register_dialect('dialect01', doublequote=True, quoting=csv.QUOTE_ALL)
# DictWriter作成
writer = csv.DictWriter(f, fieldnames=target_dicts[0].keys(), dialect='dialect01')
# csvへの書き込み
if mode == 'w':
# 初回書き込み時(modeがappendではなくwriteの場合)はヘッダ付与
writer.writeheader()
for target_dict in target_dicts:
writer.writerow(target_dict)
return 0
#########################################################################
# main 処理
# 第一引数:取得したいAPI名(api_name)
#########################################################################
if __name__ == "__main__":
# 処理開始時間
start = time.time()
# 引数情報を代入
args = sys.argv
api_name = args[1]
# API発行
try:
res = do_sf_api(api_name)
except Exception as e:
print('do_sf_api error:' + e)
sys.exit(1)
# 処理終了時間
elapsed_time = time.time() - start
# 処理時間出力
print ("elapsed_time:{0}".format(elapsed_time) + "[sec]")
# 終了コード
sys.exit(0)
使用方法
./sf_api.py <API名>
最後に
上記コードで無事に取得、出力することを確認できました。
今は単純にハードコーディングしてディスパッチしていますが、
実際にバッチとかで組み込む際はSQQL(salesforceのクエリ)を外部定義して
ファイル読込するとかしといたほうが運用しやすそうですね。
chameleonmeme.com/ きっかけは、偶然同じ現場で働いていたエンジニア3人の 「もっと仕事にのめり込んだり、熱中したいよね」という雑談でした。 営業から開発、サービスの提供まですべての工程を自分たちの手で行い、 気の合う仲間と楽しく仕事をすることで熱中するためにチームをスタートしました。
Discussion