第4章 MapReduce計算フレームワーク-2
ここでは、MapReduceの実行原理について説明する。
第7節 MapReduceの原理分析
MapReduce全体の処理流れは、MapTaskとReduceTaskに分けて考えることができ、本章ではまずMapTask側から順に解説を進めていく。
7.1 MapTask運行仕組み

Read段階
- まず、対象ファイルは読み込まれる前に、ローカルでブロック(block)に分割されて各ノードに割り当てられる。通常、128MBの固定サイズに従って分割し、残る部分が128MBに満たない場合でも、1つのブロックとして処理される。
- データを読み込む時には、
getSplits()メソッドを使用して、ブロックを論理的な単位であるス切片(split)に分割する。この分割は物理的ではなく理論上の切り分けであり、デフォルトでは切片のサイズは 128MBに設定されており、HDFSのブロックサイズと同じです。そのため、通常の設定では切片とブロックは一対一関係です。そして、切片の数に応じて、同数のMapTaskが後続の段階で起動される。 -
InputFormatクラスを継承するFileInputFormatが、getSplits()メソッドを実装して、入力ファイルを切り分ける。この切り分けは論理的なものであり、物理的に分割するわけではない。
- 切り分けされたsplit情報、実行対象のコードを含むJARファイル、さらにジョブ(Job)の実行に必要な設定情報(XML設定ファイルなど)は、1つのジョブパッケージとして纏められ、最終的に YARN に提出される。
- YARNは、この提出されたジョブに対して、実行に必要なリソース(メモリやCPUなど)を割り当てる。次にMrAppMaster(MapReduce Application Master)と呼ばれるプロセスを起動し、ジョブの生命周期を管理する。MrAppMasterは、処理の実行順序を決定し、その制御のもとでMapTaskおよびReduceTaskが順次実行され、ジョブ処理を完了する。
Map段階 - Map段階に実行されるロジックは上書きされた
map()メソッドが担当する。 - 入力の
<key/value>ペアは、行番号(key)とその行の文字列(Value)です。出力の<key/value>ペアは単語(key) と その単語の出現回数1(Value)です。つまり、入力の<key: 行番号 / value: 行の文字列>が、<key: 単語 / value: 出現回数1>という形式に変換される。
- map処理が完了すると、各mapの結果は
context.write()を通じてフレームワークに渡され、一時バッファに格納される。 - バッファへ転送する時に分類処理がある。それはデータのkeyをハッシュ化(Hash)し、その値をReduceTask数で割って余りを求める。同じ余りを持つ
<key: 単語 / value: 出現回数1>ペアは、同じパーティションになる。
partition = hash(key) % numReducers
- このときの数式における分母はReduceTask数であり、MapTask 数ではない。したがって、バッファ格納の直前にパーティション分割が決定され、各データには対応するパーティション番号が付与される。
- その一時キャッシュが環状キャッシュと呼ばれる。本質には2つ配列から構成されており、1つ目の配列には
<key:単語/value:出現回数>ペアを格納する。2つ目の配列にはそれらのペアに対応する索引が格納されている。図の1つ目に示されている、2つの半円形の矢印が組み合わさったような記号が、この環状構造を表している。
- 環状バッファは、デフォルト100MBサイズ制限がある。使用率が閾値(80%)に達すると、その領域はロックされ、その領域に書き込むが止まる。
- ロックされたデータが新しいプロセスに取り出されてディスクへ転送する。その同時に、書き込み処理が配列の末尾から逆方向に残りの20MBバッファから続ける。再び80%閾値に達すると、逆側の領域が書き出される、という動作を繰り返す。
- 環状バッファュが上述の流れを抽象化する構造である。ディスク書き出しと新規データ追加が並行処理され、バッファは再利用され続ける。結果として 効率的にデータを処理できるになる。
Spill段階 - Spillとは、上述の環状バッファからデータを書き出す処理のことです。環状バッファの使用率80%閾値に達すると、その領域がロックされ、Spill用スレッドを起動する。このスレッドは、ロックされたデータをバッファから取り出し、一時ファイル(temporary file)に書き込む。
spillは漢字で表すと「溢出」や「溢写」といった表記になり、意味をイメージしやすくなる。いずれも「容量がいっぱいになったデータを外部へ書き出す」という語感を持っている。
- 臨時ファイルへ出力する前に、データの並べ替えがある。同じ余りを持つ
<key:単語/value:単語数>のペアが混在した状態で環状バッファに格納られている。赤枠で示した流れは、データをKeyを基準にして並べ替える処理です。 - その処理が終わると、同じkeyを持つ
<key:単語/value:単語数>のペアが集約される(⑧)。この段階では、パーティション(分区)が形成される。 - 並べ替えられたデータはパーティション単位で臨時ファイルに書き込む(⑨)。Map出力が大きい場合は複数の一時ファイルが生成される。
Merge段階 - MapTaskごとに生成される臨時ファイルをマージて大きな出力ファイルに纏めて書き込む。
- 最終的に1つノードに1つの統合ファイルだけがある。

ここまでMapTaskの流れが終わった。
7.2 MapTask並列処理
MapTaskの並列処理とは、同時に実行されるMapTaskのタスク数を指し、Mapper並列処理能力を測る指標となる。
この並列度を決定する要素は、read段階でFileInputFormatクラスのgetSplits()メソッドが呼び出され、データブロックを論理的に分割して生成された切片(スプリット)数です。デフォルト場合で切片のサイズとデータブロックのサイズが一致しているため、次の等式が成り立つ。
データブロックの数 = 切片の数 = MapTaskの数

図の上には、切片のサイズとブロックが同じで、各ブロック全体はそのまま1つのタスクに割り当てられて処理される。
下は、切片サイズを 100MB とした場合、128Mのグロックが100Mと28Mに分割される。残りの28Mデータが100Mに揃えるため次のブロックに移動する必要があり、この過程で最終的なタスク数が変わってないものの、データ移動による余分な資源の消費が発生する。
その例で伝えたいのは、切片サイズを小さくしてタスク数を増やしても、ノードを跨ぐデータ転送が発生するなら必ずしも効率化には繋がらないという点です。一方で、ブロックと切片サイズの両方を適切に縮小することで、タスク数の増加を抑えつつ適度に並列度を高め、効率的な処理を実現できる。
ブロックサイズの変更は設定ファイルによってできる。
# hdfs-default.xml設定
dfs.blocksize=128MB
# mapred-default.xml配置ファイル
mapreduce.input.fileinputformat.split.maxsize=256MB
mapreduce.input.fileinputformat.split.minsize=64MB
次に、ソースコードを見ながら切片サイズの判定ルールについて紹介する。
ブロックサイズは設定ファイルから直接取得られ、デフォルト値が128Mです。setBlockSize()のようなメソッドがないため、コードレベルで一時的に変更することができない。

切片サイズには、ブロック、切片の最大値と切片の最小値を比べて1つを選んで決める。minSize未満ならminSize、maxSize超ならmaxSizeに調整する。

computeSplitSize()メソッドを見ると、ブロックサイズは必ずminSize以上、maxSize以下 の範囲に収まるように制御されており、その範囲を外れることはだめです。

その仕組みによって、切片サイズを適切な範囲に収めることで、タスク数が多すぎたり少なすぎたりによる効率が下がることを防ぐ。ただし、その際にデータの移動が発生する可能があり、例えば、maxSize<blockSize場合で最後に返す切片サイズがブロックサイズより小さい。
極端な状況として、大規模データや大量の小さなファイルを処理する場合、タスク数が過剰に増えてしまうことがある。このような時は、block、minSize、maxSize三つを同時に変えて、タスク数を適度に減らしつつ、データ移動も起こってない。

ソースコードを見ると、bytesRemaining が splitSize(切片サイズ)の 1.1倍より大きい場合 にのみ、新しい切片として分割される仕組みになっています。したがって、たとえ 129MB のファイルであっても、残りの1MBが新しい切片を生成せず、1つのブロックとして扱われる。
ここまでMapReduceの前半であるMap流れについて解説しました。
次は、後半Reduce流れの流れです。よろしくお願いいたします。

Discussion