Blueskyの投稿画像にウォーターマークを追加するBot
Blueskyに投稿されたイラストに自動でウォーターマークを追加するBotを書いてみました。
この記事では、AWSのサービス(Step Functions、ECS、Eventbridgeなど)をCDKでIaCする例と、Bluesky の atprotocol の python SDK である atproto 等を組み合わせた実装例について紹介します。
作成したBotの機能要素は以下の通りです:
- フォロー/アンフォロー操作によるユーザ登録と削除
- DMを介したアプリパスワードの受取
- 登録ユーザーの画像投稿をウォーターマーク付きの画像に差し替える[1]
- Blueskyのfirehoseからリアルタイムで投稿を監視する
アーキテクチャ概要
システムは以下の主要コンポーネントで構成されています:
- ECS Service: Bluesky firehoseの常時監視
- Lambda Functions: 各処理ステップの実行
- Step Functions: ワークフローの制御
- SQS: 非同期処理のメッセージキュー
- S3: 画像ファイルの保存
- Secrets Manager: API認証情報の管理
利用技術スタック
インフラ・デプロイ
- AWS CDK (TypeScript): インフラのコード化
- Docker: コンテナ化
- AWS Lambda: サーバーレス関数実行
- AWS ECS Fargate: コンテナオーケストレーション
- AWS Step Functions: ワークフロー管理
アプリケーション
- Python 3.13: メインの開発言語
- Poetry: 依存関係管理
- atproto: Bluesky AT Protocol SDK
- Pillow: 画像処理ライブラリ。ウォーターマーク処理に使っています
- boto3: AWS SDK
主要な機能とユースケース
この仕組みは、以下の5種類のユースケースに対応する機能で構成されます:
- フォローバック: 新規フォロワーへの自動フォローバックとDM送信
- サインアップ: 新規フォロワーへの正式ユーザ登録手続
- ウォーターマーク画像登録: ユーザーが使いたいウォーターマーク画像の登録
- ウォーターマーク適用: フォロワーの画像投稿をウォーターマーク付きに差し替え
- サインアウト: アンフォロー時のユーザ登録情報抹消
各機能は、src/
配下のPythonプログラムとlib/
配下のCDKスタック(TypeScript)の組み合わせで実現されています。CDKスタックでAWSリソースをデプロイし、そのリソース上でPythonプログラムが実行される構成となっています。
1. フォローバック機能
この機能はsrc/followのPythonプログラムとlib/follow_flow_stack.tsのCDKスタックで実現されています。
CDKで構築されるAWSリソース
このCDKスタックにより、3つのLambda関数を順次実行するStep Functionsワークフローが構築されます。
src/follow配下のPythonプログラム
touch_user_file.py: ユーザー情報をS3に保存
followback.py: 自動フォローバック処理
send_dm.py: アプリパスワード提供の依頼
2. サインアップ機能
この機能は新規フォロワーへの正式ユーザ登録手続きを担当します。フォローバック後の新規ユーザーに対し、DMで正式な登録手続きを案内するための仕組みです。
正式な登録 とは、Bluesky のアプリパスワードを DM 経由で受領し、暗号化してS3上にユーザーファイルとして保存することを指します。ユーザーファイルのオブジェクト名はユーザーのDIDであるため、ユーザーとユーザーファイルは一意に紐付き、混同されることはありません。
なお、アプリパスワードは、ユーザーの画像投稿にウォーターマークを付与した投稿をユーザー名義で投稿し、元の投稿を削除する操作にのみ利用されます。
src/signupのPythonプログラムとlib/signup_flow_stack.tsのCDKスタックで実装されています。
CDKで構築されるAWSリソース
src/signup配下のPythonプログラム
getter.py: DMからアプリパスワードを取得して保存
notifier.py: 登録完了の通知
3. ウォーターマーク画像登録
この機能は、ユーザーが使いたいウォーターマーク画像をシステムに登録するためのものです。ユーザーが特定のaltを付けた画像を投稿すると、その画像がウォーターマークとしてS3に保存されます。
src/set_watermark_imgのPythonプログラムとlib/set_watermark_img_stack.tsのCDKスタックで実装されています。
CDKで構築されるAWSリソース
src/set_watermark_img 配下のPythonプログラム
executor.py: 投稿からウォーターマーク画像を取得してS3に保存
notifier.py: 設定完了通知
4. ウォーターマーク適用
この機能は、フォロワーが画像投稿を行った際に、登録済みのウォーターマークを自動で合成し、元の投稿をウォーターマーク付きの画像で差し替える処理を担当します。
差し替える処理とは、ウォーターマーク付きの画像を新たに投稿し、元の投稿を削除することを指します。firehoseからフォロワーの投稿をリアルタイムで監視し、画像投稿を即座に検知して動作するため、投稿から差替完了までの所要時間は長くて3分程度です。
src/watermarkingのPythonプログラムとlib/watermarking_flow_stack.tsのCDKスタックで実装されています。
鋭い方はすでに頭の中でツッコミを入れられていると思いますが、この仕様はあまりスマートではありません。
たとえば、WebUIを持つサードパーティのアプリケーションなどを提供し、そこから画像を投稿してもらう仕様にすることで、firehoseを監視する必要はありませんし、ウォーターマークが付与されていないオリジナル画像を Bluesky に投稿する必要性自体をなくすことができます。
しかし、そのような方法を採らず今回の仕様にした理由は、使う人が外部のアプリケーションではなく Bluesky の利用だけで完結できることを重視したためです。
ウォーターマークをつけるために外部のWebアプリケーションからわざわざ投稿をするのは、自分なら継続できないと思ったからです。(BlueskyのクライアントアプリでSNSを使っているのに、わざわざブラウザに切り替えるのはめんどくさいので…)
CDKで構築されるAWSリソース
src/watermarking配下のPythonプログラム
get_image.py: 元画像のダウンロード
apply_watermark.py: ウォーターマーク合成処理
post_watermarked.py: ウォーターマーク付き画像の投稿
del_original_post.py: 元投稿の削除
5. サインアウト機能
この機能は、いわゆる退会機能です。
フォロワーがBotをアンフォローしたら、そのフォロワーの登録情報をS3上から抹消する処理をします。アプリパスワードが保管されたユーザーファイルや、お預かりしたウォーターマーク画像のS3バケットからの削除を行います。プライバシー保護とデータ管理の観点から重要な機能です。
src/signoutのPythonプログラムとlib/signout_flow_stack.tsのCDKスタックで実装されています。
CDKで構築されるAWSリソース
src/signout配下のPythonプログラム
delete_user_files.py: ユーザーデータの削除
delete_watermarks.py: ウォーターマーク画像の削除
unfollow.py: アンフォロー
6.リアルタイム監視機能
ウォーターマーク画像の初回登録やその後の差し替えと、ウォーターマークの付加対象とすべき画像投稿に即座に反応する機能を支える基盤として、Bluesky firehoseのリアルタイム監視システムがあります。
firehose とは、Bluesky の AT Protocol におけるリアルタイムイベントストリームのことを指します。これにより、フォロワーの投稿をリアルタイムで監視し、必要な処理を即座に実行することが可能になります。
firehose は秒間10や100のオーダーを超えるメッセージが流れ込む仕組みなので、ECSで常時起動して 関心があるメッセージ だけを検知するサービスを立てています。
関心があるメッセージ とは、フォロワーの画像投稿とウォーターマーク画像の設定リクエスト(としての画像投稿)です。これらを検知して、先に紹介したウォーターマークの適用や画像の登録処理を起動するトリガーとしての役割を、ECS上で稼働するこの リアルタイム監視機能 が担っています。
src/firehoseのPythonプログラムとlib/firehose_stack.tsのCDKスタックで実装されています。
CDKで構築されるECSサービス
コンテナイメージをFargateのタスクとして実行する構成です。
src/firehose/listener.py
Fargate上で常時起動するプロセスです。
Bluesky firehoseから流れ込む大量のメッセージから、フォロワーの投稿を検知し、関心がある内容があった場合はSQSにメッセージを送ることで具体的な処理を別スタックのワークフローに行わせます。
余談ですが、firehose にどの程度のメッセージが流入しているかは、Firesky を見ると分かりやすいです。Fireskyにアクセスした瞬間にBluesky上に投稿されたすべてのポストが一気に流れていきます。
AWS CDKでのインフラ構築
スタック構成
プロジェクトは以下の7つのスタックに分割しています。各スタックは個別にデプロイができる設計にしてあります。
共通リソースの定義
各Cfnスタックから共通で利用する S3 Bucket や、Botユーザのアプリパスワードなどを安全に保管するための Secrets Manager などは、 common-resource というCfnスタックで作成しています。
プロデューサー・コンシューマーモデルの利用
このBotの主要な機能は Step Functions でワークフローとして構成されています。
BlueskyのようなSNSは大量のリクエストが短期間に発生し得るシステムなので、それを相手にするBot側では、過負荷を避けて処理可能なスピードで順番にリクエストを処理できるよう、ワークフローのトリガーにSQSを使ったプロデューサー・コンシューマーモデルを採用しています。
画像にウォーターマークを付加するワークフローを実行するEvent Bridge Pipes。Forehoseから流れ込んでくるSQSにメッセージをバッチサイズで定義された1件ずつ、ワークフローである StepFunctions に渡す形で実行する。
狙いは、特別に高性能な仕組みを用意しなくても、大量のリクエストを送ってくるSNSを相手にできるようにすることです。
たとえば、SQSに入ってきたメッセージはEventBridge Pipesが常に1件の Step Functionsのワークフローを実行するようコントロールしてくれる(batchSize: 1のパラメーターの指定により)ので、ワークフロー側を高性能にする必要が無くなります。
もちろん、利用が増えてバッチサイズ1ではスピードが不足してきたら、エラーにならない程度にサイズを上げることで並行数を増やしてスピードアップすることも簡単です。
これにより、Stepfunctions自体か、Stepfunctionsから実行されているLambdaなどのRate Limitまではバッチサイズを上げる拡張性も持つことができています。
リポジトリ
ソースコードは以下で公開しています:
詳細なデプロイ手順と運用に関する情報は、プロジェクトのREADME.mdをご参照ください。
モチベーション
あまり深く考えず思いつきと勢いで書いたコードなので、はっきりと狙いが無いというのが正直なところです。
強いて書くと、私は絵を描いてSNSに投稿しているのですが、界隈ではサインかウォーターマーク(透かし画像)を入れる習慣があります。
オシャレなウォーターマークを入れている方も多く、自分でもやりたいけど手間だから自動化できないかなと思い、ちょうど Bluesky の atprotocol も触れてみたかったので勢いで作ってみました。
技術的なノウハウ目当ての方や、Bluesky 上に投稿した画像へのウォーターマークの自動付加に興味がある方にとって、何かしらの参考になれば幸いです。
-
差し替えるという言い回しは厳密ではなく、ウォーターマーク付きの画像投稿を投稿してから、オリジナルの投稿を削除する処理を行います。オリジナルの投稿に含まれる画像ファイルを差し替えているわけではありません(できたらうれしいけど…) ↩︎
Discussion