Open7

MWAA(Amazon Managed Workflow for Apache Airflow)の動作環境について

yu_s_1985yu_s_1985

MWAAの動作環境についての参考記事

英語記事だけど、こちらめちゃくちゃ参考になる。
Amazon Managed Workflows for Apache Airflow — Configuration: Understanding Amazon MWAA’s Configuration Options

MWAAのブラックボックス的な部分を調査してその結果を貼ってくれている。
特に参考になったのは下記のgist

MWAAではS3に配置しているdagファイルを読み込んでくれるけど、それがMWAA環境上のどこに置かれるのか。
dags_folder = /usr/local/airflow/dags
base_log_folder = /usr/local/airflow/logs

あたり。

yu_s_1985yu_s_1985

MWAAのairflow.cfgについて

同記事で、MWAAのairflow.cfgはカスタマイズ可能とも記載されている。
airflow.cfgを直接公開はしていないのでそれを直接編集するのは無理だが、主だった設定はAirflow設定オプション(コンソールからも設定可能)で設定することができる。
内部では環境変数で渡されるとか。

While AWS doesn’t expose the airflow.cfg in the Apache Airflow UI of your environment, you can change the default Apache Airflow configuration options directly within the Amazon MWAA console and continue using all other settings in airflow.cfg. The configuration options changed in the Amazon MWAA console are translated into environment variables.

実際にコンソールで設定可能なのはこれを書いたタイミングでは下記の通り。

Airflow設定オプション一覧

ここは自分で構築しててサラッと見落としていた部分でした…。

yu_s_1985yu_s_1985

そもそもMWAAを使うための準備について書いていなかった。
MWAAを使うには最低限airflow-から始まる名前のS3バケットが必要です。
S3バケット名は全世界で一意でなければいけないため注意。
この辺は最初に貼ったクラスメソッドさんの記事を見たほうが早そう。

yu_s_1985yu_s_1985

動いているプロセス

bash operatorでps auxを実行してプロセスを見てみました。
IPはプライベートIPなので別に隠さなくても良かったけど一応マスクしてます。
test_dockerが今回ps auxを動かしたdagです。
なんでそんな名前になっているかは後述。

見てわかるのは、AirflowがCeleryExecutorで動いてるんだなあ、くらいです。
他のものは動いてなさそう。

[2021-02-05 08:00:37,756] {{bash_operator.py:146}} INFO - Running command: ps aux
[2021-02-05 08:00:37,803] {{bash_operator.py:153}} INFO - Output:
[2021-02-05 08:00:37,963] {{bash_operator.py:157}} INFO - USER       PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
[2021-02-05 08:00:38,002] {{bash_operator.py:157}} INFO - airflow      1  0.0  2.7 422016 105116 ?       Ss   Jan29   0:07 python3 /usr/local/airflow/config/console_handler.py --stream worker_console airflow worker -s
[2021-02-05 08:00:38,041] {{bash_operator.py:157}} INFO - airflow     30  0.0  3.0 501968 116000 ?       S    Jan29   7:25 [celeryd: celery@ip-xxx-xxx-xxx-xxx.ap-northeast-1.compute.internal:MainProcess] -active- (worker -s)
[2021-02-05 08:00:38,083] {{bash_operator.py:157}} INFO - airflow     34  0.0  2.5 430560 96364 ?        S    Jan29   0:00 [celeryd: celery@ip-xxx-xxx-xxx-xxx.ap-northeast-1.compute.internal:ForkPoolWorker-1]
[2021-02-05 08:00:38,125] {{bash_operator.py:157}} INFO - airflow     35  0.0  2.4 428476 92948 ?        S    Jan29   0:00 [celeryd: celery@ip-xxx-xxx-xxx-xxx.ap-northeast-1.compute.internal:ForkPoolWorker-2]
[2021-02-05 08:00:38,168] {{bash_operator.py:157}} INFO - airflow     36  0.0  2.4 428480 92948 ?        S    Jan29   0:00 [celeryd: celery@ip-xxx-xxx-xxx-xxx.ap-northeast-1.compute.internal:ForkPoolWorker-3]
[2021-02-05 08:00:38,214] {{bash_operator.py:157}} INFO - airflow     37  0.0  2.5 430848 96812 ?        S    Jan29   0:00 [celeryd: celery@ip-xxx-xxx-xxx-xxx.ap-northeast-1.compute.internal:ForkPoolWorker-4]
[2021-02-05 08:00:38,257] {{bash_operator.py:157}} INFO - airflow     38  0.0  2.4 428900 94788 ?        S    Jan29   0:00 [celeryd: celery@ip-xxx-xxx-xxx-xxx.ap-northeast-1.compute.internal:ForkPoolWorker-5]
[2021-02-05 08:00:38,300] {{bash_operator.py:157}} INFO - airflow   3052 47.3  3.0 431664 115012 ?       S    08:00   0:01 /usr/bin/python3 /usr/local/bin/airflow run test_docker test 2021-02-05T08:00:32.709497+00:00 --local --pool default_pool -sd /usr/local/airflow/dags/test_docker.py
[2021-02-05 08:00:38,541] {{bash_operator.py:157}} INFO - airflow   3056 15.0  2.8 435784 108160 ?       S    08:00   0:00 airflow task runner: test_docker test 2021-02-05T08:00:32.709497+00:00 244
[2021-02-05 08:00:38,577] {{bash_operator.py:157}} INFO - airflow   3057  0.0  0.0  11900  2636 ?        Ss   08:00   0:00 bash /tmp/airflowtmpqmoxlk9j/testu2b_lytj
[2021-02-05 08:00:38,618] {{bash_operator.py:157}} INFO - airflow   3058  0.0  0.0  52208  3556 ?        R    08:00   0:00 ps aux
[2021-02-05 08:00:47,652] {{bash_operator.py:161}} INFO - Command exited with return code 0
yu_s_1985yu_s_1985

DockerOperatorは使えない

上述の通り、ps auxでプロセスを見るとairflowしか動いていないことがわかる。
つまり、MWAAの環境上ではdockerdが起動していない
これはとてもハマったのですが、dockerdが起動していないのでDockerOperatorは使えません
コンテナを動かしたければECSOperatorを使いましょう。
EKSでもいいかもしれないけど、EKSを既に使っているのにMWAAを新しく使うメリットがあるのかはちょっと自分にはわかりません。
なんでps auxを実行したdagがtest_dockerというdag名だったのかというと、元々DockerOperatorを動かそうとしてテストしている過程で「そもそもdockerd動いてないんじゃ…?」と思ってps auxを動かしたからですね。

MWAAの環境を魔改造してdockerdをインストールして無理やり動かすとかはできなくもないかもしれないけど、設定変更とかするたびにもとに戻ったりすると大変なことになりそうなので、そこは要検証。

ECS Operatorのハマりどころ

以下はMWAAではなくてECS Operatorのハマりどころなのですが、
ECS Operatorはドキュメントに

For further information, look at the documentation of run_task() method in boto3.

とか書いてあるのに、boto3とオプションの指定形式が一致していません
boto3のECS.Clientのrun_taskのドキュメント
ECS Operator
boto3ではキャメルケースなのに対し、ECS Operatorはスネークケースで指定する必要があります。

Github(ECS Operator)
具体的には

    def __init__(
        self,
        *,
        task_definition: str,
        cluster: str,
        overrides: dict,  # pylint: disable=too-many-arguments
        aws_conn_id: Optional[str] = None,
        region_name: Optional[str] = None,
        launch_type: str = 'EC2',
        group: Optional[str] = None,
        placement_constraints: Optional[list] = None,
        placement_strategy: Optional[list] = None,
        platform_version: str = 'LATEST',
        network_configuration: Optional[dict] = None,
        tags: Optional[dict] = None,
        awslogs_group: Optional[str] = None,
        awslogs_region: Optional[str] = None,
        awslogs_stream_prefix: Optional[str] = None,
        propagate_tags: Optional[str] = None,
        reattach: bool = False,
        **kwargs,
    ):

となっている。
boto3のオプション指定と形は似ているがそこが違うので、boto3のドキュメントから値をコピペして設定しようとするとうまく行きません。
この辺はAWSのリソースに触るOperator自体そうなってるかもしれないです(未確認)

これで時間をたんまり溶かしました。documentにプルリクすべき?

yu_s_1985yu_s_1985

構築・設定変更に時間がかかる

MWAAは構築は楽チンですが、環境が構築されるまでに30分ぐらい待ちが発生します。
きちんと計測したわけではないので正確ではないですが。
変更についても少し時間がかかります。
なので設定変更を一つ一つ試すのが非常に面倒です。

対して、S3にアップロードしたファイルの読み込みは非常に早く、アップロードしてからすぐに反映されます。
こちらもきちんと計測したわけではありませんが、アップロードしてAirflowの画面を更新したら大体は反映されてました。