PythonとActivityPubでリマインダーBotを作ろう
先日、apkit
というライブラリを使って、簡単なActivityPubのBotを作ってみました。この記事では、その際に書いたコードを元に、どうやって作ったのかを解説していこうと思います。
どんなBot?
今回作ったのは、メンションで時間を指定すると、その時間後にリプライで通知してくれる、というシンプルなリマインダーBotです。
例えば、「@reminder@your.host.com 10m オーブンを確認して!
」のようにメンションを送ると、10分後に「🔔 Reminder for @your_username: オーブンを確認して!」といった感じのリプライを返してくれます。
使ったもの
開発に使ったのは、主に以下のライブラリです。
-
uv: 最近話題の高速なパッケージ管理ツール。
pip
でも大丈夫です。 - apkit[server]: 今回の主役。ActivityPubサーバーを簡単に作れるPythonライブラリです。
-
uvicorn:
apkit
で作ったサーバーを動かすためのASGIサーバー。 - cryptography: ActivityPubで必須の署名鍵を生成するために使います。
まずは、これらの依存関係をインストールします。
# プロジェクトの初期化
uv init
# 依存関係のインストール
uv add "apkit[server]" uvicorn cryptography
コード全体像
コードはmain.py
という一つのファイルにまとめています。全体の流れはこんな感じです。
- 必要なライブラリのインポートと基本設定
- ActivityPubで使う署名鍵の準備
- Bot自身の情報であるActorを定義
-
apkit
サーバーの初期化 - 送られてきた投稿(Activity)を保存する場所の確保
- リマインダーの処理ロジック
- 各種エンドポイント(
/actor
や/outbox
など)の定義 - 他のサーバーからアクティビティを受信したときの処理
- サーバーの起動
それでは、各部分を詳しく見ていきましょう。
1. インポートと基本設定
まずは、必要なものをimport
して、Botの基本的な設定をします。
# main.py
import asyncio
import logging
import re
import uuid
import os
from datetime import timedelta, datetime
# FastAPIやapkitなど、必要なライブラリをインポート
from fastapi import Request, Response
from fastapi.responses import JSONResponse
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization as crypto_serialization
from apkit.config import AppConfig
from apkit.server import ActivityPubServer
from apkit.server.types import Context, ActorKey
from apkit.server.responses import ActivityResponse
from apkit.models import (
Actor, Application, CryptographicKey, Follow, Create, Note, Mention, Actor as APKitActor, OrderedCollection,
)
from apkit.client import WebfingerResource, WebfingerResult, WebfingerLink
from apkit.client.asyncio.client import ActivityPubClient
# --- ロギング設定 ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# --- Botの基本設定 ---
HOST = "your.host.com" # 自分のドメインに書き換えてね
USER_ID = "reminder" # BotのID(ユーザー名)
HOST
とUSER_ID
は、Botを動かす環境に合わせて書き換えてください。ここで設定した値が、BotのID(@reminder@your.host.com
のような形式)になります。
2. 署名鍵の準備
ActivityPubでは、サーバー間でアクティビティをやり取りする際にHTTP Signaturesという仕組みで署名を行います。そのための鍵を準備します。
# main.py (続き)
# --- 鍵の永続化 ---
KEY_FILE = "private_key.pem"
# 秘密鍵ファイルがあれば読み込み、なければ新しく作る
if os.path.exists(KEY_FILE):
logger.info(f"秘密鍵を読み込みます: {KEY_FILE}")
with open(KEY_FILE, "rb") as f:
private_key = crypto_serialization.load_pem_private_key(f.read(), password=None)
else:
logger.info(f"秘密鍵が見つからないので、新しく作ります: {KEY_FILE}")
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
with open(KEY_FILE, "wb") as f:
f.write(private_key.private_bytes(
encoding=crypto_serialization.Encoding.PEM,
format=crypto_serialization.PrivateFormat.PKCS8,
encryption_algorithm=crypto_serialization.NoEncryption()
))
# 秘密鍵から公開鍵を生成
public_key_pem = private_key.public_key().public_bytes(
encoding=crypto_serialization.Encoding.PEM,
format=crypto_serialization.PublicFormat.SubjectPublicKeyInfo
).decode('utf-8')
初回起動時にprivate_key.pem
というファイル名で秘密鍵が作られ、次回からはそれを読み込んで使うようにしています。
3. Actorの定義
次に、Bot自身を表すActorを定義します。ActorはActivityPubの世界での「登場人物」のようなものです。今回はBotなので、Application
というタイプを使いました。
# main.py (続き)
# --- Actorの定義 ---
actor = Application(
id=f"https://{HOST}/actor",
name="Reminder Bot",
preferredUsername=USER_ID,
summary="リマインダーBotです。`@reminder 5m オーブンを確認` のようにメンションを送ると、リマインドを送信します。",
inbox=f"https://{HOST}/inbox", # 他のサーバーからアクティビティを受け取る場所
outbox=f"https://{HOST}/outbox", # こちらからアクティビティを外に発信する場所
publicKey=CryptographicKey( # 公開鍵の情報
id=f"https://{HOST}/actor#main-key",
owner=f"https://{HOST}/actor",
publicKeyPem=public_key_pem
)
)
apkit
サーバーの初期化
4. いよいよapkit
の登場です。ActivityPubServer
を初期化します。
# main.py (続き)
# --- 鍵取得関数の定義 ---
async def get_keys_for_actor(identifier: str) -> list[ActorKey]:
"""指定されたIDのActorの鍵を返す関数"""
if identifier == actor.id:
# 今回はBot自身の鍵だけを返す
return [ActorKey(key_id=actor.publicKey.id, private_key=private_key)]
return []
# --- サーバーの初期化 ---
app = ActivityPubServer(apkit_config=AppConfig(
actor_keys=get_keys_for_actor # 鍵取得関数を登録
))
apkit
が投稿に署名するとき、ここで登録したget_keys_for_actor
関数が呼ばれて、秘密鍵を取得する仕組みです。
5. アクティビティの保存場所
Botが作成した投稿(Note
アクティビティ)は、どこかに保存しておかないと、一部のActivityPub実装で投稿を作成してくれません (Iceshrimpなど。)。そこで、今回は一時的な辞書にActivityを保存することにします。
# main.py (続き)
# --- アクティビティのインメモリ保存領域 ---
ACTIVITY_STORE = {} # 作成したアクティビティを保存しておく場所
CACHE = {} # 一度取得したアクティビティをキャッシュする場所
CACHE_TTL = timedelta(minutes=5) # キャッシュの有効期限(5分)
ACTIVITY_STORE
が永続的な保存場所(今回はメモリ上なのでサーバーを再起動すると消えます)、CACHE
が一時的なキャッシュです。同じ投稿に何度もアクセスがあったときに、高速に返せるようにしています。
6. リマインダー処理
ここがBotのメインロジックです。メンションから時間とメッセージを抜き出して、指定時間後にリプライを送信します。
# main.py (続き)
# --- リマインダーの解析ロジック ---
def parse_reminder(text: str) -> tuple[timedelta | None, str | None, str | None]:
"""'5m 何かをする' のようなリマインダーテキストを解析する"""
# 正規表現で時間、単位、メッセージを抽出
pattern = re.compile(r"^\s*(\d+)\s*(s|m|h|d)\s*(.*)", re.IGNORECASE)
match = pattern.search(text)
if not match:
return None, None, None
value, unit, message = match.groups()
value = int(value)
original_time_string = f"{value}{unit.lower()}"
unit = unit.lower()
if unit == 's': # 秒
delta = timedelta(seconds=value)
elif unit == 'm': # 分
delta = timedelta(minutes=value)
elif unit == 'h': # 時間
delta = timedelta(hours=value)
elif unit == 'd': # 日
delta = timedelta(days=value)
else:
return None, None, None
if not message.strip():
message = "Here's your reminder!"
return delta, message.strip(), original_time_string
# --- リマインダー送信関数 ---
async def send_reminder(ctx: Context, delay: timedelta, message: str, target_actor: APKitActor, original_note: Note):
"""指定された時間待機し、リマインダーを送信する"""
logger.info(f"{delay} 後にリマインドをスケジュールしました: {message}")
await asyncio.sleep(delay.total_seconds()) # ここで待機!
logger.info(f"{target_actor.id} にリマインダーを送信します")
# リマインドの投稿(Note)を作成
reminder_note = Note(...)
# それをCreateアクティビティで包む
reminder_create = Create(...)
# 作成したアクティビティを保存
ACTIVITY_STORE[reminder_note.id] = reminder_note
ACTIVITY_STORE[reminder_create.id] = reminder_create
# 相手のinboxに送信!
keys = await get_keys_for_actor(f"https://{HOST}/actor")
await ctx.send(keys, target_actor, reminder_create)
logger.info(f"リマインダーを送信しました")
7. エンドポイントの定義
ActivityPubサーバーとして機能するために必要なエンドポイントを定義します。apkit
はFastAPIをベースにしているので、@app.get(...)
のようなデコレータで簡単に定義できます。
# main.py (続き)
# Actor情報を返すエンドポイント
@app.get("/actor")
async def get_actor_endpoint():
return ActivityResponse(actor)
# Outbox(送信済み投稿一覧)のエンドポイント
@app.get("/outbox")
async def get_outbox_endpoint():
items = sorted(ACTIVITY_STORE.values(), key=lambda x: x.id, reverse=True)
outbox_collection = OrderedCollection(
id=actor.outbox,
totalItems=len(items),
orderedItems=items
)
return ActivityResponse(outbox_collection)
# 個別の投稿(Note)を返すエンドポイント
@app.get("/notes/{note_id}")
async def get_note_endpoint(note_id: uuid.UUID):
note_uri = f"https://{HOST}/notes/{note_id}"
# まずキャッシュを探す
if note_uri in CACHE and (datetime.now() - CACHE[note_uri]["timestamp"]) < CACHE_TTL:
logger.info(f"Cache hit: {note_uri}")
return ActivityResponse(CACHE[note_uri]["activity"])
# なければストアから探す
if note_uri in ACTIVITY_STORE:
logger.info(f"Cache miss: {note_uri}")
activity = ACTIVITY_STORE[note_uri]
# 見つかったらキャッシュに保存
CACHE[note_uri] = {"activity": activity, "timestamp": datetime.now()}
return ActivityResponse(activity)
return Response(status_code=404) # 見つからなかったら404
# ... (他のエンドポイントも同様)
7.1 Webfingerの定義
Webfingerは、他のサーバーから「@user@host」みたいな形式でユーザーを見つけるための仕組みです。これを実装しておかないと、せっかく作ったBotが誰も使えない…なんて悲しいことになるので、忘れずに対応しておきたいところです。ここでは簡易的な実装にとどめておきます。
@app.webfinger()
async def webfinger_endpoint(request: Request, acct: WebfingerResource) -> Response:
if not acct.url:
if acct.username == USER_ID and acct.host == HOST: # USER_IDとHOSTが一致したら返す
link = WebfingerLink(rel="self", type="application/activity+json", href=actor.id) # type: ignore
wf_result = WebfingerResult(subject=acct, links=[link])
return JSONResponse(wf_result.to_json(), media_type="application/jrd+json")
else:
if acct.url == f"https://{HOST}/actor": # urlからもactorを解決できるようにしておく
link = WebfingerLink(rel="self", type="application/activity+json", href=actor.id) # type: ignore
wf_result = WebfingerResult(subject=acct, links=[link])
return JSONResponse(wf_result.to_json(), media_type="application/jrd+json")
return JSONResponse({"message": "Not Found"}, status_code=404)
8. アクティビティの受信処理
@app.on(...)
デコレータを使うと、特定のタイプのアクティビティがinbox
にPOSTされたときの処理を書くことができます。
# main.py (続き)
# Followされたときの処理
@app.on(Follow)
async def on_follow_activity(ctx: Context):
"""フォローを自動的に承認する"""
activity = ctx.activity
if not isinstance(activity, Follow):
return Response(status_code=400)
async with ActivityPubClient() as client:
follower_actor = await client.actor.fetch(activity.actor.id if isinstance(activity.actor, Actor) else activity.actor) # pyright: ignore[reportArgumentType, reportAttributeAccessIssue]
if not follower_actor:
return Response(status_code=400)
accept_activity = activity.accept(
id=f"https://{HOST}/activity/follows/{follower_actor}",
actor=actor
) # Followアクティビティに対してのacceptを生成する
keys = await get_keys_for_actor(f"https://{HOST}/actor")
await ctx.send(keys, follower_actor, accept_activity) # type: ignore
logger.info(f"Accepted follow from {follower_actor.id}") # type: ignore
return Response(status_code=202)
# Createアクティビティ(Note)を受け取ったときの処理
@app.on(Create)
async def on_create_activity(ctx: Context):
"""メンションを解析してリマインダーをスケジュールする"""
activity = ctx.activity
# Noteじゃない場合は無視
if not (isinstance(activity, Create) and isinstance(activity.object, Note)):
return Response(status_code=202)
note = activity.object
# 自分宛てのメンションかチェック
is_mentioned = False
if note.tag:
for tag in note.tag:
if isinstance(tag, Mention) and tag.href == actor.id:
is_mentioned = True
break
if not is_mentioned:
return Response(status_code=202)
# ... (リマインダー解析処理)
delay, message, time_str = parse_reminder(command_text)
# リマインダーの形式に合致したら、バックグラウンドタスクとして登録
if delay and message and sender_actor:
asyncio.create_task(send_reminder(ctx, delay, message, sender_actor, note))
reply_content = f"<p>{time_str} 後にリマインドします。</p>"
else:
# 形式が違う場合は使い方を返信
reply_content = "<p>フォーマットが無効です。`@reminder [時間] [メッセージ]`の形式で送ってください。</p><p>例: `@reminder 10m オーブンを確認`</p>"
reply_note = Note(
id=f"https://{HOST}/notes/{uuid.uuid4()}",
attributedTo=actor.id,
inReplyTo=note.id, # type: ignore
content=reply_content,
to=[sender_actor.id], # type: ignore
tag=[Mention(href=sender_actor.id, name=f"@{sender_actor.preferredUsername}")] # type: ignore
)
reply_create = Create(
id=f"https://{HOST}/creates/{uuid.uuid4()}",
actor=actor.id,
object=reply_note,
to=[sender_actor.id] # type: ignore
)
ACTIVITY_STORE[reply_note.id] = reply_note
ACTIVITY_STORE[reply_create.id] = reply_create
keys = await get_keys_for_actor(f"https://{HOST}/actor")
await ctx.send(keys, sender_actor, reply_create) # type: ignore
return Response(status_code=202)
on_create_activity
の中で、メンションの内容をparse_reminder
で解析し、成功すればsend_reminder
をasyncio.create_task
でバックグラウンドタスクとして実行しています。
9. サーバーの起動
最後に、uvicorn
でサーバーを起動します。
# main.py (続き)
if __name__ == "__main__":
import uvicorn
logger.info("uvicornサーバーを起動します...")
uvicorn.run(app, host="0.0.0.0", port=8000)
Botの動かし方
-
main.py
のHOST
とUSER_ID
を、自分の環境に合わせて設定します。 -
ターミナルで以下のコマンドを実行します。
uvicorn main:app --host 0.0.0.0 --port 8000
-
Botが
http://0.0.0.0:8000
で起動します。
これで、Fediverseのどこからでも@reminder@your.host.com
のようにメンションを送れば、Botが応答してくれるはずです。
まとめ
今回はapkit
を使って、ごく簡単なActivityPub Botを作ってみました。メモリ上にしかデータを保存しないので、サーバーを再起動するとリマインダーが消えてしまうなど、まだまだ改善の余地はたくさんあります。
例えば、以下のような改良が考えられます。
- SQLiteやPostgreSQLなどのデータベースを使って、アクティビティを永続化する
- リマインダーのタスク管理を、サーバーが再起動しても消えないように、もっと堅牢な仕組みにする(例えばRedisやCeleryを使うなど)
- 繰り返しリマインダー(「毎日9時にリマインド」など)に対応する
この記事が、誰かがActivityPubで何かを作るきっかけになれば嬉しいです。
Discussion