ブロック一覧
データの読み込み系
| ブロック名 |
説明 |
| ParquetFileScan |
Parquet ファイルのスキャン |
| CSVScan |
CSV ファイルのスキャン |
| JSONScan |
JSON ファイルのスキャン |
| OrcScan |
ORC ファイルのスキャン |
| HiveTableScan |
Hive テーブルのスキャン |
| JDBCRelation |
JDBC 経由でデータベースからデータ取得 |
| MemoryTableScan |
メモリ上のデータフレームからデータ取得 |
| KafkaSource |
Kafka からデータ取得 |
| FileSourceScanExec |
HDFS や S3 などのファイルソースをスキャン |
| DataSourceV2ScanExec |
DataSource V2 API を使ったデータのスキャン |
| Range |
range(n) で生成されたデータの処理 |
データ変換系
| ブロック名 |
説明 |
| Project |
select によるカラムの選択・計算(式の適用) |
| Filter |
filter() によるデータの絞り込み |
| Join |
join() による結合(内部で SortMergeJoin や BroadcastHashJoin になる) |
| Aggregate |
groupBy().agg() による集約処理 |
| Sort |
orderBy() や sort() による並び替え |
| Expand |
explode() によるデータの展開 |
| Generate |
flatMap() のような処理 |
| Window |
ウィンドウ関数の適用 |
| Repartition |
repartition() によるパーティションの変更 |
| Coalesce |
coalesce() によるパーティションの減少 |
| MapElements |
map() によるデータ変換 |
| FlatMapGroupsInPandas |
grouped_df.applyInPandas() の処理 |
| PythonUDF |
Python UDF を適用(Catalyst の恩恵を受けにくい) |
| ScalarSubquery |
スカラーサブクエリの評価 |
| SubqueryBroadcastExec |
サブクエリのブロードキャスト |
| DeserializeToObject |
RDD から DataFrame に変換 |
| SerializeFromObject |
DataFrame から RDD に変換 |
データの分散処理系
| ブロック名 |
説明 |
| Exchange |
shuffle() や join() の際にデータがシャッフルされる |
| SortMergeJoin |
ソート・マージ・ジョイン(大規模データの結合) |
| BroadcastExchange |
小さなデータをブロードキャストする(broadcast() で最適化) |
| BroadcastHashJoin |
ブロードキャストされたデータをハッシュ結合 |
| ShuffledHashJoin |
ハッシュ結合(シャッフルが発生) |
| Union |
union() によるデータの結合 |
| SubqueryAlias |
サブクエリの処理 |
| LocalShuffleReader |
Adaptive Query Execution (AQE) でシャッフルされたデータの最適化 |
| ShuffleExchangeExec |
データシャッフルのオペレーション |
| SortAggregateExec |
groupBy().agg() の処理(ソートベース) |
| HashAggregateExec |
groupBy().agg() の処理(ハッシュベース) |
| SortBasedAggregation |
データのソートを前提にした集約 |
| CoGroupedMap |
cogroup() の適用 |
| ReusedSubqueryExec |
同じサブクエリ結果を再利用 |
データの出力(アクション)系
| ブロック名 |
説明 |
| Collect |
collect() で全データをドライバに取得 |
| TakeOrderedAndProject |
take() や head() でデータを取得 |
| Write |
write().parquet() などでデータを書き出し |
| SaveIntoDataSourceCommand |
データベースなどにデータを保存 |
| Show |
show() でデータの表示 |
| TakeOrderedExec |
takeOrdered(n) によるデータ取得 |
| GlobalLimitExec |
limit(n) によるデータ取得(グローバル適用) |
| LocalLimitExec |
limit(n) によるデータ取得(各パーティション単位) |
| CollectLimitExec |
collect() で取得するデータ量を制限 |
| WriteToDataSourceV2 |
DataSource V2 API を使ったデータ書き込み |
実行時最適化系
| ブロック名 |
説明 |
| **AdaptiveSparkPlan |
AQE による実行計画の適応的最適化 |
| **QueryStage |
クエリのステージ分割 |
| ReusedExchange |
同じシャッフル結果を再利用 |
| SkewJoin |
AQE によるスキュージョインの最適化 |
| CoalescedShuffleRead |
小さなシャッフルパーティションを統合 |
| CustomShuffleReaderExec |
AQE のシャッフル最適化の実行ノード |
| BroadcastNestedLoopJoinExec |
ブロードキャスト・ネストループ・ジョイン |
最適化のポイント系
- withColumn を多用すると Project が増え、DAG が深くなる
- join() で Exchange や SortMergeJoin が発生しやすい
- repartition() を無駄に使うと Exchange のオーバーヘッドが増える
- AQE (AdaptiveSparkPlan) を有効化すると localShuffleReader などで最適化される
このブロック一覧を理解してDAGを見れば、ば、パフォーマンスを改善するためにどこを最適化すべきかが分かります
具体的な最適化例
1. ジョイン処理の最適化
- 問題: ジョイン処理が遅い場合
- ブロック: Exchange、SortMergeJoin、BroadcastExchange、ShuffledHashJoin
- 最適化方法① BroadcastHashJoin の利用: 一方のテーブルが小さい場合、ブロードキャストジョイン(broadcast())を使用すると、シャッフルのオーバーヘッドを削減でき、パフォーマンスが向上します。
- 最適化方法② SortMergeJoin と ShuffledHashJoin の選択: 大きなデータを結合する際に、どちらの方法が最適かを調整することで、ジョインの速度を向上させることができます。ソートとハッシュのどちらを使うべきかは、データの分布とサイズによって決まります。
- 最適化方法③スキュー(Skew)問題の解消: SkewJoin を利用して、データが偏っている場合でも均等に分散できるように調整する。
2. シャッフル(データ移動)の最適化
- 問題: シャッフルのオーバーヘッドが大きい場合
- ブロック: ShuffleExchangeExec、LocalShuffleReader、BroadcastExchange
- 最適化方法① シャッフルの最小化: シャッフルはデータの移動に関するコストが高いため、可能な限り repartition() や coalesce() を使ってシャッフルを減らすことが重要です。特に、大規模なデータを扱う場合、repartition() の使いすぎに注意が必要です。
- 最適化方法② CoalescedShuffleRead: シャッフル後にパーティション数が多すぎる場合、coalesce() を利用してパーティションを減らし、シャッフルの回数を削減できます。
LocalShuffleReader を使用して、ローカルシャッフルを行うことで、リモートノードへのアクセスを減らすことが可能です。
3. UDF(ユーザー定義関数)の最適化
- 問題: UDF がパフォーマンスを低下させている場合
- ブロック: PythonUDF
- 最適化方法① UDF の回避: UDF は Catalyst Optimizer の最適化を受けられないため、できるだけ利用を避け、Spark の組み込み関数(fn)を使うようにしましょう。
- 最適化方法② pandas_udf の利用: pandas_udf を利用することで、Spark は Pandas の並列処理を活用でき、パフォーマンスが向上する場合があります。
4. 集約処理の最適化
- 問題: 集約処理が遅い場合
- ブロック: HashAggregateExec、SortAggregateExec
- 最適化方法① HashAggregateExec vs SortAggregateExec: 集約処理の方法としてハッシュ集約とソート集約があります。データサイズやパーティションの分布に応じて、最適な方法を選ぶことが重要です。
- 最適化方法② 集約の事前フィルタリング: 集約を行う前に filter() や project() で必要ないデータを削除して、集約処理を効率化します。
5. データのスキャンと読み込みの最適化
- 問題: ファイルの読み込み処理が遅い場合
- ブロック: FileSourceScanExec、ParquetFileScan、JDBCRelation
- 最適化方法① パーティションプルーニング: parquet や ORC などのカラム型ファイルの場合、必要なカラムだけを読み込むことで、I/O コストを削減できます。これを プルーニング と呼びます。
- 最適化方法② 分散処理の最適化: repartition() や coalesce() を使って、ファイル読み込みの際のパーティション数を調整することで、処理時間を短縮できます。
6. メモリ使用の最適化
- 問題: メモリ不足やガーベジコレクションの問題
- ブロック: PythonUDF、Repartition、Aggregate
- 最適化方法① メモリ制御: repartition() を使ってメモリ分散を調整したり、必要に応じて persist() や cache() を使って計算結果をメモリにキャッシュすることが重要です。
- 最適化方法② スパークのメモリ設定: spark.executor.memory や spark.driver.memory の設定を調整し、より多くのメモリを使って処理を効率化する。
7. Adaptive Query Execution (AQE) の活用
- 問題: 実行計画が動的に最適化されない場合
- ブロック: AdaptiveSparkPlanExec、AdaptiveExecution
- 最適化方法① AQE の有効化: AQE を有効にすることで、クエリの実行時に最適なシャッフルパーティション数やジョイン戦略を動的に変更することができます。これにより、実行計画が改善され、パフォーマンスが向上します。
- 最適化方法②スキュー処理の自動調整: AQE によってスキュー(データの偏り)が自動で最適化され、パフォーマンスの問題を解決できます。
8. スカラサブクエリと非効率なクエリの最適化
- 問題: 非効率なサブクエリ処理
- ブロック: ScalarSubquery、SubqueryAlias
- 最適化方法① サブクエリの展開: スカラサブクエリや非効率なサブクエリを展開し、必要のない計算を削減することで、パフォーマンスが向上します。
Discussion