pythonで咄嗟に実装できて便利だった機能3選
こんにちは。株式会社シグマアイリサーチャーの@k_arakiです。
今回は初めて携わったアプリ開発で簡単に導入できて便利だった機能を3つ紹介します。
はじめに
この記事を書くきっかけとなった体験についてお話しします。
入社してまもなく、ある企業に対する業務サポートの効果検証プロジェクトに参加しました。
まず初めにサポート用のプログラムだけ作成し、過去のデータを用いて有効性を調べました。
その次のステップである試験運用を行うために簡易的なアプリ化が必要でした。
今回の話はこの時の開発に関するものです。
本体のプログラムはそれまでの検証で既に出来上がっているため、アプリとしての開発項目は以下の2つでした。
- 業務サポートプログラムのWebAPI化
- データの入出力・整形を担当するフロントエンド開発
あくまで試験用だったこともあり、要件定義は表面的に必要な機能の洗い出し程度のものでした。
その結果出来上がったのは、負荷に弱かったり無駄の多い、試験運用にすら問題のあるアプリでした。
「業務サポートに必要な機能」まではしっかり考えていたのですが、「業務サポートに必要な機能に必要な機能」あたりからの見通しが甘かったと今では反省しています…
試験運用期間を十分に確保するため、続く修正作業も素早く終わらせることが重要でした。
そのために意識したことはこの2つでした。
- 今あるものを極力変えずに付け足す
- (やり方さえ分かれば)簡単なものを使う
根こそぎ変えた方が最終的には良いものになることは勿論あり得ます。
一方でクオリティより早さが求められる場面や後戻りのしにくい状況ではこのような立ち回りも必要になる、というのがここで得た教訓でした。
本記事ではこの時に調査・実装した機能のうち、他のアプリでも使えそうなものをピックアップして紹介します。
試験的なアプリ開発程度であればサクッと追加できて役に立つことがあるかもしれません。
反対に大規模であったり本格的なアプリ開発には非推奨なものもあるため予めご了承ください。
また、この記事には時間の限られた新入社員が半ば場当たり的に取り組んだ行動記録という側面も含まれます。
変なところもあるでしょうが何卒ご容赦ください。
アプリの概要
本題のための事前情報として、私が開発していた試験用アプリの詳細に触れておきます。
1企業内の5名程度を対象とした、試験用の業務サポートアプリです。
AWS上に立ち上げた2つのインスタンスから構成されています。
各インスタンスについてざっくりと記載しておくと
- コアインスタンス
- 業務サポートプログラムの本体であるWebAPI
- PythonのFastAPIを使用
- 後述のフロントより10倍ほど高価(計算時にメモリが必要なため)
- フロントインスタンス
- ユーザからデータを受け取りWebAPIに送信/受信した結果を表示
- PythonのStreamlitを使用
- コアと比べると安価
両方のインスタンスでDockerを用いて仮想環境を作成しています。
Point1:バックグラウンド処理で通信を安定させる
もともと作っておいた関数のWebAPI化が最初の課題でした。
Pythonを使う場合FastAPIというフレームワークが便利です。
以下に使用例を載せます(hoge:WebAPI化したい機能を書いた関数)。
from fastapi import Body, FastAPI
app = FastAPI()
@app.post("/do_task")
async def do_hoge(request=Body(...)):
response = hoge(request)
return response
基本的にこれで問題なく動作するのですが、中身の処理時間が長い場合にはそう上手くはいきません。
先ほどの例を鵜呑みにした私を待ち受けていたのは通信中断によるエラーの山でした。
上記の例だと、関数hogeの処理が終わるまで通信を維持し続ける仕様になります。
これが想定より負荷のかかるもので通信中断の原因となっていました。
では通信時間を短縮しようと考えた時、真っ先に削るべきは処理中の待ち時間でした。
一度処理が始まれば、フロント側では終わった時に結果が分かればそれで十分です。
作業中ずっとWebAPIに張り付いて監視する必要は特にありません。
そこで処理開始時と進捗確認の時にだけ通信を行い、計算自体はバックグラウンドで実行されるような改良を考えました。
実装方法
Celeryという非同期タスク処理用のライブラリを使ってバックグラウンドでの処理を実現します。
Celeryを使う場合、以下の様に追加で複数のコンテナを起動する必要があります。
- worker:タスクを実行する
- redis:実行中のタスクを管理するデータベース
- flower:workerのモニタリング(必須ではないがあると便利)
そのため、まずコア側のdocker-composeにこれらのコンテナの情報を記入します。
参考までに元々あった部分を含めた全文を載せておきます。
ハイライト箇所が新しく追加した部分です。
(自分で調べていた時はここで苦戦していました…)
version: '3.9'
services:
core:
build:
context: .
dockerfile: Dockerfile
container_name: core
restart: unless-stopped
ports:
- 8502:8502
+ depends_on:
+ - redis
+ worker:
+ build: .
+ volumes:
+ - .:/src
+ command: poetry run celery -A app.main.celery_app worker --loglevel=info
+ depends_on:
+ - redis
+ restart: always
+ redis:
+ image: "redis:alpine"
+ ports:
+ - "6379:6379"
+ restart: always
+ flower:
+ image: mher/flower:latest
+ command: celery flower --url_prefix=flower
+ environment:
+ CELERY_BROKER_URL: redis://redis:6379/0
+ FLOWER_PORT: 5555
+ # flowerにアクセスするときの認証情報
+ FLOWER_BASIC_AUTH: {ユーザ名}:{パスワード}
+ depends_on:
+ - redis
+ restart: always
+ flower-healthchecker:
+ build:
+ context: ./flower
+ dockerfile: Dockerfile
+ ports:
+ - 5556:5556
+ restart: always
+ nginx:
+ image: nginx:alpine
+ ports:
+ - "5555:5555"
+ volumes:
+ - ./flower/nginx.conf:/etc/nginx/nginx.conf
+ depends_on:
+ - flower
+ - flower-healthchecker
+ restart: always
一応Dockerfileも載せておきますが、Celery導入前後での変更は特にありません。
FROM python:3.9.4-slim
EXPOSE 8502
ENV PYTHONUNBUFFERED=1
RUN apt-get update \
&& apt-get install -y gcc python3-dev \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /src
RUN pip install poetry
COPY . .
RUN poetry config virtualenvs.create false \
&& poetry install --no-dev
CMD ["poetry", "run", "uvicorn", "app.main:app", "--host", "0.0.0.0",
"--port", "8502", "--timeout-keep-alive", "3600", "--workers", "4"]
続けてPythonのコード本体の変更点です。
まずWebAPI化したい処理をFastAPIではなくCeleryのタスクとして定義します。
from celery import Celery
celery_app = Celery(
'tasks',
broker='redis://redis:6379',
backend='redis://redis:6379'
)
@celery_app.task
def do_hoge(request=Body(...)):
response = hoge(request)
return response
通信に関わる部分はFastAPIを使って定義します。
delayを使うとタスクがバックグラウンドで開始されます。
この時の通信はタスク開始が確認できたところで終了するため、タスクの種類問わず短時間で済みます。
また、delayを実行すると返ってくるidはこの時開始されたタスクへの問い合わせに使います。
from fastapi import Body, FastAPI
app = FastAPI()
@app.post("/start_task")
async def start_task(request=Body(...)):
task = do_hoge.delay(request)
return {"task_id": task.id}
タスクの進捗確認にはAsyncResultを使います。
開始時に取得したidを入力すると、該当タスクの状態が以下のどれか分かります。
- PENDING(実行中)
- SUCCESS(成功)
- FAILURE(タスクの内部で問題が発生)
タスクが完了していた場合にはその出力も取得できます。
@app.get("/task_status/{task_id}")
async def get_task_status(task_id: str):
task = do_hoge.AsyncResult(task_id)
return {"task_status": task.status, "result": task.result}
後はタスク開始リクエスト後、定期的に進捗確認を行うようにフロント側を修正すれば目的であった通信時間の短縮は完了です。
余談にはなりますがidを使えばタスクの中断も可能です。
この場合はcontrol.revokeを使います。
ここまでの2つの操作とは異なり、Celeryインスタンスに対しての操作ですのでご注意ください。
@app.delete("/delete_task/{task_id}")
async def delete_task(task_id: str):
celery_app.control.revoke(task_id, terminate=True)
return {"status": "task revoked"}
私が勉強に使ったものを載せておくので、Celeryについてより詳しく知りたい方はこちらを参考にしてください。
Point2:インスタンス起動は使いたい時だけ
次はAWSのコスト低減に関わるお話です。
AWSのインスタンスは従量課金制のため、使わない時間は停止しておいた方がお得です。
今回紹介する節約方法は、アプリ(フロントインスタンス)にコアインスタンスの起動/停止機能を追加し、利用時だけユーザに起動してもらうというものです。
こちらの方法はインスタンスを使う日時が不定期な場合にはおすすめできます。
一方で、もし予め使用日時が十分絞り込める場合には予め時間帯を限定する方が簡単かつ起動の手間もかからないので便利です。
その他にも今回のやり方が有効であるための条件がいくつか存在するのでご注意ください。
-
コアとフロントのインスタンスが別
もし単一のインスタンスに全ての機能を載せていた場合、当然ながら停止中はアプリにアクセスできません。
そのため、アプリに起動用の機能を追加しても起動できないから使えないという本末転倒な事態になってしまいます。 -
コアのインスタンスがフロントと比べて相対的に高価
本機能を使う場合、コアの起動/停止を実行できるようにフロントは起動し続ける事になります。
今回のアプリでは処理をメインで行うコアインスタンスだけがメモリの都合上高価(フロントの10倍以上)であったため、コアだけの停止であっても大幅なコストカットに成功しています。 -
ユーザが1社内かつ少人数
1人のユーザによって全ユーザが共有するコアインスタンスを停止する事が可能になります。
そのため不特定多数向けのアプリに本機能を実装することはお勧めしません。
実装方法
まずは起動/停止対象のインスタンスを指定します。
今回の場合コアインスタンスのIDが必要になります。
セキュリティの観点から、インスタンスIDは.envファイルなどで別途管理してください。
import os
import boto3
INSTANCE_ID = os.environ["CORE_INSTANCE_ID"]
EC2 = boto3.resource('ec2')
instance = EC2.Instance(id=INSTANCE_ID)
現在のインスタンスが起動しているかの確認方法です。
インスタンスの状態には
- pending:起動中
- running:稼働中
- stopping:停止実行中
- stopped:停止中
があり、先ほどの準備に続けてinstance.state['Name']
で取得できます。
インスタンスを起動する場合にはinstance.start()
、停止する場合にはinstance.stop()
をそれぞれ実行してください。
注意点として、インスタンスが起動してから内部のコンテナが起動するまでには時間差があります。
インスタンス起動直後にAPIへリクエストを送ると404エラーが出るのでご注意を。
Point3:アクセス中のユーザを確認
3つ目と銘打ちましたが、正確にはPoint2の使い勝手を良くするための補助的な機能です。
Point2の使用条件でも触れましたが、本アプリではユーザ全員で1つのコアインスタンスを使用しています。
そのため自分が使い終わったからとインスタンスを停止すると、他ユーザのアクセスも中断されてしまいます。
かといって全員が「まだ使っている人がいるかも」と停止ボタンを押さなければその分コストもかかってしまいます。
この問題を回避するために、現在コアインスタンスにアクセスしている人の一覧を取得する機能を実装しました。
なお今回紹介するやり方はWebAPIだけで完結します。
新しいインスタンスやサービスを利用する必要はありません。
WebAPIだけで完結させた背景には認証機能との連携失敗があります。
今回のアプリではCognitoという認証機能を導入していました。
この機能はAWS上で提供されているため、同じくAWS上でアプリを作る際に使う方もいらっしゃると思います。
認証ができるのならそこからアクセス中のユーザも分かるのでは?と思ったのですが
Cognitoから取得できるのは「各ユーザのアカウントが有効かどうか」だけでした。
他の認証サービスなら話は違ったのかもしれませんが、検討や導入のコストが対価に見合わなかったので今必要なユーザ確認部分だけ別で作った次第です。
実装方法
必要な機能はアクセス中のユーザ一覧に対する
- 閲覧
- 自分を一覧に追加
- 自分を一覧から削除
の3つの操作です。
以下で説明するコードはPoint1で書いたコードの下に書き加えるだけでOKです。
処理が短時間のため、ここではCeleryを使う必要はありません。
import os
import json
import datetime as dt
import fasteners
from fastapi import Body, FastAPI
app = FastAPI()
# 起動するたびに空の辞書で上書き(一度停止する = 誰も使っていない状態になったと判定)
active_user_folder_path = f"active_users"
if not os.path.exists(active_user_folder_path):
os.makedirs(active_user_folder_path)
active_user_list_path = active_user_folder_path + "/active_user_list.json"
active_user_list = {}
with open(active_user_list_path, "w") as file:
json.dump(active_user_list, file)
timezone_jst = dt.timezone(dt.timedelta(hours=9))
# 閲覧
@app.get("/get_active_user")
def get_active_user():
with open(active_user_list_path, "r") as file:
active_user_list = json.load(file)
return active_user_list
# 追加(request:ユーザ名(str))
@fasteners.interprocess_locked("/active_users/active_user_list.json")
@app.post("/add_active_user")
def update_active_user(request=Body(...)):
with open(active_user_list_path, "r") as file:
active_user_list = json.load(file)
username = request["username"]
now = dt.datetime.now(timezone_jst)
active_user_list[username] = now.strftime("%Y/%m/%d %H:%M")
with open(active_user_list_path, "w") as file:
json.dump(active_user_list, file)
return active_user_list
# 削除(request:ユーザ名(str))
@fasteners.interprocess_locked("/active_users/active_user_list.json")
@app.delete("/delete_active_user")
def delete_active_user(request=Body(...)):
with open(active_user_list_path, "r") as file:
active_user_list = json.load(file)
username = request["username"]
if username in active_user_list:
del active_user_list[username]
with open(active_user_list_path, "w") as file:
json.dump(active_user_list, file)
return active_user_list
このコードで作成されるユーザ一覧は以下のように辞書として保存されています。
{
"user1":"2023/12/21 18:58",
"user2":"2023/12/21 17:40",
}
コアインスタンスを使い始める時はadd_active_userにリクエストを送信、使い終わったらdelete_active_userにリクエストを送信、と運用すれば上記の辞書にはアクセス中のユーザだけが記録されることになります。
各種リクエストを送信するには専用のボタンを用意する他、「明確に使い始め/使い終わりと結びつく操作」のついでに自動で送信されるようにしておくと便利です。
今回作っていたアプリではコアインスタンスにPoint1で定義した処理開始のリクエストを送る際にadd_active_userにもリクエストが送られるようにしています。
名前だけでなく時間も記録されているのはdeleteし忘れへの対策です。
deleteを実行し忘れるとインスタンスが停止されるまで一覧に名前が残り続けてしまいます。
こんな時でも「最後に使った時刻が1日前なら無視する」などとルールを決めておけば心置きなくインスタンスを停止できます。
また、変数としての保持が面倒だったためユーザ一覧はファイルとして保存されるようにしています。
このためファイルの書き換えを行うユーザ追加・削除機能に排他ロックを導入する必要があります。
やり方は簡単で、デコレータ@fasteners.interprocess_locked("ロックしたいファイル名")
をつけるだけです。
ただし上の例と同じようにfastenersのデコレータ→fastapiのデコレータの順に書かないとエラーが出てしまうのでご注意ください。
時刻に関する補足です。
現在時刻を取得する時、上のコードでは
timezone_jst = dt.timezone(dt.timedelta(hours=9))
now = dt.datetime.now(timezone_jst)
と書いています。
自前のPCやサーバで動かす場合には
now = dt.datetime.now()
で良いのですが、AWS上で動かす場合には先ほどの例のように書いて下さい。
AWS上ではデフォルトで取得される時刻が世界標準時になっており、日本の時刻を取得したいことを明示する必要があります(9時間ずらすだけで十分)。
まとめ
ここまでお読みいただきありがとうございます。
今回は私が簡易版のアプリを作成した時に役立った、簡単に実装できる機能を3つ紹介しました。
試験用のアプリなどでは、しばしば本筋の機能以外はさっさと作りたい・作らなければならないという事があると思います。
その時にこの記事が少しでも皆様のお役に立てば幸いです。
Discussion