Open15

Reth読むスレ

adustadust

Rethの実装を確認し最終的には何かしらのPRを投げて年内にマージさせる。
https://x.com/uzumakihouse1/status/1920533446235206049

実装を読む前にissueやmilestoneを確認し、Rethの開発状況と自分がコミットできそうな部分を確認する。

https://github.com/paradigmxyz/reth

Labels
Labelsを見るとissue数が多いラベルは主に以下の4つである。

  • A-rpc:Related to the RPC implementation
  • A-sdk:Related to reth's use as a library
  • C-perf:A change motivated by improving speed, memory usage or disk footprint
  • M-prevent-stale:Prevents old inactive issues/PRs from being closed due to inactivity

ちなみに現在の開発状況を把握するにはReth Trackerを確認すると良い。

Milestone
Milestoneを見るとA-sdk関連のissueが積まれている。このことからもSDK周りの開発に集中していることがわかる。
その中でもC-enhancement と重複したissueが目立つので、SDKのパフォーマンス改善に寄与するようなPRが出せないか模索してみる。

調査方針
Rethのディレクトリの中でもコアな実装は[Crates](https://github.com/paradigmxyz/reth/tree/main/crates)に集約されている。
とりあえずこのディレクトリを上から下に幅優先探索で確認していく。

adustadust

chain-state crate

まずはchain-statecrate
このcrateでは文字通りchain, blockのstateを管理している。
補足:canonical(chain)とはネットワーク全体で「正当なチェーン」として認識されているブロックチェーンの分岐のこと

/// Container type for all chain info fields
#[derive(Debug)]
struct ChainInfoInner<N: NodePrimitives = reth_ethereum_primitives::EthPrimitives> {
    /// Timestamp when we received the last fork choice update.
    ///
    /// This is mainly used to track if we're connected to a beacon node.
    last_forkchoice_update: RwLock<Option<Instant>>,

    /// Tracks the number of the `canonical_head`.
    canonical_head_number: AtomicU64,
    /// The canonical head of the chain.
    canonical_head: RwLock<SealedHeader<N::BlockHeader>>,
    /// The block that the beacon node considers safe.
    safe_block: watch::Sender<Option<SealedHeader<N::BlockHeader>>>,
    /// The block that the beacon node considers finalized.
    finalized_block: watch::Sender<Option<SealedHeader<N::BlockHeader>>>,
}

これらの情報はIn-Memoryで管理される。

例えばBlockを更新する処理は以下のように定義されている。

    /// Append new blocks to the in memory state.
    ///
    /// This removes all reorged blocks and appends the new blocks to the tracked chain and connects
    /// them to their parent blocks.
    fn update_blocks<I, R>(&self, new_blocks: I, reorged: R)
    where
        I: IntoIterator<Item = ExecutedBlockWithTrieUpdates<N>>,
        R: IntoIterator<Item = ExecutedBlock<N>>,
    {
        {
            // acquire locks, starting with the numbers lock
            let mut numbers = self.inner.in_memory_state.numbers.write();
            let mut blocks = self.inner.in_memory_state.blocks.write();

            // we first remove the blocks from the reorged chain
            for block in reorged {
                let hash = block.recovered_block().hash();
                let number = block.recovered_block().number();
                blocks.remove(&hash);
                numbers.remove(&number);
            }

            // insert the new blocks
            for block in new_blocks {
                let parent = blocks.get(&block.recovered_block().parent_hash()).cloned();
                let block_state = BlockState::with_parent(block, parent);
                let hash = block_state.hash();
                let number = block_state.number();

                // append new blocks
                blocks.insert(hash, Arc::new(block_state));
                numbers.insert(number, hash);
            }

            // remove the pending state
            self.inner.in_memory_state.pending.send_modify(|p| {
                p.take();
            });
        }
        self.inner.in_memory_state.update_metrics();
    }
adustadust

config crate

Rethのpipeline(CI/CD?)における各stageの設定が定義されている。
StageConfigでは12stageに分割されている。

/// Configuration for each stage in the pipeline.
#[derive(Debug, Clone, Default, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "serde", serde(default))]
pub struct StageConfig {
    /// Header stage configuration.
    pub headers: HeadersConfig,
    /// Body stage configuration.
    pub bodies: BodiesConfig,
    /// Sender Recovery stage configuration.
    pub sender_recovery: SenderRecoveryConfig,
    /// Execution stage configuration.
    pub execution: ExecutionConfig,
    /// Prune stage configuration.
    pub prune: PruneStageConfig,
    /// Account Hashing stage configuration.
    pub account_hashing: HashingConfig,
    /// Storage Hashing stage configuration.
    pub storage_hashing: HashingConfig,
    /// Merkle stage configuration.
    pub merkle: MerkleConfig,
    /// Transaction Lookup stage configuration.
    pub transaction_lookup: TransactionLookupConfig,
    /// Index Account History stage configuration.
    pub index_account_history: IndexHistoryConfig,
    /// Index Storage History stage configuration.
    pub index_storage_history: IndexHistoryConfig,
    /// Common ETL related configuration.
    pub etl: EtlConfig,
}

以下はdocs/crates/stages.mdをGPTに要約させたもの。
pipeline は、RethクライアントがEthereumチェーンと同期し、データベースを更新するための中心的な仕組みです。この仕組みは複数の「ステージ」で構成され、それぞれが特定の役割を果たしています。以下に各ステージの役割を説明します。


1. HeaderStage

HeaderStage はブロックヘッダーを同期するステージです。このステージでは、以下を行います:

  • ブロックヘッダーをダウンロード
  • ヘッダーの整合性を検証
  • ヘッダーをデータベースに書き込む

ヘッダーの検証では、親ブロックとの正当性を確認します。このプロセスを繰り返し、すべてのヘッダーがダウンロードされると、チェーンの総困難度(Total Difficulty)が更新されます。


2. BodyStage

HeaderStage の完了後、BodyStage が開始されます。このステージでは、以下を行います:

  • 新しいブロックヘッダーに対応するブロックボディをダウンロード
  • トランザクションルートやOmmer(叔父ブロック)のハッシュを検証

ブロックボディは、starting_block から target_block まで順次ダウンロードされます。各ブロックボディは事前検証を行い、問題がなければデータベースに保存されます。


3. SenderRecoveryStage

このステージでは、トランザクションの送信者(署名者)情報を復元します。具体的には:

  • ECDSA署名から送信者アドレスを復元
  • 復元した送信者情報をデータベースに保存

4. ExecutionStage

ExecutionStage は、トランザクションを実行し、ステート(アカウントやコントラクトの状態)を更新します。以下の処理が行われます:

  • トランザクションの実行
  • アカウントバランスやステートの更新
  • 実行結果をデータベースに反映

5. MerkleUnwindStage

このステージは、リオーグ(チェーンの巻き戻し)やロールバックが必要な場合に、Merkle Patricia Trie(ステートツリー)の状態を元に戻します。


6. AccountHashingStage

アカウントの暗号化ハッシュを計算します。これにより、状態ツリーの構築に必要なアカウントデータのハッシュが生成されます。


7. StorageHashingStage

スマートコントラクトのストレージスロットに対してハッシュを計算します。AccountHashingStage と似ていますが、対象がストレージデータです。


8. MerkleExecuteStage

このステージでは、Merkle Patricia Trieの構築と更新を行います。トランザクション実行後のステート変更を反映します。


9. TransactionLookupStage

トランザクションのインデックスを作成し、効率的な検索を可能にします。これにより、トランザクションのハッシュやブロック位置でのクエリが高速化されます。


10. IndexStorageHistoryStage

コントラクトストレージの履歴を追跡するインデックスを作成します。これにより、過去の状態をクエリできるようになります。


11. IndexAccountHistoryStage

アカウントの履歴を記録するインデックスを作成します。アカウントの残高、ノンス、コードなどの変化を追跡します。


12. FinishStage

最後のステージでは、すべての処理が正しく完了したことを検証し、クリーンアップを行います。このステージが完了すると、ノードの状態がチェーンの最新状態と同期します。


このように、pipeline は各ステージを順番に処理し、Ethereumチェーンとの同期を効率的に行います。それぞれのステージが担当する役割を順次実行することで、一貫したデータベースの更新とチェーンとの整合性を維持します。

Execution stageに注目すると以下のようなconfigが確認できる

/// Execution stage configuration.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "serde", serde(default))]
pub struct ExecutionConfig {
    /// The maximum number of blocks to process before the execution stage commits.
    pub max_blocks: Option<u64>,
    /// The maximum number of state changes to keep in memory before the execution stage commits.
    pub max_changes: Option<u64>,
    /// The maximum cumulative amount of gas to process before the execution stage commits.
    pub max_cumulative_gas: Option<u64>,
    /// The maximum time spent on blocks processing before the execution stage commits.
    #[cfg_attr(
        feature = "serde",
        serde(
            serialize_with = "humantime_serde::serialize",
            deserialize_with = "deserialize_duration"
        )
    )]
    pub max_duration: Option<Duration>,
}
adustadust

consensus crate

  • validation.rsにはblock validationに関するmethodsが実装されている。
  • validationにはYellow Paperで定義されたHolistic Validator およびBlock Header ValidiityそしてOmmer Validationが含まれる。
  • validate_shanghai_withdrawals(for EIP4895)やvalidate_cancun_gas(for EIP5844)など各EIPに関連したvalidationも見つかるので面白い。

参考: https://ethereum.github.io/yellowpaper/paper.pdf

adustadust

engine crate

Engin APIはCLとELのやり取りを行うためのAPIである。

engine/primitives/src

まずはengine/primitives/srcから見ていく。
lib.rsを覗くとExecution Payload の存在を確認できる。現在はV4まで存在する。
各versionの違いは後で調べる
code jumpしてpayload cratesに飛ぶと以下のtraitを確認できる。

/// An execution payload.
pub trait ExecutionPayload:
    Serialize + DeserializeOwned + Debug + Clone + Send + Sync + 'static
{
    /// Returns the parent hash of the block.
    fn parent_hash(&self) -> B256;

    /// Returns the hash of the block.
    fn block_hash(&self) -> B256;

    /// Returns the number of the block.
    fn block_number(&self) -> u64;

    /// Returns the withdrawals for the payload, if it exists.
    fn withdrawals(&self) -> Option<&Vec<Withdrawal>>;

    /// Return the parent beacon block root for the payload, if it exists.
    fn parent_beacon_block_root(&self) -> Option<B256>;

    /// Returns the timestamp to be used in the payload.
    fn timestamp(&self) -> u64;

    /// Gas used by the payload
    fn gas_used(&self) -> u64;
}

PayloadValidator traitとEngineValidator traitの存在も確認した。
PayloadValidator:payloadのフォーマットが違反していないか確認する。この際にpayloadはexcutable blockに変換される。
EngineValidator:engineで処理されたpayloadに対しての検証を定義している。例えばresponse payloadのversionやheaderの検証が含まれる

次にengine/primitives/src/config.rsではengine treeの設定が定義されている。てかengine treeとは?state treeという理解でおけ?

/// The configuration of the engine tree.
#[derive(Debug)]
pub struct TreeConfig {
    /// Maximum number of blocks to be kept only in memory without triggering
    /// persistence.
    persistence_threshold: u64,
    /// How close to the canonical head we persist blocks. Represents the ideal
    /// number of most recent blocks to keep in memory for quick access and reorgs.
    ///
    /// Note: this should be less than or equal to `persistence_threshold`.
    memory_block_buffer_target: u64,
    /// Number of pending blocks that cannot be executed due to missing parent and
    /// are kept in cache.
    block_buffer_limit: u32,
    /// Number of invalid headers to keep in cache.
    max_invalid_header_cache_length: u32,
    /// Maximum number of blocks to execute sequentially in a batch.
    ///
    /// This is used as a cutoff to prevent long-running sequential block execution when we receive
    /// a batch of downloaded blocks.
    max_execute_block_batch_size: usize,
    /// Whether to use the legacy state root calculation method instead of the
    /// new state root task.
    legacy_state_root: bool,
    /// Whether to always compare trie updates from the state root task to the trie updates from
    /// the regular state root calculation.
    always_compare_trie_updates: bool,
    /// Whether to disable cross-block caching and parallel prewarming.
    disable_caching_and_prewarming: bool,
    /// Whether to enable state provider metrics.
    state_provider_metrics: bool,
    /// Cross-block cache size in bytes.
    cross_block_cache_size: u64,
    /// Whether the host has enough parallelism to run state root task.
    has_enough_parallelism: bool,
    /// Maximum number of concurrent proof tasks
    max_proof_task_concurrency: u64,
    /// Number of reserved CPU cores for non-reth processes
    reserved_cpu_cores: usize,
}

max_proof_task_concurrencyはtxのinclusion proofを最大何個処理できるかを表していそう。

payloadを処理するために並列化していることがわかった。ホストマシンに対して5スレッド以上の並列化を求めている。また、コメントからengine tree = state treeという理解で良いこともわかった。

/// Determines if the host has enough parallelism to run the payload processor.
///
/// It requires at least 5 parallel threads:
/// - Engine in main thread that spawns the state root task.
/// - Multiproof task in payload processor
/// - Sparse Trie task in payload processor
/// - Multiproof computation spawned in payload processor
/// - Storage root computation spawned in trie parallel proof
pub fn has_enough_parallelism() -> bool {
    #[cfg(feature = "std")]
    {
        std::thread::available_parallelism().is_ok_and(|num| num.get() >= 5)
    }
    #[cfg(not(feature = "std"))]
    false
}

pub const DEFAULT_MAX_PROOF_TASK_CONCURRENCY: u64 = 256;となっているのでexcutionのたびに256個のinclusion proofを5スレッド以上で並列処理しているっぽい。

forkchoice.rsにはCLからForkchoiceに関するstate,statusを受け取るようになっている。

/// Represents a forkchoice update and tracks the status we assigned to it.
#[derive(Debug, Clone)]
pub(crate) struct ReceivedForkchoiceState {
    state: ForkchoiceState,
    status: ForkchoiceStatus,
}

invalid_block_hook.rsには受け取ったpayloadのvalidationが失敗した場合のhookがある。
engine/invalid-block-hooks/witness.rsに具体的なロジックが定義されているので後で読む。

service.rsではEngineServiceが定義されている。
このnewメソッドはnode crateのlaunch_nodeで実行される。ELの起動時にEngineが起動されるわけね

    /// Constructor for `EngineService`.
    #[expect(clippy::too_many_arguments)]
    pub fn new<V, C>(
        consensus: Arc<dyn FullConsensus<N::Primitives, Error = ConsensusError>>,
        chain_spec: Arc<N::ChainSpec>,
        client: Client,
        incoming_requests: EngineMessageStream<N::Payload>,
        pipeline: Pipeline<N>,
        pipeline_task_spawner: Box<dyn TaskSpawner>,
        provider: ProviderFactory<N>,
        blockchain_db: BlockchainProvider<N>,
        pruner: PrunerWithFactory<ProviderFactory<N>>,
        payload_builder: PayloadBuilderHandle<N::Payload>,
        payload_validator: V,
        tree_config: TreeConfig,
        invalid_block_hook: Box<dyn InvalidBlockHook<N::Primitives>>,
        sync_metrics_tx: MetricEventsSender,
        evm_config: C,
    ) -> Self
    ...

config crateで見つけたpipelineがここで現れた。stages crateで定義されているようなので詳細は後で確認しよう。

adustadust

era crate

eraが何を意味するのか分からない。
調べてみると.eraというファイルフォーマットを定義し、ステートとそれに至るブロックからなるグループが含めるらしい。他にも.e2sと呼ばれるより簡易的なフォーマットも定義されている。
参考:E2store Format Specifications

具体的なフォーマットは後から確認する

adustadust

errors crate

コアなエラーのenumが定義されている。以下の5つに大別される。

#[derive(Debug, thiserror::Error)]
pub enum RethError {
    /// Error encountered during block execution.
    #[error(transparent)]
    Execution(#[from] BlockExecutionError),

    /// Consensus-related errors.
    #[error(transparent)]
    Consensus(#[from] ConsensusError),

    /// Database-related errors.
    #[error(transparent)]
    Database(#[from] DatabaseError),

    /// Errors originating from providers.
    #[error(transparent)]
    Provider(#[from] ProviderError),

    /// Any other error.
    #[error(transparent)]
    Other(Box<dyn core::error::Error + Send + Sync>),
}

adustadust

evm crate

ここにexcution自体が定義されている。
これを拡張してexecute_one_with_state_hookやexecute_batchに応用されている。

execute_transaction_with_result_closure 関数の処理フローは以下の通り:
1. トランザクションの ガス制限チェック。
2. Deposit トランザクションであれば デポジットアカウントのキャッシュ。
3. トランザクションの ハッシュ計算。
4. トランザクションの 実行と結果の取得。
5. トランザクション結果をクロージャ f に渡して処理。
6. ガス使用量の取得と累積更新。
7. Receipt の生成と保存。
8. ステートのコミット。
9. 最終的に 消費ガス量を返却。

まずトランザクションが Deposit トランザクションかどうか を判定し、is_deposit に格納する。

  • 現在のブロックで使用可能なガス量 block_available_gas を計算する。
  • トランザクションの gas_limit が block_available_gas を超えている場合、エラーを返す。
let is_deposit = tx.tx().ty() == DEPOSIT_TRANSACTION_TYPE;

let block_available_gas = self.evm.block().gas_limit - self.gas_used;
if tx.tx().gas_limit() > block_available_gas && (self.is_regolith || !is_deposit) {
    return Err(BlockValidationError::TransactionGasLimitMoreThanAvailableBlockGas {
        transaction_gas_limit: tx.tx().gas_limit(),
        block_available_gas,
    }.into());
}

次にデポジット用のアカウント情報をキャッシュする。load_cache_account() によりアカウント情報を取得し、その nonce を取得する。

let depositor = (self.is_regolith && is_deposit)
    .then(|| {
        self.evm
            .db_mut()
            .load_cache_account(*tx.signer())
            .map(|acc| acc.account_info().unwrap_or_default())
    })
    .transpose()
    .map_err(BlockExecutionError::other)?;

トランザクションを実行し、結果 (result_and_state) を取得し、トランザクションのステート変更 (state) を system_caller に通知する。

let result_and_state =
    self.evm.transact(tx).map_err(move |err| BlockExecutionError::evm(err, hash))?;
    self.system_caller
    .on_state(StateChangeSource::Transaction(self.receipts.len()), &result_and_state.state);

消費されたガス量を取得しブロック全体のガス使用量を更新する。

let gas_used = result.gas_used();
self.gas_used += gas_used;

次にReceiptBuilderCtx を使って Receipt を構築し、正常なトランザクションの場合は通常の Receipt を作成する。デポジットトランザクションの場合、OpDepositReceipt を生成し、nonce および version を設定する。

self.receipts.push(
    match self.receipt_builder.build_receipt(ReceiptBuilderCtx {
        tx: tx.tx(),
        result,
        cumulative_gas_used: self.gas_used,
        evm: &self.evm,
        state: &state,
    }) {
        Ok(receipt) => receipt,
        Err(ctx) => {
            let receipt = alloy_consensus::Receipt {
                status: Eip658Value::Eip658(ctx.result.is_success()),
                cumulative_gas_used: self.gas_used,
                logs: ctx.result.into_logs(),
            };

            self.receipt_builder.build_deposit_receipt(OpDepositReceipt {
                inner: receipt,
                deposit_nonce: depositor.map(|account| account.nonce),
                deposit_receipt_version: (is_deposit
                    && self.spec.is_canyon_active_at_timestamp(self.evm.block().timestamp))
                .then_some(1),
            })
        }
    }
);

commit() を呼ぶことで、トランザクションの結果がデータベースに反映される。

self.evm.db_mut().commit(state);
adustadust

exexcrate

exesはexcution extentionの略称でrollupやindexers向けの効率的なブロック通知と処理が実装されている。exexはReth固有の機能であり、以下のような使用例が挙げられている。
https://github.com/paradigmxyz/reth-exex-examples
ここが前提知識が必要そうなのでスキップする。exexにバグがあるとrollup側で問題が起きそう。auditとかされてるのかな?

adustadust

net crate

このcrateにはP2Pネットワークに関する実装が含まれている。
banlistなるものがあり、ネットワークのブラックリストっぽい。
discv4はnodeの探索ライブラリであり、これはkadmiliaと呼ばれるアルゴリズムをdevp2p向けにアップデートしたものになる。

ここら辺はすごい面白そうなんだけど実装を読む以前の前提知識が必要なのでスキップする
https://github.com/ethereum/devp2p/blob/40ab248bf7e017e83cc9812a4e048446709623e8/discv4.md

adustadust

node crate

RethではMDBXと呼ばれるKV databeseを採用している。GethはLevel DBだったはず。

nodeの設定は以下のようなパラメータで設定される。各argumentについてはnode/core/src/argsに定義されている。

#[derive(Debug)]
pub struct NodeConfig<ChainSpec> {
    /// All data directory related arguments
    pub datadir: DatadirArgs,

    /// The path to the configuration file to use.
    pub config: Option<PathBuf>,

    /// The chain this node is running.
    ///
    /// Possible values are either a built-in chain or the path to a chain specification file.
    pub chain: Arc<ChainSpec>,

    /// Enable Prometheus metrics.
    ///
    /// The metrics will be served at the given interface and port.
    pub metrics: Option<SocketAddr>,

    /// Add a new instance of a node.
    ///
    /// Configures the ports of the node to avoid conflicts with the defaults.
    /// This is useful for running multiple nodes on the same machine.
    ///
    /// Max number of instances is 200. It is chosen in a way so that it's not possible to have
    /// port numbers that conflict with each other.
    ///
    /// Changes to the following port numbers:
    /// - `DISCOVERY_PORT`: default + `instance` - 1
    /// - `DISCOVERY_V5_PORT`: default + `instance` - 1
    /// - `AUTH_PORT`: default + `instance` * 100 - 100
    /// - `HTTP_RPC_PORT`: default - `instance` + 1
    /// - `WS_RPC_PORT`: default + `instance` * 2 - 2
    /// - `IPC_PATH`: default + `instance`
    pub instance: Option<u16>,

    /// All networking related arguments
    pub network: NetworkArgs,

    /// All rpc related arguments
    pub rpc: RpcServerArgs,

    /// All txpool related arguments with --txpool prefix
    pub txpool: TxPoolArgs,

    /// All payload builder related arguments
    pub builder: PayloadBuilderArgs,

    /// All debug related arguments with --debug prefix
    pub debug: DebugArgs,

    /// All database related arguments
    pub db: DatabaseArgs,

    /// All dev related arguments with --dev prefix
    pub dev: DevArgs,

    /// All pruning related arguments
    pub pruning: PruningArgs,

    /// All engine related arguments
    pub engine: EngineArgs,
}

nodeのbuild方法に関して様々な設定が可能である。
../../../examplesに例がたくさんあるので確認するとよろし

engine.rsにはlaunch_nodeメソッドが存在する。ここを起点にnodeが立ち上がる。
このメソッドの中でconfig crateで見かけたpipelineが作成されていた。(CI/CD関係なかった..)

        // create pipeline
        let network_client = ctx.components().network().fetch_client().await?;
        let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel();

        let node_config = ctx.node_config();

        let max_block = ctx.max_block(network_client.clone()).await?;

        let static_file_producer = ctx.static_file_producer();
        let static_file_producer_events = static_file_producer.lock().events();
        info!(target: "reth::cli", "StaticFileProducer initialized");

        let consensus = Arc::new(ctx.components().consensus().clone());

        // Configure the pipeline
        let pipeline_exex_handle =
            exex_manager_handle.clone().unwrap_or_else(ExExManagerHandle::empty);
        let pipeline = build_networked_pipeline(
            &ctx.toml_config().stages,
            network_client.clone(),
            consensus.clone(),
            ctx.provider_factory().clone(),
            ctx.task_executor(),
            ctx.sync_metrics_tx(),
            ctx.prune_config(),
            max_block,
            static_file_producer,
            ctx.components().evm_config().clone(),
            pipeline_exex_handle,
        )?;
adustadust

payload crate

ここではCLとenging APIを介して会話する時のデータ(payload)に関する実装が集約されている。

impl ExecutionPayload for ExecutionData {
    fn parent_hash(&self) -> B256 {
        self.payload.parent_hash()
    }

    fn block_hash(&self) -> B256 {
        self.payload.block_hash()
    }

    fn block_number(&self) -> u64 {
        self.payload.block_number()
    }

    fn withdrawals(&self) -> Option<&Vec<Withdrawal>> {
        self.payload.withdrawals()
    }

    fn parent_beacon_block_root(&self) -> Option<B256> {
        self.sidecar.parent_beacon_block_root()
    }

    fn timestamp(&self) -> u64 {
        self.payload.timestamp()
    }

    fn gas_used(&self) -> u64 {
        self.payload.as_v1().gas_used
    }
}

CLから受信するときはAttributes eventが発火する。逆にCLに対してexecution payloadを送信するときはBuildPayload eventが発火する。後者は Reth ノードがバリデーターである場合にのみ発火する。

何をもってbestとするのだ?

    /// Returns the best payload for the given identifier that has been built so far.
    fn best_payload(&self, id: PayloadId) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
        let res = self
            .payload_jobs
            .iter()
            .find(|(_, job_id)| *job_id == id)
            .map(|(j, _)| j.best_payload().map(|p| p.into()));
        if let Some(Ok(ref best)) = res {
            self.metrics.set_best_revenue(best.block().number(), f64::from(best.fees()));
        }

以下はlib.rsをgptに要約させたもの
モジュール:

  • error: ペイロード構築中に発生する可能性のあるエラー型を定義します。
  • traits: ペイロード属性型を抽象化するためのトレイトと、Ethereum メインネットおよび Optimism タイプに対する [PayloadAttributes] トレイトのデフォルト実装が含まれています。
  • payload: [ExecutionPayload] トレイトと、[PayloadOrAttributes] 列挙型を定義します。
    PayloadTypes トレイト: エンジン API で使用される型を定義します。
  • ExecutionData: 入力として提供される実行ペイロード型。
  • BuiltPayload: 構築されたペイロード型。
  • PayloadAttributes: CL ノードがエンジン API 経由で出力する RPC ペイロード属性型。
  • PayloadBuilderAttributes: 実行中のペイロードジョブに関する情報を含むペイロード属性型。
  • block_to_payload: ブロックを実行ペイロードに変換します。
    検証関数:
  • validate_payload_timestamp: バージョンに応じてタイムスタンプを検証します。
    V2 の場合、ペイロードタイムスタンプが Cancun より前であることを確認します。
    V3 の場合、ペイロードタイムスタンプが Cancun タイムスタンプ内であることを確認します。
    V4 の場合、ペイロードタイムスタンプが Prague タイムスタンプ内であることを確認します。
  • validate_withdrawals_presence: ペイロードタイムスタンプに従って withdrawals フィールドの存在を検証します。
    Shanghai 以降、withdrawals フィールドは Some である必要があります。
    Shanghai より前では、withdrawals フィールドは None である必要があります。
    validate_parent_beacon_block_root_presence: 指定されたタイムスタンプに従って、parentBeaconBlockRoot フィールドの存在を検証します。
    Cancun 以降、parentBeaconBlockRoot フィールドは Some である必要があります。
    Cancun より前では、parentBeaconBlockRoot フィールドは None である必要があります。
  • validate_version_specific_fields: Ethereum 実行ペイロードまたはペイロード属性、およびメッセージバージョンに基づいて、フォーク固有のフィールドの存在または除外を検証します。
  • validate_execution_requests: 実行リクエストが Engine API 仕様に従って有効であることを検証します。
    EngineApiMessageVersion 列挙型: エンジン API メッセージのバージョンを定義します。
    V1: バージョン 1
    V2: バージョン 2 (Shanghai ハードフォークで追加)
    V3: バージョン 3 (Cancun ハードフォークで追加、デフォルト)
    V4: バージョン 4 (Prague ハードフォークで追加)
  • PayloadKind 列挙型: 返すペイロードの選択方法を決定します。
    Earliest: 次の最適な利用可能なペイロード(利用可能な最も早いペイロード)を返します。
  • WaitForPending: 少なくとも 1 つの構築されたペイロードがある場合にのみ返します。
    テスト:
  • version_ord: EngineApiMessageVersion 列挙型の順序が正しいことを確認します。
    execution_requests_validation: 実行リクエストの検証が正しく機能することを確認します。

cancun.rsにはCancunハードフォーク後の新しいペイロードに関する検証ルールを定義している。これはEIP-4844 (blob トランザクション)に関するもの。
これにより無効なブロックがチェーンに追加されるのを防ぐ。

ensure_matching_blob_versioned_hashes 関数: EIP-4844 トランザクションの blob バージョン付きハッシュの数が、サイドカーの block_versioned_hashes の数と一致することを確認する。また、それぞれのハッシュが一致することも確認する。
ここでsidecarとはEIP-4844に関連する追加データ(blob自体)を指す。
blob自体はオンチェーンに含まれないのでpayloadと並行するsidecarという表現

adustadust

stages crate

excution.rsのexcute stageでは全てのtxを実行する。
各blockに対して前述のexecute_blockが呼び出される.
providerからblock情報を受け取り、excution結果はstateとして再びproviderに書き込まれる

        for block_number in start_block..=max_block {
            // Fetch the block
            let fetch_block_start = Instant::now();

            // we need the block's transactions but we don't need the transaction hashes
            let block = provider
                .recovered_block(block_number.into(), TransactionVariant::NoHash)?
                .ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?;

            fetch_block_duration += fetch_block_start.elapsed();

            cumulative_gas += block.header().gas_used();

            // Configure the executor to use the current state.
            trace!(target: "sync::stages::execution", number = block_number, txs = block.body().transactions().len(), "Executing block");

            // Execute the block
            let execute_start = Instant::now();

            let result = self.metrics.metered_one(&block, |input| {
                executor.execute_one(input).map_err(|error| StageError::Block {
                    block: Box::new(block.block_with_parent()),
                    error: BlockErrorKind::Execution(error),
                })
            })?;

            if let Err(err) = self.consensus.validate_block_post_execution(&block, &result) {
                return Err(StageError::Block {
                    block: Box::new(block.block_with_parent()),
                    error: BlockErrorKind::Validation(err),
                });
            }
            results.push(result);

            execution_duration += execute_start.elapsed();
                        stage_progress = block_number;
            stage_checkpoint.progress.processed += block.header().gas_used();

            // If we have ExExes we need to save the block in memory for later
            if self.exex_manager_handle.has_exexs() {
                blocks.push(block);
            }

            // Check if we should commit now
            if self.thresholds.is_end_of_batch(
                block_number - start_block,
                executor.size_hint() as u64,
                cumulative_gas,
                batch_start.elapsed(),
            ) {
                break;
            }

hashing_account stageはTransactions Trie用のhashing, hashing_strageはstate tree用のhasingと思われる。
ブロック数が閾値(clean_threshold)を超える場合、または最初のブロック(genesis)から始まる場合は、全データを再ハッシュ化。それ以外の場合は、変更されたストレージのみを処理している。

rayon を使用して並列処理を実装している。各ストレージエントリのアドレスとキーを Keccak256 でハッシュ化し結果をチャンネルを通じて送信する流れ

rayon::spawn(move || {
    for (address, slot) in chunk {
        let mut addr_key = Vec::with_capacity(64);
        addr_key.put_slice(keccak256(address).as_slice());
        addr_key.put_slice(keccak256(slot.key).as_slice());
        let _ = tx.send((addr_key, CompactU256::from(slot.value)));
    }
});

最終的にハッシュ化されたデータはProviderを通じて保存される。

header stageではブロックヘッダーの同期を担当している。

  1. ブロックヘッダーをネットワークからダウンロード
  2. ローカルの最高ブロックからネットワークの最高ブロックまでのヘッダーを同期
  3. ヘッダーデータを静的ファイルとデータベースに保存

並列処理はもちろん逆順(新しいブロックから古いブロックへ)でヘッダーをダウンロードするなど最適化が図られていた。

index_accouint_history 及び index_storage_history stageではaccountとstorageの変更履歴をインデックス化し効率的なクエリを可能にするための処理が実行される。
例えばindex_accouint_historyでは以下のような流れ

  1. アカウントの変更セットを処理
  2. 変更履歴をインデックス化
  3. DBのシャーディングを使用して効率的にデータを保存
    古いデータを削除するpruningも実装されてた
if let Some((target_prunable_block, prune_mode)) = self.prune_mode.map(|mode| {
    mode.prune_target_block(
        input.target(),
        PruneSegment::AccountHistory,
        PrunePurpose::User,
    )
})

この辺は大量のデータを効率的にバッチ処理したりメモリ使用量を最適化することでパフォーマンスの向上が図られる印象

merkle stageはmerkle patricia trie(state tree)を管理するステージである。
普段は前のstateの差分に対してのみアカウントとストレージのハッシュからstate treeを構築するが、5000ブロックごとに再構築していることがわかった。

pub const MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD: u64 = 5_000;

計算された状態ルートが期待値と一致するか検証し、不一致の場合はエラーを返す

fn validate_state_root<H: BlockHeader + Sealable + Debug>(
    got: B256,
    expected: SealedHeader<H>,
    target_block: BlockNumber,
) -> Result<(), StageError>

また、状態ツリー構築の進捗を保存し中断時の再開を可能にする機能も存在した

fn get_execution_checkpoint(&self, provider: &impl StageCheckpointReader) -> Result<Option<MerkleCheckpoint>, StageError>
fn save_execution_checkpoint(&self, provider: &impl StageCheckpointWriter, checkpoint: Option<MerkleCheckpoint>) -> Result<(), StageError>
adustadust

気になった点

exex

Reth固有の機能。ここの改善をしてもプロトコルへのコミットにはならない

MDBX

state DBとして機能するKVS。GethやBitcoin coreで使われるLevel DBとは異なるなので改善の余地がありそう。あまり名前を聞かないし、セキュリティホールもあるのではないか? ioの改善を図るのが現実的かな

discv4

node 探索アルゴリズム。
これはプロトコル全体の仕様なので理解したいし、EIP側で提案できるかもしれない。
Reth実装はクライアントのバグバウンティの対象なのでそこを掘ってPR投げるのもあり