AWS Lambda + EventBridgeで宿泊在庫の自動調整システム

に公開

はじめに

複数のOTA(Booking.com、Expedia等)で客室を販売する際、在庫の同期は重要な課題です。オーバーブッキングを防ぎつつ、販売機会を最大化する自動調整システムをAWS Lambda + EventBridgeで実装します。

システムアーキテクチャ

# serverless.yml
service: hotel-inventory-sync

provider:
  name: aws
  runtime: python3.9
  region: ap-northeast-1

functions:
  inventoryAdjuster:
    handler: handler.adjust_inventory
    events:
      - eventBridge:
          pattern:
            source:
              - hotel.booking
            detail-type:
              - ReservationCreated
              - ReservationCancelled
    environment:
      INVENTORY_TABLE: ${self:custom.inventoryTable}
      OTA_SECRETS_ARN: ${self:custom.otaSecretsArn}
    timeout: 30
    memorySize: 512

  inventoryMonitor:
    handler: handler.monitor_inventory
    events:
      - schedule: rate(5 minutes)
    environment:
      THRESHOLD_PERCENTAGE: 20
      ALERT_TOPIC_ARN: ${self:custom.alertTopicArn}

resources:
  Resources:
    InventoryTable:
      Type: AWS::DynamoDB::Table
      Properties:
        TableName: hotel-inventory
        BillingMode: PAY_PER_REQUEST
        AttributeDefinitions:
          - AttributeName: room_type_date
            AttributeType: S
        KeySchema:
          - AttributeName: room_type_date
            KeyType: HASH

在庫調整ロジックの実装

# handler.py
import json
import boto3
from datetime import datetime
from typing import Dict, List
import logging
from botocore.exceptions import ClientError

logger = logging.getLogger()
logger.setLevel(logging.INFO)

dynamodb = boto3.resource('dynamodb')
secrets_client = boto3.client('secretsmanager')
table = dynamodb.Table(os.environ['INVENTORY_TABLE'])

class InventoryAdjuster:
    def __init__(self):
        self.ota_clients = self._initialize_ota_clients()
        
    def _initialize_ota_clients(self) -> Dict:
        """OTA API認証情報の取得"""
        try:
            secret = secrets_client.get_secret_value(
                SecretId=os.environ['OTA_SECRETS_ARN']
            )
            credentials = json.loads(secret['SecretString'])
            
            return {
                'booking': BookingAPIClient(credentials['booking']),
                'expedia': ExpediaAPIClient(credentials['expedia']),
                'agoda': AgodaAPIClient(credentials['agoda'])
            }
        except ClientError as e:
            logger.error(f"Failed to retrieve OTA credentials: {e}")
            raise

    def process_reservation(self, event_detail: Dict) -> Dict:
        """予約イベント処理と在庫調整"""
        room_type = event_detail['room_type']
        check_in = event_detail['check_in_date']
        check_out = event_detail['check_out_date']
        quantity = event_detail['quantity']
        action = event_detail['action']  # 'reserve' or 'cancel'
        
        results = {
            'processed_dates': [],
            'sync_status': {},
            'errors': []
        }
        
        # 日付範囲で在庫更新
        current_date = datetime.strptime(check_in, '%Y-%m-%d')
        end_date = datetime.strptime(check_out, '%Y-%m-%d')
        
        while current_date < end_date:
            date_str = current_date.strftime('%Y-%m-%d')
            
            try:
                # DynamoDBから現在の在庫取得
                inventory = self._get_inventory(room_type, date_str)
                
                # 在庫調整
                if action == 'reserve':
                    new_available = max(0, inventory['available'] - quantity)
                else:  # cancel
                    new_available = inventory['available'] + quantity
                    # 最大在庫数を超えないようにする
                    new_available = min(new_available, inventory['total_rooms'])
                
                # DynamoDB更新(条件付き書き込みで競合制御)
                self._update_inventory(room_type, date_str, new_available, inventory['version'])
                
                # 各OTAに在庫同期
                sync_results = self._sync_to_otas(room_type, date_str, new_available)
                results['sync_status'][date_str] = sync_results
                results['processed_dates'].append(date_str)
                
            except Exception as e:
                error_msg = f"Failed to process {date_str}: {str(e)}"
                logger.error(error_msg)
                results['errors'].append(error_msg)
                
                # リトライロジック
                if not self._retry_with_backoff(room_type, date_str, action, quantity):
                    # Dead Letter Queueに送信
                    self._send_to_dlq(event_detail, error_msg)
            
            current_date = current_date + timedelta(days=1)
        
        return results

    def _sync_to_otas(self, room_type: str, date: str, available: int) -> Dict:
        """OTA在庫同期(並列処理)"""
        sync_results = {}
        
        for ota_name, client in self.ota_clients.items():
            try:
                # タイムアウトとリトライ設定
                response = client.update_availability(
                    room_type=room_type,
                    date=date,
                    available=available,
                    timeout=10,
                    max_retries=3
                )
                sync_results[ota_name] = {
                    'status': 'success',
                    'response_code': response.get('code')
                }
                
            except Exception as e:
                sync_results[ota_name] = {
                    'status': 'failed',
                    'error': str(e)
                }
                
                # 特定OTAのエラーは全体を止めない
                logger.warning(f"OTA sync failed for {ota_name}: {e}")
                
                # Circuit Breaker パターン
                if self._should_circuit_break(ota_name):
                    self._disable_ota_temporarily(ota_name)
        
        return sync_results

    def _retry_with_backoff(self, room_type: str, date: str, 
                           action: str, quantity: int, attempt: int = 0) -> bool:
        """指数バックオフでリトライ"""
        if attempt >= 3:
            return False
            
        wait_time = (2 ** attempt) + random.uniform(0, 1)
        time.sleep(wait_time)
        
        try:
            return self.process_single_date(room_type, date, action, quantity)
        except Exception:
            return self._retry_with_backoff(room_type, date, action, quantity, attempt + 1)

def adjust_inventory(event, context):
    """Lambda ハンドラー"""
    adjuster = InventoryAdjuster()
    
    try:
        # EventBridgeイベントの詳細取得
        event_detail = event['detail']
        
        # 在庫調整実行
        result = adjuster.process_reservation(event_detail)
        
        # CloudWatchメトリクス送信
        send_metrics(result)
        
        return {
            'statusCode': 200,
            'body': json.dumps(result)
        }
        
    except Exception as e:
        logger.error(f"Critical error in inventory adjustment: {e}")
        
        # SNSアラート送信
        send_alert(f"Inventory sync critical failure: {str(e)}")
        
        raise

エラーハンドリングとモニタリング

# monitoring.py
def monitor_inventory(event, context):
    """在庫レベル監視(5分ごと実行)"""
    threshold = int(os.environ['THRESHOLD_PERCENTAGE'])
    
    # 明日から7日間の在庫をチェック
    for days_ahead in range(1, 8):
        check_date = (datetime.now() + timedelta(days=days_ahead)).strftime('%Y-%m-%d')
        
        inventory_levels = get_all_room_inventory(check_date)
        
        for room_type, data in inventory_levels.items():
            occupancy_rate = ((data['total'] - data['available']) / data['total']) * 100
            
            if data['available'] <= (data['total'] * threshold / 100):
                send_low_inventory_alert(room_type, check_date, data['available'])

まとめ

このシステムにより、複数OTAとの在庫同期を自動化し、オーバーブッキングリスクを最小化できます。EventBridgeによるイベント駆動、DynamoDBの条件付き書き込み、そして包括的なエラーハンドリングにより、高い信頼性を実現しています。

次回は、このシステムと連携するフロントエンドのリアルタイム更新について解説します。

Discussion