Databricks 認定資格「Databricks Certified Developer for Apache Spark」は、Databricks Lakehouse Platform 上での Spark アプリケーション開発スキルを証明する公式資格です。本ブログでは、出題ドメインの全体像と各領域で重点的に学習すべきポイントを整理し、資格取得に向けた学習ガイドとして提供します。
- 2025年3月時点の情報で整理しています。
- こちらの内容で試験に合格できました(正解率:80%以上でした)。
資格のドメイン構成
- Apache Spark アーキテクチャの概念(体系的整理)(※)
- Apache Spark アーキテクチャ アプリケーション(用途・設計観点別)(※)
- Apache Spark DataFrame API アプリケーション(※)
- Spark SQL / Catalyst 最適化構文
- Spark Streaming API 活用
※公には1~3が資格のドメイン構成となっています。
1.Apache Spark アーキテクチャの概念(体系的整理)
【1】Spark の基本コンポーネント
コンポーネント |
役割 |
Driver Program |
ユーザコードのエントリーポイント。DAG(有向非巡回グラフ)生成、タスクのスケジューリング |
Spark Context |
クラスタとの接続口。リソース要求、ジョブ送信、RDD/DataFrame 操作等を管理 |
Executor |
タスクの実行単位。計算と中間データの保存、データのキャッシュ処理などを行う |
Cluster Manager |
リソースの割り当て(YARN, Mesos, Kubernetes, Standalone) |
【2】Spark の実行モデルとデータ処理の流れ
実行フロー
①アクションの呼び出しで実行がトリガーされる(Spark は遅延評価)
②DAG(有向非巡回グラフ)が構築され、ステージに分割
③各ステージ内でタスク(最小処理単位)が生成
④Driver → Executor にタスクが送信
⑤結果が Driver に返却される or データソースに保存
用語 |
説明 |
Transformation |
map, filter など。遅延評価で、DAG 上に論理的変換として記録 |
Action |
count, collect, show など。実際の計算をトリガー |
Stage |
論理DAG を narrow/wide 変換に応じて分割した単位。内部は複数タスク |
Task |
パーティション単位に Executor に送られる処理単位 |
【3】Spark のリソース管理(デプロイメント・モード)
モード |
Driver の場所 |
利用ケース |
Client モード |
提出元ノード(Edge) |
開発・テスト、インタラクティブ処理 |
Cluster モード |
クラスタ内部(Worker) |
本番バッチ処理、スケーラビリティが必要な場合 |
【4】RDD・DataFrame・Dataset の概念的な違い(抽象度の違い)
概念 |
特徴 |
利点 |
RDD |
低レベル API。型安全、分散制御が手動 |
柔軟だが手続き的 |
DataFrame |
RDD の構造化版。Spark SQL で最適化される |
Catalyst オプティマイザが有効 |
Dataset |
JVM 言語用の型安全な DataFrame(Scala/Java) |
型安全 + 最適化の両立 |
【5】Spark のデータ処理と最適化
処理要素 |
内容 |
Catalyst Optimizer |
DataFrame/Dataset に対する論理・物理計画の最適化 |
Tungsten Engine |
メモリ管理とコード生成による実行速度の向上 |
AQE(Adaptive Query Execution) |
実行時の統計情報に基づいて Join戦略やパーティション数などを動的変更 |
【6】シャッフル処理とパーティション制御
概念 |
説明 |
Shuffle |
データの再分配(例:groupByKey, join)で発生し、I/O コストが高い |
Narrow Transformation |
入力データが単一パーティションからのみ発生(例:map, filter) |
Wide Transformation |
複数パーティションの再結合が必要(例:groupByKey) |
Partition Control |
repartition, coalesce によるパーティション数変更 |
【7】データ永続化(Storage Levels)
永続化モード |
内容 |
MEMORY_ONLY |
メモリに格納。入りきらなければ再計算 |
MEMORY_AND_DISK |
メモリ不足時はディスクにスピル |
MEMORY_ONLY_SER |
シリアライズしてメモリに格納(Java のみ) |
MEMORY_AND_DISK_SER |
シリアライズしてメモリ+ディスク |
DISK_ONLY |
ディスクのみに格納 |
総括
Apache Spark のアーキテクチャは、以下のように整理できます。
-
論理構成(Driver / Executor / Cluster Manager)
-
実行モデル(DAG → Stage → Task)
-
抽象 API 層(RDD / DataFrame / Dataset)
-
最適化エンジン(Catalyst, Tungsten, AQE)
- シャッフルとパーティション戦略
- 永続化・メモリ管理
2.Apache Spark アーキテクチャ アプリケーション(用途・設計観点別)
【1】基本用途カテゴリ(主要ユースケース)
アプリケーション領域 |
内容例・適用技術 |
主な特徴・設計観点 |
バッチ処理 |
ETL、ログ集計、ジョイン、ソートなど |
高スループット、シャッフル制御、キャッシュ活用 |
ストリーム処理 |
IoTデータ、センサーデータ、クリックストリームなど |
マイクロバッチ (Structured Streaming)、低レイテンシ |
インタラクティブ分析 |
BIツール連携(Databricks, Zeppelin, Jupyter) |
遅延評価 + キャッシュで高速反応性 |
機械学習 |
MLlib、pandas-on-Spark |
分散データ処理を活用したスケーラブル学習 |
グラフ処理 |
GraphFrames、Pregel API |
ノード・エッジ間の繰り返し処理 |
【2】アーキテクチャに基づく設計パターン
データ結合・統合処理パターン
シナリオ |
推奨 API / 最適化方法 |
小さな表との結合(片方が小さい) |
broadcast join を明示指定 → メモリ上で高速化 |
列名が一致しない DataFrame の統合 |
unionByName()(列名で一致) or union()(順序で一致) |
RDD ベースのカスタム処理 |
mapPartitions, flatMap, aggregateByKey などを活用 |
データ加工・抽出パターン
シナリオ |
推奨手法/API |
条件付きの絞り込み・選択 |
filter, where, select, drop, withColumn 等 |
型変換(例:int → string) |
withColumn("col", col("col").cast("string")) |
日付処理、timestamp の文字列変換 |
from_unixtime, unix_timestamp, date_format など使用 |
グループ化・集約・ピボット処理
処理種類 |
使用 API / アーキテクチャ的配慮 |
基本的な集約 |
groupBy().agg() / count, avg, min, max など |
ピボット集計 |
groupBy().pivot().agg() |
シャッフル発生の最適化 |
AQE(自動の再パーティション)+ spark.sql.shuffle.partitions |
データ永続化・ストレージ連携
操作内容 |
API・設計パターン |
パーティション単位で書き出し |
.write.partitionBy("col").parquet(path)(※ partitionOn() は誤り) |
スキーマの明示またはマージ |
.schema(schema).load(path) / .option("mergeSchema", "true") |
不要行の削除・ファイル整形 |
.drop(), .repartition(), .coalesce() など活用 |
【3】モード・構成選択によるアプリケーション最適化
運用構成 |
特徴 / 活用されるアプリケーションパターン |
Client モード |
インタラクティブ分析(UI 経由)、即時応答が必要なユースケース |
Cluster モード |
バッチ処理やスケーラブルな分析ワークロード、運用安定性が重要なケース |
Standalone モード |
単一フレームワークでの構築。Spark アプリだけで閉じる小規模構成 |
YARN / K8s |
複数ジョブ管理、リソース分離、Hadoop連携などが必要なエンタープライズ用途 |
【4】最適化戦略の実装ポイント
対象 |
手法 |
シャッフル最適化 |
AQE + spark.sql.shuffle.partitions 設定変更 |
メモリ管理 |
persist()/cache() の適用レベル、Broadcast、パーティション調整 |
大規模ジョインの高速化 |
Broadcast Hint (broadcast(df))、Partition Skew 対応 |
アクション最適化 |
Action トリガ時にだけ実行されるよう遅延評価設計、AQE 発動 |
総括
Apache Spark を使ったアプリケーション開発では、以下を軸にアーキテクチャの知識を適用することが重要です。
-
アプリケーション層からの設計観点
- ワークロードの特性(バッチ/ストリーミング/分析)
- ジョブの並列性、再利用性(キャッシュ・永続化)
- フォーマット/スキーマ統一戦略(Parquet + schema)
-
処理構成のアーキテクチャ観点
- ジョブの DAG 分解とシャッフルポイントの把握
- Driver / Executor の配置戦略(Client/Cluster)
- Catalyst / AQE を生かす DataFrame 設計
3.Apache Spark DataFrame API アプリケーション
(=DataFrameベースで実装される処理パターンとそのAPI設計)
【1】基本操作カテゴリ(データ抽出・変形)
処理内容 |
API例・構文 |
備考 |
列の選択 |
select("col1", "col2") |
col()関数併用で表現力UP |
行のフィルタ |
filter(col("value") > 100) where(...) |
filterとwhereは同等 |
列の追加・変換 |
withColumn("newCol", expr) |
型変換や条件列もここで処理 |
列の削除 |
drop("colName") |
不要列の削除に使う |
列の名前変更 |
withColumnRenamed("old", "new") |
複数回チェーンで複数列のリネーム対応 |
型変換 |
cast("int"), cast("string") |
withColumnと併用 |
【2】集約・統計処理(groupBy・agg)
処理内容 |
API例 |
備考 |
グループ単位の集計 |
groupBy("key").agg(avg("col").alias("avgVal")) |
count, sum, min, maxなど |
単一列のカウント |
groupBy("col").count() |
最も基本的な集計 |
複数集計 |
.agg(max("amount").alias("max"), min("amount").alias("min")) |
複数列同時に集計 |
ピボット処理 |
.groupBy("key").pivot("col", [val1, val2]).agg(...) |
分類データを列に展開 |
【3】結合・統合処理(join・union)
処理内容 |
API例 |
備考 |
等値結合 |
.join(df2, df1.key == df2.key) |
デフォルトでinner join |
外部結合/片側結合 |
.join(df2, on="key", how="left_outer") |
left, right, outer, left_semi, left_anti など |
名前付き列での縦連結 |
.unionByName(df2) |
列名に基づいて連結、欠損列にはnullを挿入 |
位置での縦連結(列名無視) |
.union(df2) |
両者のスキーマ(列順)が一致している必要がある |
ブロードキャスト結合 |
.join(broadcast(df2), "key") |
片方が小さい場合に分散せず各ノードに展開され高速化 |
【4】UDF(ユーザー定義関数)の適用
処理内容 |
API例 |
備考 |
関数登録・SQLで使用 |
spark.udf.register("FUNC", myfunc) SELECT FUNC(col) |
SQL実行文内で使用可能 |
DataFrame APIで使用 |
df.withColumn("newCol", udfFunc("colName")) |
型指定が必要なことに注意 |
foreachで集約・カウント |
acc = spark.sparkContext.accumulator(0) df.foreach(...) |
非集計操作に向く(mapでは使えない) |
【5】ファイルの読み書き
処理内容 |
API例 |
備考 |
CSVファイルの読み込み |
spark.read.csv("path", comment="#") |
コメント行除外、ヘッダ指定などオプション可能 |
JSONファイルの読み込み |
spark.read.json("path") |
キーマ指定可能 |
Parquet + スキーママージ |
spark.read.option("mergeSchema", "true").parquet("path") |
パーティションでスキーマ差異がある場合 |
パーティション書き出し |
df.write.partitionBy("col").parquet("path") |
正しいAPIはpartitionBy (partitionOnは×) |
【6】時間/日付処理・フォーマット変換
処理内容 |
API例 |
備考 |
Unixタイム変換 |
withColumn("ts", from_unixtime("col")) |
yyyy-MM-dd HH:mm:ss形式など |
日付の文字列変換 |
date_format("col", "MMM dd (EEEE)") |
Jan 20 (Thursday) |
角度変換と数値処理 |
round(cos(degrees("value")), 2) |
値を角度変換し、cos計算後に丸める |
【7】サンプリング・分割・型チェックなど
処理内容 |
API例 |
備考 |
サンプリング |
.sample(fraction=0.1, seed=1234) |
再現性のある部分抽出に最適 |
スキーマ確認 |
.printSchema() |
デバッグや開発時の確認に便利 |
列のデータ型確認 |
df.select("col").printSchema() |
特定列の型だけ確認可能 |
型明示で作成 |
spark.createDataFrame(dataList, FloatType()) |
明示的に型指定してDF作成 |
総括
設計観点 |
実装パターン例 |
柔軟な列操作 |
withColumn, cast, drop, rename, select で変換・整形 |
集約処理と可視化設計 |
groupBy + agg, pivot, orderBy を活用して BI 向け集約表を生成 |
結合/連結戦略 |
join(等値・外部・broadcast)と unionByName で動的な統合処理 |
形式変換・外部連携 |
read.json/csv/parquet + schema指定 + write.partitionBy()でデータIO整備 |
UDF / foreach 活用 |
複雑な処理・累積集計では UDF, accumulator, foreach による非標準処理も検討 |
構造管理と型確認 |
printSchema(), cast(), dropDuplicates(), sample() などでデータ整備&健全性チェック |
4.Spark SQL / Catalyst 最適化構文
(=SQL文の構文理解 + Catalystによる内部最適化を意識した設計)
【1】Spark SQL の基本構文と実用記法
区分 |
主な構文例 |
内容と備考 |
テーブル参照 |
SELECT * FROM tableName SELECT col1, col2 FROM df_view |
createOrReplaceTempViewで DataFrame を仮想テーブル化 |
条件フィルタ |
WHERE col > 100 AND, OR, IN, LIKE, IS NOT NULL |
標準SQL記法 |
集計・グルーピング |
GROUP BY key COUNT(*), MAX(col), AVG(col) |
HAVING句も使える |
結合 |
INNER JOIN, LEFT JOIN, RIGHT JOIN, FULL OUTER JOIN |
結合キーの明示は必須 |
並び替え |
ORDER BY col ASC, DESC |
NULLS FIRST / LASTでnull制御 |
制限・抽出 |
LIMIT 10, DISTINCT, DROP DUPLICATES |
PySparkの.dropDuplicates()相当も利用可能 |
関数利用 |
ROUND(...), COS(...), DATE_FORMAT(...) |
PySpark SQL関数と完全互換 |
【2】SQLとの連携実装パターン(PySparkとの橋渡し)
実装用途 |
PySpark記法例 |
DataFrame → SQL化 |
df.createOrReplaceTempView("myTable") |
SQL実行 |
spark.sql("SELECT ... FROM myTable") |
UDF登録 + SQLで呼出 |
spark.udf.register("FUNC", udf_func) spark.sql("SELECT FUNC(col) FROM myTable") |
【3】Catalyst Optimizer の最適化対象と構文上の考慮
カテゴリ |
最適化内容 |
Spark SQLでの意識点 |
①プッシュダウン |
WHERE 句や LIMIT 条件を早期に適用 |
フィルタ・LIMITは早めに指定することで物理実行プランが改善される |
②プロジェクション最適化 |
不要な列を除外する |
SELECT *を避けて明示列を指定するとパフォーマンス改善 |
③常にSQL→論理計画に変換 |
Catalyst内部でLogicalPlan生成 → 解析 → 最適化 |
DataFrame APIでもSQLでも裏側はCatalystで同様に最適化される |
④統計情報による最適化 |
ANALYZE TABLE やファイルサイズ情報による最適join順 |
Adaptive Query Execution (AQE)と組み合わせると効果大 |
⑤条件式の簡約 |
不要なブール式の除外や整理 |
WHERE col > 5 OR col > 10→WHERE col > 5 に簡約など |
【4】Adaptive Query Execution(AQE)との関係
内容 |
Spark SQLにおける意義 |
シャッフルパーティションの自動調整 |
実行時のデータサイズを見てspark.sql.shuffle.partitions を動的調整 |
ブロードキャスト結合の自動選択 |
小さなテーブルを自動的にbroadcastする |
スキュー除去(Data Skew) |
不均衡な分割をAQEが調整する |
SQLにも適用される |
spark.sql.adaptive.enabled=true でSQL実行にも有効 |
【5】SQL/構文関連の注意すべき非標準API
誤用API |
正しい代替 |
備考 |
sc.union(...) |
df1.union(df2) または unionByName(df2) |
SparkContextに対してDataFrame渡すのは不適切 |
partitionOn() |
partitionBy() |
DataFrameWriterでの書き出し時に使用 |
read().json(...) |
read.json(...) |
read()は関数ではない |
"spark.schema(...)" |
df.printSchema() |
スキーマ表示はDataFrameメソッドで |
総括
観点 |
推奨アプローチ |
留意点 |
列・条件明示 |
SELECT col1, col2 + WHERE句 |
不要列・遅延フィルタは避ける |
仮想テーブル化 |
.createOrReplaceTempView() |
SQLとDataFrame APIの橋渡しに活用 |
結合効率 |
SQLでも broadcast()ヒント適用可能 |
AQEとの組み合わせで効果最大化 |
UDF活用 |
必要最小限にとどめる |
Catalystによる最適化が効かなくなる可能性あり |
統一スキーマ設計 |
mergeSchema活用 + SQLで一貫処理 |
マルチパーティション読み込みに有効 |
5.Spark Streaming API 活用
(=マイクロバッチ処理の基本構成・各種演算・ストレージ出力・設計留意点)
【1】Spark Streaming の基本構成
要素 |
説明 |
入力ソース |
Kafka、ソケット、ファイルディレクトリ、Kinesis など |
マイクロバッチ構造 |
DStream / Structured Streaming によって時間単位で処理(例: 2秒ごと) |
出力シンク |
コンソール、ファイル(Parquet, JSON 等)、Kafka、RDB など |
トリガー |
Trigger.ProcessingTime("5 seconds") や Trigger.Once() などで制御 |
障害耐性 |
チェックポイントディレクトリを指定して状態の復元を可能にする |
【2】Structured Streaming API の主要メソッド
API名 |
説明 |
例 |
readStream |
ストリーミングソースからの入力定義 |
spark.readStream.format("kafka")... |
writeStream |
出力ストリームの定義 |
df.writeStream.format("console")... |
trigger |
実行タイミングの定義 |
.trigger(Trigger.ProcessingTime("10 seconds")) |
outputMode |
出力モード(append / update / complete) |
.outputMode("append") |
option |
Kafkaのオプション、checkpointLocation、パーティション制御など |
.option("checkpointLocation", "/tmp/check") |
start / awaitTermination |
ストリームの起動・停止制御 |
query.start(); query.awaitTermination() |
【3】出力モード(outputMode)の選択肢と用途
モード名 |
説明 |
適用例 |
append |
新規追加行のみ出力 |
ファイル出力、Kafka出力など |
update |
更新された集計値のみ出力 |
agg()等で中間更新される場合 |
complete |
全体の集計状態を出力 |
状態を全件保持する集計(例:groupBy後のカウント) |
【4】ウィンドウ処理と watermark(時間遅延処理)
機能 |
APIと意味 |
ウィンドウ関数 |
groupBy(window(col("timestamp"), "10 minutes")) |
水位設定(遅延データ処理) |
.withWatermark("timestamp", "5 minutes") |
【5】状態管理(stateful operation)
操作 |
説明 |
mapGroupsWithState |
Key単位で状態を保持しながら処理 |
flatMapGroupsWithState |
複数行の出力も可能な状態管理処理 |
状態更新の保持要件 |
チェックポイントが必須。ステートが大きくなるとメモリ圧迫も考慮 |
【6】Kafkaとの統合例(典型)
Kafkaとの統合例
df = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "input_topic")
.load()
decoded_df = df.selectExpr("CAST(value AS STRING)")
query = decoded_df.writeStream
.format("console")
.outputMode("append")
.start()
【7】チェックポイント(失敗復旧)
設定箇所 |
.writeStream.option("checkpointLocation", "/path/to/checkpoint") |
効果 |
ジョブ失敗時にも処理を再開可能にするための状態記録 |
【8】開発・設計時の実践ポイント
設計観点 |
推奨 |
レイテンシ設計 |
バッチ間隔は use case に応じて選定(秒単位 or 分単位) |
遅延データ処理 |
watermark を指定しないと重複・欠損が発生し得る |
出力先の制約確認 |
Kafka / Delta Lake などで制限される outputMode に注意 |
スキーマ整合性 |
構造化JSONなどでは schema = StructType(...) の定義が必要 |
総括
項目 |
チェックポイント |
入力ソース |
Kafka, ファイル, ソケット など |
スキーマ整備 |
readStream.schema(schema).json(...) |
トリガー頻度 |
Trigger.ProcessingTime("5s") など指定必須 |
出力モード |
append, update, complete 使い分け |
状態管理 |
group集計やstateful処理ならcheckpointも設計に含める |
再起動対応 |
チェックポイントパスで復旧性を担保する設計 |
出力の最適化 |
書込形式(Parquet等)のpartitionByやmodeに注意 |
Discussion