SimpleHttpOperatorで日本語を使いたい
tl;dr
- SimpleHttpOperatorでは、デフォルトではdataパラメータで日本語送れないよ
- encodeすると送れるようになるよ
- マクロ(Jinjaテンプレート)が使えなくなるので、必要な場合はpre_execute経由すると良いよ
SimpleHttpOperatorとは
AirflowのOperatorの一つで名前の通りHTTP(S)のリクエストを行います。パラメータとしては、
- method(GETとかPOSTとか)
- data(POSTのbody、GETのURL param)
- HTTPヘッダー
- responseの扱い(ログに残すか、内容をチェックしTask成功の判断)
- 認証
dataパラメータで日本語を送りたい
前述の通り、SimpleHttpOperatorではdataパラメータでリクエストに使うデータ(POSTのbody、GETのクエリパラメータ)を送ることができます。しかし、素直な実装では日本語の内容を送るとエラーになります。
例えば、以下のDAGを実行すると、
from datetime import datetime
from airflow import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
with DAG(
'simple_http_dag2',
description='SimpleHttpOperator',
schedule_interval=None,
start_date=datetime(2021, 1, 1),
catchup=False,
default_args={'retries': 1},
) as dag:
SimpleHttpOperator(
task_id='en_post',
http_conn_id='',
endpoint='https://httpbin.org/post',
data='{"a": "b"}',
log_response=True,
headers={"Content-Type": "application/json"},
)
SimpleHttpOperator(
task_id='ja_post',
http_conn_id='',
endpoint='https://httpbin.org/post',
data='{"a": "日本語"}',
log_response=True,
headers={"Content-Type": "application/json"},
)
英語のdataを送るTask Instanceは成功します(以下Task Instanceのログの抜粋)。
2022-05-06, 22:44:21 UTC] {http.py:129} INFO - Sending 'POST' to url: https://httpbin.org/post
[2022-05-06, 22:44:22 UTC] {http.py:106} INFO - {
"args": {},
"data": "{\"a\": \"b\"}",
"files": {},
"form": {},
"headers": {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
"Content-Length": "10",
"Content-Type": "application/json",
"Host": "httpbin.org",
"User-Agent": "python-requests/2.27.1",
"X-Amzn-Trace-Id": "Root=1-6275a4c4-0902cdb667985a0364483f6b"
},
"json": {
"a": "b"
},
"origin": "60.113.53.109",
"url": "https://httpbin.org/post"
}
日本語のdataを送るTask Instanceは失敗します。
UnicodeEncodeError: 'latin-1' codec can't encode characters in position 7-9: Body ('日本語') is not valid Latin-1. Use body.encode('utf-8') if you want to send it encoded in UTF-8.
エラーの原因
SimpleHttpOperatorは、HttpHookを経由してrequests.Requestを呼び出します。
req = requests.Request(self.method, url, data=data, headers=headers, **request_kwargs)
requests.Requestのdataは、日本語の場合bytesにencoodeして渡す必要があります(例えばこのQita記事を参考)。SimpleHttpOperator・HttpHookでは引数をそのまま(※)渡しているため、dataパラメータに日本語を設定するとエラーになるわけです。
※正確にはマクロ(Jinja2テンプレート)を展開して
回避策1:encodeして渡す
Taskのdataパラメータをencodeするのが自明な回避策で、これで動きます。ただし、一点落とし穴があります(後述)。
前述のDAGを変えて実行してみます
# Task定義以外は省略
SimpleHttpOperator(
task_id='ja_post',
http_conn_id='',
endpoint='https://httpbin.org/post',
# encodeを追加
data='{"a": "日本語"}'.encode('utf-8'),
log_response=True,
headers={"Content-Type": "application/json"},
)
動いたようです
[2022-05-06, 23:04:19 UTC] {http.py:129} INFO - Sending 'POST' to url: https://httpbin.org/post
[2022-05-06, 23:04:20 UTC] {http.py:106} INFO - {
"args": {},
"data": "{\"a\": \"\u65e5\u672c\u8a9e\"}",
"files": {},
"form": {},
"headers": {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
"Content-Length": "18",
"Content-Type": "application/json",
"Host": "httpbin.org",
"User-Agent": "python-requests/2.27.1",
"X-Amzn-Trace-Id": "Root=1-6275a972-3a0acaac0c3d666a4e35ddbd"
},
"json": {
"a": "\u65e5\u672c\u8a9e"
},
"origin": "60.113.53.109",
"url": "https://httpbin.org/post"
}
回避策1の問題:マクロ(Jinjaテンプレート)が展開されない
Operatorのパラメータ(※)には、Jinjaテンプレートでマクロを記載することができます。回避策1でdataパラメータをencodeしてbytesで渡すと、マクロが展開されなくなります。
実行時に決まる値(XComやAirflow Variables、DAG Runのパラメータ等)を使う場合は、マクロが必要ですので、これは嬉しくない話です。
※正確にはtemplate_fieldsで指定されているパラメータ。SimpleHttpOperatorの場合endpoint、data、headers
例えば下のDAGを実行すると、
# Task定義以外は省略
SimpleHttpOperator(
task_id='macro_post',
http_conn_id='',
endpoint='https://httpbin.org/post',
data='{{ run_id }} 日本語'.encode('utf-8'),
log_response=True,
)
エラーにこそなりませんが、httpbinの結果からマクロが展開されずに渡されたことがわかります(dataの部分の{}が残っていることに注目)。
[2022-05-06, 23:04:19 UTC] {http.py:129} INFO - Sending 'POST' to url: https://httpbin.org/post
[2022-05-06, 23:04:20 UTC] {http.py:106} INFO - {
"args": {},
"data": "{{ run_id }} \u65e5\u672c\u8a9e",
"files": {},
"form": {},
"headers": {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
"Content-Length": "22",
"Host": "httpbin.org",
"User-Agent": "python-requests/2.27.1",
"X-Amzn-Trace-Id": "Root=1-6275a972-6dfe5d7a561151a72665922e"
},
"json": null,
"origin": "60.113.53.109",
"url": "https://httpbin.org/post"
}
マクロ展開されない原因
AbstractOperator(すべてのOperatorの親クラス)のrender_templateメソッドで、template_fieldsで指定されたパラメータを実際のパラメータに変換(render)しますが、この時、パラメータが文字列(str)の場合だけJinjaテンプレートとして読み込みます。
(具体的には下の分岐です)
if isinstance(value, str):
if any(value.endswith(ext) for ext in self.template_ext): # A filepath.
template = jinja_env.get_template(value)
else:
template = jinja_env.from_string(value)
dag = self.get_dag()
if dag and dag.render_template_as_native_obj:
return render_template_as_native(template, context)
return render_template_to_string(template, context)
回避策2:pre_execute+encode
BaseOperator(全てのOperatorの親クラス)には、executeの直前フックできるpre_executeというメソッドがあります。このメソッドを利用することで
- マクロ(Jinjaテンプレート)を展開(pre_executeよりさらに前の処理)
- strからbytesに変換(pre_executeに記載)
- SimpleHttpOperatorの処理を実行(execute)
の順に処理を行い、マクロを展開しつつ日本語を使うことがで可能となります。
# Task定義以外は省略
class SimpleHttpOperator2(SimpleHttpOperator):
def pre_execute(self, context):
super().pre_execute(context)
self.data = self.data.encode('utf-8')
SimpleHttpOperator2(
task_id='macro_post_with_hook',
http_conn_id='',
endpoint='https://httpbin.org/post',
data='{{ run_id }} 日本語',
log_response=True,
)
マクロ(Jinjaテンプレート)を展開しつつ、日本語を送ることができました。
[2022-05-06, 23:13:19 UTC] {http.py:129} INFO - Sending 'POST' to url: https://httpbin.org/post
[2022-05-06, 23:13:19 UTC] {http.py:106} INFO - {
"args": {},
"data": "manual__2022-05-06T23:04:19.514089+00:00 \u65e5\u672c\u8a9e",
"files": {},
"form": {},
"headers": {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
"Content-Length": "50",
"Host": "httpbin.org",
"User-Agent": "python-requests/2.27.1",
"X-Amzn-Trace-Id": "Root=1-6275ab8d-30bf5e2a08357ade66145790"
},
"json": null,
"origin": "60.113.53.109",
"url": "https://httpbin.org/post"
}
Discussion