Closed6

100日チャレンジ day56 (イベント駆動によるECサイト)

riddle_tecriddle_tec

昨日
https://zenn.dev/gin_nazo/scraps/d3b0b39197cd82


https://blog.framinal.life/entry/2025/04/14/154104

100日チャレンジに感化されたので、アレンジして自分でもやってみます。

やりたいこと

  • 世の中のさまざまなドメインの簡易実装をつくり、バックエンドの実装に慣れる(dbスキーマ設計や、関数の分割、使いやすいインターフェイスの切り方に慣れる
  • 設計力(これはシステムのオーバービューを先に自分で作ってaiに依頼できるようにする
  • 生成aiをつかったバイブコーティングになれる
  • 実際にやったことはzennのスクラップにまとめ、成果はzennのブログにまとめる(アプリ自体の公開は必須ではないかコードはgithubにおく)

できたもの

https://github.com/lirlia/100day_challenge_backend/tree/main/day56_event_driven_architecture

riddle_tecriddle_tec

はい、承知いたしました。イベント駆動型のシステムをGo言語で開発するのですね。購入、商品確保、配送をそれぞれ独立したサービスとして構築しましょう。

まずはプロジェクトの初期設定として、新しい日のディレクトリを作成し、基本的なドキュメントを準備します。

新しい日(day56)のプロジェクトディレクトリ day56_event_driven_architecture を作成します。
次に、このディレクトリ内に README.mdPROGRESS.md を作成します。

mkdir day56_event_driven_architecture

day56_event_driven_architecture ディレクトリを作成しました。
次に、このディレクトリ直下に README.md を作成し、今回作るアプリケーションの概要を記述します。
また、PROGRESS.md を作成し、開発の進捗を管理できるようにします。

README.mdPROGRESS.md を作成しました。

それでは、イベント駆動システムの仕様書を作成します。

イベント駆動型システム 仕様書

1. 概要

本システムは、オンラインショップにおける注文処理をイベント駆動アーキテクチャで構築するものです。購入、在庫管理、配送の各プロセスを独立したマイクロサービスとして開発し、非同期のイベント通知によって連携させます。これにより、スケーラビリティと耐障害性の高いシステムを目指します。

2. アーキテクチャ

2.1. 全体構成図

2.2. 使用技術スタック

  • 言語: Go
  • メッセージブローカー: NATS (Goクライアント: nats.go)
  • データベース: SQLite (各サービスが個別のDBファイルを持つ)
  • DBアクセス: database/sql パッケージ, github.com/mattn/go-sqlite3 ドライバ
  • API形式: 各サービスの必要に応じて HTTP/JSON (Go標準の net/http パッケージ)
  • 設定管理: 環境変数 or 設定ファイル (簡易的なもの)
  • ロギング: Go標準の log パッケージ

3. サービス詳細

3.1. 購入サービス (Order Service)

  • 役割:
    • ユーザーからの注文リクエストを受け付ける。
    • 注文情報を永続化する。
    • OrderCreated イベントを発行する。
    • 在庫確保失敗時 (StockReservationFailed) や配送失敗時 (ShipmentFailed) のイベントをリッスンし、注文ステータスを更新する。
  • APIエンドポイント:
    • POST /orders: 新規注文作成
      • リクエストボディ例: {"userId": "user123", "items": [{"productId": "prod001", "quantity": 2, "price": 1000}, {"productId": "prod002", "quantity": 1, "price": 500}]}
      • レスポンス例 (成功時): {"orderId": "orderXYZ", "status": "PENDING"}
    • GET /orders/:orderId: 注文情報取得
  • 発行するイベント:
    • OrderCreated
      • トピック: orders.created
      • ペイロード例: {"orderId": "orderXYZ", "userId": "user123", "items": [{"productId": "prod001", "quantity": 2}, ...], "totalAmount": 2500, "timestamp": "2023-10-27T10:00:00Z"}
  • リッスンするイベント:
    • StockReserved (トピック: inventory.reserved)
      • アクション: 注文ステータスを AWAITING_SHIPMENT に更新
    • StockReservationFailed (トピック: inventory.reservation_failed)
      • アクション: 注文ステータスを CANCELLED_NO_STOCK に更新、ユーザーに通知(今回はログ出力で代替)
    • ShipmentCompleted (トピック: shipping.completed)
      • アクション: 注文ステータスを COMPLETED に更新
    • ShipmentFailed (トピック: shipping.failed)
      • アクション: 注文ステータスを SHIPMENT_FAILED に更新、ユーザーに通知(今回はログ出力で代替)
  • データモデル (SQLite): orders テーブル
    • id (TEXT, PRIMARY KEY) - 注文ID
    • user_id (TEXT)
    • total_amount (INTEGER)
    • status (TEXT) - 例: PENDING, AWAITING_SHIPMENT, SHIPPED, COMPLETED, CANCELLED_NO_STOCK, SHIPMENT_FAILED
    • created_at (TEXT) - ISO8601形式
    • updated_at (TEXT) - ISO8601形式
      order_items テーブル
    • id (INTEGER, PRIMARY KEY AUTOINCREMENT)
    • order_id (TEXT, FOREIGN KEY REFERENCES orders(id))
    • product_id (TEXT)
    • quantity (INTEGER)
    • price_at_purchase (INTEGER) - 購入時の単価

3.2. 在庫サービス (Inventory Service)

  • 役割:
    • 商品の在庫数を管理する。
    • OrderCreated イベントをリッスンし、在庫引き当てを試みる。
    • 在庫確保成功なら StockReserved イベントを発行。
    • 在庫確保失敗なら StockReservationFailed イベントを発行。
    • 配送失敗 (ShipmentFailed) イベントをリッスンし、在庫を戻す(補償トランザクション)。
  • APIエンドポイント (管理用・任意):
    • GET /products/:productId/stock: 特定商品の在庫数確認
    • POST /products/:productId/stock: 特定商品の在庫数更新 (手動調整用)
  • 発行するイベント:
    • StockReserved
      • トピック: inventory.reserved
      • ペイロード例: {"orderId": "orderXYZ", "items": [{"productId": "prod001", "quantityReserved": 2}, ...], "timestamp": "2023-10-27T10:01:00Z"}
    • StockReservationFailed
      • トピック: inventory.reservation_failed
      • ペイロード例: {"orderId": "orderXYZ", "reason": "Insufficient stock for prod001", "failedItems": [{"productId": "prod001", "requestedQuantity": 2, "availableQuantity": 1}], "timestamp": "2023-10-27T10:01:00Z"}
  • リッスンするイベント:
    • OrderCreated (トピック: orders.created)
      • アクション: 注文内の各商品について在庫を確認し、引き当てる。
    • ShipmentFailed (トピック: shipping.failed)
      • アクション: 対象注文の確保済み在庫を戻す。ペイロードに orderIditems (失敗した商品情報) が含まれる想定。
  • データモデル (SQLite): products テーブル
    • id (TEXT, PRIMARY KEY) - 商品ID
    • name (TEXT)
    • stock_quantity (INTEGER)
    • reserved_quantity (INTEGER DEFAULT 0) - 予約済み在庫数(オプショナル、より厳密な管理のため)
    • updated_at (TEXT)

3.3. 配送サービス (Shipping Service)

  • 役割:
    • StockReserved イベントをリッスンし、配送プロセスを開始する。
    • 配送情報を永続化する。
    • ShipmentInitiated イベントを発行。
    • 配送処理の完了/失敗に応じて ShipmentCompleted / ShipmentFailed イベントを発行する。(今回はダミー処理として、一定時間後に成功またはランダムで失敗とする)
  • APIエンドポイント (任意):
    • GET /shipments/:shipmentId: 配送状況確認
  • 発行するイベント:
    • ShipmentInitiated
      • トピック: shipping.initiated
      • ペイロード例: {"shipmentId": "shipABC", "orderId": "orderXYZ", "shippingAddress": "...", "timestamp": "2023-10-27T10:02:00Z"}
    • ShipmentCompleted
      • トピック: shipping.completed
      • ペイロード例: {"shipmentId": "shipABC", "orderId": "orderXYZ", "trackingNumber": "track123", "timestamp": "2023-10-27T10:05:00Z"}
    • ShipmentFailed
      • トピック: shipping.failed
      • ペイロード例: {"shipmentId": "shipABC", "orderId": "orderXYZ", "reason": "Address not found", "items": [{"productId": "prod001", "quantity": 2}, ...], "timestamp": "2023-10-27T10:05:00Z"} (在庫サービスが補償処理できるよう、商品情報を含める)
  • リッスンするイベント:
    • StockReserved (トピック: inventory.reserved)
      • アクション: 新しい配送記録を作成し、配送処理を開始する (ダミー)。
  • データモデル (SQLite): shipments テーブル
    • id (TEXT, PRIMARY KEY) - 配送ID
    • order_id (TEXT, UNIQUE) - 対応する注文ID
    • status (TEXT) - 例: PENDING, PROCESSING, SHIPPED, DELIVERED, FAILED
    • shipping_address (TEXT) - (今回はOrderCreatedイベントから取得するか、ダミーで生成)
    • tracking_number (TEXT, NULLABLE)
    • created_at (TEXT)
    • updated_at (TEXT)

4. イベントフロー例

4.1. 正常系フロー (注文成功 → 在庫確保成功 → 配送成功)

4.2. 異常系フロー (在庫確保成功 → 配送失敗 → 在庫戻し)

5. Goプロジェクト ディレクトリ構成案

day56_event_driven_architecture/
├── cmd/
│   ├── order_service/
│   │   └── main.go
│   ├── inventory_service/
│   │   └── main.go
│   ├── shipping_service/
│   │   └── main.go
├── internal/
│   ├── config/
│   │   └── config.go      # 設定読み込み
│   ├── database/
│   │   └── sqlite.go      # SQLite初期化・ヘルパー
│   ├── event/
│   │   ├── nats.go        # NATS接続・パブリッシュ・サブスクライブ
│   │   └── types.go       # イベントの型定義
│   ├── order/             # 購入サービス関連
│   │   ├── api.go
│   │   ├── handler.go
│   │   ├── model.go
│   │   └── repository.go
│   ├── inventory/         # 在庫サービス関連
│   │   ├── handler.go
│   │   ├── model.go
│   │   └── repository.go
│   ├── shipping/          # 配送サービス関連
│   │   ├── handler.go
│   │   ├── model.go
│   │   └── repository.go
│   └── util/
│       └── util.go        # 共通ユーティリティ
├── go.mod
├── go.sum
├── README.md
├── PROGRESS.md
└── docker-compose.yml     # (オプション) NATS と各サービスを起動するため
  • cmd/: 各サービスの main パッケージを配置。
  • internal/: プロジェクト内部でのみ使用される共有パッケージを配置。
    • config: 設定ファイルの読み込みなど。
    • database: データベース接続やマイグレーション処理。
    • event: NATSクライアントのラッパーやイベント構造体の定義。
    • 各サービス (order, inventory, shipping) のドメインロジック、APIハンドラ、リポジトリ。
  • docker-compose.yml: NATSサーバーと各Goサービスをローカルで簡単に起動するためのファイル(オプション)。

6. 開発ステップ (PROGRESS.md と連携)

  1. 仕様定義 (完了)
  2. プロジェクト初期セットアップ
    • day56_event_driven_architecture ディレクトリに移動。
    • 各サービス (order_service, inventory_service, shipping_service) の cmd ディレクトリと main.go ファイルを作成。
    • ルートで go mod init github.com/lirlia/100day_challenge_backend/day56_event_driven_architecture を実行。
    • 必要なパッケージ (nats.go, go-sqlite3 など) を go get する。
    • internal 配下の基本ディレクトリ構造を作成。
  3. メッセージブローカー (NATS) のセットアップ
    • docker-compose.yml でNATSサーバーを起動できるようにする(またはローカルにNATSをインストール)。
    • internal/event/nats.go でNATS接続、Publish/Subscribeの基本関数を実装。
  4. 各サービスの実装 (購入 → 在庫 → 配送の順)
    • データモデルとDB (SQLite): 各サービスに対応する internal/<service>/model.gointernal/<service>/repository.go (DB操作) を実装。internal/database/sqlite.go でDB初期化処理。
    • イベント処理:
      • 発行側: イベントペイロードを定義 (internal/event/types.go) し、nats.Publish を呼び出す。
      • リッスン側: nats.Subscribe でイベントを待ち受け、internal/<service>/handler.go で処理ロジックを実装。
    • APIエンドポイント (購入サービス): internal/order/api.go でHTTPハンドラを実装し、cmd/order_service/main.go でルーティング設定とサーバー起動。
  5. サービス間連携テスト:
    • OrderCreated イベント発行から StockReserved/StockReservationFailed を経て、ShipmentCompleted/ShipmentFailed まで一連のフローをテスト。
    • 各サービスのログを確認し、イベントが正しく伝播し処理されているか確認。
    • 失敗系(在庫不足、配送失敗と補償処理)もテスト。
  6. README更新: 最終的な構成や実行方法などを追記。

上記仕様書について、いかがでしょうか?
特に確認したい点や変更したい点があれば教えてください。
問題なければ、この仕様に基づいて開発を進めます。
まずは、プロジェクトの初期セットアップ(ディレクトリ作成とGoモジュールの初期化)から開始します。仕様書、ありがとうございます。非常にわかりやすいです。
この内容で進めてください。

最初のステップとして、プロジェクトの初期セットアップをお願いします。
day56_event_driven_architecture ディレクトリに移動し、Goのプロジェクトとして初期化し、提案のあったディレクトリ構造を作成してください。
また、NATSとSQLiteのGoライブラリも go get してください。

  1. カレントディレクトリの確認
  2. day56_event_driven_architecture への移動
  3. go mod init の実行
  4. 必要なディレクトリの作成 (cmd/order_service, cmd/inventory_service, cmd/shipping_service, internal/config, internal/database, internal/event, internal/order, internal/inventory, internal/shipping, internal/util)
  5. 各サービス (order, inventory, shipping) の main.go ファイルのプレースホルダ作成
  6. go getgithub.com/nats-io/nats.gogithub.com/mattn/go-sqlite3 をインストール
  7. docker-compose.yml の雛形作成 (NATSサービスのみ)
  8. 作成されたファイルとディレクトリ構造の確認

上記ステップでお願いします。

riddle_tecriddle_tec

func main() {

	natsURL := os.Getenv("NATS_URL")
	if natsURL == "" {
		natsURL = defaultNatsURL
	}

	if err := event.ConnectNATS(natsURL); err != nil {
		log.Fatalf("Failed to connect to NATS: %v", err)
	}

	if err := shipping.InitShippingDB(); err != nil {
		log.Fatalf("Failed to initialize shipping database: %v", err)
	}

	sub, err := event.SubscribeToEvent(event.StockReservedSubject, shipping.HandleStockReservedEvent)
	if err != nil {
		log.Fatalf("Failed to subscribe to StockReservedEvent: %v", err)
	}
	log.Printf("Shipping Service subscribed to %s", event.StockReservedSubject)

	// Keep the service running and wait for signals to gracefully shutdown
	c := make(chan os.Signal, 1)
	signal.Notify(c, os.Interrupt, syscall.SIGTERM)

	go func() {
		<-c
		log.Println("Shutdown signal received. Unsubscribing and closing NATS connection...")
		if sub != nil {
			sub.Unsubscribe()
			log.Println("Unsubscribed from NATS subject", event.StockReservedSubject)
		}
		event.CloseNATS()
		os.Exit(0)
	}()

	log.Println("Shipping Service is running. Waiting for events or shutdown signal...")
	runtime.Goexit()
}
func HandleStockReservedEvent(msg *nats.Msg) {
	log.Printf("Received StockReservedEvent for subject: %s", msg.Subject)
	var stockEvent event.StockReservedEvent
	if err := json.Unmarshal(msg.Data, &stockEvent); err != nil {
		log.Printf("Error unmarshalling StockReservedEvent: %v. Message data: %s", err, string(msg.Data))
		return
	}

	log.Printf("Processing shipment for order %s with %d item types", stockEvent.OrderID, len(stockEvent.Items))

	// For simplicity, assume we get UserID and ShippingAddress from the Order event or a shared context.
	// Here, we'll use placeholders or derive them if possible.
	// In a real system, OrderService might include UserID in StockReservedEvent or ShippingService might query OrderService.
	// Let's assume OrderCreatedEvent is also listened to by ShippingService to get UserID and Address, or they are passed through.
	// For now, using placeholders.
	dummyUserID := "user_placeholder"
	dummyShippingAddress := "123 Main St, Anytown, USA"

	// 1. Create Shipment Record
	shipmentItems := make([]event.OrderItem, len(stockEvent.Items))
	for i, item := range stockEvent.Items {
		shipmentItems[i] = event.OrderItem{ProductID: item.ProductID, Quantity: item.Quantity}
	}

	createdShipment, err := CreateShipment(stockEvent.OrderID, dummyUserID, dummyShippingAddress, shipmentItems)
	if err != nil {
		log.Printf("CRITICAL: Failed to create shipment record for order %s: %v", stockEvent.OrderID, err)
		// This is a severe issue. May need to alert or retry.
		// If we can't even create a shipment record, we can't proceed to publish failure for this shipment.
		return
	}
	log.Printf("Shipment record %s created for order %s", createdShipment.ID, stockEvent.OrderID)

	// 2. Publish ShipmentInitiatedEvent
	shipmentInitiatedEv := event.ShipmentInitiatedEvent{
		ShipmentID:      createdShipment.ID,
		OrderID:         createdShipment.OrderID,
		ShippingAddress: createdShipment.ShippingAddress,
		Timestamp:       time.Now(),
	}
	if err := event.PublishEvent(event.ShipmentInitiatedSubject, shipmentInitiatedEv); err != nil {
		log.Printf("CRITICAL: Failed to publish ShipmentInitiatedEvent for shipment %s (order %s): %v", createdShipment.ID, createdShipment.OrderID, err)
		// If this fails, subsequent events won't make sense. Consider a retry or a specific failure state.
		// For now, we will attempt to continue the dummy process and publish a failure if that occurs.
	}
	log.Printf("ShipmentInitiatedEvent published for shipment %s", createdShipment.ID)

	// 3. Simulate Shipment Processing (Dummy Logic)
	go simulateShipmentProcessing(createdShipment, stockEvent.Items) // Run in a goroutine to not block the event handler
}

なるほど、イベント駆動はこういう実装なのか。
NATS を topic でサブスクして goroutine ね〜。なるほ。

このスクラップは3ヶ月前にクローズされました