🆗
【Python・PySparkで学ぶ!】チートシート【DAGブロック一覧】
ブロック一覧
データの読み込み系
ブロック名 | 説明 |
---|---|
ParquetFileScan | Parquet ファイルのスキャン |
CSVScan | CSV ファイルのスキャン |
JSONScan | JSON ファイルのスキャン |
OrcScan | ORC ファイルのスキャン |
HiveTableScan | Hive テーブルのスキャン |
JDBCRelation | JDBC 経由でデータベースからデータ取得 |
MemoryTableScan | メモリ上のデータフレームからデータ取得 |
KafkaSource | Kafka からデータ取得 |
FileSourceScanExec | HDFS や S3 などのファイルソースをスキャン |
DataSourceV2ScanExec | DataSource V2 API を使ったデータのスキャン |
Range | range(n) で生成されたデータの処理 |
データ変換系
ブロック名 | 説明 |
---|---|
Project | select によるカラムの選択・計算(式の適用) |
Filter | filter() によるデータの絞り込み |
Join | join() による結合(内部で SortMergeJoin や BroadcastHashJoin になる) |
Aggregate | groupBy().agg() による集約処理 |
Sort | orderBy() や sort() による並び替え |
Expand | explode() によるデータの展開 |
Generate | flatMap() のような処理 |
Window | ウィンドウ関数の適用 |
Repartition | repartition() によるパーティションの変更 |
Coalesce | coalesce() によるパーティションの減少 |
MapElements | map() によるデータ変換 |
FlatMapGroupsInPandas | grouped_df.applyInPandas() の処理 |
PythonUDF | Python UDF を適用(Catalyst の恩恵を受けにくい) |
ScalarSubquery | スカラーサブクエリの評価 |
SubqueryBroadcastExec | サブクエリのブロードキャスト |
DeserializeToObject | RDD から DataFrame に変換 |
SerializeFromObject | DataFrame から RDD に変換 |
データの分散処理系
ブロック名 | 説明 |
---|---|
Exchange | shuffle() や join() の際にデータがシャッフルされる |
SortMergeJoin | ソート・マージ・ジョイン(大規模データの結合) |
BroadcastExchange | 小さなデータをブロードキャストする(broadcast() で最適化) |
BroadcastHashJoin | ブロードキャストされたデータをハッシュ結合 |
ShuffledHashJoin | ハッシュ結合(シャッフルが発生) |
Union | union() によるデータの結合 |
SubqueryAlias | サブクエリの処理 |
LocalShuffleReader | Adaptive Query Execution (AQE) でシャッフルされたデータの最適化 |
ShuffleExchangeExec | データシャッフルのオペレーション |
SortAggregateExec | groupBy().agg() の処理(ソートベース) |
HashAggregateExec | groupBy().agg() の処理(ハッシュベース) |
SortBasedAggregation | データのソートを前提にした集約 |
CoGroupedMap | cogroup() の適用 |
ReusedSubqueryExec | 同じサブクエリ結果を再利用 |
データの出力(アクション)系
ブロック名 | 説明 |
---|---|
Collect | collect() で全データをドライバに取得 |
TakeOrderedAndProject | take() や head() でデータを取得 |
Write | write().parquet() などでデータを書き出し |
SaveIntoDataSourceCommand | データベースなどにデータを保存 |
Show | show() でデータの表示 |
TakeOrderedExec | takeOrdered(n) によるデータ取得 |
GlobalLimitExec | limit(n) によるデータ取得(グローバル適用) |
LocalLimitExec | limit(n) によるデータ取得(各パーティション単位) |
CollectLimitExec | collect() で取得するデータ量を制限 |
WriteToDataSourceV2 | DataSource V2 API を使ったデータ書き込み |
実行時最適化系
ブロック名 | 説明 |
---|---|
**AdaptiveSparkPlan | AQE による実行計画の適応的最適化 |
**QueryStage | クエリのステージ分割 |
ReusedExchange | 同じシャッフル結果を再利用 |
SkewJoin | AQE によるスキュージョインの最適化 |
CoalescedShuffleRead | 小さなシャッフルパーティションを統合 |
CustomShuffleReaderExec | AQE のシャッフル最適化の実行ノード |
BroadcastNestedLoopJoinExec | ブロードキャスト・ネストループ・ジョイン |
最適化のポイント系
- withColumn を多用すると Project が増え、DAG が深くなる
- join() で Exchange や SortMergeJoin が発生しやすい
- repartition() を無駄に使うと Exchange のオーバーヘッドが増える
- AQE (AdaptiveSparkPlan) を有効化すると localShuffleReader などで最適化される
このブロック一覧を理解してDAGを見れば、ば、パフォーマンスを改善するためにどこを最適化すべきかが分かります
具体的な最適化例
1. ジョイン処理の最適化
- 問題: ジョイン処理が遅い場合
- ブロック: Exchange、SortMergeJoin、BroadcastExchange、ShuffledHashJoin
- 最適化方法① BroadcastHashJoin の利用: 一方のテーブルが小さい場合、ブロードキャストジョイン(broadcast())を使用すると、シャッフルのオーバーヘッドを削減でき、パフォーマンスが向上します。
- 最適化方法② SortMergeJoin と ShuffledHashJoin の選択: 大きなデータを結合する際に、どちらの方法が最適かを調整することで、ジョインの速度を向上させることができます。ソートとハッシュのどちらを使うべきかは、データの分布とサイズによって決まります。
- 最適化方法③スキュー(Skew)問題の解消: SkewJoin を利用して、データが偏っている場合でも均等に分散できるように調整する。
2. シャッフル(データ移動)の最適化
- 問題: シャッフルのオーバーヘッドが大きい場合
- ブロック: ShuffleExchangeExec、LocalShuffleReader、BroadcastExchange
- 最適化方法① シャッフルの最小化: シャッフルはデータの移動に関するコストが高いため、可能な限り repartition() や coalesce() を使ってシャッフルを減らすことが重要です。特に、大規模なデータを扱う場合、repartition() の使いすぎに注意が必要です。
- 最適化方法② CoalescedShuffleRead: シャッフル後にパーティション数が多すぎる場合、coalesce() を利用してパーティションを減らし、シャッフルの回数を削減できます。
LocalShuffleReader を使用して、ローカルシャッフルを行うことで、リモートノードへのアクセスを減らすことが可能です。
3. UDF(ユーザー定義関数)の最適化
- 問題: UDF がパフォーマンスを低下させている場合
- ブロック: PythonUDF
- 最適化方法① UDF の回避: UDF は Catalyst Optimizer の最適化を受けられないため、できるだけ利用を避け、Spark の組み込み関数(fn)を使うようにしましょう。
- 最適化方法② pandas_udf の利用: pandas_udf を利用することで、Spark は Pandas の並列処理を活用でき、パフォーマンスが向上する場合があります。
4. 集約処理の最適化
- 問題: 集約処理が遅い場合
- ブロック: HashAggregateExec、SortAggregateExec
- 最適化方法① HashAggregateExec vs SortAggregateExec: 集約処理の方法としてハッシュ集約とソート集約があります。データサイズやパーティションの分布に応じて、最適な方法を選ぶことが重要です。
- 最適化方法② 集約の事前フィルタリング: 集約を行う前に filter() や project() で必要ないデータを削除して、集約処理を効率化します。
5. データのスキャンと読み込みの最適化
- 問題: ファイルの読み込み処理が遅い場合
- ブロック: FileSourceScanExec、ParquetFileScan、JDBCRelation
- 最適化方法① パーティションプルーニング: parquet や ORC などのカラム型ファイルの場合、必要なカラムだけを読み込むことで、I/O コストを削減できます。これを プルーニング と呼びます。
- 最適化方法② 分散処理の最適化: repartition() や coalesce() を使って、ファイル読み込みの際のパーティション数を調整することで、処理時間を短縮できます。
6. メモリ使用の最適化
- 問題: メモリ不足やガーベジコレクションの問題
- ブロック: PythonUDF、Repartition、Aggregate
- 最適化方法① メモリ制御: repartition() を使ってメモリ分散を調整したり、必要に応じて persist() や cache() を使って計算結果をメモリにキャッシュすることが重要です。
- 最適化方法② スパークのメモリ設定: spark.executor.memory や spark.driver.memory の設定を調整し、より多くのメモリを使って処理を効率化する。
7. Adaptive Query Execution (AQE) の活用
- 問題: 実行計画が動的に最適化されない場合
- ブロック: AdaptiveSparkPlanExec、AdaptiveExecution
- 最適化方法① AQE の有効化: AQE を有効にすることで、クエリの実行時に最適なシャッフルパーティション数やジョイン戦略を動的に変更することができます。これにより、実行計画が改善され、パフォーマンスが向上します。
- 最適化方法②スキュー処理の自動調整: AQE によってスキュー(データの偏り)が自動で最適化され、パフォーマンスの問題を解決できます。
8. スカラサブクエリと非効率なクエリの最適化
- 問題: 非効率なサブクエリ処理
- ブロック: ScalarSubquery、SubqueryAlias
- 最適化方法① サブクエリの展開: スカラサブクエリや非効率なサブクエリを展開し、必要のない計算を削減することで、パフォーマンスが向上します。
Discussion