🐚
ECS Exec Commandの標準出力をPythonで取得する
はじめに
LambdaからECS Execコマンドを使って、実行しているときにうまくいかない時がありました。正確な理由は不明なのですが色々と試していると標準出力を取得していないことが原因で失敗しているようでした。そのため、以下の記事を参考に標準出力を取得します。
参考にした記事だと標準出力の最初の一部しか取得できなかったので、修正したコードを載せています。AWSやboto3のドキュメントを見てもここら付近の記述は見つけられなかったので、以下の記事は大変参考になりました。
準備
実行に必要な以下のライブラリをインストールします。
$ pip install boto3 websocket-client construct
ECS Exec Commandの標準出力を取得するクラスEcsExec
を作成・説明します。
get_task_arn()
は新規に作成した関数で、実行中のクラスタのサービスの中にあるタスクを1つ取得するようにしています。これは複数タスクが立ち上がっているときに、一意にタスクを取得するために利用します。
execute_command()
は参考元の記事とほとんど変わりありません。
get_execute_command_output()
は、結果を取得する方法を大きく変えています。具体的には結果を取得するごとにメッセージをyield
することで逐次データを返しています。そのため、この関数を呼び時はfor
文などで取得する必要があります。
sample.py
import json
import time
import uuid
from logging import INFO, getLogger
from typing import Generator, Tuple
import boto3
import construct
import websocket
logger = getLogger()
logger.setLevel(INFO)
class EcsExec:
def __init__(self):
self.session = boto3.session.Session()
self.region = self.session.region_name
self.ecs = boto3.client("ecs", region_name=self.region)
def get_task_arn(self, cluster: str, service_name: str) -> str:
tasks = self.ecs.list_tasks(
cluster=cluster,
serviceName=service_name,
desiredStatus="RUNNING",
)
return tasks["taskArns"][0]
def execute_command(self, command, cluster, task_arn, container=None, interactive=True, retry=10):
params = {
"cluster": cluster,
"task": task_arn,
"command": command,
"interactive": interactive,
}
if container:
params["container"] = container
sleep = 5
for i in range(retry):
try:
response = self.ecs.execute_command(**params)
return response
except Exception as e:
if "Wait and try again" in str(e):
logger.info(f"Not ready task = {task_arn} in {cluster}")
logger.info(f"Try again ... wait {sleep} seconds ...")
time.sleep(sleep)
continue
logger.info(f"Failed execute command. task = {task_arn} in {cluster}")
logger.info(f"Error: {e}")
return False
return None
@staticmethod
def get_execute_command_output(stream_url: str, token_value: str) -> Generator[Tuple[str, str], None, None]:
connection = websocket.create_connection(url=stream_url)
try:
init_payload = {"MessageSchemaVersion": "1.0", "RequestId": str(uuid.uuid4()), "TokenValue": token_value}
connection.send(json.dumps(init_payload))
agent_message_header = construct.Struct(
"HeaderLength" / construct.Int32ub,
"MessageType" / construct.PaddedString(32, "ascii"),
)
agent_message_payload = construct.Struct(
"PayloadLength" / construct.Int32ub,
"Payload" / construct.PaddedString(construct.this.PayloadLength, "ascii"),
)
while True:
response = connection.recv()
if type(response) is str:
yield "channel_closed", ""
break
message = agent_message_header.parse(response)
logger.info(f"{message.MessageType=}, {message.HeaderLength=}, {message=}")
if "channel_closed" in message.MessageType:
yield "channel_closed", ""
break
if "output_stream_data" in message.MessageType:
payload_message = agent_message_payload.parse(response[message.HeaderLength :])
yield "output_stream_data", payload_message.Payload
except websocket.WebSocketTimeoutException as e:
logger.error(f"{e=}")
yield "timeout", ""
finally:
connection.close()
使い方
最後に使い方を説明します。参考にした記事からあまり変わっていませんが、メッセージを取得するときにfor文で取得しています。
sample.py
ecs = EcsExec()
# task_arnを稼働中のサービスから取得する
# コンソールなどから取得しても可
task_arn = ecs.get_task_arn(cluster="...", service_name="...")
# コマンドを実行する
res = ecs.execute_command(command="ls", cluster="...", container="...", task_arn=task_arn)
logger.info(f"{res=}")
# 標準出力を取得する
session = res["session"]
for m in ecs.get_execute_command_output(stream_url=session["streamUrl"], token_value=session["tokenValue"]):
logger.info(f"{m=}")
おわりに
備忘録的に簡単にまとめました。誰かの参考になれば幸いです。
Discussion