🦁

AirflowのDAGジョブでPythonSDKを実行する

2022/09/02に公開

AirflowでOCIのPythonSDKをインポートする方法の記事でociをインポートすることはできましたが、今度は下記エラーとなりました。

[2022-09-01 06:51:57,215] {taskinstance.py:1481} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1137, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
    result = task_copy.execute(context=context)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/operators/python.py", line 150, in execute
    return_value = self.execute_callable()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/operators/python.py", line 161, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/opt/airflow/dags/oci_list.py", line 68, in object_storage_list
    config = oci.config.from_file()
  File "/home/airflow/.local/lib/python3.6/site-packages/oci/config.py", line 107, in from_file
    raise ConfigFileNotFound("Could not find config file at {}, please follow the instructions in the link to setup the config file https://docs.cloud.oracle.com/en-us/iaas/Content/API/Concepts/sdkconfig.htm".format(expanded_file_location))
oci.exceptions.ConfigFileNotFound: Could not find config file at /home/***/.oci/config, please follow the instructions in the link to setup the config file https://docs.cloud.oracle.com/en-us/iaas/Content/API/Concepts/sdkconfig.htm

エラーの回避

色々試しましたがconfigファイルを読ませる事が出来ず諦めました・・・・
代わりにDAGの中にconfig情報直書きすることで回避させました。

config情報直書き

下記のような感じで記述することで回避しました。

def object_storage_list(**kwargs):
        pem_prefix = '-----BEGIN RSA PRIVATE KEY-----\n'
        pem_suffix = '\n-----END RSA PRIVATE KEY-----'
        key = "MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCqUzAq791phsWNj/PSSqGgrCn\
        lKC37wyfFbmeDmsX0yZfuoTgEDU1xup4CuHbB1klu2mW879LQ1uAa2hQuu/QzWh49Tlx30L2yRrSOugXBV\
    ~略~
        5yqMErUTw7Zj+T9XiQZUmcktH6Mo0pKciQKBkOa8jsCrduQvu0VZuixzLo0vPZlU8NcnbLSc="
        """
	key_content = '{}{}{}'.format(pem_prefix, key, pem_suffix)
        config_with_key_content = {
            "user": 'ocid1.user.oc1..aaaaaaaa**************',
            "key_content": key_content,
            "fingerprint": '8b:*****************ce:93',
            "tenancy": 'ocid1.tenancy.oc1..aaaaaaaa***********',
            "region": 'ap-tokyo-1'
        }
	oci.config.validate_config(config_with_key_content)
	object_storage_client = oci.object_storage.ObjectStorageClient(config_with_key_content)
	namespace_name="********"
        bucket_name="test"
        get_object_list = object_storage_client.list_objects(namespace_name,bucket_name)
        return get_object_list.data

実行できたけどまたエラー

[2022-09-01 07:15:30,151] {taskinstance.py:1481} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1137, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1344, in _execute_task
    self.xcom_push(key=XCOM_RETURN_KEY, value=result)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/session.py", line 70, in wrapper
    return func(*args, session=session, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1925, in xcom_push
    session=session,
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/session.py", line 67, in wrapper
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/xcom.py", line 79, in set
    value = XCom.serialize_value(value)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/xcom.py", line 226, in serialize_value
    return json.dumps(value).encode('UTF-8')
  File "/usr/local/lib/python3.6/json/__init__.py", line 231, in dumps
    return _default_encoder.encode(obj)
  File "/usr/local/lib/python3.6/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/usr/local/lib/python3.6/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "/usr/local/lib/python3.6/json/encoder.py", line 180, in default
    o.__class__.__name__)
TypeError: Object of type 'ListObjects' is not JSON serializable

再度調査することに。。。。
このエラーはreturnで返す型がオブジェクトでJSONにパース出来ないのでエラーが起きてるっぽい??
なので無理やり文字列に変換してやる

修正箇所
return str(get_object_list.data[0])
実行結果
*** Reading local file: /opt/airflow/logs/test_oci/object_list/2022-09-01T08:14:01.768048+00:00/1.log
[2022-09-01 08:14:08,303] {taskinstance.py:876} INFO - Dependencies all met for <TaskInstance: test_oci.object_list 2022-09-01T08:14:01.768048+00:00 [queued]>
[2022-09-01 08:14:08,324] {taskinstance.py:876} INFO - Dependencies all met for <TaskInstance: test_oci.object_list 2022-09-01T08:14:01.768048+00:00 [queued]>
[2022-09-01 08:14:08,325] {taskinstance.py:1067} INFO - 
--------------------------------------------------------------------------------
[2022-09-01 08:14:08,325] {taskinstance.py:1068} INFO - Starting attempt 1 of 1
[2022-09-01 08:14:08,327] {taskinstance.py:1069} INFO - 
--------------------------------------------------------------------------------
[2022-09-01 08:14:08,336] {taskinstance.py:1087} INFO - Executing <Task(PythonOperator): object_list> on 2022-09-01T08:14:01.768048+00:00
[2022-09-01 08:14:08,347] {standard_task_runner.py:52} INFO - Started process 65631 to run task
[2022-09-01 08:14:08,352] {standard_task_runner.py:76} INFO - Running: ['***', 'tasks', 'run', 'test_oci', 'object_list', '2022-09-01T08:14:01.768048+00:00', '--job-id', '70', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/oci_list.py', '--cfg-path', '/tmp/tmpv2gue29i', '--error-file', '/tmp/tmpy4xncbn5']
[2022-09-01 08:14:08,352] {standard_task_runner.py:77} INFO - Job 70: Subtask object_list
[2022-09-01 08:14:08,390] {logging_mixin.py:104} INFO - Running <TaskInstance: test_oci.object_list 2022-09-01T08:14:01.768048+00:00 [running]> on host eefb3556693f
[2022-09-01 08:14:08,449] {taskinstance.py:1282} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=***
AIRFLOW_CTX_DAG_ID=test_oci
AIRFLOW_CTX_TASK_ID=object_list
AIRFLOW_CTX_EXECUTION_DATE=2022-09-01T08:14:01.768048+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-09-01T08:14:01.768048+00:00
[2022-09-01 08:14:08,547] {python.py:151} INFO - Done. Returned value was: {
  "archival_state": null,
  "etag": null,
  "md5": null,
  "name": "test.txt",
  "size": null,
  "storage_tier": null,
  "time_created": null,
  "time_modified": null
}
[2022-09-01 08:14:08,576] {taskinstance.py:1191} INFO - Marking task as SUCCESS. dag_id=test_oci, task_id=object_list, execution_date=20220901T081401, start_date=20220901T081408, end_date=20220901T081408
[2022-09-01 08:14:08,601] {taskinstance.py:1245} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2022-09-01 08:14:08,641] {local_task_job.py:151} INFO - Task exited with return code 0

Discussion