BigQueryからSalesforceへの転送(Reverse ETL)について
はじめに
データエンジニアリング領域で仕事をしているmasaです。
最近、BigQueryからSalesforceへの転送(Reverse ETL)について実装する機会がありいろいろ調べていたのですが、あまりまとまった情報がなかったため自分の経験を記事にすることにしました。
対象読者はETL周りの実装を担当しているデータエンジニア、もしくはSalesforceの改修に責任を負っているRevOpsなどの方々です。
BigQueryからSalesforceへの転送をしたい背景
SalesforceなどのCRM上にある案件・契約データとプロダクトのDBなどにある利用状況データをDWH上で統合・集計して、再度CRMにインポートして使いたいというニーズはSaaS企業等で一定あるのかと思います。
例えば、CRMに再度インポートしたヘルススコアの数値を使ってカスタマーサクセスチームがアプローチ先の優先順位策定を行ったり、フリーミアムプランの利用状況を元にグロースチームがコンテンツ配信のセグメントわけを行ったり、経営管理のKPIに利用したり、などいろいろユースケースはありそうです。
どのような選択肢があるのか
BigQueryからSalesforceへの転送について、どのような方法で実現ができるのか検討しました。
①ETLのクラウドサービス
予算に余裕があるなら、ETL周りの処理をまるっとGUIで設定可能なクラウドサービスを使うのが一番早いと思います。
国産ならtrocco、海外製ならFivetran、またリバースETLに強みを持つツールでHightouchやCensusなどもあるようです。
②Salesforce内の機能
もちろんSalesforceにも、外部のデータを取り込む機能は存在します。
データインポートウィザード もしくはデータローダー
これらの機能は単発の転送ジョブで利用するのには良さそうですが定期実行のスケジューラーが組めない認識のため、日次で自動更新したいなどのケースには向かないかと考えています。
外部データソース
外部のデータソースをSalesforceのオブジェクトにマッピングできる機能です。対応可能なデータソースは複数ありSnowflakeやAmazon Athena等は対応しているものの、BigQueryには対応しておらずでした。将来BigQueryにも対応したらぜひ使ってみたいと思っています。
③SalesforceのAPI
今回はこの手法で実装することにしました。Salesforceには外部のデータをインポートできるAPIが複数公開されています(Bulk APIとSOAP APIがメジャーなようです)。
今回はPythonでスクリプトを書いたので、SalesforceのAPIに対応しているSimple Salesforceというモジュールを利用して実装することにしました。
from google.cloud import bigquery
import os
import csv
from simple_salesforce import Salesforce, SalesforceLogin
import pandas as pd
# 環境変数にGoogle Cloudの認証情報を設定し、BigQueryクライアントを初期化
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = "/path/to/your/credentials.json"
client = bigquery.Client()
# クエリの実行
query = """
SELECT * FROM `your_project.dateset.table`
"""
query_job = client.query(query)
results = query_job.result()
# 取得したクエリの結果をCSVファイルに書き込む
with open('output.csv', 'w', newline='', encoding='utf-8') as csvfile:
csv_writer = csv.writer(csvfile)
csv_writer.writerow([field.name for field in results.schema])
for row in results:
csv_writer.writerow(row.values())
# Salesforceのログイン認証
username = 'your_username'
password = 'your_password'
security_token = 'your_security_token'
domain = 'login' # 本番環境の場合は'login', sandbox環境の場合は'test'
session_id, instance = SalesforceLogin(
username=username, password=password, security_token=security_token, domain=domain
)
sf = Salesforce(instance=instance, session_id=session_id)
# CSVファイルを読み込み、Salesforceにインポート可能な形式に変換
data = pd.read_csv('output.csv')
data_dict = data.to_dict(orient='records')
# SalesforceのBulk APIを使ってSalesforceの取引先オブジェクトにデータをアップデート
job = sf.bulk.Account.update(data_dict)
今回は取引先オブジェクトにupdateしている例ですが、案件やリードなど他の標準オブジェクトはもちろん自社で構築したカスタムオブジェクトにもインポートできますし、upsertやinsertなどのインポート方法にも対応してます。
インフラの構築について
日次などで自動転送したい場合は上記のスクリプトを実行するインフラも別途構築する必要があります。
もし社内でdbtを利用しておりdbt Pythonモデルがすでに使える環境なら、dbt Pythonモデルを使うのが早いと思います。
from simple_salesforce import Salesforce, SalesforceLogin
import pandas as pd
# Salesforceのログイン認証
username = 'your_username'
password = 'your_password'
security_token = 'your_security_token'
domain = 'login'
# Pythonモデルを利用してSalesforceにインポート
def model(dbt, session):
my_sql_model_df = dbt.ref('your_input_model')
# Salesforceにログイン
session_id, instance = SalesforceLogin(
username=username, password=password, security_token=security_token, domain=domain
)
sf = Salesforce(instance=instance, session_id=session_id)
# データフレームをインポート可能な辞書形式に変換
data_dict = my_sql_model_df.to_dict(orient='records')
# SalesforceのBulk APIを使って取引先にアップデート
job = sf.bulk.Account.update(data_dict)
return my_sql_model_df
dbtの実行環境がない場合はワークフローエンジンで動かすか、
またGCPにインフラを寄せたいなら Cloud Scheduler -> Cloud Functions などの構成で動かす手もあるかもしれません。
どちらにしろ、インフラについては自社のすでにある環境に合わせた形が良さそうです。
所感
今回はBigQueryからSalesforceへの転送でしたが、BigQueryやSnowflakeなどのDWHツールから集計されたデータをCRMツールに入れてビジネス現場での優先順位策定やKPI可視化などに使いたいというケースは一般的なニーズとしてありそうと思います。
その上で、データパイプラインの構築はデータ基盤チーム、CRMの管轄はOpsやCSチームなどと担当が分かれているケースが多いと考えており、DWHからCRMへのリバースETLはその間のタスクになるためボールが浮きがちになりそうと思いました。(まとまった記事が少ないものそういう理由だったりして)
とはいえ最近はGCPとSalesforceの連携強化が進んでいるという話もあると思うので、このあたりもSalesforce内の機能で自動化されると嬉しいですね。
まとめ
BigQueryからのSalesforce転送(リバースETL)について
- 予算に余裕があるならtroccoなどのETLクラウドサービスを使うのが楽
- 予算に余裕がないならSalesforceのBulk APIを使ってインポート
- インフラは自社の環境に合わせて選択
- 将来Salesforce内に実装されるのを期待
Discussion