⚡
AWS Lambda + EventBridgeで宿泊在庫の自動調整システム
はじめに
複数のOTA(Booking.com、Expedia等)で客室を販売する際、在庫の同期は重要な課題です。オーバーブッキングを防ぎつつ、販売機会を最大化する自動調整システムをAWS Lambda + EventBridgeで実装します。
システムアーキテクチャ
# serverless.yml
service: hotel-inventory-sync
provider:
name: aws
runtime: python3.9
region: ap-northeast-1
functions:
inventoryAdjuster:
handler: handler.adjust_inventory
events:
- eventBridge:
pattern:
source:
- hotel.booking
detail-type:
- ReservationCreated
- ReservationCancelled
environment:
INVENTORY_TABLE: ${self:custom.inventoryTable}
OTA_SECRETS_ARN: ${self:custom.otaSecretsArn}
timeout: 30
memorySize: 512
inventoryMonitor:
handler: handler.monitor_inventory
events:
- schedule: rate(5 minutes)
environment:
THRESHOLD_PERCENTAGE: 20
ALERT_TOPIC_ARN: ${self:custom.alertTopicArn}
resources:
Resources:
InventoryTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: hotel-inventory
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName: room_type_date
AttributeType: S
KeySchema:
- AttributeName: room_type_date
KeyType: HASH
在庫調整ロジックの実装
# handler.py
import json
import boto3
from datetime import datetime
from typing import Dict, List
import logging
from botocore.exceptions import ClientError
logger = logging.getLogger()
logger.setLevel(logging.INFO)
dynamodb = boto3.resource('dynamodb')
secrets_client = boto3.client('secretsmanager')
table = dynamodb.Table(os.environ['INVENTORY_TABLE'])
class InventoryAdjuster:
def __init__(self):
self.ota_clients = self._initialize_ota_clients()
def _initialize_ota_clients(self) -> Dict:
"""OTA API認証情報の取得"""
try:
secret = secrets_client.get_secret_value(
SecretId=os.environ['OTA_SECRETS_ARN']
)
credentials = json.loads(secret['SecretString'])
return {
'booking': BookingAPIClient(credentials['booking']),
'expedia': ExpediaAPIClient(credentials['expedia']),
'agoda': AgodaAPIClient(credentials['agoda'])
}
except ClientError as e:
logger.error(f"Failed to retrieve OTA credentials: {e}")
raise
def process_reservation(self, event_detail: Dict) -> Dict:
"""予約イベント処理と在庫調整"""
room_type = event_detail['room_type']
check_in = event_detail['check_in_date']
check_out = event_detail['check_out_date']
quantity = event_detail['quantity']
action = event_detail['action'] # 'reserve' or 'cancel'
results = {
'processed_dates': [],
'sync_status': {},
'errors': []
}
# 日付範囲で在庫更新
current_date = datetime.strptime(check_in, '%Y-%m-%d')
end_date = datetime.strptime(check_out, '%Y-%m-%d')
while current_date < end_date:
date_str = current_date.strftime('%Y-%m-%d')
try:
# DynamoDBから現在の在庫取得
inventory = self._get_inventory(room_type, date_str)
# 在庫調整
if action == 'reserve':
new_available = max(0, inventory['available'] - quantity)
else: # cancel
new_available = inventory['available'] + quantity
# 最大在庫数を超えないようにする
new_available = min(new_available, inventory['total_rooms'])
# DynamoDB更新(条件付き書き込みで競合制御)
self._update_inventory(room_type, date_str, new_available, inventory['version'])
# 各OTAに在庫同期
sync_results = self._sync_to_otas(room_type, date_str, new_available)
results['sync_status'][date_str] = sync_results
results['processed_dates'].append(date_str)
except Exception as e:
error_msg = f"Failed to process {date_str}: {str(e)}"
logger.error(error_msg)
results['errors'].append(error_msg)
# リトライロジック
if not self._retry_with_backoff(room_type, date_str, action, quantity):
# Dead Letter Queueに送信
self._send_to_dlq(event_detail, error_msg)
current_date = current_date + timedelta(days=1)
return results
def _sync_to_otas(self, room_type: str, date: str, available: int) -> Dict:
"""OTA在庫同期(並列処理)"""
sync_results = {}
for ota_name, client in self.ota_clients.items():
try:
# タイムアウトとリトライ設定
response = client.update_availability(
room_type=room_type,
date=date,
available=available,
timeout=10,
max_retries=3
)
sync_results[ota_name] = {
'status': 'success',
'response_code': response.get('code')
}
except Exception as e:
sync_results[ota_name] = {
'status': 'failed',
'error': str(e)
}
# 特定OTAのエラーは全体を止めない
logger.warning(f"OTA sync failed for {ota_name}: {e}")
# Circuit Breaker パターン
if self._should_circuit_break(ota_name):
self._disable_ota_temporarily(ota_name)
return sync_results
def _retry_with_backoff(self, room_type: str, date: str,
action: str, quantity: int, attempt: int = 0) -> bool:
"""指数バックオフでリトライ"""
if attempt >= 3:
return False
wait_time = (2 ** attempt) + random.uniform(0, 1)
time.sleep(wait_time)
try:
return self.process_single_date(room_type, date, action, quantity)
except Exception:
return self._retry_with_backoff(room_type, date, action, quantity, attempt + 1)
def adjust_inventory(event, context):
"""Lambda ハンドラー"""
adjuster = InventoryAdjuster()
try:
# EventBridgeイベントの詳細取得
event_detail = event['detail']
# 在庫調整実行
result = adjuster.process_reservation(event_detail)
# CloudWatchメトリクス送信
send_metrics(result)
return {
'statusCode': 200,
'body': json.dumps(result)
}
except Exception as e:
logger.error(f"Critical error in inventory adjustment: {e}")
# SNSアラート送信
send_alert(f"Inventory sync critical failure: {str(e)}")
raise
エラーハンドリングとモニタリング
# monitoring.py
def monitor_inventory(event, context):
"""在庫レベル監視(5分ごと実行)"""
threshold = int(os.environ['THRESHOLD_PERCENTAGE'])
# 明日から7日間の在庫をチェック
for days_ahead in range(1, 8):
check_date = (datetime.now() + timedelta(days=days_ahead)).strftime('%Y-%m-%d')
inventory_levels = get_all_room_inventory(check_date)
for room_type, data in inventory_levels.items():
occupancy_rate = ((data['total'] - data['available']) / data['total']) * 100
if data['available'] <= (data['total'] * threshold / 100):
send_low_inventory_alert(room_type, check_date, data['available'])
まとめ
このシステムにより、複数OTAとの在庫同期を自動化し、オーバーブッキングリスクを最小化できます。EventBridgeによるイベント駆動、DynamoDBの条件付き書き込み、そして包括的なエラーハンドリングにより、高い信頼性を実現しています。
次回は、このシステムと連携するフロントエンドのリアルタイム更新について解説します。
Discussion