👻

RIDFファイルをparquetファイルにする/Kafkaにストリーミングする

2024/08/26に公開

RIDFフォーマット

RIDFは、理化学研究所の仁科加速器研究センターで主に使われているデータフォーマットである。詳細はこちらを参照[https://ribf.riken.jp/RIBFDAQ/index.php?DAQ%2FManual%2FDataformat]。
基本的にはBlocks[Events[Segments[]]]の順に入れ子になった構造の繰り返しで、segmentデータの中身はVMEモジュールのバッファから読み出してきたバイトデータである。ただし、どのVMEモジュールから読み出してきたかを特定するためのIDが複数定義されていて、

  • Device: 実験デバイスの大分類
  • Focal plane: 加速器ビームラインの焦点面
  • Detector: 検出器の種類
  • Module: VMEモジュールの種類

の4つがある。
Segmentデータの中身はVMEモジュールの種類によって異なるため、それぞれデコーダを用意しなくてはならない。標準の解析ソフトとしてANAROOT[https://ribf.riken.jp/RIBFDAQ/index.php?Tools%2FAnalysis%2FANAROOT%2FInstallation]というものが公開されている。

デコーダの改造

デコーダの改造要件として、以下がある。

  • CERN ROOTに依存しない。
  • 最低限のrawdataをparquetファイルに書き出せる。
  • 同様にKafkaにストリーミングできる。
  • rawdataのEventBlockをKafkaからストリーミングで受け取れる。

ANAROOTを使ってデコードするのは簡単で、TArtEventStoreを定義してファイルをOpenし、whileループでGetNextEvent()を呼んでいくだけである。デコードされたデータには、あらかじめTArtRawEventObjectのポインターをGetしておいて、そこからアクセスできる。

/// Open the input ridf file using anaroot
TArtEventStore *estore = new TArtEventStore();
estore->Open("filename.ridf");
TArtRawEventObject *rawevent = estore->GetRawEventObject();

/// event loop
while (estore->GetNextEvent())
{
    rawevent->必要なデータにアクセス
}

以前書いた記事のようにしてraweventをApache Arrowに詰める。

スキーマはTArtRawEventObjectに従う。とりあえず以下のメンバーだけ書き出せれば良いということにする。

TArtRawEventObject.hh
  // run number
  int run_number;

  // event number
  int event_number;

  // time stamp information
  unsigned long long int time_stamp;

  // array of data segments
  std::vector<TArtRawSegmentObject *> segment_array;
TArtRawSegmentObject.hh
  // device id, BigRIPS, ZDC, Sharaq
  int device;
  // focal plane
  int fp;
  // detector id, such as DALI, PPAC ...
  int detector;
  // module id, such as CAEN-V490
  int module;

  int address;     // supposed to be same as EFN in DAQ
  int nmodule;     // number of modules
  int ngoodmodule; // number of good modules
  int size;        // data size

  // array of data
  std::vector<TArtRawDataObject *> data_array;
TArtRawDataObject.hh
  // geometry code slot # in a crate
  int geo;

  // channel in the (CAMAC/VME) module
  int channel;

  // data value
  unsigned int value;

  // edge: -1:undefined, 0: rising, 1: falling
  int edge;

構造としては以下のような感じ。

int: runnumber,
u_int64_t: event_number,
u_int64_t: timestamp,
list:
    - int: device,
    - int: focal,
    - int: module,
    - int: detector,
    - list: [int: geo, int: channel, int: value, int: edge]

edgeというのは、頻繁に使われているV1X90 TDCのために、入力ロジック信号のrising edgeかfalling edgeかのフラグをストアする場所である。

parquet出力

最終的なコードはGithubを参照

実際にridfファイルをparquetに変換してみた。

>>> import pandas as pd
>>> df = pd.read_parquet("test.parquet")
>>> pd.set_option('display.max_colwidth',150)
>>> df.head(1)
   event_id  runnumber              ts                                                                                                                                                segdata
0         0       1029  20507102988732  [{'dev': 0, 'fp': 63, 'mod': 24, 'det': 1, 'hits': [{'geo': 7, 'ch': 0, 'value': 6190, 'edge': 0}, {'geo': 7, 'ch': 0, 'value': 6736, 'edge': 1}]}...
>>> 

event_id, runnumber, timestamp, segdataのカラムが書かれている。segdataはStructureのアレイになっていて、それぞれdev, fp, mod, det, hitsが入っており、hitsがまたアレイになっていて、モジュールのgeo id, channel, value, edge がストアされている。 基本的にこれらの情報があれば、様々なVMEモジュールのデータを解析できる。波形デジタイザー等に関しては新たにデコーダを用意するつもりなので、ここではレガシーモジュールの解析ができれば良いと割り切っている。

Kafka ストリーミング

ファイルではなく、Kafkaのストリームからデータを読むために、従来のTArtFileDataSourceの変わりにTArtKafkaDataSourceを用意する。

以前の記事を元に、C++でKafka consumerを実装する。

基本的には初期化部分とRead()部分の改造になる。以下はread部分

TArtKafkaDataSource
int TArtKafkaDataSource::Read(char *buf, const int &size, const int &offset)
{
    int len = 0;
    if (fStatus == 0)
        std::cout << "TArtKafkaDataSource: waiting for the first event... " << std::endl;
    rkmessage_ = std::unique_ptr<rd_kafka_message_t, RdKafkaMessageDeleter>(rd_kafka_consumer_poll(rk_.get(), 1000));
    if (rkmessage_.get())
    {
        if (rkmessage_->err == RD_KAFKA_RESP_ERR__PARTITION_EOF)
        {
            len = 0; // The partition is empty
        }
        else if (rkmessage_->err)
        {
            std::cout << rd_kafka_err2str(rkmessage_->err) << std::endl;
            len = 0;
        }
        else
        {
            len = rkmessage_->len;
            if (len > 0)
            {
                std::memcpy(buf, rkmessage_->payload, len);
                fStatus = 1;
            }
        }
    }
    else
    {
        std::cout << "TArtKafkaDataSource: waiting for the Kafka stream... " << std::endl;
    }
    return len;
}

pollingしてデータをデコーダーのbufferにコピーするだけの簡単な実装。

以下はイベントデータのテーブルからstreamを作ってKafkaに送る部分

    auto table = ...// tableを作る

    auto sink = arrow::io::BufferOutputStream::Create().ValueOrDie();
    arrow::TableBatchReader reader(table);
    std::shared_ptr<arrow::RecordBatch> batch;
    auto readState = reader.ReadNext(&batch);
    if (readState.ok() && batch.get())
    {
        auto writer = arrow::ipc::MakeStreamWriter(sink, schema_).ValueOrDie();
        writer->WriteRecordBatch(*batch.get());
        writer->Close();
    }
    auto stream = *sink->Finish();
    // stream をKafkaにproduce
    rd_kafka_produce(topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, (void*)stream->data, stream->size, NULL, 0, NULL);

実行

最終的なGithubのコードを実行してみる。

その前に、デコーダーからのストリームを確認するtestConsumer.pyを作る。
pip install kafka-python
執筆時にはpython3.12に対応していないようだったので、3.11を使うか、pip install kafka-python-ngの方をインストールする

testConsumer.py
from kafka import KafkaConsumer
import pyarrow as pa
import pyarrow.ipc as ipc
import io
import sys

if len(sys.argv) < 2:
    print("Usage: python testConsumer.py [topic_name]")

# Kafka consumer configuration
topic_name = sys.argv[1]
bootstrap_servers = ['loclhost:9092']

# Create a Kafka consumer
consumer = KafkaConsumer(
    topic_name,
    bootstrap_servers=bootstrap_servers,
    auto_offset_reset='latest',
)

tables = []
count = 0

for message in consumer:
    buffer = io.BytesIO(message.value)
    reader = ipc.open_stream(buffer)
    batch = reader.read_next_batch()
    table = pa.Table.from_batches([batch])

    print(table.to_pandas())  # Convert to pandas DataFrame

consumer.close()

実行は python testConsumer.py decoded。

ターミナルを3つ開いてそれぞれ

python testConsumer.py [topic_name] (デコードデータのtestConsumer)

ridfstream_decoder -i [input_topic_name] -o [output_topic_name] -s [kafka_bootstrapserver] (rawdataのストリームデコーダ)

babicon
を起動。

sethdlist 2 path localhost:9092

で出力先にKafkaサーバを指定。

この状態でbabiconでstartコマンドを打つ。

一番左にデコードされたデータがprintされている。

おわりに

ということで、RIDFデータをデコードし、parquetファイルに保存したり、Arrow TableをKafkaにストリーミングしたりできるようになった。これでいよいよApache Sparkで解析ができるようになりそう。

Discussion