🐚

ECS Exec Commandの標準出力をPythonで取得する

2024/08/22に公開

はじめに

LambdaからECS Execコマンドを使って、実行しているときにうまくいかない時がありました。正確な理由は不明なのですが色々と試していると標準出力を取得していないことが原因で失敗しているようでした。そのため、以下の記事を参考に標準出力を取得します。

参考にした記事だと標準出力の最初の一部しか取得できなかったので、修正したコードを載せています。AWSやboto3のドキュメントを見てもここら付近の記述は見つけられなかったので、以下の記事は大変参考になりました。

https://blog.father.gedow.net/2022/04/11/aws-ecs-exec-output/

準備

実行に必要な以下のライブラリをインストールします。

$ pip install boto3 websocket-client construct

ECS Exec Commandの標準出力を取得するクラスEcsExecを作成・説明します。

get_task_arn()は新規に作成した関数で、実行中のクラスタのサービスの中にあるタスクを1つ取得するようにしています。これは複数タスクが立ち上がっているときに、一意にタスクを取得するために利用します。

execute_command()は参考元の記事とほとんど変わりありません。

get_execute_command_output()は、結果を取得する方法を大きく変えています。具体的には結果を取得するごとにメッセージをyieldすることで逐次データを返しています。そのため、この関数を呼び時はfor文などで取得する必要があります。

sample.py
import json
import time
import uuid
from logging import INFO, getLogger
from typing import Generator, Tuple

import boto3
import construct
import websocket

logger = getLogger()
logger.setLevel(INFO)


class EcsExec:
    def __init__(self):
        self.session = boto3.session.Session()
        self.region = self.session.region_name
        self.ecs = boto3.client("ecs", region_name=self.region)

    def get_task_arn(self, cluster: str, service_name: str) -> str:
        tasks = self.ecs.list_tasks(
            cluster=cluster,
            serviceName=service_name,
            desiredStatus="RUNNING",
        )
        return tasks["taskArns"][0]

    def execute_command(self, command, cluster, task_arn, container=None, interactive=True, retry=10):
        params = {
            "cluster": cluster,
            "task": task_arn,
            "command": command,
            "interactive": interactive,
        }
        if container:
            params["container"] = container

        sleep = 5
        for i in range(retry):
            try:
                response = self.ecs.execute_command(**params)
                return response
            except Exception as e:
                if "Wait and try again" in str(e):
                    logger.info(f"Not ready task = {task_arn} in {cluster}")
                    logger.info(f"Try again ... wait {sleep} seconds ...")
                    time.sleep(sleep)
                    continue

                logger.info(f"Failed execute command. task = {task_arn} in {cluster}")
                logger.info(f"Error: {e}")
                return False

        return None

    @staticmethod
    def get_execute_command_output(stream_url: str, token_value: str) -> Generator[Tuple[str, str], None, None]:
        connection = websocket.create_connection(url=stream_url)
        try:
            init_payload = {"MessageSchemaVersion": "1.0", "RequestId": str(uuid.uuid4()), "TokenValue": token_value}
            connection.send(json.dumps(init_payload))

            agent_message_header = construct.Struct(
                "HeaderLength" / construct.Int32ub,
                "MessageType" / construct.PaddedString(32, "ascii"),
            )
            agent_message_payload = construct.Struct(
                "PayloadLength" / construct.Int32ub,
                "Payload" / construct.PaddedString(construct.this.PayloadLength, "ascii"),
            )
            while True:
                response = connection.recv()
                if type(response) is str:
                    yield "channel_closed", ""
                    break

                message = agent_message_header.parse(response)
                logger.info(f"{message.MessageType=}, {message.HeaderLength=}, {message=}")
                if "channel_closed" in message.MessageType:
                    yield "channel_closed", ""
                    break

                if "output_stream_data" in message.MessageType:
                    payload_message = agent_message_payload.parse(response[message.HeaderLength :])
                    yield "output_stream_data", payload_message.Payload
        except websocket.WebSocketTimeoutException as e:
            logger.error(f"{e=}")
            yield "timeout", ""
        finally:
            connection.close()

使い方

最後に使い方を説明します。参考にした記事からあまり変わっていませんが、メッセージを取得するときにfor文で取得しています。

sample.py
ecs = EcsExec()
# task_arnを稼働中のサービスから取得する
# コンソールなどから取得しても可
task_arn = ecs.get_task_arn(cluster="...", service_name="...")
# コマンドを実行する
res = ecs.execute_command(command="ls", cluster="...", container="...", task_arn=task_arn)
logger.info(f"{res=}")

# 標準出力を取得する
session = res["session"]
for m in ecs.get_execute_command_output(stream_url=session["streamUrl"], token_value=session["tokenValue"]):
    logger.info(f"{m=}")

おわりに

備忘録的に簡単にまとめました。誰かの参考になれば幸いです。

GitHubで編集を提案

Discussion