✈️

PythonとActivityPubでリマインダーBotを作ろう

に公開

先日、apkitというライブラリを使って、簡単なActivityPubのBotを作ってみました。この記事では、その際に書いたコードを元に、どうやって作ったのかを解説していこうと思います。

どんなBot?

今回作ったのは、メンションで時間を指定すると、その時間後にリプライで通知してくれる、というシンプルなリマインダーBotです。
例えば、「@reminder@your.host.com 10m オーブンを確認して!」のようにメンションを送ると、10分後に「🔔 Reminder for @your_username: オーブンを確認して!」といった感じのリプライを返してくれます。

使ったもの

開発に使ったのは、主に以下のライブラリです。

  • uv: 最近話題の高速なパッケージ管理ツール。pipでも大丈夫です。
  • apkit[server]: 今回の主役。ActivityPubサーバーを簡単に作れるPythonライブラリです。
  • uvicorn: apkitで作ったサーバーを動かすためのASGIサーバー。
  • cryptography: ActivityPubで必須の署名鍵を生成するために使います。

まずは、これらの依存関係をインストールします。

# プロジェクトの初期化
uv init

# 依存関係のインストール
uv add "apkit[server]" uvicorn cryptography

コード全体像

コードはmain.pyという一つのファイルにまとめています。全体の流れはこんな感じです。

  1. 必要なライブラリのインポートと基本設定
  2. ActivityPubで使う署名鍵の準備
  3. Bot自身の情報であるActorを定義
  4. apkitサーバーの初期化
  5. 送られてきた投稿(Activity)を保存する場所の確保
  6. リマインダーの処理ロジック
  7. 各種エンドポイント(/actor/outboxなど)の定義
  8. 他のサーバーからアクティビティを受信したときの処理
  9. サーバーの起動

それでは、各部分を詳しく見ていきましょう。

1. インポートと基本設定

まずは、必要なものをimportして、Botの基本的な設定をします。

# main.py

import asyncio
import logging
import re
import uuid
import os
from datetime import timedelta, datetime

# FastAPIやapkitなど、必要なライブラリをインポート
from fastapi import Request, Response
from fastapi.responses import JSONResponse
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization as crypto_serialization

from apkit.config import AppConfig
from apkit.server import ActivityPubServer
from apkit.server.types import Context, ActorKey
from apkit.server.responses import ActivityResponse
from apkit.models import (
    Actor, Application, CryptographicKey, Follow, Create, Note, Mention, Actor as APKitActor, OrderedCollection,
)
from apkit.client import WebfingerResource, WebfingerResult, WebfingerLink
from apkit.client.asyncio.client import ActivityPubClient

# --- ロギング設定 ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# --- Botの基本設定 ---
HOST = "your.host.com"  # 自分のドメインに書き換えてね
USER_ID = "reminder"      # BotのID(ユーザー名)

HOSTUSER_IDは、Botを動かす環境に合わせて書き換えてください。ここで設定した値が、BotのID(@reminder@your.host.comのような形式)になります。

2. 署名鍵の準備

ActivityPubでは、サーバー間でアクティビティをやり取りする際にHTTP Signaturesという仕組みで署名を行います。そのための鍵を準備します。

# main.py (続き)

# --- 鍵の永続化 ---
KEY_FILE = "private_key.pem"

# 秘密鍵ファイルがあれば読み込み、なければ新しく作る
if os.path.exists(KEY_FILE):
    logger.info(f"秘密鍵を読み込みます: {KEY_FILE}")
    with open(KEY_FILE, "rb") as f:
        private_key = crypto_serialization.load_pem_private_key(f.read(), password=None)
else:
    logger.info(f"秘密鍵が見つからないので、新しく作ります: {KEY_FILE}")
    private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
    with open(KEY_FILE, "wb") as f:
        f.write(private_key.private_bytes(
            encoding=crypto_serialization.Encoding.PEM,
            format=crypto_serialization.PrivateFormat.PKCS8,
            encryption_algorithm=crypto_serialization.NoEncryption()
        ))

# 秘密鍵から公開鍵を生成
public_key_pem = private_key.public_key().public_bytes(
    encoding=crypto_serialization.Encoding.PEM,
    format=crypto_serialization.PublicFormat.SubjectPublicKeyInfo
).decode('utf-8')

初回起動時にprivate_key.pemというファイル名で秘密鍵が作られ、次回からはそれを読み込んで使うようにしています。

3. Actorの定義

次に、Bot自身を表すActorを定義します。ActorはActivityPubの世界での「登場人物」のようなものです。今回はBotなので、Applicationというタイプを使いました。

# main.py (続き)

# --- Actorの定義 ---
actor = Application(
    id=f"https://{HOST}/actor",
    name="Reminder Bot",
    preferredUsername=USER_ID,
    summary="リマインダーBotです。`@reminder 5m オーブンを確認` のようにメンションを送ると、リマインドを送信します。",
    inbox=f"https://{HOST}/inbox",      # 他のサーバーからアクティビティを受け取る場所
    outbox=f"https://{HOST}/outbox",    # こちらからアクティビティを外に発信する場所
    publicKey=CryptographicKey(         # 公開鍵の情報
        id=f"https://{HOST}/actor#main-key",
        owner=f"https://{HOST}/actor",
        publicKeyPem=public_key_pem
    )
)

4. apkitサーバーの初期化

いよいよapkitの登場です。ActivityPubServerを初期化します。

# main.py (続き)

# --- 鍵取得関数の定義 ---
async def get_keys_for_actor(identifier: str) -> list[ActorKey]:
    """指定されたIDのActorの鍵を返す関数"""
    if identifier == actor.id:
        # 今回はBot自身の鍵だけを返す
        return [ActorKey(key_id=actor.publicKey.id, private_key=private_key)]
    return []

# --- サーバーの初期化 ---
app = ActivityPubServer(apkit_config=AppConfig(
    actor_keys=get_keys_for_actor  # 鍵取得関数を登録
))

apkitが投稿に署名するとき、ここで登録したget_keys_for_actor関数が呼ばれて、秘密鍵を取得する仕組みです。

5. アクティビティの保存場所

Botが作成した投稿(Noteアクティビティ)は、どこかに保存しておかないと、一部のActivityPub実装で投稿を作成してくれません (Iceshrimpなど。)。そこで、今回は一時的な辞書にActivityを保存することにします。

# main.py (続き)

# --- アクティビティのインメモリ保存領域 ---
ACTIVITY_STORE = {} # 作成したアクティビティを保存しておく場所
CACHE = {}          # 一度取得したアクティビティをキャッシュする場所
CACHE_TTL = timedelta(minutes=5) # キャッシュの有効期限(5分)

ACTIVITY_STOREが永続的な保存場所(今回はメモリ上なのでサーバーを再起動すると消えます)、CACHEが一時的なキャッシュです。同じ投稿に何度もアクセスがあったときに、高速に返せるようにしています。

6. リマインダー処理

ここがBotのメインロジックです。メンションから時間とメッセージを抜き出して、指定時間後にリプライを送信します。

# main.py (続き)

# --- リマインダーの解析ロジック ---
def parse_reminder(text: str) -> tuple[timedelta | None, str | None, str | None]:
    """'5m 何かをする' のようなリマインダーテキストを解析する"""
    # 正規表現で時間、単位、メッセージを抽出
    pattern = re.compile(r"^\s*(\d+)\s*(s|m|h|d)\s*(.*)", re.IGNORECASE)
    match = pattern.search(text)
    if not match:
        return None, None, None

    value, unit, message = match.groups()
    value = int(value)
    original_time_string = f"{value}{unit.lower()}"
    
    unit = unit.lower()
    if unit == 's': # 秒
        delta = timedelta(seconds=value)
    elif unit == 'm': # 分
        delta = timedelta(minutes=value)
    elif unit == 'h': # 時間
        delta = timedelta(hours=value)
    elif unit == 'd': # 日
        delta = timedelta(days=value)
    else:
        return None, None, None
        
    if not message.strip():
        message = "Here's your reminder!"

    return delta, message.strip(), original_time_string

# --- リマインダー送信関数 ---
async def send_reminder(ctx: Context, delay: timedelta, message: str, target_actor: APKitActor, original_note: Note):
    """指定された時間待機し、リマインダーを送信する"""
    logger.info(f"{delay} 後にリマインドをスケジュールしました: {message}")
    await asyncio.sleep(delay.total_seconds()) # ここで待機!
    
    logger.info(f"{target_actor.id} にリマインダーを送信します")
    
    # リマインドの投稿(Note)を作成
    reminder_note = Note(...)
    # それをCreateアクティビティで包む
    reminder_create = Create(...)
    
    # 作成したアクティビティを保存
    ACTIVITY_STORE[reminder_note.id] = reminder_note
    ACTIVITY_STORE[reminder_create.id] = reminder_create
    
    # 相手のinboxに送信!
    keys = await get_keys_for_actor(f"https://{HOST}/actor")
    await ctx.send(keys, target_actor, reminder_create)
    logger.info(f"リマインダーを送信しました")

7. エンドポイントの定義

ActivityPubサーバーとして機能するために必要なエンドポイントを定義します。apkitはFastAPIをベースにしているので、@app.get(...)のようなデコレータで簡単に定義できます。

# main.py (続き)

# Actor情報を返すエンドポイント
@app.get("/actor")
async def get_actor_endpoint():
    return ActivityResponse(actor)

# Outbox(送信済み投稿一覧)のエンドポイント
@app.get("/outbox")
async def get_outbox_endpoint():
    items = sorted(ACTIVITY_STORE.values(), key=lambda x: x.id, reverse=True)
    outbox_collection = OrderedCollection(
        id=actor.outbox,
        totalItems=len(items),
        orderedItems=items
    )
    return ActivityResponse(outbox_collection)

# 個別の投稿(Note)を返すエンドポイント
@app.get("/notes/{note_id}")
async def get_note_endpoint(note_id: uuid.UUID):
    note_uri = f"https://{HOST}/notes/{note_id}"
    
    # まずキャッシュを探す
    if note_uri in CACHE and (datetime.now() - CACHE[note_uri]["timestamp"]) < CACHE_TTL:
        logger.info(f"Cache hit: {note_uri}")
        return ActivityResponse(CACHE[note_uri]["activity"])
        
    # なければストアから探す
    if note_uri in ACTIVITY_STORE:
        logger.info(f"Cache miss: {note_uri}")
        activity = ACTIVITY_STORE[note_uri]
        # 見つかったらキャッシュに保存
        CACHE[note_uri] = {"activity": activity, "timestamp": datetime.now()}
        return ActivityResponse(activity)
        
    return Response(status_code=404) # 見つからなかったら404

# ... (他のエンドポイントも同様)

7.1 Webfingerの定義

Webfingerは、他のサーバーから「@user@host」みたいな形式でユーザーを見つけるための仕組みです。これを実装しておかないと、せっかく作ったBotが誰も使えない…なんて悲しいことになるので、忘れずに対応しておきたいところです。ここでは簡易的な実装にとどめておきます。

@app.webfinger()
async def webfinger_endpoint(request: Request, acct: WebfingerResource) -> Response:
    if not acct.url:
        if acct.username == USER_ID and acct.host == HOST: # USER_IDとHOSTが一致したら返す
            link = WebfingerLink(rel="self", type="application/activity+json", href=actor.id) # type: ignore
            wf_result = WebfingerResult(subject=acct, links=[link])
            return JSONResponse(wf_result.to_json(), media_type="application/jrd+json")
    else:
        if acct.url == f"https://{HOST}/actor": # urlからもactorを解決できるようにしておく
            link = WebfingerLink(rel="self", type="application/activity+json", href=actor.id) # type: ignore
            wf_result = WebfingerResult(subject=acct, links=[link])
            return JSONResponse(wf_result.to_json(), media_type="application/jrd+json")
    return JSONResponse({"message": "Not Found"}, status_code=404)

8. アクティビティの受信処理

@app.on(...)デコレータを使うと、特定のタイプのアクティビティがinboxにPOSTされたときの処理を書くことができます。

# main.py (続き)

# Followされたときの処理
@app.on(Follow)
async def on_follow_activity(ctx: Context):
    """フォローを自動的に承認する"""
    activity = ctx.activity
    if not isinstance(activity, Follow): 
        return Response(status_code=400)
    
    async with ActivityPubClient() as client:
        follower_actor = await client.actor.fetch(activity.actor.id if isinstance(activity.actor, Actor) else activity.actor) # pyright: ignore[reportArgumentType, reportAttributeAccessIssue]
        if not follower_actor: 
            return Response(status_code=400)

    accept_activity = activity.accept(
        id=f"https://{HOST}/activity/follows/{follower_actor}",
        actor=actor
    ) # Followアクティビティに対してのacceptを生成する
    keys = await get_keys_for_actor(f"https://{HOST}/actor")
    await ctx.send(keys, follower_actor, accept_activity) # type: ignore
    logger.info(f"Accepted follow from {follower_actor.id}") # type: ignore
    return Response(status_code=202)


# Createアクティビティ(Note)を受け取ったときの処理
@app.on(Create)
async def on_create_activity(ctx: Context):
    """メンションを解析してリマインダーをスケジュールする"""
    activity = ctx.activity
    # Noteじゃない場合は無視
    if not (isinstance(activity, Create) and isinstance(activity.object, Note)):
        return Response(status_code=202)

    note = activity.object
    
    # 自分宛てのメンションかチェック
    is_mentioned = False
    if note.tag:
        for tag in note.tag:
            if isinstance(tag, Mention) and tag.href == actor.id:
                is_mentioned = True
                break
    
    if not is_mentioned:
        return Response(status_code=202)

    # ... (リマインダー解析処理)
    delay, message, time_str = parse_reminder(command_text)

    # リマインダーの形式に合致したら、バックグラウンドタスクとして登録
    if delay and message and sender_actor:
        asyncio.create_task(send_reminder(ctx, delay, message, sender_actor, note))
        reply_content = f"<p>{time_str} 後にリマインドします。</p>"
    else:
        # 形式が違う場合は使い方を返信
        reply_content = "<p>フォーマットが無効です。`@reminder [時間] [メッセージ]`の形式で送ってください。</p><p>例: `@reminder 10m オーブンを確認`</p>"

    reply_note = Note(
        id=f"https://{HOST}/notes/{uuid.uuid4()}",
        attributedTo=actor.id,
        inReplyTo=note.id, # type: ignore
        content=reply_content,
        to=[sender_actor.id], # type: ignore
        tag=[Mention(href=sender_actor.id, name=f"@{sender_actor.preferredUsername}")] # type: ignore
    )
    reply_create = Create(
        id=f"https://{HOST}/creates/{uuid.uuid4()}",
        actor=actor.id,
        object=reply_note,
        to=[sender_actor.id] # type: ignore
    )
    
    ACTIVITY_STORE[reply_note.id] = reply_note
    ACTIVITY_STORE[reply_create.id] = reply_create
    
    keys = await get_keys_for_actor(f"https://{HOST}/actor")
    await ctx.send(keys, sender_actor, reply_create) # type: ignore
    
    return Response(status_code=202)

on_create_activityの中で、メンションの内容をparse_reminderで解析し、成功すればsend_reminderasyncio.create_taskでバックグラウンドタスクとして実行しています。

9. サーバーの起動

最後に、uvicornでサーバーを起動します。

# main.py (続き)

if __name__ == "__main__":
    import uvicorn
    logger.info("uvicornサーバーを起動します...")
    uvicorn.run(app, host="0.0.0.0", port=8000)

Botの動かし方

  1. main.pyHOSTUSER_IDを、自分の環境に合わせて設定します。

  2. ターミナルで以下のコマンドを実行します。

    uvicorn main:app --host 0.0.0.0 --port 8000
    
  3. Botがhttp://0.0.0.0:8000で起動します。

これで、Fediverseのどこからでも@reminder@your.host.comのようにメンションを送れば、Botが応答してくれるはずです。

まとめ

今回はapkitを使って、ごく簡単なActivityPub Botを作ってみました。メモリ上にしかデータを保存しないので、サーバーを再起動するとリマインダーが消えてしまうなど、まだまだ改善の余地はたくさんあります。

例えば、以下のような改良が考えられます。

  • SQLiteやPostgreSQLなどのデータベースを使って、アクティビティを永続化する
  • リマインダーのタスク管理を、サーバーが再起動しても消えないように、もっと堅牢な仕組みにする(例えばRedisやCeleryを使うなど)
  • 繰り返しリマインダー(「毎日9時にリマインド」など)に対応する

この記事が、誰かがActivityPubで何かを作るきっかけになれば嬉しいです。

https://fedi-libs.github.io/apkit/

https://github.com/fedi-libs/apkit

https://github.com/AmaseCocoa/activitypub-reminder-bot

Discussion