🦔

レイテンシ重視のOLAP DB、Apache Pinot触ってみた

2022/12/17に公開

気にはなってるけど触ってないビッグデータ系のツール・サービスを触る Advent Calendar 2022の#16です。

Apache Pinotとは

公式サイト曰く、

Apache Pinot, a real-time distributed OLAP datastore, purpose-built for low-latency high throughput analytics, perfect for user-facing analytical workloads.

です。
大規模なデータの分析用のデータベースといえばSnowflakeやBigQueryが有名ですが、Apache Pinotは少し違うカテゴリーの製品で、エンドユーザが見るダッシュボードのような、レイテンシが重要な用途に特化した分析用のデータベースです。

似たカテゴリの製品としては、

などがあります。

Podcastの発音や同名のワインの名前的に発音は「ピノ」な気がしています。

事例

ロゴが出ている企業だけでも、

  • LinkedIn
  • Uber
  • Slack
  • セブンイレブン

などの企業があります。特にUberとLinkedInはPinotに力を入れているようで(両社はPinotの開発にも関与しています)、

で事例の説明を行っています。

例えばUberのOperating Apache Pinot @ Uber Scaleでは、下図のような、ユーザ(レストラン経営者)が見るダッシュボードの裏側でPinotが使われており、数10TBのデータを100msのオーダーで処理している話が紹介されています。


(Uberのエンジニアブログより)


(Uberのエンジニアブログより)

制限

大規模データ・低レイテンシを実現する代償として、一般的なDB・DWHに比べ弱い部分もあります。例えば、一般的なDB、DWHが行える以下のクエリには制限があります(Druidも同じような制限があったはず)。

  • DML
    • 変更自体は出来そうですが、SQLのUPDATEとはだいぶ違う形式で、機能も制限あり
  • JOIN
    • 出来なくもないがおススメしないとのこと

これらの機能制限、また、構成がそれなりに重たいこともあり、汎用的なDWHや社内でのデータ分析のDWHの代替にはならず、Pinotの自己紹介にあるように、「user-facing analytics」に用途を限定した方が良さそうです。

連携

クライアントアクセス用のライブラリについては、

  • JDBC
  • Java
  • Python
  • Go

が用意されています。また、

  • Tableau(BI)
  • Trino・Presto(クエリエンジン)
  • Superset(BI)
  • ThirdEye(異常検知)

についてはドキュメントが用意されています(JDBCが使えるので、これ以外のツールでも連携できると思いますが)。

マネージドサービス・エンタープライズサービス

StarTreeというPinotの開発者が始めた企業が、エンタープライズ版やマネージドサービスを提供しています。
本番運用するなら検討しても良いかもしれません。

学習リソース

Pinotに関する学習リソースとして、

などがあります。

Apache Druidとの比較

Apache Druidとは事例アーキテクチャ共に、よく似ています。

比較記事ありますが(下)、私は違い・使い分けよく分かっていないので詳しい人教えてくれると嬉しいです。

アーキテクチャ・概念


(Pinot公式ページより)

Pinotは主に以下のコンポーネントで構成されています。結構多くのコンポーネントが必要ですね。

  • Controller(図のZooKeeper(ZK)とHelix)
  • Broker
  • Server(Offline, Realtime)
  • Segment Store(Deep Storage)
  • Ingestion Job
  • (この図では省略)Minion

それぞれ簡単に説明すると、

  • Brokerはユーザからのクエリを行い、実際にデータを提供します
  • Controllerは各種サーバ・Segment(テーブルを分割したもの)の管理・管理API・WebUIの提供を行います
  • Serverはデータの提供を行います。RealtimeとOfflineの二種類があります(後述)
  • Segment Store、Ingestion Jobは、Segmentの保存とOfflineデータの取り込みを行います
  • Minionは非同期で何か作業をします

といった役割分担になっています。

Pinotでも他のDB同様にテーブルの概念がありますが、Pinotではデータの取り込み・テーブル(正確にはテーブルを分割したSegment)が3種類に分けられます。

  • Kafkaなどのキューからデータを読む、「Data in Motion」(左下)とそこにつながる「Realtime」
  • ファイルシステム・オブジェクトストレージからデータを読む、「Data at Rest」(右下)とそこにつながる「Offline」
  • 「Data in Motion」「Data at Rest」の両方を含む「Hybrid」
    • テーブルに対応するRealtime ServerとOffline Serverが両方あるテーブル(一つのServerで兼務も可能ですが本番ではおススメしないらしい)

クエリの実行は、RealtimeテーブルでもOfflineテーブルでも同じようにできますが、

といった機能(と性能)の違いがあります。

試してみる

Getting Startedでは、

  • ローカル
  • Docker
  • Kubernetes(Helm)
  • パブリッククラウド(Azure、AWS、GCP)

でのデプロイ方法と、

  • バッチインポート(Offline Segmnet)
  • ストリーミングインポート
  • (Kuberenetesのみ)BI(Apache Superset)との連携
  • (Kuberenetesのみ)Trino・Prestoとの連携

の例が記載されています。今回はDockerを使い

  • 各コンポーネントの起動
  • テーブルの定義
  • Offlineテーブルに、ファイルからデータを取り込み
    • standaloneスクリプトを動かします
  • Onlineテーブルに、Kafkaからデータを取り込み

を行いました。チュートリアルそのままだと動かない箇所があったため、一部を変更しています。

Ubuntu 20.04 (Windows10のWSL2上)、minikube kubernetes-version v1.23.8で試しました。

各コンポーネントの起動

https://docs.pinot.apache.org/basics/getting-started/running-pinot-in-docker

  • Zookeeper
  • Controller
  • Broker
  • Server

のコンポーネントをDockerコンテナを起動します。

起動する前にネットワークの作成と、コンテナイメージの指定をしておきます。

docker network create -d bridge pinot-demo
PINOT_IMAGE="apachepinot/pinot:0.9.3"

各コンポーネントを起動します。

docker run --rm -ti \
    --network=pinot-demo \
    --name pinot-controller \
    -p 9000:9000 \
    -e JAVA_OPTS="-Dplugins.dir=/opt/pinot/plugins -Xms1G -Xmx4G -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -Xloggc:gc-pinot-controller.log" \
    -d ${PINOT_IMAGE} StartController \
    -zkAddress pinot-zookeeper:2181

docker run --rm -ti \
    --network=pinot-demo \
    --name pinot-broker \
    -p 8099:8099 \
    -e JAVA_OPTS="-Dplugins.dir=/opt/pinot/plugins -Xms4G -Xmx4G -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -Xloggc:gc-pinot-broker.log" \
    -d ${PINOT_IMAGE} StartBroker \
    -zkAddress pinot-zookeeper:2181

docker run --rm -ti \
    --network=pinot-demo \
    --name pinot-server \
    -p 8098:8098 \
    -e JAVA_OPTS="-Dplugins.dir=/opt/pinot/plugins -Xms4G -Xmx16G -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -Xloggc:gc-pinot-server.log" \
    -d ${PINOT_IMAGE} StartServer \
    -zkAddress pinot-zookeeper:2181

localhost:9000をWebブラウザで開くと、PinotのWebUIが表示されるはずです。

Offlineテーブルの定義・データの読み込み

https://docs.pinot.apache.org/basics/getting-started/pushing-your-data-to-pinot

テーブル定義

サンプルのファイルを作成します。

mkdir /tmp/pinot-quick-start/rawdata/

cat > /tmp/pinot-quick-start/rawdata/transcript.csv << EOS
studentID,firstName,lastName,gender,subject,score,timestampInEpoch
200,Lucy,Smith,Female,Maths,3.8,1570863600000
200,Lucy,Smith,Female,English,3.5,1571036400000
201,Bob,King,Male,Maths,3.2,1571900400000
202,Nick,Young,Male,Physics,3.6,1572418800000
EOS

テーブル定義を作成します。Pinotでは一般的にRDBのようにCREATE TABLEではテーブル(多分)作れず、

をJSONで記載し、テーブル作成を行います。
(余談ですがWeb UIからもテーブル作成は可能です)

cat > /tmp/pinot-quick-start/transcript-schema.json << EOS
 {
   "schemaName": "transcript",
   "dimensionFieldSpecs": [
     {
    "name": "studentID",
       "dataType": "INT"
     },
     {
       "name": "firstName",
       "dataType": "STRING"
     },
     {
       "name": "lastName",
       "dataType": "STRING"
     },
     {
       "name": "gender",
       "dataType": "STRING"
     },
     {
       "name": "subject",
       "dataType": "STRING"
     }
   ],
   "metricFieldSpecs": [
     {
       "name": "score",
       "dataType": "FLOAT"
     }
   ],
   "dateTimeFieldSpecs": [{
     "name": "timestampInEpoch",
     "dataType": "LONG",
     "format" : "1:MILLISECONDS:EPOCH",
     "granularity": "1:MILLISECONDS"
   }]
} 
EOS

cat > /tmp/pinot-quick-start/transcript-table-offline.json << EOS
{
  "tableName": "transcript",
  "segmentsConfig" : {
    "timeColumnName": "timestampInEpoch",
    "timeType": "MILLISECONDS",
    "replication" : "1",
    "schemaName" : "transcript"
  },
  "tableIndexConfig" : {
    "invertedIndexColumns" : [],
    "loadMode"  : "MMAP"
  },
  "tenants" : {
    "broker":"DefaultTenant",
    "server":"DefaultTenant"
  },
  "tableType":"OFFLINE",
  "metadata": {}
}

EOS

Pinot Controllerでテーブルを登録します。チュートリアルではcontrollerHostはmanual-pinot-controllerですが、Dockerコンテナ起動に合わせてpinot-controllerに変えています。

docker run --rm -ti \
    --network=pinot-demo \
    -v /tmp/pinot-quick-start:/tmp/pinot-quick-start \
    --name pinot-batch-table-creation \
    apachepinot/pinot:latest AddTable \
    -schemaFile /tmp/pinot-quick-start/transcript-schema.json \
    -tableConfigFile /tmp/pinot-quick-start/transcript-table-offline.json \
    -controllerHost pinot-controller \
    -controllerPort 9000 -exec

ファイルの読み込み

PinotでOfflineテーブルにファイルをアップロードするには

  • Spark
  • Hadoop
  • Flink
  • Standalone

方法があります。ここではStandaloneで取り込みます。
(本番では他の三つの方法が推奨らしい)

Standaloneの取り込みを行う設定ファイルを準備します。入力ファイルの場所・フォーマット、取り込み先のテーブルなどを定義してます。

cat > /tmp/pinot-quick-start/docker-job-spec.yml << EOS
 executionFrameworkSpec:
   name: 'standalone'
   segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
   segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
   segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
 jobType: SegmentCreationAndTarPush
 inputDirURI: '/tmp/pinot-quick-start/rawdata/'
 includeFileNamePattern: 'glob:**/*.csv'
 outputDirURI: '/tmp/pinot-quick-start/segments/'
 overwriteOutput: true
 pinotFSSpecs:
   - scheme: file
     className: org.apache.pinot.spi.filesystem.LocalPinotFS
 recordReaderSpec:
   dataFormat: 'csv'
   className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
   configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
 tableSpec:
   tableName: 'transcript'
   schemaURI: 'http://pinot-controller:9000/tables/transcript/schema'
   tableConfigURI: 'http://pinot-controller:9000/tables/transcript'
 pinotClusterSpecs:
   - controllerURI: 'http://pinot-controller:9000'
 pushJobSpec:
  pushParallelism: 2
  pushAttempts: 2
  pushRetryIntervalMillis: 1000
EOS

なお、チュートリアルの設定ファイルから以下を変更しています(元々の設定ファイルだとエラーになりました)。

  • 「manual-pinot-controller」を「pinot-controller」に
  • pushJobSpecを追加

処理を実行します。

docker run --rm -ti \
    --network=pinot-demo \
    -v /tmp/pinot-quick-start:/tmp/pinot-quick-start \
    --name pinot-data-ingestion-job \
    apachepinot/pinot:latest LaunchDataIngestionJob \
    -jobSpecFile /tmp/pinot-quick-start/docker-job-spec.yml

PinotのWebUI(localhost:9000)でテーブルの確認ができます。

Onlineテーブルの定義・データの読み込み

https://docs.pinot.apache.org/basics/getting-started/pushing-your-streaming-data-to-pinot

PinotのRealtimeテーブルでは、

  • Apache Kafka
  • Apache Pulsar
  • Amazon Kinesi

に対応しています。DockerでKafkaを動かしてみます。

docker run \
    --network pinot-demo --name=kafka \
    -e KAFKA_ZOOKEEPER_CONNECT=pinot-zookeeper:2181/kafka \
    -e KAFKA_BROKER_ID=0 \
    -e KAFKA_ADVERTISED_HOST_NAME=kafka \
    -e  ALLOW_PLAINTEXT_LISTENER=yes\
    -d bitnami/kafka:latest

なお、KAFKA_ZOOKEEPER_CONNECTはmanual-pinot-zookeeperから、pinot-zookeeperに変更しています。

Kafkaのトピックを作成します。

docker exec \
  -t kafka \
   /opt/bitnami/kafka/bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 \
  --partitions=1 --replication-factor=1 \
  --create --topic transcript-topic

なお、

  • kafka-topics.shの場所を変更
  • zookeeperオプションをbootstrap-serverに変更

しています。

Offlineテーブルと同様にテーブル定義を行います。

cat > /tmp/pinot-quick-start/transcript-schema-realtime.json << EOS
{
  "schemaName": "transcript_realtime",
  "dimensionFieldSpecs": [
    {
      "name": "studentID",
      "dataType": "INT"
    },
    {
      "name": "firstName",
      "dataType": "STRING"
    },
    {
      "name": "lastName",
      "dataType": "STRING"
    },
    {
      "name": "gender",
      "dataType": "STRING"
    },
    {
      "name": "subject",
      "dataType": "STRING"
    }
  ],
  "metricFieldSpecs": [
    {
      "name": "score",
      "dataType": "FLOAT"
    }
  ],
  "dateTimeFieldSpecs": [{
    "name": "timestampInEpoch",
    "dataType": "LONG",
    "format" : "1:MILLISECONDS:EPOCH",
    "granularity": "1:MILLISECONDS"
  }]
}
EOS
cat >  /tmp/pinot-quick-start/transcript-table-realtime.json << EOS
{
  "tableName": "transcript_realtime",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "timeColumnName": "timestampInEpoch",
    "timeType": "MILLISECONDS",
    "schemaName": "transcript_realtime",
    "replicasPerPartition": "1"
  },
  "tenants": {},
  "tableIndexConfig": {
    "loadMode": "MMAP",
    "streamConfigs": {
      "streamType": "kafka",
      "stream.kafka.consumer.type": "lowlevel",
      "stream.kafka.topic.name": "transcript-topic",
      "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
      "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
      "stream.kafka.broker.list": "kafka:9092",
      "realtime.segment.flush.threshold.rows": "0",
      "realtime.segment.flush.threshold.time": "24h",
      "realtime.segment.flush.threshold.segment.size": "50M",
      "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
    }
  },
  "metadata": {
    "customConfigs": {}
  }
}
EOS
docker run \
    --network=pinot-demo \
    -v /tmp/pinot-quick-start:/tmp/pinot-quick-start \
    --name pinot-streaming-table-creation \
    apachepinot/pinot:latest AddTable \
    -schemaFile /tmp/pinot-quick-start/transcript-schema-realtime.json \
    -tableConfigFile /tmp/pinot-quick-start/transcript-table-realtime.json \
    -controllerHost pinot-controller \
    -controllerPort 9000 \
    -exec

Kafkaにデータを流します。

# Kafkaに流すデータの準備
cat > /tmp/pinot-quick-start/transcript.json <<EOS
{"studentID":205,"firstName":"Natalie","lastName":"Jones","gender":"Female","subject":"Maths","score":3.8,"timestampInEpoch":1571900400000}
{"studentID":205,"firstName":"Natalie","lastName":"Jones","gender":"Female","subject":"History","score":3.5,"timestampInEpoch":1571900400000}
{"studentID":207,"firstName":"Bob","lastName":"Lewis","gender":"Male","subject":"Maths","score":3.2,"timestampInEpoch":1571900400000}
{"studentID":207,"firstName":"Bob","lastName":"Lewis","gender":"Male","subject":"Chemistry","score":3.6,"timestampInEpoch":1572418800000}
{"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"Geography","score":3.8,"timestampInEpoch":1572505200000}
{"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"English","score":3.5,"timestampInEpoch":1572505200000}
{"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"Maths","score":3.2,"timestampInEpoch":1572678000000}
{"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"Physics","score":3.6,"timestampInEpoch":1572678000000}
{"studentID":211,"firstName":"John","lastName":"Doe","gender":"Male","subject":"Maths","score":3.8,"timestampInEpoch":1572678000000}
{"studentID":211,"firstName":"John","lastName":"Doe","gender":"Male","subject":"English","score":3.5,"timestampInEpoch":1572678000000}
{"studentID":211,"firstName":"John","lastName":"Doe","gender":"Male","subject":"History","score":3.2,"timestampInEpoch":1572854400000}
{"studentID":212,"firstName":"Nick","lastName":"Young","gender":"Male","subject":"History","score":3.6,"timestampInEpoch":1572854400000}
EOS

# publish
docker cp  /tmp/pinot-quick-start/transcript.json kafka:/tmp/transcript.json
docker exec   -t kafka  sh -c  '/opt/bitnami/kafka/bin/kafka-console-producer.sh     --broker-list localhost:9092 --topic transcript-topic < /tmp/transcript.json'

PinotのWebUI(localhost:9000)でテーブルの確認ができます。

クライアントアクセス

Pythonでのアクセスを試してみます。

pip install pinotdb
from pinotdb import connect
conn = connect(host='localhost', port=8099, path='/query/sql', scheme='http')
curs.execute('SELECT * FROM transcript')
result = curs.execute('SELECT * FROM transcript')
[x for x in result]

テーブルのデータが取得できていそうです。

[['Lucy', 'Female', 'Smith', 3.8, 200, 'Maths', 1570863600000],
 ['Lucy', 'Female', 'Smith', 3.5, 200, 'English', 1571036400000],
 ['Bob', 'Male', 'King', 3.2, 201, 'Maths', 1571900400000],
 ['Nick', 'Male', 'Young', 3.6, 202, 'Physics', 1572418800000]]

Discussion