実践!LambdaアーキテクチャのスピードレイヤーとKafkaストリーミング設計入門
前回の記事では、Lambdaアーキテクチャの設計原則と実装について紹介しました。
その中で3つのレイヤーを取り上げました:
- Batch Layer – 遅いが正確。履歴データ全体を処理して最終的一貫性を担保する
- Speed Layer – 増分データをリアルタイムに処理する
- Serving Layer – 両レイヤーの結果をマージし、統一的なクエリインターフェースを提供する
今日からは、スピードレイヤーの実装に踏み込みます。
まずKafka Consumerの作成から始め、ストリーム処理の中核メカニズムを段階的に探っていきます。
本日の記事では、スピードレイヤーの重要コンポーネントであるKafka Consumerを実装します。これは注文データを即時に受信してServing DBに書き込む役割を担います(バッチレイヤーは毎日、履歴データ全体の処理に注力します)。
⚠️ 重要
本記事のコードは教育・概念実証を目的とした擬似コードです。
実運用を想定した完成品ではありません。
現場の課題を理解・解決するための設計思考や全体像の把握に焦点を当ててください。
低レベルなコード詳細は流し読みでも構いません。
Step 1: スピードレイヤーのKafka Consumer:注文をServing DBに取り込む
最もシンプルなバージョンから始めましょう:
from kafka import KafkaConsumer
import json
# ordersトピックを購読
consumer = KafkaConsumer('orders')
print("[Speed Layer] 新しい注文を待機中...")
for message in consumer:
order = json.loads(message.value.decode('utf-8'))
insert_db(order)
conn.commit()
print(f"[Speed Layer] 注文 {order['id']} を挿入しました")
このロジックはシンプルです:
- Kafkaがリアルタイムの注文データストリームを提供する
- Consumerがストリームを消費して処理する
- Serving DBが処理済み結果を保存する
- コアタスク:受信データを可能な限り速くストレージに書き込む
Step 2: Serving DBの設計
ここでは2つの主要テーブルを用意します:
-
orders_batch_summary– バッチレイヤーが日次で計算した事前集計済みの履歴データ -
orders_realtime– スピードレイヤーが送るリアルタイムの詳細注文データ
ダッシュボードからのクエリ時には、無効な注文(status = 'removed')を除外しつつ両テーブルをマージします:
SELECT status, SUM(count) AS total
FROM (
SELECT status, count(*)
FROM orders_batch_summary
WHERE status != 'removed'
UNION ALL
SELECT status, COUNT(*) AS count
FROM orders_realtime
WHERE status != 'removed'
GROUP BY status
) t
GROUP BY status;
しかし、会社の成長に伴い、開発者ごとに独自のConsumerが増えると、実装はちぐはぐになり、やがてコードベースはスパゲッティ化して保守不能になります。
ここで必要なのはリファクタリングです。全員が共通フレームワーク上で開発できるよう、統一的なストリーム処理アーキテクチャを確立します。
Source抽象化レイヤーの設計
チーム開発では、Kafka Consumerの実装がバラバラだと統合が難しくなります。
その解決策が、統一されたSourceインターフェースの定義です。
Sourceアーキテクチャ設計
┌─────────────┐
│ BaseSource │ ◄── 抽象インターフェース
│ │
│ + run() │
└─────────────┘
△
│ implements
┌─────────────┐
│KafkaSource │ ◄── 具象実装
│ │
│ + run() │
└─────────────┘
Sourceコアの段階的な説明
Step 1: BaseSource 抽象インターフェースの定義
from abc import ABC, abstractmethod
class BaseSource(ABC):
def __init__(self, name: str):
self.name = name
@abstractmethod
def run(self):
pass
要点:
- 各Sourceは一意の
nameを持つ -
run()は抽象メソッドで、サブクラスに実装を強制
Step 2: SimpleKafkaSource の初期化
class SimpleKafkaSource(BaseSource):
def __init__(self, name: str, topic: str, broker_address: str = "localhost:9092"):
super().__init__(name)
self.topic = topic
self.broker_address = broker_address
self.consumer = None
self.message_handler = self._default_handler
要点:
-
BaseSourceを継承し、統一インターフェースに準拠 - 差し替え可能な
message_handlerで処理ロジックを外部注入できる柔軟性
Step 3: Kafka Consumerのセットアップ
def _setup_consumer(self):
self.consumer = KafkaConsumer(
self.topic,
bootstrap_servers=self.broker_address,
group_id=f"simple-source-{self.name}",
auto_offset_reset='latest',
value_deserializer=lambda m: json.loads(m.decode('utf-8')) if m else None
)
要点:
- 競合回避のため
group_idを自動生成 -
auto_offset_reset='latest'で最新メッセージから消費開始 - JSONを自動デシリアライズ
Step 4: コア実行ロジック
def run(self):
self._setup_consumer() # Consumerを初期化
for message in self.consumer: # メッセージを継続的に監視
self.message_handler({
'key': message.key,
'value': message.value,
'topic': message.topic,
'offset': message.offset
})
実行フロー:
- Kafka Consumerを初期化
- トピックからメッセージを継続的に取得
- メッセージを標準フォーマットにラップ
-
message_handlerを呼び出して処理
要点:
Sourceは**取り込み(ingestion)**のみに専念し、実際の処理は外部から注入されたmessage_handlerに委譲することで高い柔軟性を確保します。
import logging
import json
from abc import ABC, abstractmethod
from typing import Optional, Callable, Any
from kafka import KafkaConsumer
logger = logging.getLogger(__name__)
class BaseSource(ABC):
"""すべてのSourceの基本抽象クラス"""
def __init__(self, name: str):
self.name = name
self._running = False
@abstractmethod
def run(self):
"""メイン実行メソッド"""
pass
def stop(self):
"""Sourceを停止"""
self._running = False
logger.info(f"Source {self.name} を停止しました")
class SimpleKafkaSource(BaseSource):
"""シンプルなKafka Source実装"""
def __init__(
self,
name: str,
topic: str,
broker_address: str = "localhost:9092",
consumer_group: Optional[str] = None,
message_handler: Optional[Callable[[Any], None]] = None
):
super().__init__(name)
self.topic = topic
self.broker_address = broker_address
self.consumer_group = consumer_group or f"simple-source-{name}"
self.message_handler = message_handler or self._default_handler
self.consumer: Optional[KafkaConsumer] = None
def _default_handler(self, message):
print(f"[{self.name}] メッセージ受信: {message}")
def _setup_consumer(self):
try:
self.consumer = KafkaConsumer(
self.topic,
bootstrap_servers=self.broker_address,
group_id=self.consumer_group,
auto_offset_reset='latest',
value_deserializer=lambda m: json.loads(m.decode('utf-8')) if m else None,
key_deserializer=lambda m: m.decode('utf-8') if m else None
)
logger.info(f"Consumerをセットアップ: topic={self.topic}, group={self.consumer_group}")
except Exception as e:
logger.error(f"Consumerのセットアップに失敗: {e}")
raise
def run(self):
logger.info(f"Source {self.name} を起動 (topic={self.topic})")
self._setup_consumer()
self._running = True
try:
while self._running:
message_batch = self.consumer.poll(timeout_ms=1000)
for topic_partition, messages in message_batch.items():
for message in messages:
if not self._running:
break
try:
self.message_handler({
'key': message.key,
'value': message.value,
'topic': message.topic,
'partition': message.partition,
'offset': message.offset,
'timestamp': message.timestamp
})
except Exception as e:
logger.error(f"メッセージ処理でエラー: {e}")
except KeyboardInterrupt:
logger.info("割り込みシグナルを受信")
except Exception as e:
logger.error(f"実行ループでエラー: {e}")
finally:
if self.consumer:
self.consumer.close()
logger.info(f"Source {self.name} を終了")
def stop(self):
super().stop()
if self.consumer:
self.consumer.close()
Sink抽象化レイヤーの設計
スピードレイヤーのアーキテクチャでは、Sourceが入力、Sinkが出力を担当します。
実装の不統一を避けるため、統一されたSinkインターフェースを定義します。
Sinkアーキテクチャ設計
┌─────────────┐
│ BaseSink │ ◄── 抽象インターフェース
│ │
│ + write() │
└─────────────┘
△
│ implements
┌──────────────────┐
│SimplePostgreSQL │ ◄── 具象実装
│Sink │
│ + write() │
└──────────────────┘
Sinkコアの段階的な説明
Step 1: Base Sinkインターフェースの定義
from abc import ABC, abstractmethod
class BaseSink(ABC):
def __init__(self, name: str):
self.name = name
@abstractmethod
def write(self, message):
pass
def setup(self):
pass # 既定では何もしない
要点:
- 各Sinkは一意の
nameを持つ -
write()が実際の書き込みを担当 -
setup()は任意でオーバーライド可能
Step 2: Simple PostgreSQL Sinkの初期化
class SimplePostgreSQLSink(BaseSink):
def __init__(self, name: str, host: str, dbname: str, table_name: str):
super().__init__(name)
self.host = host
self.dbname = dbname
self.table_name = table_name
self.connection = None
要点:
-
BaseSinkを継承して一貫性を担保 - DB接続情報を保持
- 遅延初期化(
connection = None)
Step 3: write() のコアロジック
def write(self, message):
# カラムを自動検出してDBへ挿入
data = message.get('value', {})
# ... SQLを動的に生成して挿入
要点:
message['value']のフィールド構造を自動検出し、INSERT SQLを動的生成して書き込みます。
これにより、Sinkは多様なスキーマに自動適応できます。
完全版 Sink実装
import logging
from abc import ABC, abstractmethod
from typing import Any, Dict
try:
import psycopg2
from psycopg2.extras import Json
from psycopg2 import sql
except ImportError:
psycopg2 = None
print("警告: psycopg2がインストールされていません。pip install psycopg2-binary を実行してください")
logger = logging.getLogger(__name__)
class BaseSink(ABC):
"""すべてのSinkの基本抽象クラス"""
def __init__(self, name: str):
self.name = name
@abstractmethod
def write(self, message: Dict[str, Any]):
"""メッセージを書き込む"""
pass
def setup(self):
"""接続を初期化"""
pass
def close(self):
"""接続をクローズ"""
pass
class SimplePostgreSQLSink(BaseSink):
"""自動スキーマ検出を備えたPostgreSQL Sink"""
def __init__(
self,
name: str,
host: str,
port: int,
dbname: str,
user: str,
password: str,
table_name: str
):
super().__init__(name)
self.host = host
self.port = port
self.dbname = dbname
self.user = user
self.password = password
self.table_name = table_name
self.connection = None
def setup(self):
if psycopg2 is None:
raise ImportError("psycopg2が必要です")
self.connection = psycopg2.connect(
host=self.host,
port=self.port,
dbname=self.dbname,
user=self.user,
password=self.password
)
logger.info(f"PostgreSQLに接続: {self.host}:{self.port}/{self.dbname}")
def write(self, message: Dict[str, Any]):
"""フィールドを自動検出してPostgreSQLに書き込む"""
data = message.get('value', {})
# ... 動的フィールド検出とSQL実行ロジック
def close(self):
if self.connection:
self.connection.close()
logger.info("PostgreSQL接続をクローズしました")
Simple Streaming Engine:統合管理レイヤー
Source(入力)とSink(出力)の間には、オーケストレーション、モニタリング、ライフサイクル管理を担う統合管理レイヤーが必要です。
このコンポーネントがSimpleStreamingEngineです。
Simple Streaming Engineアーキテクチャ設計
┌─────────────────────┐
│SimpleStreamingEngine│ ◄── 中央管理コンポーネント
│ │
│ +add_source() │
│ +add_sink() │
│ + run() │
└─────────────────────┘
│
│ manages
▼
┌──────────────┐ ┌──────────────┐
│ Source │───▶│ Sink │
│ │ │ │
│ KafkaSource │ │PostgreSQLSink│
└──────────────┘ └──────────────┘
Simple Streaming Engineコアコードの段階的な解説
Step 1: Simple Streaming Engineの初期化
class SimpleStreamingEngine:
def __init__(self, name: str = "simple-streaming-app"):
self.name = name
self._sources = [] # Sourceのリスト
self._sinks = [] # Sinkのリスト
要点:
-
SimpleStreamingEngineはSourceとSinkの2つのリストを管理 - 両者に対して統一的な登録インターフェースを提供
Step 2: SourceとSinkの登録
def add_source(self, source: BaseSource):
self._sources.append(source)
def add_sink(self, sink: BaseSink):
self._sinks.append(sink)
要点:
- 複数のSource/Sinkをサポートするリスト管理
-
BaseSource/BaseSinkを実装していれば登録可能
Step 3: コア実行ロジック
def run(self):
# すべてのSinkを初期化
for sink in self._sinks:
sink.setup()
# 各Sourceにメッセージハンドラを設定
for source in self._sources:
source.message_handler = self._create_message_handler()
source.run() # データ消費を開始
実行フロー:
- すべてのSinkの接続初期化
- Sourceにハンドラを注入
- Sourceを起動し、データ処理を開始
Step 4: メッセージハンドラのコアロジック
def _create_message_handler(self):
def handler(message):
# 受信メッセージをすべてのSinkへ転送
for sink in self._sinks:
sink.write(message)
return handler
詳細なデータフロー説明:
-
SimpleStreamingEngine起動時:
# SimpleStreamingEngine.run() 内 for source in self._sources: source.message_handler = self._create_message_handler() # ハンドラ注入 source.run() # Sourceを起動 -
Sourceがデータを受信したとき:
# SimpleKafkaSource.run() 内 for message in self.consumer: # Kafkaから取得 formatted_message = { 'key': message.key, 'value': message.value } self.message_handler(formatted_message) # 注入済みハンドラを呼ぶ -
ハンドラがメッセージを転送:
# _create_message_handler() が返すhandler def handler(message): # Sourceからの標準化メッセージ for sink in self._sinks: sink.write(message)
全体のデータフロー:
Kafka → Source.run() → message_handler() → Sink.write()
要点:
SimpleStreamingEngineは関数注入により、SourceがSinkを意識せず動作します。これにより完全な疎結合を実現しています。
Simple Streaming Engineが必要な理由
- 疎結合設計:SourceとSinkが独立し、相互に交換可能
- スケーラビリティ:複数Sink(例:PostgreSQL+Elasticsearch)を同時に扱える
- 統一管理:登録・実行のAPIを一元化
完全版 Simple Streaming Engineコード
import logging
from typing import List
from .source import BaseSource
from .sink import BaseSink
logger = logging.getLogger(__name__)
class SimpleStreamingEngine:
"""
最小構成のストリーミング処理エンジン
"""
def __init__(self, name: str = "simple-streaming-engine"):
self.name = name
self._sources: List[BaseSource] = [] # Sourceの一覧
self._sinks: List[BaseSink] = [] # Sinkの一覧
def add_source(self, source: BaseSource):
"""
ストリーミングエンジンにSourceを登録
"""
self._sources.append(source)
def add_sink(self, sink: BaseSink):
"""
ストリーミングエンジンにSinkを登録
"""
self._sinks.append(sink)
def run(self):
"""
ストリーミングエンジンを起動し、データストリームを処理
"""
# すべてのSinkを初期化
for sink in self._sinks:
sink.setup()
# 各Sourceにメッセージハンドラを割り当て
for source in self._sources:
source.message_handler = self._create_message_handler()
source.run() # 消費開始
def _create_message_handler(self):
"""
受信データをすべてのSinkにディスパッチするハンドラを生成
"""
def handler(message):
for sink in self._sinks:
sink.write(message)
return handler
すべてをつなげる:自動データフローの実行
ここまでで以下がそろいました:
- Source:Kafkaからデータを取り込む
- Sink:PostgreSQLにデータを書き込む
- SimpleStreamingEngine:両者を接続・管理・オーケストレーションする
これらを組み合わせると、エンドツーエンドの自動データフローが起動します。
# 1. SimpleStreamingEngineを作成
engine = SimpleStreamingEngine(...)
# 2. Kafka Sourceを作成
orders_source = SimpleKafkaSource(...)
# 3. PostgreSQL Sinkを作成
pg_sink = SimplePostgreSQLSink(...)
# 4. 組み上げて起動
engine.add_source(orders_source)
engine.add_sink(pg_sink)
engine.run() # 処理開始: Kafka → PostgreSQL
Summary
このセクションでは、スピードレイヤーの中核実装を見てきました:
- Batch Layer は信頼性の高い履歴データ処理を提供
- Speed Layer はリアルタイム性とストリーミング処理を提供
- スピードレイヤーなしには、Lambdaアーキテクチャの「真のリアルタイム性」は実現できない
そして、Source–Sink–SimpleStreamingEngine というアーキテクチャにより、次の要素を構築しました:
- 統一的なデータ処理インターフェース
- スケーラブルなストリーム処理フレームワーク
- スピードレイヤーの機能実装一式
Day 5 Preview: パフォーマンスボトルネックへの対処
最初は順調に動作し、Consumerは注文データを問題なく処理します。
しかし高負荷下では:
- コンソールにレイテンシ警告が出始める
- Consumerの処理能力が上限に達する
- 注文が処理待ちとしてキューに積まれていく
次回は、スピードレイヤーのパフォーマンス課題と、高スループット向け最適化に取り組みます。
Discussion