100日チャレンジ day56 (イベント駆動によるECサイト)
昨日
100日チャレンジに感化されたので、アレンジして自分でもやってみます。
やりたいこと
- 世の中のさまざまなドメインの簡易実装をつくり、バックエンドの実装に慣れる(dbスキーマ設計や、関数の分割、使いやすいインターフェイスの切り方に慣れる
- 設計力(これはシステムのオーバービューを先に自分で作ってaiに依頼できるようにする
- 生成aiをつかったバイブコーティングになれる
- 実際にやったことはzennのスクラップにまとめ、成果はzennのブログにまとめる(アプリ自体の公開は必須ではないかコードはgithubにおく)
できたもの

イベント駆動 を学ぶ
はい、承知いたしました。イベント駆動型のシステムをGo言語で開発するのですね。購入、商品確保、配送をそれぞれ独立したサービスとして構築しましょう。
まずはプロジェクトの初期設定として、新しい日のディレクトリを作成し、基本的なドキュメントを準備します。
新しい日(day56)のプロジェクトディレクトリ day56_event_driven_architecture を作成します。
次に、このディレクトリ内に README.md と PROGRESS.md を作成します。
mkdir day56_event_driven_architecture
day56_event_driven_architecture ディレクトリを作成しました。
次に、このディレクトリ直下に README.md を作成し、今回作るアプリケーションの概要を記述します。
また、PROGRESS.md を作成し、開発の進捗を管理できるようにします。
README.md と PROGRESS.md を作成しました。
それでは、イベント駆動システムの仕様書を作成します。
イベント駆動型システム 仕様書
1. 概要
本システムは、オンラインショップにおける注文処理をイベント駆動アーキテクチャで構築するものです。購入、在庫管理、配送の各プロセスを独立したマイクロサービスとして開発し、非同期のイベント通知によって連携させます。これにより、スケーラビリティと耐障害性の高いシステムを目指します。
2. アーキテクチャ
2.1. 全体構成図
2.2. 使用技術スタック
- 言語: Go
-
メッセージブローカー: NATS (Goクライアント:
nats.go) - データベース: SQLite (各サービスが個別のDBファイルを持つ)
-
DBアクセス:
database/sqlパッケージ,github.com/mattn/go-sqlite3ドライバ -
API形式: 各サービスの必要に応じて HTTP/JSON (Go標準の
net/httpパッケージ) - 設定管理: 環境変数 or 設定ファイル (簡易的なもの)
-
ロギング: Go標準の
logパッケージ
3. サービス詳細
3.1. 購入サービス (Order Service)
-
役割:
- ユーザーからの注文リクエストを受け付ける。
- 注文情報を永続化する。
-
OrderCreatedイベントを発行する。 - 在庫確保失敗時 (
StockReservationFailed) や配送失敗時 (ShipmentFailed) のイベントをリッスンし、注文ステータスを更新する。
-
APIエンドポイント:
-
POST /orders: 新規注文作成- リクエストボディ例:
{"userId": "user123", "items": [{"productId": "prod001", "quantity": 2, "price": 1000}, {"productId": "prod002", "quantity": 1, "price": 500}]} - レスポンス例 (成功時):
{"orderId": "orderXYZ", "status": "PENDING"}
- リクエストボディ例:
-
GET /orders/:orderId: 注文情報取得
-
-
発行するイベント:
-
OrderCreated- トピック:
orders.created - ペイロード例:
{"orderId": "orderXYZ", "userId": "user123", "items": [{"productId": "prod001", "quantity": 2}, ...], "totalAmount": 2500, "timestamp": "2023-10-27T10:00:00Z"}
- トピック:
-
-
リッスンするイベント:
-
StockReserved(トピック:inventory.reserved)- アクション: 注文ステータスを
AWAITING_SHIPMENTに更新
- アクション: 注文ステータスを
-
StockReservationFailed(トピック:inventory.reservation_failed)- アクション: 注文ステータスを
CANCELLED_NO_STOCKに更新、ユーザーに通知(今回はログ出力で代替)
- アクション: 注文ステータスを
-
ShipmentCompleted(トピック:shipping.completed)- アクション: 注文ステータスを
COMPLETEDに更新
- アクション: 注文ステータスを
-
ShipmentFailed(トピック:shipping.failed)- アクション: 注文ステータスを
SHIPMENT_FAILEDに更新、ユーザーに通知(今回はログ出力で代替)
- アクション: 注文ステータスを
-
-
データモデル (SQLite):
ordersテーブル-
id(TEXT, PRIMARY KEY) - 注文ID -
user_id(TEXT) -
total_amount(INTEGER) -
status(TEXT) - 例:PENDING,AWAITING_SHIPMENT,SHIPPED,COMPLETED,CANCELLED_NO_STOCK,SHIPMENT_FAILED -
created_at(TEXT) - ISO8601形式 -
updated_at(TEXT) - ISO8601形式
order_itemsテーブル -
id(INTEGER, PRIMARY KEY AUTOINCREMENT) -
order_id(TEXT, FOREIGN KEY REFERENCES orders(id)) -
product_id(TEXT) -
quantity(INTEGER) -
price_at_purchase(INTEGER) - 購入時の単価
-
3.2. 在庫サービス (Inventory Service)
-
役割:
- 商品の在庫数を管理する。
-
OrderCreatedイベントをリッスンし、在庫引き当てを試みる。 - 在庫確保成功なら
StockReservedイベントを発行。 - 在庫確保失敗なら
StockReservationFailedイベントを発行。 - 配送失敗 (
ShipmentFailed) イベントをリッスンし、在庫を戻す(補償トランザクション)。
-
APIエンドポイント (管理用・任意):
-
GET /products/:productId/stock: 特定商品の在庫数確認 -
POST /products/:productId/stock: 特定商品の在庫数更新 (手動調整用)
-
-
発行するイベント:
-
StockReserved- トピック:
inventory.reserved - ペイロード例:
{"orderId": "orderXYZ", "items": [{"productId": "prod001", "quantityReserved": 2}, ...], "timestamp": "2023-10-27T10:01:00Z"}
- トピック:
-
StockReservationFailed- トピック:
inventory.reservation_failed - ペイロード例:
{"orderId": "orderXYZ", "reason": "Insufficient stock for prod001", "failedItems": [{"productId": "prod001", "requestedQuantity": 2, "availableQuantity": 1}], "timestamp": "2023-10-27T10:01:00Z"}
- トピック:
-
-
リッスンするイベント:
-
OrderCreated(トピック:orders.created)- アクション: 注文内の各商品について在庫を確認し、引き当てる。
-
ShipmentFailed(トピック:shipping.failed)- アクション: 対象注文の確保済み在庫を戻す。ペイロードに
orderIdとitems(失敗した商品情報) が含まれる想定。
- アクション: 対象注文の確保済み在庫を戻す。ペイロードに
-
-
データモデル (SQLite):
productsテーブル-
id(TEXT, PRIMARY KEY) - 商品ID -
name(TEXT) -
stock_quantity(INTEGER) -
reserved_quantity(INTEGER DEFAULT 0) - 予約済み在庫数(オプショナル、より厳密な管理のため) -
updated_at(TEXT)
-
3.3. 配送サービス (Shipping Service)
-
役割:
-
StockReservedイベントをリッスンし、配送プロセスを開始する。 - 配送情報を永続化する。
-
ShipmentInitiatedイベントを発行。 - 配送処理の完了/失敗に応じて
ShipmentCompleted/ShipmentFailedイベントを発行する。(今回はダミー処理として、一定時間後に成功またはランダムで失敗とする)
-
-
APIエンドポイント (任意):
-
GET /shipments/:shipmentId: 配送状況確認
-
-
発行するイベント:
-
ShipmentInitiated- トピック:
shipping.initiated - ペイロード例:
{"shipmentId": "shipABC", "orderId": "orderXYZ", "shippingAddress": "...", "timestamp": "2023-10-27T10:02:00Z"}
- トピック:
-
ShipmentCompleted- トピック:
shipping.completed - ペイロード例:
{"shipmentId": "shipABC", "orderId": "orderXYZ", "trackingNumber": "track123", "timestamp": "2023-10-27T10:05:00Z"}
- トピック:
-
ShipmentFailed- トピック:
shipping.failed - ペイロード例:
{"shipmentId": "shipABC", "orderId": "orderXYZ", "reason": "Address not found", "items": [{"productId": "prod001", "quantity": 2}, ...], "timestamp": "2023-10-27T10:05:00Z"}(在庫サービスが補償処理できるよう、商品情報を含める)
- トピック:
-
-
リッスンするイベント:
-
StockReserved(トピック:inventory.reserved)- アクション: 新しい配送記録を作成し、配送処理を開始する (ダミー)。
-
-
データモデル (SQLite):
shipmentsテーブル-
id(TEXT, PRIMARY KEY) - 配送ID -
order_id(TEXT, UNIQUE) - 対応する注文ID -
status(TEXT) - 例:PENDING,PROCESSING,SHIPPED,DELIVERED,FAILED -
shipping_address(TEXT) - (今回はOrderCreatedイベントから取得するか、ダミーで生成) -
tracking_number(TEXT, NULLABLE) -
created_at(TEXT) -
updated_at(TEXT)
-
4. イベントフロー例
4.1. 正常系フロー (注文成功 → 在庫確保成功 → 配送成功)
4.2. 異常系フロー (在庫確保成功 → 配送失敗 → 在庫戻し)
5. Goプロジェクト ディレクトリ構成案
day56_event_driven_architecture/
├── cmd/
│ ├── order_service/
│ │ └── main.go
│ ├── inventory_service/
│ │ └── main.go
│ ├── shipping_service/
│ │ └── main.go
├── internal/
│ ├── config/
│ │ └── config.go # 設定読み込み
│ ├── database/
│ │ └── sqlite.go # SQLite初期化・ヘルパー
│ ├── event/
│ │ ├── nats.go # NATS接続・パブリッシュ・サブスクライブ
│ │ └── types.go # イベントの型定義
│ ├── order/ # 購入サービス関連
│ │ ├── api.go
│ │ ├── handler.go
│ │ ├── model.go
│ │ └── repository.go
│ ├── inventory/ # 在庫サービス関連
│ │ ├── handler.go
│ │ ├── model.go
│ │ └── repository.go
│ ├── shipping/ # 配送サービス関連
│ │ ├── handler.go
│ │ ├── model.go
│ │ └── repository.go
│ └── util/
│ └── util.go # 共通ユーティリティ
├── go.mod
├── go.sum
├── README.md
├── PROGRESS.md
└── docker-compose.yml # (オプション) NATS と各サービスを起動するため
-
cmd/: 各サービスのmainパッケージを配置。 -
internal/: プロジェクト内部でのみ使用される共有パッケージを配置。-
config: 設定ファイルの読み込みなど。 -
database: データベース接続やマイグレーション処理。 -
event: NATSクライアントのラッパーやイベント構造体の定義。 - 各サービス (
order,inventory,shipping) のドメインロジック、APIハンドラ、リポジトリ。
-
-
docker-compose.yml: NATSサーバーと各Goサービスをローカルで簡単に起動するためのファイル(オプション)。
6. 開発ステップ (PROGRESS.md と連携)
- 仕様定義 (完了)
-
プロジェクト初期セットアップ
-
day56_event_driven_architectureディレクトリに移動。 - 各サービス (
order_service,inventory_service,shipping_service) のcmdディレクトリとmain.goファイルを作成。 - ルートで
go mod init github.com/lirlia/100day_challenge_backend/day56_event_driven_architectureを実行。 - 必要なパッケージ (
nats.go,go-sqlite3など) をgo getする。 -
internal配下の基本ディレクトリ構造を作成。
-
-
メッセージブローカー (NATS) のセットアップ
-
docker-compose.ymlでNATSサーバーを起動できるようにする(またはローカルにNATSをインストール)。 -
internal/event/nats.goでNATS接続、Publish/Subscribeの基本関数を実装。
-
-
各サービスの実装 (購入 → 在庫 → 配送の順)
-
データモデルとDB (SQLite): 各サービスに対応する
internal/<service>/model.goとinternal/<service>/repository.go(DB操作) を実装。internal/database/sqlite.goでDB初期化処理。 -
イベント処理:
- 発行側: イベントペイロードを定義 (
internal/event/types.go) し、nats.Publishを呼び出す。 - リッスン側:
nats.Subscribeでイベントを待ち受け、internal/<service>/handler.goで処理ロジックを実装。
- 発行側: イベントペイロードを定義 (
-
APIエンドポイント (購入サービス):
internal/order/api.goでHTTPハンドラを実装し、cmd/order_service/main.goでルーティング設定とサーバー起動。
-
データモデルとDB (SQLite): 各サービスに対応する
-
サービス間連携テスト:
-
OrderCreatedイベント発行からStockReserved/StockReservationFailedを経て、ShipmentCompleted/ShipmentFailedまで一連のフローをテスト。 - 各サービスのログを確認し、イベントが正しく伝播し処理されているか確認。
- 失敗系(在庫不足、配送失敗と補償処理)もテスト。
-
- README更新: 最終的な構成や実行方法などを追記。
上記仕様書について、いかがでしょうか?
特に確認したい点や変更したい点があれば教えてください。
問題なければ、この仕様に基づいて開発を進めます。
まずは、プロジェクトの初期セットアップ(ディレクトリ作成とGoモジュールの初期化)から開始します。仕様書、ありがとうございます。非常にわかりやすいです。
この内容で進めてください。
最初のステップとして、プロジェクトの初期セットアップをお願いします。
day56_event_driven_architecture ディレクトリに移動し、Goのプロジェクトとして初期化し、提案のあったディレクトリ構造を作成してください。
また、NATSとSQLiteのGoライブラリも go get してください。
- カレントディレクトリの確認
-
day56_event_driven_architectureへの移動 -
go mod initの実行 - 必要なディレクトリの作成 (
cmd/order_service,cmd/inventory_service,cmd/shipping_service,internal/config,internal/database,internal/event,internal/order,internal/inventory,internal/shipping,internal/util) - 各サービス (
order,inventory,shipping) のmain.goファイルのプレースホルダ作成 -
go getでgithub.com/nats-io/nats.goとgithub.com/mattn/go-sqlite3をインストール -
docker-compose.ymlの雛形作成 (NATSサービスのみ) - 作成されたファイルとディレクトリ構造の確認
上記ステップでお願いします。
func main() {
natsURL := os.Getenv("NATS_URL")
if natsURL == "" {
natsURL = defaultNatsURL
}
if err := event.ConnectNATS(natsURL); err != nil {
log.Fatalf("Failed to connect to NATS: %v", err)
}
if err := shipping.InitShippingDB(); err != nil {
log.Fatalf("Failed to initialize shipping database: %v", err)
}
sub, err := event.SubscribeToEvent(event.StockReservedSubject, shipping.HandleStockReservedEvent)
if err != nil {
log.Fatalf("Failed to subscribe to StockReservedEvent: %v", err)
}
log.Printf("Shipping Service subscribed to %s", event.StockReservedSubject)
// Keep the service running and wait for signals to gracefully shutdown
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
log.Println("Shutdown signal received. Unsubscribing and closing NATS connection...")
if sub != nil {
sub.Unsubscribe()
log.Println("Unsubscribed from NATS subject", event.StockReservedSubject)
}
event.CloseNATS()
os.Exit(0)
}()
log.Println("Shipping Service is running. Waiting for events or shutdown signal...")
runtime.Goexit()
}
func HandleStockReservedEvent(msg *nats.Msg) {
log.Printf("Received StockReservedEvent for subject: %s", msg.Subject)
var stockEvent event.StockReservedEvent
if err := json.Unmarshal(msg.Data, &stockEvent); err != nil {
log.Printf("Error unmarshalling StockReservedEvent: %v. Message data: %s", err, string(msg.Data))
return
}
log.Printf("Processing shipment for order %s with %d item types", stockEvent.OrderID, len(stockEvent.Items))
// For simplicity, assume we get UserID and ShippingAddress from the Order event or a shared context.
// Here, we'll use placeholders or derive them if possible.
// In a real system, OrderService might include UserID in StockReservedEvent or ShippingService might query OrderService.
// Let's assume OrderCreatedEvent is also listened to by ShippingService to get UserID and Address, or they are passed through.
// For now, using placeholders.
dummyUserID := "user_placeholder"
dummyShippingAddress := "123 Main St, Anytown, USA"
// 1. Create Shipment Record
shipmentItems := make([]event.OrderItem, len(stockEvent.Items))
for i, item := range stockEvent.Items {
shipmentItems[i] = event.OrderItem{ProductID: item.ProductID, Quantity: item.Quantity}
}
createdShipment, err := CreateShipment(stockEvent.OrderID, dummyUserID, dummyShippingAddress, shipmentItems)
if err != nil {
log.Printf("CRITICAL: Failed to create shipment record for order %s: %v", stockEvent.OrderID, err)
// This is a severe issue. May need to alert or retry.
// If we can't even create a shipment record, we can't proceed to publish failure for this shipment.
return
}
log.Printf("Shipment record %s created for order %s", createdShipment.ID, stockEvent.OrderID)
// 2. Publish ShipmentInitiatedEvent
shipmentInitiatedEv := event.ShipmentInitiatedEvent{
ShipmentID: createdShipment.ID,
OrderID: createdShipment.OrderID,
ShippingAddress: createdShipment.ShippingAddress,
Timestamp: time.Now(),
}
if err := event.PublishEvent(event.ShipmentInitiatedSubject, shipmentInitiatedEv); err != nil {
log.Printf("CRITICAL: Failed to publish ShipmentInitiatedEvent for shipment %s (order %s): %v", createdShipment.ID, createdShipment.OrderID, err)
// If this fails, subsequent events won't make sense. Consider a retry or a specific failure state.
// For now, we will attempt to continue the dummy process and publish a failure if that occurs.
}
log.Printf("ShipmentInitiatedEvent published for shipment %s", createdShipment.ID)
// 3. Simulate Shipment Processing (Dummy Logic)
go simulateShipmentProcessing(createdShipment, stockEvent.Items) // Run in a goroutine to not block the event handler
}
なるほど、イベント駆動はこういう実装なのか。
NATS を topic でサブスクして goroutine ね〜。なるほ。
