RIDFファイルをparquetファイルにする/Kafkaにストリーミングする
RIDFフォーマット
RIDFは、理化学研究所の仁科加速器研究センターで主に使われているデータフォーマットである。詳細はこちらを参照[https://ribf.riken.jp/RIBFDAQ/index.php?DAQ%2FManual%2FDataformat]。
基本的にはBlocks[Events[Segments[]]]の順に入れ子になった構造の繰り返しで、segmentデータの中身はVMEモジュールのバッファから読み出してきたバイトデータである。ただし、どのVMEモジュールから読み出してきたかを特定するためのIDが複数定義されていて、
- Device: 実験デバイスの大分類
- Focal plane: 加速器ビームラインの焦点面
- Detector: 検出器の種類
- Module: VMEモジュールの種類
の4つがある。
Segmentデータの中身はVMEモジュールの種類によって異なるため、それぞれデコーダを用意しなくてはならない。標準の解析ソフトとしてANAROOT[https://ribf.riken.jp/RIBFDAQ/index.php?Tools%2FAnalysis%2FANAROOT%2FInstallation]というものが公開されている。
デコーダの改造
デコーダの改造要件として、以下がある。
- CERN ROOTに依存しない。
- 最低限のrawdataをparquetファイルに書き出せる。
- 同様にKafkaにストリーミングできる。
- rawdataのEventBlockをKafkaからストリーミングで受け取れる。
ANAROOTを使ってデコードするのは簡単で、TArtEventStore
を定義してファイルをOpenし、whileループでGetNextEvent()
を呼んでいくだけである。デコードされたデータには、あらかじめTArtRawEventObject
のポインターをGetしておいて、そこからアクセスできる。
/// Open the input ridf file using anaroot
TArtEventStore *estore = new TArtEventStore();
estore->Open("filename.ridf");
TArtRawEventObject *rawevent = estore->GetRawEventObject();
/// event loop
while (estore->GetNextEvent())
{
rawevent->必要なデータにアクセス
}
以前書いた記事のようにしてraweventをApache Arrowに詰める。
スキーマはTArtRawEventObject
に従う。とりあえず以下のメンバーだけ書き出せれば良いということにする。
// run number
int run_number;
// event number
int event_number;
// time stamp information
unsigned long long int time_stamp;
// array of data segments
std::vector<TArtRawSegmentObject *> segment_array;
// device id, BigRIPS, ZDC, Sharaq
int device;
// focal plane
int fp;
// detector id, such as DALI, PPAC ...
int detector;
// module id, such as CAEN-V490
int module;
int address; // supposed to be same as EFN in DAQ
int nmodule; // number of modules
int ngoodmodule; // number of good modules
int size; // data size
// array of data
std::vector<TArtRawDataObject *> data_array;
// geometry code slot # in a crate
int geo;
// channel in the (CAMAC/VME) module
int channel;
// data value
unsigned int value;
// edge: -1:undefined, 0: rising, 1: falling
int edge;
構造としては以下のような感じ。
int: runnumber,
u_int64_t: event_number,
u_int64_t: timestamp,
list:
- int: device,
- int: focal,
- int: module,
- int: detector,
- list: [int: geo, int: channel, int: value, int: edge]
edgeというのは、頻繁に使われているV1X90 TDCのために、入力ロジック信号のrising edgeかfalling edgeかのフラグをストアする場所である。
parquet出力
最終的なコードはGithubを参照
実際にridfファイルをparquetに変換してみた。
>>> import pandas as pd
>>> df = pd.read_parquet("test.parquet")
>>> pd.set_option('display.max_colwidth',150)
>>> df.head(1)
event_id runnumber ts segdata
0 0 1029 20507102988732 [{'dev': 0, 'fp': 63, 'mod': 24, 'det': 1, 'hits': [{'geo': 7, 'ch': 0, 'value': 6190, 'edge': 0}, {'geo': 7, 'ch': 0, 'value': 6736, 'edge': 1}]}...
>>>
event_id, runnumber, timestamp, segdataのカラムが書かれている。segdataはStructureのアレイになっていて、それぞれdev, fp, mod, det, hitsが入っており、hitsがまたアレイになっていて、モジュールのgeo id, channel, value, edge がストアされている。 基本的にこれらの情報があれば、様々なVMEモジュールのデータを解析できる。波形デジタイザー等に関しては新たにデコーダを用意するつもりなので、ここではレガシーモジュールの解析ができれば良いと割り切っている。
Kafka ストリーミング
ファイルではなく、Kafkaのストリームからデータを読むために、従来のTArtFileDataSource
の変わりにTArtKafkaDataSource
を用意する。
以前の記事を元に、C++でKafka consumerを実装する。
基本的には初期化部分とRead()
部分の改造になる。以下はread部分
int TArtKafkaDataSource::Read(char *buf, const int &size, const int &offset)
{
int len = 0;
if (fStatus == 0)
std::cout << "TArtKafkaDataSource: waiting for the first event... " << std::endl;
rkmessage_ = std::unique_ptr<rd_kafka_message_t, RdKafkaMessageDeleter>(rd_kafka_consumer_poll(rk_.get(), 1000));
if (rkmessage_.get())
{
if (rkmessage_->err == RD_KAFKA_RESP_ERR__PARTITION_EOF)
{
len = 0; // The partition is empty
}
else if (rkmessage_->err)
{
std::cout << rd_kafka_err2str(rkmessage_->err) << std::endl;
len = 0;
}
else
{
len = rkmessage_->len;
if (len > 0)
{
std::memcpy(buf, rkmessage_->payload, len);
fStatus = 1;
}
}
}
else
{
std::cout << "TArtKafkaDataSource: waiting for the Kafka stream... " << std::endl;
}
return len;
}
pollingしてデータをデコーダーのbufferにコピーするだけの簡単な実装。
以下はイベントデータのテーブルからstreamを作ってKafkaに送る部分
auto table = ...// tableを作る
auto sink = arrow::io::BufferOutputStream::Create().ValueOrDie();
arrow::TableBatchReader reader(table);
std::shared_ptr<arrow::RecordBatch> batch;
auto readState = reader.ReadNext(&batch);
if (readState.ok() && batch.get())
{
auto writer = arrow::ipc::MakeStreamWriter(sink, schema_).ValueOrDie();
writer->WriteRecordBatch(*batch.get());
writer->Close();
}
auto stream = *sink->Finish();
// stream をKafkaにproduce
rd_kafka_produce(topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, (void*)stream->data, stream->size, NULL, 0, NULL);
実行
最終的なGithubのコードを実行してみる。
その前に、デコーダーからのストリームを確認するtestConsumer.pyを作る。
pip install kafka-python
執筆時にはpython3.12に対応していないようだったので、3.11を使うか、pip install kafka-python-ng
の方をインストールする
from kafka import KafkaConsumer
import pyarrow as pa
import pyarrow.ipc as ipc
import io
import sys
if len(sys.argv) < 2:
print("Usage: python testConsumer.py [topic_name]")
# Kafka consumer configuration
topic_name = sys.argv[1]
bootstrap_servers = ['loclhost:9092']
# Create a Kafka consumer
consumer = KafkaConsumer(
topic_name,
bootstrap_servers=bootstrap_servers,
auto_offset_reset='latest',
)
tables = []
count = 0
for message in consumer:
buffer = io.BytesIO(message.value)
reader = ipc.open_stream(buffer)
batch = reader.read_next_batch()
table = pa.Table.from_batches([batch])
print(table.to_pandas()) # Convert to pandas DataFrame
consumer.close()
実行は python testConsumer.py decoded。
ターミナルを3つ開いてそれぞれ
python testConsumer.py [topic_name]
(デコードデータのtestConsumer)
ridfstream_decoder -i [input_topic_name] -o [output_topic_name] -s [kafka_bootstrapserver]
(rawdataのストリームデコーダ)
babicon
を起動。
sethdlist 2 path localhost:9092
で出力先にKafkaサーバを指定。
この状態でbabiconでstartコマンドを打つ。
一番左にデコードされたデータがprintされている。
おわりに
ということで、RIDFデータをデコードし、parquetファイルに保存したり、Arrow TableをKafkaにストリーミングしたりできるようになった。これでいよいよApache Sparkで解析ができるようになりそう。
Discussion