🦁
AirflowのDAGジョブでPythonSDKを実行する
AirflowでOCIのPythonSDKをインポートする方法の記事でociをインポートすることはできましたが、今度は下記エラーとなりました。
[2022-09-01 06:51:57,215] {taskinstance.py:1481} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1137, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
result = task_copy.execute(context=context)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/operators/python.py", line 150, in execute
return_value = self.execute_callable()
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/operators/python.py", line 161, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/opt/airflow/dags/oci_list.py", line 68, in object_storage_list
config = oci.config.from_file()
File "/home/airflow/.local/lib/python3.6/site-packages/oci/config.py", line 107, in from_file
raise ConfigFileNotFound("Could not find config file at {}, please follow the instructions in the link to setup the config file https://docs.cloud.oracle.com/en-us/iaas/Content/API/Concepts/sdkconfig.htm".format(expanded_file_location))
oci.exceptions.ConfigFileNotFound: Could not find config file at /home/***/.oci/config, please follow the instructions in the link to setup the config file https://docs.cloud.oracle.com/en-us/iaas/Content/API/Concepts/sdkconfig.htm
エラーの回避
色々試しましたがconfigファイルを読ませる事が出来ず諦めました・・・・
代わりにDAGの中にconfig情報直書きすることで回避させました。
config情報直書き
下記のような感じで記述することで回避しました。
def object_storage_list(**kwargs):
pem_prefix = '-----BEGIN RSA PRIVATE KEY-----\n'
pem_suffix = '\n-----END RSA PRIVATE KEY-----'
key = "MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCqUzAq791phsWNj/PSSqGgrCn\
lKC37wyfFbmeDmsX0yZfuoTgEDU1xup4CuHbB1klu2mW879LQ1uAa2hQuu/QzWh49Tlx30L2yRrSOugXBV\
~略~
5yqMErUTw7Zj+T9XiQZUmcktH6Mo0pKciQKBkOa8jsCrduQvu0VZuixzLo0vPZlU8NcnbLSc="
"""
key_content = '{}{}{}'.format(pem_prefix, key, pem_suffix)
config_with_key_content = {
"user": 'ocid1.user.oc1..aaaaaaaa**************',
"key_content": key_content,
"fingerprint": '8b:*****************ce:93',
"tenancy": 'ocid1.tenancy.oc1..aaaaaaaa***********',
"region": 'ap-tokyo-1'
}
oci.config.validate_config(config_with_key_content)
object_storage_client = oci.object_storage.ObjectStorageClient(config_with_key_content)
namespace_name="********"
bucket_name="test"
get_object_list = object_storage_client.list_objects(namespace_name,bucket_name)
return get_object_list.data
実行できたけどまたエラー
[2022-09-01 07:15:30,151] {taskinstance.py:1481} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1137, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1344, in _execute_task
self.xcom_push(key=XCOM_RETURN_KEY, value=result)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/session.py", line 70, in wrapper
return func(*args, session=session, **kwargs)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1925, in xcom_push
session=session,
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/session.py", line 67, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/xcom.py", line 79, in set
value = XCom.serialize_value(value)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/xcom.py", line 226, in serialize_value
return json.dumps(value).encode('UTF-8')
File "/usr/local/lib/python3.6/json/__init__.py", line 231, in dumps
return _default_encoder.encode(obj)
File "/usr/local/lib/python3.6/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/usr/local/lib/python3.6/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "/usr/local/lib/python3.6/json/encoder.py", line 180, in default
o.__class__.__name__)
TypeError: Object of type 'ListObjects' is not JSON serializable
再度調査することに。。。。
このエラーはreturnで返す型がオブジェクトでJSONにパース出来ないのでエラーが起きてるっぽい??
なので無理やり文字列に変換してやる
修正箇所
return str(get_object_list.data[0])
実行結果
*** Reading local file: /opt/airflow/logs/test_oci/object_list/2022-09-01T08:14:01.768048+00:00/1.log
[2022-09-01 08:14:08,303] {taskinstance.py:876} INFO - Dependencies all met for <TaskInstance: test_oci.object_list 2022-09-01T08:14:01.768048+00:00 [queued]>
[2022-09-01 08:14:08,324] {taskinstance.py:876} INFO - Dependencies all met for <TaskInstance: test_oci.object_list 2022-09-01T08:14:01.768048+00:00 [queued]>
[2022-09-01 08:14:08,325] {taskinstance.py:1067} INFO -
--------------------------------------------------------------------------------
[2022-09-01 08:14:08,325] {taskinstance.py:1068} INFO - Starting attempt 1 of 1
[2022-09-01 08:14:08,327] {taskinstance.py:1069} INFO -
--------------------------------------------------------------------------------
[2022-09-01 08:14:08,336] {taskinstance.py:1087} INFO - Executing <Task(PythonOperator): object_list> on 2022-09-01T08:14:01.768048+00:00
[2022-09-01 08:14:08,347] {standard_task_runner.py:52} INFO - Started process 65631 to run task
[2022-09-01 08:14:08,352] {standard_task_runner.py:76} INFO - Running: ['***', 'tasks', 'run', 'test_oci', 'object_list', '2022-09-01T08:14:01.768048+00:00', '--job-id', '70', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/oci_list.py', '--cfg-path', '/tmp/tmpv2gue29i', '--error-file', '/tmp/tmpy4xncbn5']
[2022-09-01 08:14:08,352] {standard_task_runner.py:77} INFO - Job 70: Subtask object_list
[2022-09-01 08:14:08,390] {logging_mixin.py:104} INFO - Running <TaskInstance: test_oci.object_list 2022-09-01T08:14:01.768048+00:00 [running]> on host eefb3556693f
[2022-09-01 08:14:08,449] {taskinstance.py:1282} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=***
AIRFLOW_CTX_DAG_ID=test_oci
AIRFLOW_CTX_TASK_ID=object_list
AIRFLOW_CTX_EXECUTION_DATE=2022-09-01T08:14:01.768048+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-09-01T08:14:01.768048+00:00
[2022-09-01 08:14:08,547] {python.py:151} INFO - Done. Returned value was: {
"archival_state": null,
"etag": null,
"md5": null,
"name": "test.txt",
"size": null,
"storage_tier": null,
"time_created": null,
"time_modified": null
}
[2022-09-01 08:14:08,576] {taskinstance.py:1191} INFO - Marking task as SUCCESS. dag_id=test_oci, task_id=object_list, execution_date=20220901T081401, start_date=20220901T081408, end_date=20220901T081408
[2022-09-01 08:14:08,601] {taskinstance.py:1245} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2022-09-01 08:14:08,641] {local_task_job.py:151} INFO - Task exited with return code 0
Discussion