【Dart/Flutter】DartでBlueskyのFirehose APIを簡単に使用する
概要
どうも、真也です。
この記事ではBluesky APIにおけるFirehose APIについて、DartやFlutterで簡単に使用するための方法を紹介します。
この際にDartやFlutterを試してみたいという方は、次の公式サイトを参考にしてインストールしてみてください。
Firehose APIとは
そもそもFirehose APIとはなんなのかわからない方もいるかと思いますが、簡単に表現するとbsky.social
などのBlueskyの特定のインスタンスで投稿されたポストやLikeなどのイベントをリアルタイムで取得できる長寿命のStream APIです。また、Firehose APIを使用すると、Blueskyで自分がフォローしていないユーザーの活動もリアルタイムで取得できるので、これらのリアルタイム情報を使用して統計をとったりBOTを作成することができます。
例えば、BlueskyのFirehose APIを使用したサービスには次のようなものがあります。
上記の例以外にも、特定のBlueskyインスタンスのリアルタイム情報を使用したトレンド情報の集計や、投稿されたポスト情報からワードクラウドを作成したりと様々な活用方法がありそうです。この記事を読んでくれた方は是非DartやFlutterでFirehose APIを使用した開発に挑戦してみてください。
使用するパッケージ
さて、Firehose APIとはなんなのか簡単に触れたところで、実際にDartでFireshose APIを使用する方法を紹介していこうと思います。まず、DartやFlutterでBluesky APIを簡単にあつかうために、次のパッケージをインストールしてください。
このbluesky
パッケージは私が開発保守しているパッケージなのですが、その名のとおりBlueskyのAPIをラッピングしたパッケージで、Firehose APIを含めた全てのエンドポイントをサポートしています。bluesky
パッケージは既に多くのアプリで使用された実績があり、例えば先ほど紹介したSkyFeed.appでも使われています。Bluesky関連のアプリをDartやFlutterで作ってみたい方は是非試してみてください!
お馴染みの次のコマンドでbluesky
パッケージをインストールしてください。
Dart:
dart pub add bluesky
dart pub get
Flutter:
flutter pub add bluesky
flutter pub get
pubspec.yaml
の依存性にbluesky
パッケージが次のように追加されれば成功です。この記事では現在(2023/06/10)の最新ビルドであるv0.5.7
を使用します。
name: bluesky_firehose
description: A sample for Bluesky Firehose API
version: 1.0.0
environment:
sdk: ^3.0.0
dependencies:
bluesky: ^0.5.7
dev_dependencies:
lints: ^2.0.0
test: ^1.21.0
実装する
早速bluesky
パッケージを使用してFirehose APIを使用するための実装をしていこうと思いますが、結論から始めると次のコードでFirehose APIを簡単に使用できます。
import 'package:bluesky/bluesky.dart';
Future<void> main() async {
final bluesky = Bluesky.anonymous();
final subscription = await bluesky.sync.subscribeRepoUpdates();
await for (final event in subscription.data.stream) {
event.when(
commit: print,
handle: print,
migrate: print,
tombstone: print,
info: print,
unknown: print,
);
}
}
はい、これだけです。
Firehose APIと聞くと通信処理の実装が難しい印象を持つ方もいるかもしれませんが、bluesky
パッケージを使用すると上記のように難しい処理は全て安全にカプセル化されているので、単純にawait bluesky.sync.subscribeRepoUpdates();
を実行するだけで長寿命のStreamを取得することができます。簡単ですね。
上記の実装について要点を確認しながらもう少しだけ詳しく説明していきます。
bluesky
パッケージのインポート
まずは当然ながら、bluesky
パッケージの機能を使用するために次のようにbluesky
パッケージをインポートします。
import 'package:bluesky/bluesky.dart';
Blueskyオブジェクトをインスタンス化
bluesky
パッケージから提供されるほとんどの機能はBluesky
オブジェクトから使用できます。次のようにしてインスタンス化します。
import 'package:bluesky/bluesky.dart';
Future<void> main() async {
final bluesky = Bluesky.anonymous();
}
今回使用するFirehose APIは認証情報を必要としないため、.anonymous()
コンストラクタを使用してBluesky
オブジェクトをインスタンス化しています。もしFirehose API以外にもポストを投稿したり、特定のポストにLikeしたりといった認証情報を必要するエンドポイントも使用する場合は、次のように.fromSession()
コンストラクタを使用します。
import 'package:bluesky/bluesky.dart';
Future<void> main() async {
final session = await createSession(
identifier: '登録アカウントのハンドルまたはメールアドレス',
password: '登録アカウントのパスワードまたはアプリパスワード',
);
final bluesky = Bluesky.fromSession(session.data);
}
今回の記事では.fromSession()
コンストラクタについてはあまり触れませんが、認証情報を必要とするエンドポイントを使用する際には使用が必須だと覚えていただければ大丈夫です。
Firehose APIに接続
Bluesky
オブジェクトをインスタンス化できたら、次はFirehose APIに接続するためのメソッドを実行しましょう。bluesky
パッケージにおいてFirehose APIは、Bluesky
オブジェクト配下のSyncService
のsubscribeRepoUpdates()
から接続できます。
import 'package:bluesky/bluesky.dart';
Future<void> main() async {
final bluesky = Bluesky.anonymous();
// この処理を追加
final subscription = await bluesky.sync.subscribeRepoUpdates();
}
この1行だけでBlueskyのFirehose APIに接続することができます。
実際のFirehose APIと接続する際の通信処理はWebSocketプロトコルで行われるのですが、bluesky
パッケージを使用すると実装者はこれらの難しい処理をほとんど意識する必要はありません。
また、WebSocket以外にも、CBOR形式のレスポンスデータをデコードする必要があったり、一部でCIDなどのIPFS関連の知識が必要だったりと全てを自分で実装するのがとても大変です。しかし、これらの難しい処理はbluesky
パッケージの中で安全かつ高度にカプセル化されているため、bluesky
パッケージを使用する実装者はより重要なタスクに時間を割り当てることができます。
Firehose APIからイベントを取得する
先の例でFirehose APIに接続ができたら次のようにしてリアルタイムにイベントを取得することができます。
import 'package:bluesky/bluesky.dart';
Future<void> main() async {
final bluesky = Bluesky.anonymous();
final subscription = await bluesky.sync.subscribeRepoUpdates();
// この処理を追加
await for (final event in subscription.data.stream) {
// "event"はUnion型のため、
//".when"を使用して各フィールドにアクセスする。
event.when(
commit: print,
handle: print,
migrate: print,
tombstone: print,
info: print,
unknown: print,
);
}
}
await for (final event in subscription.data.stream)
を実行することで、Firehose APIからリアルタイムで発生したイベントを取得することができます。
ただ、event.when
としている箇所はDartを使用している方でも少し難しく見えるかもしれません、なぜevent
がこのような構造になるのかというと、event
がUnion型であり、commit
やhandle
といったように発生したイベントによって異なる構造のレスポンスが返却されるからです。
また、DartではUnion型は言語仕様としてサポートされていないため、上記の例のように擬似的にUnion型を実装しています。このようにAT ProtocolやBlueskyのAPIではいくつかのエンドポイントでUnion型のレスポンスが返却されるのですが、bluesky
パッケージを使用する場合にはこれらのUnion型のオブジェクトは全て.when(...)
を使用することでイベントを安全にあつかうことができます。
例えば上記の例だと、ポストが作成されたなどのcommit
関連のイベントが発生した場合はcommit
のprint
が実行され、アカウントのハンドル名が変更されたなどのhandle
関連のイベントが発生した場合にはhandle
のprint
が実行されるといった具合です。
unknown
というフィールドはbluesky
パッケージでサポートしているほぼ全てのUnion型でありますが、これはbluesky
パッケージでサポートされていない構造のレスポンスが返却された場合に実行されます。unknown
が実行される際にはレスポンスから取得した生のJSON(Map<String, dynamic>)がコールバックで渡されるので、急な公式の仕様追加にも対応することができます。
commit
イベントについて深掘り
ここまででbluesky
パッケージを使用してFirehose APIをあつかう方法がわかってきたと思いますので、Firehose APIを使用した際に取得できる各イベントについてより詳しく見ていこうと思います。
先ほどの例だと、commit
、handle
、migrate
、tombstone
、info
といったように多くのイベントが定義されていたと思いますが、現在の公式AT Protocolの実装では主にcommit
とhandle
のみがサポートされています。その他のイベントについてはLexiconに定義だけがある状態で実装されていなかったり、info
のように実装されてはいるけどもほとんど発生することがないイベントになっています。(少なくとも私はinfo
が発生したイベントを見たことがありません。)
そのため、この記事ではcommit
イベントとhandle
イベントについて説明します。
まずはcommit
イベントについてですが、bsky.social
といった特定のインスタンスで発生したポスト投稿やリポスト、そしてLikeやFollowなどのイベントは全てこのcommit
イベントに流れてきます。つまり、現状のBlueskyのFirehose APIで最も大きなウェイトを占めているのがこのcommit
イベントです。むしろ、handle
イベントとデータ量を比較するとcommit
イベントしかないと言っても過言ではありません。
先ほどの例だと単純にprint
しかしていなかった処理を次のように修正します。
import 'package:bluesky/bluesky.dart';
Future<void> main() async {
final bluesky = Bluesky.anonymous();
final subscription = await bluesky.sync.subscribeRepoUpdates();
await for (final event in subscription.data.stream) {
event.when(
commit: (data) {
// 複数のオペレーションが含まれる場合があります。
for (final op in data.ops) {
// どのようなオペレーションが行われたかで
// 処理を分けます。
switch (op.action) {
// 作成系のイベント(ポスト投稿、フォロー追加等)
case RepoAction.create:
// 更新系のイベント(プロフィール更新等)
case RepoAction.update:
print(op.uri); // 作成/更新されたレコードのAT URI
print(op.record); // 作成/更新されたレコード
break;
case RepoAction.delete:
// 削除系のイベント(フォロー解除等)
print(op.uri); // 削除されたレコードのAT URI
break;
}
}
},
handle: print,
migrate: print,
tombstone: print,
info: print,
unknown: print,
);
}
}
上記の修正された例を見ると、commit
イベントについて処理がいくつか追加されたことが一目でわかると思います。ただ、やっていることはとても単純で、発生したcommit
イベントの全てのオペレーション情報(data.ops)を確認し、それぞれのオペレーション(op)のアクション(create/update/delete)に応じて処理を分けています。
オペレーションのアクションに応じて処理を分ける理由はアクションごとに取得できる項目が異なるためですが、現在(2023/06/10)の公式の仕様ではcreateとupdateは取得できる項目が同じで、deleteのみがレコード情報を取得できないという差があります。
handle
イベントについて深掘り
さて、次はhandle
イベントについて詳しく見ていこうと思うのですが、次の例のようにhandle
イベントはcommit
イベントと比較するとかなり単純です。
import 'package:bluesky/bluesky.dart';
Future<void> main() async {
final bluesky = Bluesky.anonymous();
final subscription = await bluesky.sync.subscribeRepoUpdates();
await for (final event in subscription.data.stream) {
event.when(
commit: print,
handle: (data) {
// 特定のアカウントがハンドルを変更した際に発生する。
print(data.did);
print(data.handle);
},
migrate: print,
tombstone: print,
info: print,
unknown: print,
);
}
}
handle
イベントは特定のアカウントがハンドルを変更した際に発生するという特性から、commit
イベントのようにオペレーションの種類がなく、またアクションの概念もありません。そのため、handle
イベントから取得できる主な情報は、アカウントのハンドルを変更したユーザーのDIDと変更後のアカウントのハンドル名です。
commit
イベントをもっと簡単にあつかう
番外編: 先に説明したFirehose APIのcommit
イベントですが、実際に使用する際には発生したオペレーションのAT URIやレコードの情報を見て、そのレコードがLikeなのかFollowなのかといった判断をする処理が必要になるのですが、はっきり言ってこれを毎回実装するのは面倒です。
なので、bluesky
パッケージではFirehose APIのcommit
イベントについて、RepoCommitAdaptor
を使用したより簡単な解決策を提供しています。先に使用したcommit
イベントの例を次のように修正します。
import 'package:bluesky/bluesky.dart';
Future<void> main() async {
final bluesky = Bluesky.anonymous();
final subscription = await bluesky.sync.subscribeRepoUpdates();
// この処理を追加
final repoCommitAdaptor = RepoCommitAdaptor(
onCreatePost: (data) {
// ポストが投稿された際に発生するイベント
print(data.author);
print(data.uri);
print(data.record.text);
print(data.record.embed);
},
onCreateFollow: (data) {
// 特定のユーザーをフォローした際に発生するイベント
},
onUpdateProfile: (data) {
// プロフィールが更新された際に発生するイベント
print(data.author);
print(data.uri);
print(data.record.displayName);
print(data.record.description);
},
onDeleteLike: (data) {
// Likeが削除された際に発生するイベント
print(data.uri);
},
);
await for (final event in subscription.data.stream) {
event.when(
commit: repoCommitAdaptor.execute, // RepoCommitAdaptorを実行
handle: print,
migrate: print,
tombstone: print,
info: print,
unknown: print,
);
}
}
RepoCommitAdaptor
を使用した場合と使用しなかった先の例を比較すると違いが一目瞭然だと思います。
まず、RepoCommitAdaptor
を使用する際の最も大きなメリットは、bluesky
パッケージを使用してアプリを開発する実装者がcommitイベントの煩雑なハンドリング処理を意識することなく、特定のオペレーションだけを集中的に取得/処理できることです。つまりRepoCommitAdaptor
を使用することで、例えば投稿されたポストのデータだけをあつかいたい場合や、削除されたフォローのデータだけをあつかいたいといったオンデマンドな実装を簡単にすることができます。
次にRepoCommitAdaptor
をした際のメリットは、作成/更新されたレコードを特定のオブジェクトに変換された状態で取得できることです。RepoCommitAdaptor
を使用しない場合には、各オペレーションのレコード情報は生のJSONオブジェクトとして格納されます。そのため、RepoCommitAdaptor
を使用することで、各オペレーションのレコードオブジェクトを非常に安全かつ簡単にあつかうことができます。
bluesky
パッケージでFirehose APIのcommit
イベントをあつかう場合は、RepoCommitAdaptor
を使用することを強くおすすめします。
最後に
ここまでbluesky
パッケージを使用したDartでBlueskyのFirehose APIを簡単に使用する方法を説明してきました。
この記事ではFlutterでの実装は紹介しませんでしたが、横道にそれ過ぎる気がしたのと、Dartの実装だけで十分カバーできると考えたので割愛しました。先にも余談で触れたように、Flutterで実装する際には、StreamBuilder
を使用するとStreamを綺麗に処理することができます。
また先に触れたように、bluesky
パッケージは全てのエンドポイントをサポートしており、また一時的なサーバーエラーや通信障害時に対処するExponential BackOff and Jitterアルゴリズムを使用した高度な自動リトライ機能が組み込まれています。DartやFlutterでBluesky関連のアプリを開発する際は、ぜひbluesky
パッケージを試してみてください。
もし実装時の質問などがあればBlueskyの私のアカウントにでもメンションを飛ばしてください。
宣伝
この記事では私が開発保守しているbluesky
パッケージについて主に触れましたが、私はbluesky
パッケージ以外にも多くのAT ProtocolとBluesky関連のパッケージを開発しています。全てをこの場で紹介するのも野暮なので、もし興味のある方は以下のモノレポを確認してみてください。きっとAT ProtocolやBluesky関連のアプリ開発に役立つパッケージがあります。
Discussion