🚀

実践!LambdaアーキテクチャのスピードレイヤーとKafkaストリーミング設計入門

に公開

前回の記事では、Lambdaアーキテクチャの設計原則と実装について紹介しました。

その中で3つのレイヤーを取り上げました:

  • Batch Layer – 遅いが正確。履歴データ全体を処理して最終的一貫性を担保する
  • Speed Layer – 増分データをリアルタイムに処理する
  • Serving Layer – 両レイヤーの結果をマージし、統一的なクエリインターフェースを提供する

今日からは、スピードレイヤーの実装に踏み込みます。

まずKafka Consumerの作成から始め、ストリーム処理の中核メカニズムを段階的に探っていきます。

本日の記事では、スピードレイヤーの重要コンポーネントであるKafka Consumerを実装します。これは注文データを即時に受信してServing DBに書き込む役割を担います(バッチレイヤーは毎日、履歴データ全体の処理に注力します)。

⚠️ 重要

本記事のコードは教育・概念実証を目的とした擬似コードです。
実運用を想定した完成品ではありません。
現場の課題を理解・解決するための設計思考や全体像の把握に焦点を当ててください。
低レベルなコード詳細は流し読みでも構いません。

Step 1: スピードレイヤーのKafka Consumer:注文をServing DBに取り込む

最もシンプルなバージョンから始めましょう:

from kafka import KafkaConsumer
import json

# ordersトピックを購読
consumer = KafkaConsumer('orders')

print("[Speed Layer] 新しい注文を待機中...")

for message in consumer:
    order = json.loads(message.value.decode('utf-8'))

    insert_db(order)
    conn.commit()
    print(f"[Speed Layer] 注文 {order['id']} を挿入しました")

このロジックはシンプルです:

  • Kafkaがリアルタイムの注文データストリームを提供する
  • Consumerがストリームを消費して処理する
  • Serving DBが処理済み結果を保存する
  • コアタスク:受信データを可能な限り速くストレージに書き込む

Step 2: Serving DBの設計

ここでは2つの主要テーブルを用意します:

  • orders_batch_summary – バッチレイヤーが日次で計算した事前集計済みの履歴データ
  • orders_realtime – スピードレイヤーが送るリアルタイムの詳細注文データ

ダッシュボードからのクエリ時には、無効な注文(status = 'removed')を除外しつつ両テーブルをマージします:

SELECT status, SUM(count) AS total
FROM (
    SELECT status, count(*)
    FROM orders_batch_summary
    WHERE status != 'removed'

    UNION ALL

    SELECT status, COUNT(*) AS count
    FROM orders_realtime
    WHERE status != 'removed'
    GROUP BY status
) t
GROUP BY status;

しかし、会社の成長に伴い、開発者ごとに独自のConsumerが増えると、実装はちぐはぐになり、やがてコードベースはスパゲッティ化して保守不能になります。

ここで必要なのはリファクタリングです。全員が共通フレームワーク上で開発できるよう、統一的なストリーム処理アーキテクチャを確立します。

Source抽象化レイヤーの設計

チーム開発では、Kafka Consumerの実装がバラバラだと統合が難しくなります。

その解決策が、統一されたSourceインターフェースの定義です。

Sourceアーキテクチャ設計

    ┌─────────────┐
    │ BaseSource  │  ◄── 抽象インターフェース
    │             │
    │ + run()     │
    └─────────────┘
           △
           │ implements
    ┌─────────────┐
    │KafkaSource  │  ◄── 具象実装
    │             │
    │ + run()     │
    └─────────────┘

Sourceコアの段階的な説明

Step 1: BaseSource 抽象インターフェースの定義

from abc import ABC, abstractmethod

class BaseSource(ABC):
    def __init__(self, name: str):
        self.name = name

    @abstractmethod
    def run(self):
        pass

要点:

  • 各Sourceは一意のnameを持つ
  • run()は抽象メソッドで、サブクラスに実装を強制

Step 2: SimpleKafkaSource の初期化

class SimpleKafkaSource(BaseSource):
    def __init__(self, name: str, topic: str, broker_address: str = "localhost:9092"):
        super().__init__(name)
        self.topic = topic
        self.broker_address = broker_address
        self.consumer = None
        self.message_handler = self._default_handler

要点:

  • BaseSourceを継承し、統一インターフェースに準拠
  • 差し替え可能なmessage_handlerで処理ロジックを外部注入できる柔軟性

Step 3: Kafka Consumerのセットアップ

def _setup_consumer(self):
    self.consumer = KafkaConsumer(
        self.topic,
        bootstrap_servers=self.broker_address,
        group_id=f"simple-source-{self.name}",
        auto_offset_reset='latest',
        value_deserializer=lambda m: json.loads(m.decode('utf-8')) if m else None
    )

要点:

  • 競合回避のためgroup_idを自動生成
  • auto_offset_reset='latest'で最新メッセージから消費開始
  • JSONを自動デシリアライズ

Step 4: コア実行ロジック

def run(self):
    self._setup_consumer()  # Consumerを初期化

    for message in self.consumer:  # メッセージを継続的に監視
        self.message_handler({
            'key': message.key,
            'value': message.value,
            'topic': message.topic,
            'offset': message.offset
        })

実行フロー:

  1. Kafka Consumerを初期化
  2. トピックからメッセージを継続的に取得
  3. メッセージを標準フォーマットにラップ
  4. message_handlerを呼び出して処理

要点:

Sourceは**取り込み(ingestion)**のみに専念し、実際の処理は外部から注入されたmessage_handlerに委譲することで高い柔軟性を確保します。

import logging
import json
from abc import ABC, abstractmethod
from typing import Optional, Callable, Any
from kafka import KafkaConsumer

logger = logging.getLogger(__name__)

class BaseSource(ABC):
    """すべてのSourceの基本抽象クラス"""

    def __init__(self, name: str):
        self.name = name
        self._running = False

    @abstractmethod
    def run(self):
        """メイン実行メソッド"""
        pass

    def stop(self):
        """Sourceを停止"""
        self._running = False
        logger.info(f"Source {self.name} を停止しました")

class SimpleKafkaSource(BaseSource):
    """シンプルなKafka Source実装"""

    def __init__(
        self,
        name: str,
        topic: str,
        broker_address: str = "localhost:9092",
        consumer_group: Optional[str] = None,
        message_handler: Optional[Callable[[Any], None]] = None
    ):
        super().__init__(name)
        self.topic = topic
        self.broker_address = broker_address
        self.consumer_group = consumer_group or f"simple-source-{name}"
        self.message_handler = message_handler or self._default_handler
        self.consumer: Optional[KafkaConsumer] = None

    def _default_handler(self, message):
        print(f"[{self.name}] メッセージ受信: {message}")

    def _setup_consumer(self):
        try:
            self.consumer = KafkaConsumer(
                self.topic,
                bootstrap_servers=self.broker_address,
                group_id=self.consumer_group,
                auto_offset_reset='latest',
                value_deserializer=lambda m: json.loads(m.decode('utf-8')) if m else None,
                key_deserializer=lambda m: m.decode('utf-8') if m else None
            )
            logger.info(f"Consumerをセットアップ: topic={self.topic}, group={self.consumer_group}")
        except Exception as e:
            logger.error(f"Consumerのセットアップに失敗: {e}")
            raise

    def run(self):
        logger.info(f"Source {self.name} を起動 (topic={self.topic})")
        self._setup_consumer()
        self._running = True

        try:
            while self._running:
                message_batch = self.consumer.poll(timeout_ms=1000)
                for topic_partition, messages in message_batch.items():
                    for message in messages:
                        if not self._running:
                            break
                        try:
                            self.message_handler({
                                'key': message.key,
                                'value': message.value,
                                'topic': message.topic,
                                'partition': message.partition,
                                'offset': message.offset,
                                'timestamp': message.timestamp
                            })
                        except Exception as e:
                            logger.error(f"メッセージ処理でエラー: {e}")
        except KeyboardInterrupt:
            logger.info("割り込みシグナルを受信")
        except Exception as e:
            logger.error(f"実行ループでエラー: {e}")
        finally:
            if self.consumer:
                self.consumer.close()
            logger.info(f"Source {self.name} を終了")

    def stop(self):
        super().stop()
        if self.consumer:
            self.consumer.close()

Sink抽象化レイヤーの設計

スピードレイヤーのアーキテクチャでは、Sourceが入力、Sinkが出力を担当します。

実装の不統一を避けるため、統一されたSinkインターフェースを定義します。

Sinkアーキテクチャ設計

    ┌─────────────┐
    │  BaseSink   │  ◄── 抽象インターフェース
    │             │
    │ + write()   │
    └─────────────┘
           △
           │ implements
    ┌──────────────────┐
    │SimplePostgreSQL  │  ◄── 具象実装
    │Sink              │
    │ + write()        │
    └──────────────────┘

Sinkコアの段階的な説明

Step 1: Base Sinkインターフェースの定義

from abc import ABC, abstractmethod

class BaseSink(ABC):
    def __init__(self, name: str):
        self.name = name

    @abstractmethod
    def write(self, message):
        pass

    def setup(self):
        pass  # 既定では何もしない

要点:

  • 各Sinkは一意のnameを持つ
  • write()が実際の書き込みを担当
  • setup()は任意でオーバーライド可能

Step 2: Simple PostgreSQL Sinkの初期化

class SimplePostgreSQLSink(BaseSink):
    def __init__(self, name: str, host: str, dbname: str, table_name: str):
        super().__init__(name)
        self.host = host
        self.dbname = dbname
        self.table_name = table_name
        self.connection = None

要点:

  • BaseSinkを継承して一貫性を担保
  • DB接続情報を保持
  • 遅延初期化(connection = None

Step 3: write() のコアロジック

def write(self, message):
    # カラムを自動検出してDBへ挿入
    data = message.get('value', {})
    # ... SQLを動的に生成して挿入

要点:

message['value']のフィールド構造を自動検出し、INSERT SQLを動的生成して書き込みます。
これにより、Sinkは多様なスキーマに自動適応できます。

完全版 Sink実装

import logging
from abc import ABC, abstractmethod
from typing import Any, Dict

try:
    import psycopg2
    from psycopg2.extras import Json
    from psycopg2 import sql
except ImportError:
    psycopg2 = None
    print("警告: psycopg2がインストールされていません。pip install psycopg2-binary を実行してください")

logger = logging.getLogger(__name__)

class BaseSink(ABC):
    """すべてのSinkの基本抽象クラス"""

    def __init__(self, name: str):
        self.name = name

    @abstractmethod
    def write(self, message: Dict[str, Any]):
        """メッセージを書き込む"""
        pass

    def setup(self):
        """接続を初期化"""
        pass

    def close(self):
        """接続をクローズ"""
        pass

class SimplePostgreSQLSink(BaseSink):
    """自動スキーマ検出を備えたPostgreSQL Sink"""

    def __init__(
        self,
        name: str,
        host: str,
        port: int,
        dbname: str,
        user: str,
        password: str,
        table_name: str
    ):
        super().__init__(name)
        self.host = host
        self.port = port
        self.dbname = dbname
        self.user = user
        self.password = password
        self.table_name = table_name
        self.connection = None

    def setup(self):
        if psycopg2 is None:
            raise ImportError("psycopg2が必要です")

        self.connection = psycopg2.connect(
            host=self.host,
            port=self.port,
            dbname=self.dbname,
            user=self.user,
            password=self.password
        )
        logger.info(f"PostgreSQLに接続: {self.host}:{self.port}/{self.dbname}")

    def write(self, message: Dict[str, Any]):
        """フィールドを自動検出してPostgreSQLに書き込む"""
        data = message.get('value', {})
        # ... 動的フィールド検出とSQL実行ロジック

    def close(self):
        if self.connection:
            self.connection.close()
            logger.info("PostgreSQL接続をクローズしました")

Simple Streaming Engine:統合管理レイヤー

Source(入力)とSink(出力)の間には、オーケストレーション、モニタリング、ライフサイクル管理を担う統合管理レイヤーが必要です。

このコンポーネントがSimpleStreamingEngineです。

Simple Streaming Engineアーキテクチャ設計

    ┌─────────────────────┐
    │SimpleStreamingEngine│  ◄── 中央管理コンポーネント
    │                     │
    │    +add_source()    │
    │    +add_sink()      │
    │    + run()          │
    └─────────────────────┘
           │
           │ manages
           ▼
    ┌──────────────┐    ┌──────────────┐
    │    Source    │───▶│     Sink     │
    │              │    │              │
    │ KafkaSource  │    │PostgreSQLSink│
    └──────────────┘    └──────────────┘

Simple Streaming Engineコアコードの段階的な解説

Step 1: Simple Streaming Engineの初期化

class SimpleStreamingEngine:
    def __init__(self, name: str = "simple-streaming-app"):
        self.name = name
        self._sources = []  # Sourceのリスト
        self._sinks = []    # Sinkのリスト

要点:

  • SimpleStreamingEngineSourceSinkの2つのリストを管理
  • 両者に対して統一的な登録インターフェースを提供

Step 2: SourceとSinkの登録

def add_source(self, source: BaseSource):
    self._sources.append(source)

def add_sink(self, sink: BaseSink):
    self._sinks.append(sink)

要点:

  • 複数のSource/Sinkをサポートするリスト管理
  • BaseSource/BaseSinkを実装していれば登録可能

Step 3: コア実行ロジック

def run(self):
    # すべてのSinkを初期化
    for sink in self._sinks:
        sink.setup()

    # 各Sourceにメッセージハンドラを設定
    for source in self._sources:
        source.message_handler = self._create_message_handler()
        source.run()  # データ消費を開始

実行フロー:

  1. すべてのSinkの接続初期化
  2. Sourceにハンドラを注入
  3. Sourceを起動し、データ処理を開始

Step 4: メッセージハンドラのコアロジック

def _create_message_handler(self):
    def handler(message):
        # 受信メッセージをすべてのSinkへ転送
        for sink in self._sinks:
            sink.write(message)
    return handler

詳細なデータフロー説明:

  1. SimpleStreamingEngine起動時:

    # SimpleStreamingEngine.run() 内
    for source in self._sources:
        source.message_handler = self._create_message_handler()  # ハンドラ注入
        source.run()  # Sourceを起動
    
  2. Sourceがデータを受信したとき:

    # SimpleKafkaSource.run() 内
    for message in self.consumer:  # Kafkaから取得
        formatted_message = {
            'key': message.key,
            'value': message.value
        }
        self.message_handler(formatted_message)  # 注入済みハンドラを呼ぶ
    
  3. ハンドラがメッセージを転送:

    # _create_message_handler() が返すhandler
    def handler(message):  # Sourceからの標準化メッセージ
        for sink in self._sinks:
            sink.write(message)
    

全体のデータフロー:

KafkaSource.run()message_handler()Sink.write()

要点:

SimpleStreamingEngine関数注入により、SourceがSinkを意識せず動作します。これにより完全な疎結合を実現しています。

Simple Streaming Engineが必要な理由

  • 疎結合設計:SourceとSinkが独立し、相互に交換可能
  • スケーラビリティ:複数Sink(例:PostgreSQL+Elasticsearch)を同時に扱える
  • 統一管理:登録・実行のAPIを一元化

完全版 Simple Streaming Engineコード

import logging
from typing import List
from .source import BaseSource
from .sink import BaseSink

logger = logging.getLogger(__name__)

class SimpleStreamingEngine:
    """
    最小構成のストリーミング処理エンジン
    """

    def __init__(self, name: str = "simple-streaming-engine"):
        self.name = name
        self._sources: List[BaseSource] = []  # Sourceの一覧
        self._sinks: List[BaseSink] = []      # Sinkの一覧

    def add_source(self, source: BaseSource):
        """
        ストリーミングエンジンにSourceを登録
        """
        self._sources.append(source)

    def add_sink(self, sink: BaseSink):
        """
        ストリーミングエンジンにSinkを登録
        """
        self._sinks.append(sink)

    def run(self):
        """
        ストリーミングエンジンを起動し、データストリームを処理
        """
        # すべてのSinkを初期化
        for sink in self._sinks:
            sink.setup()

        # 各Sourceにメッセージハンドラを割り当て
        for source in self._sources:
            source.message_handler = self._create_message_handler()
            source.run()  # 消費開始

    def _create_message_handler(self):
        """
        受信データをすべてのSinkにディスパッチするハンドラを生成
        """
        def handler(message):
            for sink in self._sinks:
                sink.write(message)
        return handler

すべてをつなげる:自動データフローの実行

ここまでで以下がそろいました:

  • Source:Kafkaからデータを取り込む
  • Sink:PostgreSQLにデータを書き込む
  • SimpleStreamingEngine:両者を接続・管理・オーケストレーションする

これらを組み合わせると、エンドツーエンドの自動データフローが起動します。

# 1. SimpleStreamingEngineを作成
engine = SimpleStreamingEngine(...)

# 2. Kafka Sourceを作成
orders_source = SimpleKafkaSource(...)

# 3. PostgreSQL Sinkを作成
pg_sink = SimplePostgreSQLSink(...)

# 4. 組み上げて起動
engine.add_source(orders_source)
engine.add_sink(pg_sink)
engine.run()  # 処理開始: Kafka → PostgreSQL

Summary

このセクションでは、スピードレイヤーの中核実装を見てきました:

  • Batch Layer は信頼性の高い履歴データ処理を提供
  • Speed Layer はリアルタイム性とストリーミング処理を提供
  • スピードレイヤーなしには、Lambdaアーキテクチャの「真のリアルタイム性」は実現できない

そして、Source–Sink–SimpleStreamingEngine というアーキテクチャにより、次の要素を構築しました:

  • 統一的なデータ処理インターフェース
  • スケーラブルなストリーム処理フレームワーク
  • スピードレイヤーの機能実装一式

Day 5 Preview: パフォーマンスボトルネックへの対処

最初は順調に動作し、Consumerは注文データを問題なく処理します。

しかし高負荷下では:

  • コンソールにレイテンシ警告が出始める
  • Consumerの処理能力が上限に達する
  • 注文が処理待ちとしてキューに積まれていく

次回は、スピードレイヤーのパフォーマンス課題と、高スループット向け最適化に取り組みます。

Discussion