🆗

【Python・PySparkで学ぶ!】チートシート【DAGブロック一覧】

2025/03/07に公開

ブロック一覧

データの読み込み系

ブロック名 説明
ParquetFileScan Parquet ファイルのスキャン
CSVScan CSV ファイルのスキャン
JSONScan JSON ファイルのスキャン
OrcScan ORC ファイルのスキャン
HiveTableScan Hive テーブルのスキャン
JDBCRelation JDBC 経由でデータベースからデータ取得
MemoryTableScan メモリ上のデータフレームからデータ取得
KafkaSource Kafka からデータ取得
FileSourceScanExec HDFS や S3 などのファイルソースをスキャン
DataSourceV2ScanExec DataSource V2 API を使ったデータのスキャン
Range range(n) で生成されたデータの処理

データ変換系

ブロック名 説明
Project select によるカラムの選択・計算(式の適用)
Filter filter() によるデータの絞り込み
Join join() による結合(内部で SortMergeJoin や BroadcastHashJoin になる)
Aggregate groupBy().agg() による集約処理
Sort orderBy() や sort() による並び替え
Expand explode() によるデータの展開
Generate flatMap() のような処理
Window ウィンドウ関数の適用
Repartition repartition() によるパーティションの変更
Coalesce coalesce() によるパーティションの減少
MapElements map() によるデータ変換
FlatMapGroupsInPandas grouped_df.applyInPandas() の処理
PythonUDF Python UDF を適用(Catalyst の恩恵を受けにくい)
ScalarSubquery スカラーサブクエリの評価
SubqueryBroadcastExec サブクエリのブロードキャスト
DeserializeToObject RDD から DataFrame に変換
SerializeFromObject DataFrame から RDD に変換

データの分散処理系

ブロック名 説明
Exchange shuffle() や join() の際にデータがシャッフルされる
SortMergeJoin ソート・マージ・ジョイン(大規模データの結合)
BroadcastExchange 小さなデータをブロードキャストする(broadcast() で最適化
BroadcastHashJoin ブロードキャストされたデータをハッシュ結合
ShuffledHashJoin ハッシュ結合(シャッフルが発生)
Union union() によるデータの結合
SubqueryAlias サブクエリの処理
LocalShuffleReader Adaptive Query Execution (AQE) でシャッフルされたデータの最適化
ShuffleExchangeExec データシャッフルのオペレーション
SortAggregateExec groupBy().agg() の処理(ソートベース)
HashAggregateExec groupBy().agg() の処理(ハッシュベース)
SortBasedAggregation データのソートを前提にした集約
CoGroupedMap cogroup() の適用
ReusedSubqueryExec 同じサブクエリ結果を再利用

データの出力(アクション)系

ブロック名 説明
Collect collect() で全データをドライバに取得
TakeOrderedAndProject take() や head() でデータを取得
Write write().parquet() などでデータを書き出し
SaveIntoDataSourceCommand データベースなどにデータを保存
Show show() でデータの表示
TakeOrderedExec takeOrdered(n) によるデータ取得
GlobalLimitExec limit(n) によるデータ取得(グローバル適用)
LocalLimitExec limit(n) によるデータ取得(各パーティション単位)
CollectLimitExec collect() で取得するデータ量を制限
WriteToDataSourceV2 DataSource V2 API を使ったデータ書き込み

実行時最適化系

ブロック名 説明
**AdaptiveSparkPlan AQE による実行計画の適応的最適化
**QueryStage クエリのステージ分割
ReusedExchange 同じシャッフル結果を再利用
SkewJoin AQE によるスキュージョインの最適化
CoalescedShuffleRead 小さなシャッフルパーティションを統合
CustomShuffleReaderExec AQE のシャッフル最適化の実行ノード
BroadcastNestedLoopJoinExec ブロードキャスト・ネストループ・ジョイン

最適化のポイント系

  1. withColumn を多用すると Project が増え、DAG が深くなる
  2. join() で Exchange や SortMergeJoin が発生しやすい
  3. repartition() を無駄に使うと Exchange のオーバーヘッドが増える
  4. AQE (AdaptiveSparkPlan) を有効化すると localShuffleReader などで最適化される

このブロック一覧を理解してDAGを見れば、ば、パフォーマンスを改善するためにどこを最適化すべきかが分かります

具体的な最適化例

1. ジョイン処理の最適化

  • 問題: ジョイン処理が遅い場合
  • ブロック: Exchange、SortMergeJoin、BroadcastExchange、ShuffledHashJoin
  • 最適化方法① BroadcastHashJoin の利用: 一方のテーブルが小さい場合、ブロードキャストジョイン(broadcast())を使用すると、シャッフルのオーバーヘッドを削減でき、パフォーマンスが向上します。
  • 最適化方法② SortMergeJoin と ShuffledHashJoin の選択: 大きなデータを結合する際に、どちらの方法が最適かを調整することで、ジョインの速度を向上させることができます。ソートとハッシュのどちらを使うべきかは、データの分布とサイズによって決まります。
  • 最適化方法③スキュー(Skew)問題の解消: SkewJoin を利用して、データが偏っている場合でも均等に分散できるように調整する。

2. シャッフル(データ移動)の最適化

  • 問題: シャッフルのオーバーヘッドが大きい場合
  • ブロック: ShuffleExchangeExec、LocalShuffleReader、BroadcastExchange
  • 最適化方法① シャッフルの最小化: シャッフルはデータの移動に関するコストが高いため、可能な限り repartition() や coalesce() を使ってシャッフルを減らすことが重要です。特に、大規模なデータを扱う場合、repartition() の使いすぎに注意が必要です。
  • 最適化方法② CoalescedShuffleRead: シャッフル後にパーティション数が多すぎる場合、coalesce() を利用してパーティションを減らし、シャッフルの回数を削減できます。
    LocalShuffleReader を使用して、ローカルシャッフルを行うことで、リモートノードへのアクセスを減らすことが可能です。

3. UDF(ユーザー定義関数)の最適化

  • 問題: UDF がパフォーマンスを低下させている場合
  • ブロック: PythonUDF
  • 最適化方法① UDF の回避: UDF は Catalyst Optimizer の最適化を受けられないため、できるだけ利用を避け、Spark の組み込み関数(fn)を使うようにしましょう。
  • 最適化方法② pandas_udf の利用: pandas_udf を利用することで、Spark は Pandas の並列処理を活用でき、パフォーマンスが向上する場合があります。

4. 集約処理の最適化

  • 問題: 集約処理が遅い場合
  • ブロック: HashAggregateExec、SortAggregateExec
  • 最適化方法① HashAggregateExec vs SortAggregateExec: 集約処理の方法としてハッシュ集約とソート集約があります。データサイズやパーティションの分布に応じて、最適な方法を選ぶことが重要です。
  • 最適化方法② 集約の事前フィルタリング: 集約を行う前に filter() や project() で必要ないデータを削除して、集約処理を効率化します。

5. データのスキャンと読み込みの最適化

  • 問題: ファイルの読み込み処理が遅い場合
  • ブロック: FileSourceScanExec、ParquetFileScan、JDBCRelation
  • 最適化方法① パーティションプルーニング: parquet や ORC などのカラム型ファイルの場合、必要なカラムだけを読み込むことで、I/O コストを削減できます。これを プルーニング と呼びます。
  • 最適化方法② 分散処理の最適化: repartition() や coalesce() を使って、ファイル読み込みの際のパーティション数を調整することで、処理時間を短縮できます。

6. メモリ使用の最適化

  • 問題: メモリ不足やガーベジコレクションの問題
  • ブロック: PythonUDF、Repartition、Aggregate
  • 最適化方法① メモリ制御: repartition() を使ってメモリ分散を調整したり、必要に応じて persist() や cache() を使って計算結果をメモリにキャッシュすることが重要です。
  • 最適化方法② スパークのメモリ設定: spark.executor.memory や spark.driver.memory の設定を調整し、より多くのメモリを使って処理を効率化する。

7. Adaptive Query Execution (AQE) の活用

  • 問題: 実行計画が動的に最適化されない場合
  • ブロック: AdaptiveSparkPlanExec、AdaptiveExecution
  • 最適化方法① AQE の有効化: AQE を有効にすることで、クエリの実行時に最適なシャッフルパーティション数やジョイン戦略を動的に変更することができます。これにより、実行計画が改善され、パフォーマンスが向上します。
  • 最適化方法②スキュー処理の自動調整: AQE によってスキュー(データの偏り)が自動で最適化され、パフォーマンスの問題を解決できます。

8. スカラサブクエリと非効率なクエリの最適化

  • 問題: 非効率なサブクエリ処理
  • ブロック: ScalarSubquery、SubqueryAlias
  • 最適化方法① サブクエリの展開: スカラサブクエリや非効率なサブクエリを展開し、必要のない計算を削減することで、パフォーマンスが向上します。

Discussion