Zenn
🐱

[DDD] 戦術的設計パターン Part 4 整合性

に公開

1年以上前になりますが、 DDD の戦術的設計パターンについて記事を書きました。

  1. ドメインレイヤー
  2. アプリケーションレイヤー
  3. プレゼンテーション / インフラストラクチャー レイヤー
  4. 整合性 (本記事)

本記事では、 データ整合性を保つための設計パターンについてまとめます。

リポジトリ

https://github.com/minericefield/ddd-onion-lit

その他、採用アーキテクチャーやテーマについては Part 1 冒頭をご参照ください。

トランザクション制御

IDDD を参考にトランザクション制御の設計方針を確認します。

  • 大前提として集約は強整合性境界を示す
  • 単一のコマンドは単一の集約のみを操作する
    • (参照するだけなら複数のリポジトリからそれぞれ異なる集約を参照しても良いが、実際に変更を加える集約は一つ)

現時点で、 集約・コマンド 共に正しく設計できており、あとは副作用を持つそれぞれのユースケースでトランザクション整合性を担保できるようにすれば良さそうです。

トランザクション制御はアプリケーション層で行います。
アプリケーション層の責務はシナリオの流れの調整です。
データ整合性含むタスクの進行管理や調整の責務をアプリケーション層が引き受けるようにして、概念的な凝集度を高めていきます。

インターフェース

今回は以下のようなインターフェースを用意しました。

application/shared/repository-transactor.ts
export abstract class RepositoryTransactor {
  abstract handle<T>(
    manipulation: () => Promise<T>,
  ): ReturnType<typeof manipulation>;
}

コミットロールバック など、丸っと抽象化するようなインターフェースにしました。
副作用を伴う操作(manipulation)を受け取り、トランザクションとして実行することでリポジトリ操作における原始性や一貫性を保証します。

トランザクションから例外が発生した際に、他の外部ドライバーや別のストアに対しては異なる方法でロールバックやリトライを命じなければいけない、という場面もあるかもしれません。
それら全ての整合性が担保されるものだと勘違いされるリスクを回避するため RepositoryTransactor という命名にしてみました。

スレッドローカルストア

Transactor を実装する最も簡単な方法は、データストアのコネクションやプロセスを特定できる参照をそのままスレッドローカルなストア(Javaでいうような)に割り当てる方法でしょう。
おそらく以下のような実装になると思います
(node:async_hooks を使用していきます)。

example-repository-transactor.ts
export class TypeormRepositoryTransactor implements RepositoryTransactor {
  constructor(
    private readonly dataSource: DataSource,
    private readonly als: AsyncLocalStorage<EntityManager>,
  ) {}

  async handle<T>(
    manipulation: () => Promise<T>,
  ): ReturnType<typeof manipulation> {
    return this.dataSource.transaction((entityManager) => {
      return this.als.run(entityManager, manipulation);
    });
  }
}

一方で、リクエストからレスポンスまでの処理全体を通じて、異なる種類の情報を同一のスコープで一貫して保持したいというニーズもあるかと思われます。
本アプリケーションでは、インタラクションの開始時点で対応するスレッドローカルストアは初期化されており、任意の情報をその中で参照・保持できるものとします。
以上を踏まえて、ストアと Transactor を実装していきます。

infrastructure/node-async-hooks/als-thread-context.ts
export class AlsThreadContext {
  constructor(private readonly als: AsyncLocalStorage<Map<string, any>>) {}

  run<T>(run: () => Promise<T>): Promise<T> {
    const threadId = Math.random().toString(32).substring(2);

    return this.als.run(new Map([['threadId', threadId]]), run);
  }

  get<T>(key: string): T | undefined {
    return this.als.getStore().get(key);
  }

  set(key: string, value: any) {
    this.als.getStore().set(key, value); // or enterWith
  }
}

ストアは Map にし、せっかくなので適当に threadId も割り当ててあげます
(もしイベントのハンドリングなどで既にストアが初期化されていた場合、直前のコンテキストもスタックのように詰めておくのもありかもしれません)。

node:async_hooks なども使わずに リクエスト単位のインジェクションスコープ でストアのインスタンスを保持することで、似たような機能をもっと簡単に実現できそうです。
しかし、今回のストアは特定のオペレーションに依存せずに、ヘッドレスなアプリケーションなどでも任意のタイミングで使用できます。

こちらをミドルウェアから実行するようにします。

app.module.ts
export class AppModule implements NestModule {
  constructor(private readonly alsThreadContext: AlsThreadContext) {}

  configure(consumer: MiddlewareConsumer) {
    consumer
      .apply((_req, _res, next) => {
        this.alsThreadContext.run(next);
      })
      .forRoutes('*');
  }
}

Transactor の実装

ストアを使って Transactor を実装します。

infrastructure/mysql/typeorm/transactors/typeorm-repository-transactor.ts
export class TypeormRepositoryTransactor implements RepositoryTransactor {
  constructor(
    private readonly dataSource: DataSource,
    private readonly alsThreadContext: AlsThreadContext,
  ) {}

  async handle<T>(
    manipulation: () => Promise<T>,
  ): ReturnType<typeof manipulation> {
    return this.dataSource.transaction((entityManager) => {
      this.alsThreadContext.set(
        ENTITY_MANAGER_THREAD_CONTEXT_KEY,
        entityManager,
      );

      return manipulation();
    });
  }
}

ストアに EntityManager を保持させました。

リポジトリの実装

EntityManager からTypeormリポジトリを取得するようします。

最初に、Typeormリポジトリを取得するヘルパーを作りました。

infrastructure/mysql/typeorm/repositories/shared/get-typeorm-repositories.ts
export default class GetTypeormRepositories {
  constructor(
    private readonly threadContext: AlsThreadContext,
    private readonly dataSource: DataSource,
  ) {}

  handle<T extends readonly Models[keyof Models][]>(
    ...models: T
  ): { [K in keyof T]: Repository<InstanceType<T[K]>> } {
    const entityManager = this.threadContext.get<EntityManager>(
      ENTITY_MANAGER_THREAD_CONTEXT_KEY,
    );

    /**
     * map cannot recognize tuple types
     */
    return models.map((model) =>
      (entityManager ?? this.dataSource).getRepository(model),
    ) as { [K in keyof T]: Repository<InstanceType<T[K]>> };
  }
}

リポジトリの実装ではこちらのヘルパーを経由してTypeormリポジトリを取得します。

infrastructure/mysql/typeorm/repositories/task.typeorm-repository.ts
export class TaskTypeormRepository implements TaskRepository {
  constructor(private readonly getRepositories: GetTypeormRepositories) {}

  async update(task: Task) {
    const [taskRepository, taskAssignmentRepository, taskCommentRepository] =
      this.getRepositories.handle(
        models.Task,
        models.TaskAssignment,
        models.TaskComment,
      );

    await taskRepository.update(task.id.value, { name: task.name.value });

    await taskAssignmentRepository.delete({ taskId: task.id.value });
    task.userId &&
      (await taskAssignmentRepository.save({
        taskId: task.id.value,
        userId: task.userId.value,
      }));

    await taskCommentRepository.delete({ taskId: task.id.value });
    await taskCommentRepository.save(
      task.comments.map((comment) => ({
        id: comment.id.value,
        userId: comment.userId.value,
        content: comment.content,
        postedAt: comment.postedAt,
        taskId: task.id.value,
      })),
    );
  }

  async find() {
    const [taskRepository] = this.getRepositories.handle(models.Task);
    const tasks = await taskRepository.find({
      relations: {
        taskAssignment: true,
        taskComments: true,
      },
    });
    return tasks.map((task) =>
      Task.reconstitute(
        new TaskId(task.id),
        new TaskName(task.name),
        task.taskComments.map(
          (taskComment) =>
            new Comment(
              new CommentId(taskComment.id),
              new UserId(taskComment.userId),
              taskComment.content,
              taskComment.postedAt,
            ),
        ),
        task.taskAssignment?.userId && new UserId(task.taskAssignment.userId),
      ),
    );
  }
}

ユースケースの実装

ユースケースで Transactor を走らせるようにします。

application/task/create-task.usecase.ts
export class CreateTaskUseCase {
  constructor(
    private readonly taskRepository: TaskRepository,
    private readonly taskIdFactory: TaskIdFactory,
    private readonly repositoryTransactor: RepositoryTransactor,
  ) {}

  /**
   * @throws {TaskNameCharactersExceededException}
   */
  async handle(
    requestDto: CreateTaskUseCaseRequestDto,
  ): Promise<CreateTaskUseCaseResponseDto> {
    return this.repositoryTransactor.handle(async () => {
      /**
       * Create task.
       */
      const task = Task.create(
        await this.taskIdFactory.handle(),
        new TaskName(requestDto.taskName),
      );

      /**
       * Store it.
       */
      await this.taskRepository.insert(task);

      return new CreateTaskUseCaseResponseDto(task);
    });
  }
}

とりあえず処理を丸ごと機械的に Transactor に渡す方針で良いでしょう。
これで、集約の状態変更がアトミックに永続化されることが保証されるようになりました。
トランザクション制御は以上になります。

ロギング

脱線しますが、せっかくストアに threadId を仕込んだので、ログに埋め込んでみます。
トレーシングやモニタリングの際に役立つでしょう。

src/infrastructure/nestjs-common/nestjs-common-console-logger.ts
export class NestjsCommonConsoleLogger extends ConsoleLogger implements Logger {
  constructor(private readonly alsThreadContext: AlsThreadContext) {
    super();
  }

  log(message: any) {
    super.log({
      threadId: this.alsThreadContext.get('threadId'),
      message: message,
    });
  }

かなり機能に乏しいですが id を message と一緒に出力する簡単なロガーを作ってみました。

適当にログを埋め込んでみます。

{
  "threadId": "131j1fffra",
  "message": "Request url: /tasks/061a7db5-2d24-416e-9564-d1813db2cb91, method: GET"
}
{
  "threadId": "131j1fffra",
  "message": "task found in FindTaskUseCase: task name: Review the project documentation"
}
{
  "threadId": "131j1fffra",
  "message": "Response time: 95ms"
}

1つ目と最後のログはプレゼンテーション層のインターセプターで出力したものになります。
真ん中のログはアプリケーション層のユースケースで出力したものです。
threadId をもとにインタラクション全体を追跡できるようになりました。

集約間の整合性

異なる集約間の整合性は結果整合性で担保されます。
今回は例として、以下の要件を追加して簡単な実装を加えてみます。

  • ユーザーが新規作成された際にオンボーディングタスクが割り当てられる
  • オンボーディングタスクは以下の二つ
    • Create your GitHub account
    • Review the project documentation

ドメインイベント

domain/user/user-created.domain-event.ts
export class UserCreated extends DomainEvent {
  constructor(readonly userId: UserId) {
    super();
  }

  get name() {
    return UserCreated.name;
  }
}

ユーザー作成のドメインイベントを用意しました。

イベントの配信

イベント配信インフラ

今回イベントの配信にはサンプルとして、固定で @nestjs/event-emitter を使用することにしました。

domain/shared/domain-event-publisher.ts
export abstract class DomainEventPublisher {
  abstract handle(...domainEvents: DomainEvent[]): void;
}
infrastructure/nestjs-event-emitter/nestjs-event-emitter-domain-event-publisher.ts
export class NestjsEventEmitterDomainEventPublisher
  implements DomainEventPublisher
{
  constructor(private readonly eventEmitter: EventEmitter2) {}

  handle(...domainEvents: DomainEvent[]) {
    domainEvents.forEach((domainEvent) => {
      this.eventEmitter.emitAsync(domainEvent.name, domainEvent);
    });
  }
}

実際のアプリケーションでは、イベントによっては公開言語に変換して他のコンテキストや外部システムに伝える必要なども出てくるかもしれません。
DomainEventPublisher という命名は ドメイン駆動設計 サンプルコード&FAQ を参考にしました。

イベントを生成、配信するタイミング

イベントを配信するタイミングやその実装方法には様々な選択肢があるようです。

  • エンティティからイベントを配信する
    • 例えば、 user.changeName が実行されたらその内部で UserChangedName イベントが生成、配信されるような形
      • ドメインイベントの静的メソッドなどを介してイベントの生成と配信をその内部で同時に行うこともあれば、イベントを生成してから別途パブリッシャーに渡す場合もある
    • 暗黙のコピーによる自動変更追跡機(ダーティーチェック)をもつORMと組み合わせて実現したりする
    • ドメインモデルの純粋性を保つことができる一方でデメリットも多い
      • パフォーマンスへの影響
      • テスト、デバックが難しくなる
      • 複数のコンテキスト間や非同期処理との連携に不向きな場合もある
  • エンティティではイベントの蓄積のみを行い、配信を任意のタイミングに遅延する
    • 上で挙げたデメリットを克服する手段
    • dotnetのサンプルコード ではエンティティの永続化とイベント配信をセットで行っている
      • リポジトリパターンではリポジトリからイベントを配信することになるが、いずれにしろイベントをエンティティで蓄積するパターンとの親和性が高い
    • パブリッシャーをリポジトリに渡してリポジトリから配信を実行するパターンが ドメイン駆動設計 サンプルコード&FAQ で紹介されていた
      • taskRepository.insert(task, domainEventPublisher) のような形
        • リポジトリにエンティティ以外を渡すという例外的な設計だが、リポジトリから暗黙的にイベントを配信するパターンと比べて処理が追跡しやすくなる

今回は、エンティティでイベントを蓄積し簡単にアプリケーションサービスから配信してみます。

  • リポジトリ本来の責務に忠実
  • 万が一トランザクションをコミットする前に同期的にイベントを配信したいとなった時も簡単に対応できる (そもそも望ましくない例外的なシナリオだが)

イベントの蓄積

イベントを蓄積する仕組みを、ユーザーにミニマムで追加しました
(イベントのクリアとかもありません)。

domain/shared/domain-events-storable-aggregate-root.ts
export abstract class DomainEventStorableAggregateRoot<T extends DomainEvent> {
  private _events: T[] = [];

  get events(): T[] {
    return [...this._events];
  }

  protected addEvent(event: T) {
    this._events = [...this._events, event];
  }
}
domain/user/user.aggregate-root.ts
export class User extends DomainEventStorableAggregateRoot<
  UserFooBar | UserCreated
> {
  private constructor(
    readonly id: UserId,
    readonly name: string,
    readonly emailAddress: UserEmailAddress,
  ) {
    super();
  }

  static create(id: UserId, name: string, emailAddress: UserEmailAddress) {
    const user = new User(id, name, emailAddress);
    user.addEvent(new UserCreated(id));
    return user;
  }

  static reconstitute(
    id: UserId,
    name: string,
    emailAddress: UserEmailAddress,
  ) {
    return new User(id, name, emailAddress);
  }
}

static create から user.addEvent(new UserCreated(id)) されます。

配信を呼び出す

アプリケーションサービスから domainEventPublisher を実行します。

application/user/create-user.usecase.ts
export class CreateUserUseCase {
  constructor(
    private readonly userRepository: UserRepository,
    private readonly userIdFactory: UserIdFactory,
    private readonly userEmailAddressIsNotDuplicated: UserEmailAddressIsNotDuplicated,
    private readonly repositoryTransactor: RepositoryTransactor,
    private readonly domainEventPublisher: DomainEventPublisher,
  ) {}

  /**
   * @throws {InvalidUserEmailAddressFormatException}
   * @throws {DuplicatedUserEmailAddressException}
   */
  async handle(
    requestDto: CreateUserUseCaseRequestDto,
  ): Promise<CreateUserUseCaseResponseDto> {
    return this.repositoryTransactor.handle(async () => {
      /**
       * Create userEmailAddress.
       */
      const userEmailAddress = new UserEmailAddress(requestDto.emailAddress);
      await this.userEmailAddressIsNotDuplicated.handle(userEmailAddress);

      /**
       * Create user.
       */
      const user = User.create(
        await this.userIdFactory.handle(),
        requestDto.name,
        userEmailAddress,
      );

      /**
       * Store it.
       */
      await this.userRepository.insert(user);

      /**
       * Publish domain events.
       */
      this.domainEventPublisher.handle(...user.events);

      return new CreateUserUseCaseResponseDto(user);
    });
  }
}

domainEventPublisher の実装は emitAsync(ハンドラーが非同期) になってますが、明示的に repositoryTransactor の外で publish し、トランザクションから独立している旨を強調するのもありかもしれません。

イベントのハンドリング

IDDD では adapter でイベントを受け取りアプリケーションサービスを呼び出しています
(IDDD はヘキサゴナル)。
こちらを参考にします。

オンボーディングタスクを作成するアプリケーションサービスは以下になります。

application/task/create-onboarding-tasks.usecse.ts
export class CreateOnboardingTasksUseCase {
  constructor(
    private readonly taskRepository: TaskRepository,
    private readonly userRepository: UserRepository,
    private readonly createOnboardingTasks: CreateOnboardingTasks,
    private readonly repositoryTransactor: RepositoryTransactor,
    private readonly logger: Logger,
  ) {}

  /**
   * @throws {NotFoundApplicationException}
   */
  async handle(
    requestDto: CreateOnboardingTasksUseCaseRequestDto,
  ): Promise<CreateOnboardingTasksUseCaseResponseDto> {
    return this.repositoryTransactor.handle(async () => {
      /**
       * Find user.
       */
      const userId = new UserId(requestDto.userId);
      if (!(await this.userRepository.findOneById(userId))) {
        throw new NotFoundApplicationException('User not found.');
      }

      /**
       * Create onboarding tasks.
       */
      const tasks = await this.createOnboardingTasks.handle(userId);
      await this.taskRepository.insertMany(tasks);

      this.logger.log(`Onboarding tasks created for user id: ${userId.value}`);

      return new CreateOnboardingTasksUseCaseResponseDto(tasks);
    });
  }
}

オンボーディングタスクを ドメインサービス から生成し、最後に永続化します。

アプリケーションサービスをリスナーから呼び出します。

infrastructure/nestjs-event-emitter/nestjs-event-emitter-user-created-domain-event-listener.ts
export class NestjsEventEmitterUserCreatedDomainEventListener {
  constructor(
    private readonly createOnboardingTasksUseCase: CreateOnboardingTasksUseCase,
    private readonly alsThreadContext: AlsThreadContext,
  ) {}

  @OnEvent(UserCreated.name, { async: true, promisify: true })
  async handle(userCreated: UserCreated) {
    return this.alsThreadContext.run(async () => {
      await this.createOnboardingTasksUseCase.handle({
        userId: userCreated.userId.value,
      });
    });
  }
}

これで結果整合性を用いた複数集約間の整合性担保を実現できました。

参考文献

レバテック開発部

Discussion

ログインするとコメントできます