【一気通貫】EventDriven-Serverless-Functional-DDD【変幻自在】
ポエム編
何を試みたいか
↑ でもMajorityとあるように(このへんMajorityってマ?)、新しめの概念ではあるが考えとしては広まっている諸手法・技術要素があるかと思います。
関心事 | 伝統的手法 | 別の選択肢 |
---|---|---|
ビジネスプロセス(要件)定義 | 業務フロー、UML、ユーザストーリ等 | イベントストーミング |
パスルーティング 制御フロー |
MVCフレームワークに委譲 | クラウドコンポーネントに委譲 |
ドメインロジック | クラス駆動設計 | 関数型DDD |
ロジック実行プロセス | 常駐・コンテナ | 短命なFaaS |
事実の記録の永続化と、そのビューのキャッシュ | 事実とビューを一律に、ミュータブルに扱う | ES/CQRS |
サービス間の連携 | 同期・リクエストレスポンス | 非同期・イベント駆動 |
これらの手法の中には、それぞれの相性が非常によい、なんならお互いがお互いの成立前提になっているようなものもあると思います。
それらの組み合わせで、
- ビジネスプロセス設計 <-> システム・コード設計 <-> チーム設計まで一気通貫した統一メンタルモデルで開発できる
- ビジネス変化に変幻自在に追随する
アーキテクチャが可能になるのではないかと思い、実際に試してみようと思います。
また、上記それぞれの手法の個別の優れた解説記事は世に多いと思いますが、包括的な説明は多くは無さそうなので、それも試みられればと思います。
3 factor appがかなり似た概念であり、この紹介と言い換えても良いかと思います。
サンプルコード全体
何が嬉しいか
- ビジネス変化に即応する = アジャイルな = イノベーションのための = 一気通貫で変幻自在なアーキテクチャ
- つまり価値提供が早い&仮説検証回すのが早いシステム
の構成に有用なのではないかと考えます。
上記を支える要因は2点考えています。
- ビジネス構造と同期したシステム構造になる
- 自律的なチーム・サービス構成が可能
👈ビジネス構造と同期したシステム構造
後でも見ますが、今回のお題となるカフェビジネスのビジネスプロセスを、イベントストーミングで定義したものです。(エラーパスがないなどめっちゃ簡略化してます。)
続いて、システムアーキテクチャです。
まあ、こう見るとかなりわちゃわちゃしてややこしそうなのですが、1段階鳥瞰してみます。
ビジネスプロセスとシステムを、同じモデルで設計できると感じます。
ドメインがそのままシステムに写されてくるのってなんかDDDの目指すところっぽくね?と思いました。
しかし、別にモデルが似ること自体に価値があるわけではもちろんなく、より具体的に以下の旨み
があるのではないかと考えます。
- ビジネスの変更をシステムに即反映・同期しやすい
- ビジネス上の変更箇所がそのままシステムの変更箇所になる
- ビジネス構造と同じなので、ビジネスサイドや新規メンバーがシステムを理解しやすく、保守しやすい
- それぞれのサービスはイベントを介した緩い結合なので、開放閉鎖的に機能追加できる
- ex) ポイント制度を導入するとします。その場合、"Paidイベント"をsubしたサービスを追加することが考えられます。
この時、Publisher側の既存機能は何も意識する必要がありません。
- ex) ポイント制度を導入するとします。その場合、"Paidイベント"をsubしたサービスを追加することが考えられます。
- ビジネス上の変更箇所がそのままシステムの変更箇所になる
例えば、
- 業務フロー図 -> コンテナ上のモノリシックFWがappロジックを包含しているアーキテクチャ図 -> クラス図等のUML
は、必ずしも物事の切り方というかモデルが一気通貫してこないところがあり、これが開発プロジェクトの難易度を上げる一因になっているかもしれないとも思います。
🕺自律的なチーム・サービス
先にも見たように、サービス間が比較的疎結合であるためチームは自分たちの価値創出に専念できます。
物事を細かく分割することのメリットは、理解しやすいではなく、個々の自律性を高められることもあるかと思います。
より具体的には、
- 把握しづらい全体の中の"歯車感"ではなく、把握可能な規模であるためコントロール感がある。モチベアップに直結し、開発者体験が良くなる、要は楽しい。
- マイクロサービスの一般的なメリットを踏襲したところと思います。
- オーナーシップが明確であるため、チーム間の間に落ちたボールの見ないふり、押し付け合いのようなクッソ寒い現象の回避に役立つ
- チーム間の調整のような消耗につながるリスクのある仕事の発生を抑制できる
- ex) Orderチームが、"決済履歴一覧"のような画面を新設しようとします。
この時、Paymentチームになんらかタスクをお願いしてどうのこうの、の必要がありません。
Paymentイベントをsubして、Orderコンテキストの内部に自分たち用の"決済履歴一覧"のリードモデルを作ります。- イベントソーシング、CQRSによるメリットである要素かと思います。
参考になった動画) https://www.youtube.com/watch?v=7PYNmZhpT1A
- イベントソーシング、CQRSによるメリットである要素かと思います。
- ex) Orderチームが、"決済履歴一覧"のような画面を新設しようとします。
最後に少し話がズレるのですが、緩く結合したサービス同士がメッセージパッシング(この場合はイベント)を送り合っていくようなプログラムの進み方という点だけ見たときに、オブジェクト指向(クラス駆動ではない方の意味で)を思い出しました。
以後は説明のお題を軽く説明してから、ちゃんとコードを見せながらの実装に移っていきます。
お題の説明
webアプリ経由でオーダー->決済し、提供準備が出来たら店頭で受け取るようなカフェビジネスを題材にします。
全体の構造として、
- ビジネスプロセス(L1タスク) > 各ワークフロー(L2タスク) > ワークフローを構成するタスク群(L3タスク) > サブタスク群...
といったような階層構造で捉えています。
ビジネスプロセスアーキテクチャ
4つのコンテキスト(業務領域)に分かれます。以下ビジネスプロセス全体の流れです。
- [Order業務] ユーザはwebアプリからドリンクを注文します。
- 注文受付が成功すると、UI上の注文状況一覧に表示されます。
- [Payment業務] 事前登録してある(と言う体の)クレジットカード情報から、裏でStrip連携し決済します。
- 決済が正常に終了すると、ユーザの画面は決済完了のpushを受けます。
- 決済が正常に終了すると、ユーザの画面は決済完了のpushを受けます。
- [Serve業務] 決済が完了すると、店舗側では注文が入った通知を受けます。(ほんとの業務でslack通知で耐えるかはわかりません)
- バリスタはドリンクを作り、提供準備が完了したら、slack上で「準備完了」のボタンを押します。
- 提供準備完了が通知されると、ユーザは、再びUIから通知を受けます。
- [Engagement業務] 提供準備の完了と同時に、ユーザが事前登録している(と言う体の)emailアドレス宛にメールが飛びます。これには、準備完了の通知と同時に、レビューフォームへのリンクが含まれています。
- ユーザは、店頭でのドリンク受け取り後、レビューを書き込みます。
- 店員は、よきタイミングでレビューにコメントバックします。
システムアーキテクチャ
モノレポで、ディレクトリ構造も業務&チーム構造と同期しています。
せっかくなのでなんか気取ったネーミングをしてますが、それぞれのworkflowディレクトリ配下には、
- ux-logic
- 要はフロントエンド。Nextとかのコード。webアプリケーションロジック。
- business-logic
- 要はバックエンド。ほとんどlambdaに載せる関数。ドメインロジック。
- cloud-construct-logic
- 要はインフラ。インフラ的なクラウド部品に加え、アプリケーション寄りのクラウド部品(step functionsによる制御フロー定義とか)をcdkで書いている。
のようなサブディレクトリが切られています。
(力尽きてEngagementは丸っと未実装です😜)
前掲のAWSお絵描きは上からの俯瞰視点の図ですが、(それぞれのworkflow単位で)横から見た断面図的視点でも表現してみます。
個々のサービス(ワークフロー)はおよそこのようにクラウド部品が組み合わさって構成されます。
実装編
個々のワークフロー毎に、コードを挟みつつ個別に探訪していきます。
全体観
階層構造のイメージ
その前に全体の考え方をまだ補足させてください。
- ビジネスプロセス(L1タスク) > 各ワークフロー(L2タスク) > ワークフローを構成するタスク群 > サブタスク群...
ビジネスプロセス構造を上記の階層構造で捉える事を前述しましたが、システム構造も同様です。
- 全体としてビジネスプロセス自体があり
- イベントバスを中心として、各ワークフローがつながっている。
- ここで、各ワークフローには2種類の実装方針があります。
種類 | 実装部品 | 向いているドメイン |
---|---|---|
オーケストレーション | Step Functions | ・時間に沿って一方行に流れる仕事 ・トランザクションの一貫性が優先される場合 ・ただの関数ではなくステートマシンが欲しい時 |
コレオグラフィー | EventBridge | ・非同期的に、アクターが相互にコラボレーティブに動作して進行するような仕事 ・構成要素間の疎結合さが優先される場合 |
- 今回の例では、ビジネスプロセス(L1タスク)全体ではコレオグラフィー、配下のワークフロー(L2タスク)はオーケストレーションとコレオグラフィーの両方があります。(全体をオーケストレーションで構成する「ケースもあると思います。)
- それぞれがオーケストレーションかコレオグラフィーであるワークフローが、(時には相互に)入れ子になって全体を構成しているようなイメージです。
- L1タスク(コレオ)>L2タスク(オーケ)>L3タスク(コレオ)>L4タスク(オーケ)...
- L1タスク(コレオ)>L2タスク(オーケ)>L3タスク(コレオ)>L4タスク(オーケ)...
参考になった動画) https://www.youtube.com/watch?v=jYmZH7j_MXA&list=PLJo-rJlep0EC121zz6w967dtbNTbuwrBH
横断的なTIPS
常に意識しておかねばならない注意点と、セオリーと言われている点をまとめます。
⚠️注意点
✅ 冪等性
- 例えば、EventBridgeで配信されるイベントはAt Least Once(つまり、2回イベントが来る可能性)であるため、それに反応する関数の冪等性を常に意識する
awsの方の資料より拝借https://d1.awsstatic.com/serverless-jp/contents/202102_EDA_steps_patterns.pdf
✅ 順序性
- イベントが生成された順序で処理されるとは限らない。ネットワーク上で後発が先発を飛び越えてしまう可能性を考慮する
✅ 補償トランザクション
- マイクロサービスでお馴染み。例えば、Step FunctionのワークフローのTask1とTask2で異なるDBを更新していた場合、2でこけたら1に取り消し処理を打つ必要があることもある。
セオリー
✅ lambda less
- 直接統合を使い、不要なglue lambdaを設けない。例えば、EventBridgeターゲットやStepFunctionのタスクにlambdaを介さず連携できるサービスの場合に
- または、EvenBridge Pipesが対応している場合検討する
- それでも必要ならlambda
✅ avoid lambda chain
- lambdaピンボールとも。lambdaでlambdaを呼ぶようなことを避ける。step functionsで統合するなどする
その他
基本サイト。cdkコードはここからめっちゃパクりますし、基礎の解説記事もあります。
さて、ようやくそれぞれのディレクトリ(チームとサービスの単位とも一致)毎に実装をみていきましょう。
common
その名の通り各ワークフロー横断的なロジックを置きます。
今回はクラウド部品の定義しかありません。本来は共通の関数など入ってきそうです。
cloud-constructs
- Cafeビジネスプロセスのそれぞれのワークフローを緩く結合する、中央のイベントバスがあります。
- また、それぞれのイベントのログも定義します。
/common/cloud-constructs-logic/lib/event-bus-stack.ts
export class EventBusStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
const cafeBusinessCentralBus = new EventBus(this, 'cafe-business-central', {
eventBusName: 'cafe-business-central-bus',
});
}
}
order-workflow
ここで特筆したい概念
イベントソーシング/CQRS
a.イベントストアとb.リードモデルとを峻別して考えます。
この2つは、以下の観点でモデル・ライフサイクル・性質が全く異なるものという意見です。
- a.たった一つの不変な事実の記録 と b.多様で可変の解釈
- a.本質的に管理することを避けられない状態 と b.パフォーマンス都合のキャッシュ
- a.データの実体 と b.実体ではないビュー (ビュー更新問題)
これを混同しないことにより、
- 保守の中でテーブルやその関係がどんどんカオス度を増していくことを抑えられる
- 結局、分析系や監査系にトランザクション結果を回すのだから、トランザクションの事実の記録をイベントとして保持して置けば良い
というメリットがあると考えます。
非常に感銘を受けた記事)
ux-logic
- lambdaでnext.jsを動作させます。
- フロントエンドは、GraphQLでMutationなどを行うと同時に、リードモデルの変更をsubscribeしています
- ここがややキモで、これにより非同期に突き放したロジックの実行結果がUIに伝えられます。
Nextのコードは今回の焦点ではないので特に触れません。
cloud-construct
- CDNやUI Computingのためのlambdaなどがあります。
- ユーザpool/IDpoolのためのCognitoもここにあります。(commonでもいいかも)
- AppSyncによるAPIがあります。API本体と必要なリゾルバを定義します。
/order-workflow/cloud-constructs-logic/lib/api/appsync.ts
export class AppSyncConstruct extends Construct {
public readonly graphqlApiArn: string;
public readonly graphqlApiUrl: string;
public readonly graphqlApiKey: string;
constructor(
scope: Construct, id: string,
userPool: UserPool,
identityPool: IdentityPool,
orderedEventStore: TableV2,
orderStateView: TableV2,
) {
super(scope, id);
// AppSync GraphQL API
const OrderAppSyncApi = new GraphqlApi(this, 'OrderAppSyncApi', {
name: 'OrderAppsyncAPI',
definition: Definition.fromFile(join(__dirname, '../../../schema.graphql')),
authorizationConfig: {
defaultAuthorization: {
authorizationType: AuthorizationType.USER_POOL,
userPoolConfig: {
userPool,
},
},
additionalAuthorizationModes: [
{
authorizationType: AuthorizationType.IAM,
},
],
},
logConfig: {
retention: logs.RetentionDays.ONE_WEEK,
fieldLogLevel: FieldLogLevel.ALL
},
});
OrderAppSyncApi.grantQuery(identityPool.unauthenticatedRole, 'getOrdersByUserID')
// AppSync Data Source -> DynamoDB table
const EventStoreSource = OrderAppSyncApi.addDynamoDbDataSource(
'OrderedEventSource',
orderedEventStore,
);
const ViewCacheSource = OrderAppSyncApi.addDynamoDbDataSource(
'OrderedViewCacheSource',
orderStateView,
);
// Functions and Resolvers
const createOrderFunction = new AppsyncFunction(this, 'createOrderFunction', {
name: 'createOrderFunction',
api: OrderAppSyncApi,
dataSource: EventStoreSource,
code: Code.fromAsset(join(__dirname, '../../../business-logic/resolver/ordered-store/Mutation.createOrder.js')),
runtime: FunctionRuntime.JS_1_0_0,
});
const updateOrderStateViewFunction = new AppsyncFunction(this, 'updateOrderStateViewFunction', {
name: 'updateOrderStateViewFunction',
api: OrderAppSyncApi,
dataSource: ViewCacheSource,
code: Code.fromAsset(join(__dirname, '../../../business-logic/resolver/ordered-view-cache//Mutation.updateOrderStateView.js')),
runtime: FunctionRuntime.JS_1_0_0,
});
const getOrderedListByUserFunc = new AppsyncFunction(
this,
'getOrderListByUserFunction',
{
name: 'getOrderListByUserFunction',
api: OrderAppSyncApi,
dataSource: ViewCacheSource,
code: Code.fromAsset(
join(__dirname, '../../../business-logic/resolver/ordered-view-cache/Query.getOrderListByUser.js')
),
runtime: FunctionRuntime.JS_1_0_0,
}
);
const passthrough = InlineCode.fromInline(`
export function request(...args) {
console.log(args);
return {}
}
export function response(ctx) {
return ctx.prev.result
}
`);
const createOrderResolver = new Resolver(this, 'createOrderResolver', {
api: OrderAppSyncApi,
typeName: 'Mutation',
fieldName: 'createOrder',
runtime: FunctionRuntime.JS_1_0_0,
pipelineConfig: [createOrderFunction],
code: passthrough,
});
const updateOrderStateViewResolver = new Resolver(this, 'updateOrderStateViewResolver', {
api: OrderAppSyncApi,
typeName: 'Mutation',
fieldName: 'updateOrderStateView',
runtime: FunctionRuntime.JS_1_0_0,
pipelineConfig: [updateOrderStateViewFunction],
code: passthrough,
});
const getOrderListByUserResolver = new Resolver(
this,
'getOrderListByUserResolver',
{
api: OrderAppSyncApi,
typeName: 'Query',
fieldName: 'getOrdersByUserID',
runtime: FunctionRuntime.JS_1_0_0,
pipelineConfig: [getOrderedListByUserFunc],
code: passthrough,
}
);
this.graphqlApiArn = OrderAppSyncApi.arn;
this.graphqlApiUrl = OrderAppSyncApi.graphqlUrl;
this.graphqlApiKey = OrderAppSyncApi.apiKey || '';
}
}
- イベントストアとリードモデルを定義しています。
/order-workflow/cloud-constructs-logic/lib/data/order-read-model.ts
export class ReadModelConstruct extends Construct {
public readonly orderStateView: TableV2;
constructor(
scope: Construct, id: string,
centralEventBusArn: string,
) {
super(scope, id);
const centralEventBus = events.EventBus.fromEventBusArn(
this, 'ExistingEventBusInOrder2',
centralEventBusArn,
);
/* Read Model */
const orderStateView = new TableV2(this, 'OrderStateCacheTable', {
partitionKey: { name: 'UserID', type: AttributeType.STRING },
sortKey: { name: 'OrderID', type: AttributeType.STRING },
billing: Billing.onDemand(),
removalPolicy: RemovalPolicy.DESTROY,
localSecondaryIndexes: [
{
indexName: 'StatusIndex',
sortKey: { name: 'Status', type: AttributeType.STRING },
projectionType: ProjectionType.ALL,
},
],
});
/* Derive from Event */
const OrderedConsumer = new NodejsFunction(this, 'orderedConsumer', {
memorySize: 1024,
timeout: Duration.seconds(5),
runtime: Runtime.NODEJS_20_X,
handler: 'handler',
entry: join(__dirname, '../../../business-logic/derive-order-state/from-ordered-event.ts'),
bundling: {
forceDockerBundling: false,
},
environment: {
TABLE_NAME: orderStateView.tableName,
}
});
orderStateView.grantReadWriteData(OrderedConsumer);
// catch ordered event
new Rule(this, 'OrderedRule', {
description: 'Listen to all Ordered events',
eventPattern: {
source: ['cafe.order'],
detailType: ['Ordered']
},
eventBus: centralEventBus,
}).addTarget(new LambdaFunction(OrderedConsumer));
this.orderStateView = orderStateView;
}
}
/order-workflow/cloud-constructs-logic/lib/data/order-event-store.ts
export class EventStoreConstruct extends Construct {
public readonly orderedEventStore: TableV2;
constructor(
scope: Construct, id: string,
centralEventBusArn: string,
) {
super(scope, id);
/* Event Store */
const orderedEventTable = new TableV2(this, 'OrderedEventTable', {
partitionKey: { name: 'UserID', type: AttributeType.STRING },
sortKey: { name: 'OrderDateTime', type: AttributeType.STRING },
billing: Billing.onDemand(),
removalPolicy: RemovalPolicy.DESTROY,
localSecondaryIndexes: [
{
indexName: 'StatusIndex',
sortKey: { name: 'Status', type: AttributeType.STRING },
projectionType: ProjectionType.ALL,
},
],
dynamoStream: StreamViewType.NEW_IMAGE
});
/* Change Data Capture */
const centralEventBus = events.EventBus.fromEventBusArn(this, 'ExistingEventBusInOrder', centralEventBusArn);
// log group to see output
const orderLogGroup = new LogGroup(this, 'ordered-log', {
logGroupName: '/aws/events/ordered',
retention: RetentionDays.ONE_DAY,
removalPolicy: RemovalPolicy.DESTROY
});
// Rule that matches any incoming event and sends it to a logGroup
const catchAll = new Rule(this, 'send-to-ordered-log', {
eventBus: centralEventBus,
ruleName: 'catch-ordered',
eventPattern: {
source: ['cafe.order'],
detailType: ['Ordered']
},
targets: [new CloudWatchLogGroup(orderLogGroup)]
});
const eventBridgeRole = new Role(this, 'events-role', {
assumedBy: new ServicePrincipal('events.amazonaws.com'),
});
orderLogGroup.grantWrite(eventBridgeRole);
const pipeRole = new Role(this, 'pipe-role', {
assumedBy: new ServicePrincipal('pipes.amazonaws.com'),
});
orderedEventTable.grantStreamRead(pipeRole);
centralEventBus.grantPutEventsTo(pipeRole);
// Create new Pipe
const pipe = new CfnPipe(this, 'ordered-pipe', {
roleArn: pipeRole.roleArn,
//@ts-ignore
source: orderedEventTable.tableStreamArn,
sourceParameters: {
dynamoDbStreamParameters: {
startingPosition: StartingPosition.LATEST,
batchSize: 1,
},
filterCriteria: {
filters: [
{ pattern: '{"eventName" : ["INSERT"] }'},
],
},
},
// enrichment: splitterFunc.functionArn,
target: centralEventBusArn,
targetParameters: {
eventBridgeEventBusParameters: {
detailType: 'Ordered',
source: 'cafe.order',
},
},
});
this.orderedEventStore = orderedEventTable;
}
}
ポイント
✅ DyamoDB Streamによる変更データキャプチャを、EventBridge Pipesを通じてEventBusにpubします。
- イベントをキャッチし、リードモデルに派生させる仕組みがあります
/order-workflow/cloud-constructs-logic/lib/data/paid-event-deriver.ts
export class PaidDeriverConstruct extends Construct {
constructor(
scope: Construct, id: string,
centralEventBusArn: string,
env: {
account: string;
region: string;
},
graphqlApi: {
arn: string,
url: string,
}
) {
super(scope, id);
const centralEventBus = events.EventBus.fromEventBusArn(
this, 'ExistingEventBusInOrder3',
centralEventBusArn,
);
// catch paid event
const policyStatement = new PolicyStatement({
effect: Effect.ALLOW,
actions: ['appsync:GraphQL'],
resources: [`${graphqlApi.arn}/types/Mutation/*`],
})
const ebRuleRole = new Role(scope, 'AppSyncInvokeRole', {
assumedBy: new ServicePrincipal('events.amazonaws.com'),
inlinePolicies: {
PolicyStatement: new PolicyDocument({
statements: [policyStatement],
}),
},
})
const endpointId = cdk.Fn.select(
0,
cdk.Fn.split('.',cdk.Fn.select(1, cdk.Fn.split('://', graphqlApi.url)))
);
const graphqlEndpointArn = cdk.Fn.join('', [
'arn:aws:appsync:',
env.region,
':',
env.account,
':endpoints/graphql-api/',
endpointId
]);
const deadLetterQueue = new sqs.Queue(this, 'DeadLetterQueue');
// L2 constructはappsync target未対応
const mycfnRule = new events.CfnRule(scope, 'paidcfnRule', {
eventBusName: centralEventBus.eventBusName,
name: 'derive-paid',
description: 'Listen to all Paid events',
eventPattern: {
source: ['cafe.payment'],
"detail-type": ['Paid'],
},
targets: [
{
id: 'mutateOrderStatusAppsyncTarget',
arn: graphqlEndpointArn,
roleArn: ebRuleRole.roleArn,
deadLetterConfig: { arn: deadLetterQueue.queueArn },
appSyncParameters: {
graphQlOperation: /* GraphQL */
`mutation UpdateOrderStateView ($input: UpdateOrderStateViewInput!) {
updateOrderStateView(input: $input) {
UserID
OrderID
Status
}
}
` ,
},
inputTransformer: {
inputPathsMap: {
orderId: '$.detail.dynamodb.NewImage.OrderID.S',
userId: '$.detail.dynamodb.NewImage.UserID.S',
chargeResult: '$.detail.dynamodb.NewImage.ChargeResult.S',
},
inputTemplate: JSON.stringify({
"input": {
"OrderID": "<orderId>",
"UserID": "<userId>",
"Status": `<chargeResult>`,
"OrderItem": 'false',
}
})
},
},
],
})
deadLetterQueue.grantSendMessages(new iam.ServicePrincipal('events.amazonaws.com'));
}
}
ポイント
✅ OrderやPaid、Preparedイベントなど、UIにpushすべきリードモデル変更のトリガーをsubします。
✅ EventBridgeRulesから、Appsyncに直接統合できます。
✅ リードモデルの更新失敗をキャッチするため、デッドレターキューを統合しています。
business-logic
- AppSyncリゾルバの関数の実体があります。
/order-workflow/business-logic/resolver/ordered-store/Mutation.createOrder.js
export function request(ctx) {
return {
version: "2018-05-29",
operation: "PutItem",
key: {
"UserID": util.dynamodb.toDynamoDB(ctx.args.input.UserID),
"OrderDateTime": util.dynamodb.toDynamoDB(ctx.args.input.OrderDateTime)
},
attributeValues: {
"Status": util.dynamodb.toDynamoDB(ctx.args.input.Status),
"OrderTransaction": util.dynamodb.toDynamoDB(ctx.args.input.OrderTransaction)
}
};
}
export function response(ctx) {
return {
UserID: ctx.result.UserID,
OrderDateTime: ctx.result.OrderDateTime,
Status: ctx.result.Status,
OrderTransaction: ctx.result.OrderTransaction
};
}
ポイント
✅ マッピングテンプレートは,Javascriptリゾルバで書けます。VTLというやつではなく他の部分と統一的にロジックを書けます。
- イベント->リードモデルに派生させる際の変換ロジックが置かれています。
payment-workflow
ここで特筆したい概念
Step Functions / Event Bridgeの、外部SaaSとの直接統合
- step functionsのタスクとして、lambdaを介さずに外部SaaSをコールできます。
- event bridge は外部SaaSからのイベントを受けることができ、ターゲットにもできます。
cloud-construct
- Paymentイベントストアがあります
- 決済処理の制御フローと、その子タスク群を定義します。
/payment-workflow/cloud-constructs-logic/lib/transaction/payment-workflow.ts
export class PaymentWorkflowConstruct extends Construct {
constructor(
scope: Construct, id: string,
centralEventBusArn: string,
paymentEventStore: TableV2,
) {
super(scope, id);
// lambda
const validationLogic = new NodejsFunction(this, 'ValidateLambda', {
memorySize: 1024,
runtime: lambda.Runtime.NODEJS_20_X,
handler: 'index.handler',
entry: path.join(__dirname, '../../../business-logic/validate-payment.ts'),
bundling: {
forceDockerBundling: false,
},
});
// states
const retrieveCardInfoTask = new sfn.Pass(this, 'AddCardNumber', {
parameters: {
'NewImage.$': '$.detail.dynamodb.NewImage',
'cardNumber': '1111-1111-1111-1111'
},
resultPath: '$.detail.dynamodb'
});
const cardValidateTask = new tasks.LambdaInvoke(
this,
"validate-card-info", {
lambdaFunction: validationLogic,
}
);
// Stripeのテスト用endpointをコール
// これ来そうなのでhttp直接統合は待つ
// https://github.com/aws/aws-cdk/issues/28278
// todo: db write 直接統合するstep
const ddbWrite = new tasks.DynamoPutItem(
this,
"payment-ddb-write-job", {
item: {
UserID: tasks.DynamoAttributeValue.fromString(
sfn.JsonPath.stringAt('$.Payload.invoice.UserID')
),
PaymentDateTime: tasks.DynamoAttributeValue.fromString(
new Date().toISOString(), // TODO: stripeの結果
),
OrderID: tasks.DynamoAttributeValue.fromString(
sfn.JsonPath.stringAt('$.Payload.invoice.OrderID')
),
ChargeResult: tasks.DynamoAttributeValue.fromString(
sfn.JsonPath.stringAt('$.Payload.invoice.ChargeResult')
),
},
table: paymentEventStore,
}
);
// workflow as chanable
const definition = retrieveCardInfoTask
.next(cardValidateTask)
.next(ddbWrite);
// state machine
const sfnLogGroup = new logs.LogGroup(this, 'stpLogGroup');
const stateMachine = new sfn.StateMachine(this, 'PaymentStateMachine', {
definitionBody: DefinitionBody.fromChainable(definition),
stateMachineType: sfn.StateMachineType.EXPRESS,
stateMachineName: 'PaymentStateMachine',
logs: {
destination: sfnLogGroup,
level: sfn.LogLevel.ALL,
}
});
// catch order event
const centralBus = events.EventBus.fromEventBusArn(
this, 'centralEventBusOfpayment', centralEventBusArn
);
new events.Rule(this, 'PaymentCatchOrderedRule', {
description: 'Listen to all Ordered events from Payment workflow',
eventPattern: {
source: ['cafe.order'],
detailType: ['Ordered']
},
eventBus: centralBus,
}).addTarget(new targets.SfnStateMachine(stateMachine,{
input: events.RuleTargetInput.fromEventPath('$'),
}
));
}
}
ポイント
✅ Orderイベントの発行をトリガーにExpressワークフローが起動します
✅ StepFunctionsから、Stripeを直接統合できます。(実装しているとは言っていない)
business-logic
- 決済処理のstep functions workflowを構成する子タスクである関数が置かれています。
serve-workflow
ここで特筆したい概念
Step Functionsの種類
種類 | 用途 |
---|---|
Expressワークフロー | 普通の、すぐ終わる関数オーケストレーション |
標準ワークフロー | long-runなワークフロー(最大1年)。 例えば承認ステップを挟み、途中で一時停止するようなフロー |
また、承認ステップ等の他の処理を待つ一時停止を入れるには、タスクトークンを用いたコールバックパターンを使用します。
cloud-construct
- slackへのコールバックを含むlong-runワークフローが定義されています。
/serve-workflow/cloud-constructs-logic/lib/long-running-transaction/serve-workflow.ts
export class ServeWorkflowConstruct extends Construct {
constructor(
scope: Construct, id: string,
centralEventBusArn: string,
slackWebhookUrl: cdk.aws_secretsmanager.ISecret,
) {
super(scope, id);
// Lambda functions setup
const webhookFunction = new NodejsFunction(this, 'WebhookFunction', {
handler: 'index.handler',
entry: path.join(__dirname, '../../../business-logic/webHook.ts'),
environment: {
SECRET_ARN: slackWebhookUrl.secretArn,
},
bundling: {
forceDockerBundling: false,
},
});
slackWebhookUrl.grantRead(webhookFunction);
const approvalFunction = new NodejsFunction(this, 'ApprovalFunction', {
handler: 'index.handler',
entry: path.join(__dirname, '../../../business-logic/approval.ts'),
bundling: {
forceDockerBundling: false,
},
});
const stepFunctionsPolicy = new iam.PolicyStatement({
actions: ['states:SendTaskSuccess', 'states:SendTaskFailure'],
resources: ['*'],
});
approvalFunction.addToRolePolicy(stepFunctionsPolicy);
const publishEventFunction = new NodejsFunction(this, 'PublishEventFunction', {
handler: 'index.handler',
entry: path.join(__dirname, '../../../business-logic/publishEvent.ts'),
bundling: {
forceDockerBundling: false,
},
});
const eventBridgePolicy = new iam.PolicyStatement({
actions: ['events:PutEvents'],
resources: ['*'],
});
publishEventFunction.addToRolePolicy(eventBridgePolicy);
// Gateway for slack
const logGroup = new logs.LogGroup(
this, 'SlackCalbackApiGatewayLogGroup',
{
logGroupName: `/aws/apigateway/callback-api-access-log`,
retention: 1,
removalPolicy: cdk.RemovalPolicy.RETAIN_ON_UPDATE_OR_DELETE
},
);
const api = new apigateway.RestApi(this, 'ApprovalApi', {
restApiName: 'Approval Service',
deployOptions: {
dataTraceEnabled: true,
loggingLevel: apigateway.MethodLoggingLevel.INFO,
accessLogDestination: new apigateway.LogGroupLogDestination(logGroup),
accessLogFormat: apigateway.AccessLogFormat.clf()
}
});
const slackResource = api.root.addResource('slack');
slackResource.addMethod('POST', new apigateway.LambdaIntegration(approvalFunction));
slackResource.addCorsPreflight({
allowOrigins: apigateway.Cors.ALL_ORIGINS,
allowMethods: apigateway.Cors.ALL_METHODS
});
// Step Functions tasks
const callWebhookTask = new tasks.LambdaInvoke(this, 'CallWebhookTask', {
lambdaFunction: webhookFunction,
integrationPattern: sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN,
payload: sfn.TaskInput.fromObject({
'taskToken': sfn.JsonPath.taskToken,
'input.$': '$'
}),
resultPath: '$.result',
});
const publishEventTask = new tasks.LambdaInvoke(this, 'PublishEventTask', {
lambdaFunction: publishEventFunction,
inputPath: '$',
});
// State machine definition
const definition = callWebhookTask
.next(new sfn.Choice(this, 'ApprovalCheck')
.when(sfn.Condition.stringEquals('$.result.status', 'approved'), publishEventTask)
.otherwise(new sfn.Fail(this, 'NotApproved', {
cause: 'The request was not approved',
})));
const sfnLogGroup = new logs.LogGroup(this, 'stpLogGroup');
const stateMachine = new sfn.StateMachine(this, 'StateMachine', {
definition,
timeout: cdk.Duration.minutes(30),
logs: {
destination: sfnLogGroup,
level: sfn.LogLevel.ALL,
}
});
// catch paid event
const centralBus = events.EventBus.fromEventBusArn(
this, 'centralEventBusOfserve', centralEventBusArn
);
new events.Rule(this, 'PaymentCatchOrderedRule', {
description: 'Listen to all Ordered events from Payment workflow',
eventPattern: {
source: ['cafe.payment'],
detailType: ['Paid']
},
eventBus: centralBus,
}).addTarget(new targets.SfnStateMachine(stateMachine,{
input: events.RuleTargetInput.fromEventPath('$'),
}
));
// event log
const preparedLogGroup = new logs.LogGroup(this, 'Prepaired-log', {
logGroupName: '/aws/events/prepared',
retention: logs.RetentionDays.ONE_DAY,
removalPolicy: cdk.RemovalPolicy.DESTROY
});
const catchAll = new events.Rule(this, 'send-to-Prepared-log', {
eventBus: centralBus,
ruleName: 'catch-prepared',
eventPattern: {
source: ['cafe.serve'],
detailType: ['Prepared']
},
targets: [new targets.CloudWatchLogGroup(preparedLogGroup)]
});
const eventBridgeRole = new iam.Role(this, 'events-role-prepared', {
assumedBy: new iam.ServicePrincipal('events.amazonaws.com'),
});
preparedLogGroup.grantWrite(eventBridgeRole);
}
}
business-logic
- slackのwebhookをコールしたり、callbackを受けたり、結果をEventBusにpushする関数があります。
終わりに
感想・お気持ち
全体的な所感として、冒頭に挙げたメリット以外にも、
実際の現象ベースで設計できる
という部分を感じました。
具体的には、
- 実際のプロセス、働き: ワークフローとそれを構成する関数
- 実際に起きた記録すべき事象: イベントストア
- 実際の画面の情報構造: リードモデル
のように、ふわふわしていない現実の事象ベースで素直に考えれば良い、という点に非常に魅力を感じます。
このメリットは、今現在は存在しないビジネスプロセスを創っていく場合でも変わらないと考えます。
なんならプロセス自体がガンガン変わっていくようなドメインの方が向いているのではないでしょうか。
ちまたで"抽象化"、"モデリング"と言われている手法を経由し設計することには、コード共通化・効率性に強みがあると考えます。
一方で、関係線がハコ同士に引かれたそれらしき虚構、ではなく本当にコードに落とすにあたり意味のあるモデルを書くには熟練の勘を要するとも考えます。
その意味では、"再現性"があるともこのアプローチのメリットは言い換えられるのではないかとも思いました。
あるいは、
- ビジネスプロセスアーキテクチャ = システムアーキテクチャ
となってくることから、自然とエンジニアもbiz視点で"ビジネスプロセス(ドメイン)そのものの設計"を行うようになり、
- ある意味DDDの極地?
- 生成AI前提世界における一つの道か?
などともとも思いました。
今後の潮流
- 3大クラウド -> 垂直SaaSクラウドへ
- Cloud flareを筆頭に、Vercel、Momentなど、特定領域に特化したサービスを組み合わせていく選択肢が増えていくかも
https://www.infoq.com/articles/cloud-computing-post-serverless-trends/
- Cloud flareを筆頭に、Vercel、Momentなど、特定領域に特化したサービスを組み合わせていく選択肢が増えていくかも
- コンテナ / FaaS -> wasm workerへ
- https://developers.cloudflare.com/workers/runtime-apis/webassembly/
- ドメインロジック(関数)の実行については、wasmにコンパイルしてマネージドされたworker系にただ置く、という方法はそのシンプルさから選択肢になってくると考える
- IfC
- インフラコードも含め、ビジネスロジックとして統一的に書く。ビジネスロジックとして書くことで、裏でいい感じにインフラリソースもデプロイされる。
- https://infrastructurefromcode.com/
- 確かに、step functionsのタスクをその連鎖をcdkで書いたりすると、インフラロジックとビジネスロジックの境界がなくなっていているのを感じ、その方向にいくのも納得できる
まだまだ、サーバレス特有の
- セキュリティ
- オブザーバビリティ
- テスティング
など抑えるべきトピックは多いですが、今回手始めに"アーキテクチャ編"ということでまとめました。
今回は、実践的ではない簡素化された例かつ運用をしたわけでもないので、実際にはまだ見ぬ辛みポイントがいっぱいあろうなとは考えます。
正直長いですが、もし最後まで読んでくださった方いたらありがとうございました!!
Discussion