🏎️

【Dart/Flutter】DartでBlueskyのFirehose APIを簡単に使用する

2023/06/10に公開

概要

どうも、真也です。

この記事ではBluesky APIにおけるFirehose APIについて、DartFlutterで簡単に使用するための方法を紹介します。

この際にDartFlutterを試してみたいという方は、次の公式サイトを参考にしてインストールしてみてください。

https://dart.dev

https://flutter.dev

Firehose APIとは

そもそもFirehose APIとはなんなのかわからない方もいるかと思いますが、簡単に表現するとbsky.socialなどのBlueskyの特定のインスタンスで投稿されたポストやLikeなどのイベントをリアルタイムで取得できる長寿命のStream APIです。また、Firehose APIを使用すると、Blueskyで自分がフォローしていないユーザーの活動もリアルタイムで取得できるので、これらのリアルタイム情報を使用して統計をとったりBOTを作成することができます

例えば、BlueskyのFirehose APIを使用したサービスには次のようなものがあります。

https://skyfeed.app

https://bsky.jazco.dev

https://firesky.tv

上記の例以外にも、特定のBlueskyインスタンスのリアルタイム情報を使用したトレンド情報の集計や、投稿されたポスト情報からワードクラウドを作成したりと様々な活用方法がありそうです。この記事を読んでくれた方は是非DartFlutterFirehose APIを使用した開発に挑戦してみてください。

使用するパッケージ

さて、Firehose APIとはなんなのか簡単に触れたところで、実際にDartFireshose APIを使用する方法を紹介していこうと思います。まず、DartFlutterでBluesky APIを簡単にあつかうために、次のパッケージをインストールしてください。

https://pub.dev/packages/bluesky

このblueskyパッケージは私が開発保守しているパッケージなのですが、その名のとおりBlueskyのAPIをラッピングしたパッケージで、Firehose APIを含めた全てのエンドポイントをサポートしています。blueskyパッケージは既に多くのアプリで使用された実績があり、例えば先ほど紹介したSkyFeed.appでも使われています。Bluesky関連のアプリをDartFlutterで作ってみたい方は是非試してみてください!

お馴染みの次のコマンドでblueskyパッケージをインストールしてください。

Dart:

dart pub add bluesky
dart pub get

Flutter:

flutter pub add bluesky
flutter pub get

pubspec.yamlの依存性にblueskyパッケージが次のように追加されれば成功です。この記事では現在(2023/06/10)の最新ビルドであるv0.5.7を使用します。

name: bluesky_firehose
description: A sample for Bluesky Firehose API
version: 1.0.0

environment:
  sdk: ^3.0.0

dependencies:
  bluesky: ^0.5.7

dev_dependencies:
  lints: ^2.0.0
  test: ^1.21.0

実装する

早速blueskyパッケージを使用してFirehose APIを使用するための実装をしていこうと思いますが、結論から始めると次のコードでFirehose APIを簡単に使用できます。

import 'package:bluesky/bluesky.dart';

Future<void> main() async {
  final bluesky = Bluesky.anonymous();

  final subscription = await bluesky.sync.subscribeRepoUpdates();

  await for (final event in subscription.data.stream) {
    event.when(
      commit: print,
      handle: print,
      migrate: print,
      tombstone: print,
      info: print,
      unknown: print,
    );
  }
}

はい、これだけです。

Firehose APIと聞くと通信処理の実装が難しい印象を持つ方もいるかもしれませんが、blueskyパッケージを使用すると上記のように難しい処理は全て安全にカプセル化されているので、単純にawait bluesky.sync.subscribeRepoUpdates();を実行するだけで長寿命のStreamを取得することができます。簡単ですね。

上記の実装について要点を確認しながらもう少しだけ詳しく説明していきます。

blueskyパッケージのインポート

まずは当然ながら、blueskyパッケージの機能を使用するために次のようにblueskyパッケージをインポートします。

import 'package:bluesky/bluesky.dart';

Blueskyオブジェクトをインスタンス化

blueskyパッケージから提供されるほとんどの機能はBlueskyオブジェクトから使用できます。次のようにしてインスタンス化します。

import 'package:bluesky/bluesky.dart';

Future<void> main() async {
  final bluesky = Bluesky.anonymous();
}

今回使用するFirehose APIは認証情報を必要としないため、.anonymous()コンストラクタを使用してBlueskyオブジェクトをインスタンス化しています。もしFirehose API以外にもポストを投稿したり、特定のポストにLikeしたりといった認証情報を必要するエンドポイントも使用する場合は、次のように.fromSession()コンストラクタを使用します。

import 'package:bluesky/bluesky.dart';

Future<void> main() async {
  final session = await createSession(
    identifier: '登録アカウントのハンドルまたはメールアドレス',
    password: '登録アカウントのパスワードまたはアプリパスワード',
  );

  final bluesky = Bluesky.fromSession(session.data);
}

今回の記事では.fromSession()コンストラクタについてはあまり触れませんが、認証情報を必要とするエンドポイントを使用する際には使用が必須だと覚えていただければ大丈夫です。

Firehose APIに接続

Blueskyオブジェクトをインスタンス化できたら、次はFirehose APIに接続するためのメソッドを実行しましょう。blueskyパッケージにおいてFirehose APIは、Blueskyオブジェクト配下のSyncServicesubscribeRepoUpdates()から接続できます。

import 'package:bluesky/bluesky.dart';

Future<void> main() async {
  final bluesky = Bluesky.anonymous();

  // この処理を追加
  final subscription = await bluesky.sync.subscribeRepoUpdates();
}

この1行だけでBlueskyのFirehose APIに接続することができます。

実際のFirehose APIと接続する際の通信処理はWebSocketプロトコルで行われるのですが、blueskyパッケージを使用すると実装者はこれらの難しい処理をほとんど意識する必要はありません。

また、WebSocket以外にも、CBOR形式のレスポンスデータをデコードする必要があったり、一部でCIDなどのIPFS関連の知識が必要だったりと全てを自分で実装するのがとても大変です。しかし、これらの難しい処理はblueskyパッケージの中で安全かつ高度にカプセル化されているため、blueskyパッケージを使用する実装者はより重要なタスクに時間を割り当てることができます。

Firehose APIからイベントを取得する

先の例でFirehose APIに接続ができたら次のようにしてリアルタイムにイベントを取得することができます。

import 'package:bluesky/bluesky.dart';

Future<void> main() async {
  final bluesky = Bluesky.anonymous();

  final subscription = await bluesky.sync.subscribeRepoUpdates();

  // この処理を追加
  await for (final event in subscription.data.stream) {
    // "event"はUnion型のため、
    //".when"を使用して各フィールドにアクセスする。
    event.when(
      commit: print,
      handle: print,
      migrate: print,
      tombstone: print,
      info: print,
      unknown: print,
    );
  }
}

await for (final event in subscription.data.stream)を実行することで、Firehose APIからリアルタイムで発生したイベントを取得することができます。

ただ、event.whenとしている箇所はDartを使用している方でも少し難しく見えるかもしれません、なぜeventがこのような構造になるのかというと、eventがUnion型であり、commithandleといったように発生したイベントによって異なる構造のレスポンスが返却されるからです。

また、DartではUnion型は言語仕様としてサポートされていないため、上記の例のように擬似的にUnion型を実装しています。このようにAT ProtocolBlueskyのAPIではいくつかのエンドポイントでUnion型のレスポンスが返却されるのですが、blueskyパッケージを使用する場合にはこれらのUnion型のオブジェクトは全て.when(...)を使用することでイベントを安全にあつかうことができます。

例えば上記の例だと、ポストが作成されたなどのcommit関連のイベントが発生した場合はcommitprintが実行され、アカウントのハンドル名が変更されたなどのhandle関連のイベントが発生した場合にはhandleprintが実行されるといった具合です。

unknownというフィールドはblueskyパッケージでサポートしているほぼ全てのUnion型でありますが、これはblueskyパッケージでサポートされていない構造のレスポンスが返却された場合に実行されます。unknownが実行される際にはレスポンスから取得した生のJSON(Map<String, dynamic>)がコールバックで渡されるので、急な公式の仕様追加にも対応することができます。

commitイベントについて深掘り

ここまででblueskyパッケージを使用してFirehose APIをあつかう方法がわかってきたと思いますので、Firehose APIを使用した際に取得できる各イベントについてより詳しく見ていこうと思います。

先ほどの例だと、commithandlemigratetombstoneinfoといったように多くのイベントが定義されていたと思いますが、現在の公式AT Protocolの実装では主にcommithandleのみがサポートされています。その他のイベントについてはLexiconに定義だけがある状態で実装されていなかったり、infoのように実装されてはいるけどもほとんど発生することがないイベントになっています。(少なくとも私はinfoが発生したイベントを見たことがありません。)

そのため、この記事ではcommitイベントとhandleイベントについて説明します。

まずはcommitイベントについてですが、bsky.socialといった特定のインスタンスで発生したポスト投稿やリポスト、そしてLikeやFollowなどのイベントは全てこのcommitイベントに流れてきます。つまり、現状のBlueskyのFirehose APIで最も大きなウェイトを占めているのがこのcommitイベントです。むしろ、handleイベントとデータ量を比較するとcommitイベントしかないと言っても過言ではありません

先ほどの例だと単純にprintしかしていなかった処理を次のように修正します。

import 'package:bluesky/bluesky.dart';

Future<void> main() async {
  final bluesky = Bluesky.anonymous();

  final subscription = await bluesky.sync.subscribeRepoUpdates();

  await for (final event in subscription.data.stream) {
    event.when(
      commit: (data) {
        // 複数のオペレーションが含まれる場合があります。
        for (final op in data.ops) {
          // どのようなオペレーションが行われたかで
          // 処理を分けます。
          switch (op.action) {
            // 作成系のイベント(ポスト投稿、フォロー追加等)
            case RepoAction.create:
            // 更新系のイベント(プロフィール更新等)
            case RepoAction.update:
              print(op.uri); // 作成/更新されたレコードのAT URI
              print(op.record); // 作成/更新されたレコード

              break;
            case RepoAction.delete:
              // 削除系のイベント(フォロー解除等)
              print(op.uri); // 削除されたレコードのAT URI

              break;
          }
        }
      },
      handle: print,
      migrate: print,
      tombstone: print,
      info: print,
      unknown: print,
    );
  }
}

上記の修正された例を見ると、commitイベントについて処理がいくつか追加されたことが一目でわかると思います。ただ、やっていることはとても単純で、発生したcommitイベントの全てのオペレーション情報(data.ops)を確認し、それぞれのオペレーション(op)のアクション(create/update/delete)に応じて処理を分けています。

オペレーションのアクションに応じて処理を分ける理由はアクションごとに取得できる項目が異なるためですが、現在(2023/06/10)の公式の仕様ではcreateupdateは取得できる項目が同じで、deleteのみがレコード情報を取得できないという差があります。

handleイベントについて深掘り

さて、次はhandleイベントについて詳しく見ていこうと思うのですが、次の例のようにhandleイベントはcommitイベントと比較するとかなり単純です。

import 'package:bluesky/bluesky.dart';

Future<void> main() async {
  final bluesky = Bluesky.anonymous();

  final subscription = await bluesky.sync.subscribeRepoUpdates();

  await for (final event in subscription.data.stream) {
    event.when(
      commit: print,
      handle: (data) {
        // 特定のアカウントがハンドルを変更した際に発生する。
        print(data.did);
        print(data.handle);
      },
      migrate: print,
      tombstone: print,
      info: print,
      unknown: print,
    );
  }
}

handleイベントは特定のアカウントがハンドルを変更した際に発生するという特性から、commitイベントのようにオペレーションの種類がなく、またアクションの概念もありません。そのため、handleイベントから取得できる主な情報は、アカウントのハンドルを変更したユーザーのDID変更後のアカウントのハンドル名です。

番外編: commitイベントをもっと簡単にあつかう

先に説明したFirehose APIcommitイベントですが、実際に使用する際には発生したオペレーションのAT URIやレコードの情報を見て、そのレコードがLikeなのかFollowなのかといった判断をする処理が必要になるのですが、はっきり言ってこれを毎回実装するのは面倒です。

なので、blueskyパッケージではFirehose APIcommitイベントについて、RepoCommitAdaptorを使用したより簡単な解決策を提供しています。先に使用したcommitイベントの例を次のように修正します。

import 'package:bluesky/bluesky.dart';

Future<void> main() async {
  final bluesky = Bluesky.anonymous();

  final subscription = await bluesky.sync.subscribeRepoUpdates();

  // この処理を追加
  final repoCommitAdaptor = RepoCommitAdaptor(
    onCreatePost: (data) {
      // ポストが投稿された際に発生するイベント
      print(data.author);
      print(data.uri);
      print(data.record.text);
      print(data.record.embed);
    },
    onCreateFollow: (data) {
      // 特定のユーザーをフォローした際に発生するイベント
    },
    onUpdateProfile: (data) {
      // プロフィールが更新された際に発生するイベント
      print(data.author);
      print(data.uri);
      print(data.record.displayName);
      print(data.record.description);
    },
    onDeleteLike: (data) {
      // Likeが削除された際に発生するイベント
      print(data.uri);
    },
  );

  await for (final event in subscription.data.stream) {
    event.when(
      commit: repoCommitAdaptor.execute, // RepoCommitAdaptorを実行
      handle: print,
      migrate: print,
      tombstone: print,
      info: print,
      unknown: print,
    );
  }
}

RepoCommitAdaptorを使用した場合と使用しなかった先の例を比較すると違いが一目瞭然だと思います。

まず、RepoCommitAdaptorを使用する際の最も大きなメリットは、blueskyパッケージを使用してアプリを開発する実装者がcommitイベントの煩雑なハンドリング処理を意識することなく、特定のオペレーションだけを集中的に取得/処理できることです。つまりRepoCommitAdaptorを使用することで、例えば投稿されたポストのデータだけをあつかいたい場合や、削除されたフォローのデータだけをあつかいたいといったオンデマンドな実装を簡単にすることができます。

次にRepoCommitAdaptorをした際のメリットは、作成/更新されたレコードを特定のオブジェクトに変換された状態で取得できることです。RepoCommitAdaptorを使用しない場合には、各オペレーションのレコード情報は生のJSONオブジェクトとして格納されます。そのため、RepoCommitAdaptorを使用することで、各オペレーションのレコードオブジェクトを非常に安全かつ簡単にあつかうことができます。

blueskyパッケージでFirehose APIcommitイベントをあつかう場合は、RepoCommitAdaptorを使用することを強くおすすめします

最後に

ここまでblueskyパッケージを使用したDartでBlueskyのFirehose APIを簡単に使用する方法を説明してきました。

この記事ではFlutterでの実装は紹介しませんでしたが、横道にそれ過ぎる気がしたのと、Dartの実装だけで十分カバーできると考えたので割愛しました。先にも余談で触れたように、Flutterで実装する際には、StreamBuilderを使用するとStreamを綺麗に処理することができます。

また先に触れたように、blueskyパッケージは全てのエンドポイントをサポートしており、また一時的なサーバーエラーや通信障害時に対処するExponential BackOff and Jitterアルゴリズムを使用した高度な自動リトライ機能が組み込まれていますDartFlutterBluesky関連のアプリを開発する際は、ぜひblueskyパッケージを試してみてください。

もし実装時の質問などがあればBlueskyの私のアカウントにでもメンションを飛ばしてください。

GitHubで編集を提案

Discussion