Zenn
🌏

Databricks Certified Developer for Spark 資格対策

に公開

Databricks 認定資格「Databricks Certified Developer for Apache Spark」は、Databricks Lakehouse Platform 上での Spark アプリケーション開発スキルを証明する公式資格です。本ブログでは、出題ドメインの全体像と各領域で重点的に学習すべきポイントを整理し、資格取得に向けた学習ガイドとして提供します。

  • 2025年3月時点の情報で整理しています。
  • こちらの内容で試験に合格できました(正解率:80%以上でした)。

資格のドメイン構成

  1. Apache Spark アーキテクチャの概念(体系的整理)(※)
  2. Apache Spark アーキテクチャ アプリケーション(用途・設計観点別)(※)
  3. Apache Spark DataFrame API アプリケーション(※)
  4. Spark SQL / Catalyst 最適化構文
  5. Spark Streaming API 活用

※公には1~3が資格のドメイン構成となっています。

1.Apache Spark アーキテクチャの概念(体系的整理)

【1】Spark の基本コンポーネント

コンポーネント 役割
Driver Program ユーザコードのエントリーポイント。DAG(有向非巡回グラフ)生成、タスクのスケジューリング
Spark Context クラスタとの接続口。リソース要求、ジョブ送信、RDD/DataFrame 操作等を管理
Executor タスクの実行単位。計算と中間データの保存、データのキャッシュ処理などを行う
Cluster Manager リソースの割り当て(YARN, Mesos, Kubernetes, Standalone)

【2】Spark の実行モデルとデータ処理の流れ

実行フロー
①アクションの呼び出しで実行がトリガーされる(Spark は遅延評価)
②DAG(有向非巡回グラフ)が構築され、ステージに分割
③各ステージ内でタスク(最小処理単位)が生成
④Driver → Executor にタスクが送信
⑤結果が Driver に返却される or データソースに保存

用語 説明
Transformation map, filter など。遅延評価で、DAG 上に論理的変換として記録
Action count, collect, show など。実際の計算をトリガー
Stage 論理DAG を narrow/wide 変換に応じて分割した単位。内部は複数タスク
Task パーティション単位に Executor に送られる処理単位

【3】Spark のリソース管理(デプロイメント・モード)

モード Driver の場所 利用ケース
Client モード 提出元ノード(Edge) 開発・テスト、インタラクティブ処理
Cluster モード クラスタ内部(Worker) 本番バッチ処理、スケーラビリティが必要な場合

【4】RDD・DataFrame・Dataset の概念的な違い(抽象度の違い)

概念 特徴 利点
RDD 低レベル API。型安全、分散制御が手動 柔軟だが手続き的
DataFrame RDD の構造化版。Spark SQL で最適化される Catalyst オプティマイザが有効
Dataset JVM 言語用の型安全な DataFrame(Scala/Java) 型安全 + 最適化の両立

【5】Spark のデータ処理と最適化

処理要素 内容
Catalyst Optimizer DataFrame/Dataset に対する論理・物理計画の最適化
Tungsten Engine メモリ管理とコード生成による実行速度の向上
AQE(Adaptive Query Execution) 実行時の統計情報に基づいて Join戦略やパーティション数などを動的変更

【6】シャッフル処理とパーティション制御

概念 説明
Shuffle データの再分配(例:groupByKey, join)で発生し、I/O コストが高い
Narrow Transformation 入力データが単一パーティションからのみ発生(例:map, filter)
Wide Transformation 複数パーティションの再結合が必要(例:groupByKey)
Partition Control repartition, coalesce によるパーティション数変更

【7】データ永続化(Storage Levels)

永続化モード 内容
MEMORY_ONLY メモリに格納。入りきらなければ再計算
MEMORY_AND_DISK メモリ不足時はディスクにスピル
MEMORY_ONLY_SER シリアライズしてメモリに格納(Java のみ)
MEMORY_AND_DISK_SER シリアライズしてメモリ+ディスク
DISK_ONLY ディスクのみに格納

総括

Apache Spark のアーキテクチャは、以下のように整理できます。

  • 論理構成(Driver / Executor / Cluster Manager)
  • 実行モデル(DAG → Stage → Task)
  • 抽象 API 層(RDD / DataFrame / Dataset)
  • 最適化エンジン(Catalyst, Tungsten, AQE)
  • シャッフルとパーティション戦略
  • 永続化・メモリ管理

2.Apache Spark アーキテクチャ アプリケーション(用途・設計観点別)

【1】基本用途カテゴリ(主要ユースケース)

アプリケーション領域 内容例・適用技術 主な特徴・設計観点
バッチ処理 ETL、ログ集計、ジョイン、ソートなど 高スループット、シャッフル制御、キャッシュ活用
ストリーム処理 IoTデータ、センサーデータ、クリックストリームなど マイクロバッチ (Structured Streaming)、低レイテンシ
インタラクティブ分析 BIツール連携(Databricks, Zeppelin, Jupyter) 遅延評価 + キャッシュで高速反応性
機械学習 MLlib、pandas-on-Spark 分散データ処理を活用したスケーラブル学習
グラフ処理 GraphFrames、Pregel API ノード・エッジ間の繰り返し処理

【2】アーキテクチャに基づく設計パターン

データ結合・統合処理パターン

シナリオ 推奨 API / 最適化方法
小さな表との結合(片方が小さい) broadcast join を明示指定 → メモリ上で高速化
列名が一致しない DataFrame の統合 unionByName()(列名で一致) or union()(順序で一致)
RDD ベースのカスタム処理 mapPartitions, flatMap, aggregateByKey などを活用

データ加工・抽出パターン

シナリオ 推奨手法/API
条件付きの絞り込み・選択 filter, where, select, drop, withColumn 等
型変換(例:int → string) withColumn("col", col("col").cast("string"))
日付処理、timestamp の文字列変換 from_unixtime, unix_timestamp, date_format など使用

グループ化・集約・ピボット処理

処理種類 使用 API / アーキテクチャ的配慮
基本的な集約 groupBy().agg() / count, avg, min, max など
ピボット集計 groupBy().pivot().agg()
シャッフル発生の最適化 AQE(自動の再パーティション)+ spark.sql.shuffle.partitions

データ永続化・ストレージ連携

操作内容 API・設計パターン
パーティション単位で書き出し .write.partitionBy("col").parquet(path)(※ partitionOn() は誤り)
スキーマの明示またはマージ .schema(schema).load(path) / .option("mergeSchema", "true")
不要行の削除・ファイル整形 .drop(), .repartition(), .coalesce() など活用

【3】モード・構成選択によるアプリケーション最適化

運用構成 特徴 / 活用されるアプリケーションパターン
Client モード インタラクティブ分析(UI 経由)、即時応答が必要なユースケース
Cluster モード バッチ処理やスケーラブルな分析ワークロード、運用安定性が重要なケース
Standalone モード 単一フレームワークでの構築。Spark アプリだけで閉じる小規模構成
YARN / K8s 複数ジョブ管理、リソース分離、Hadoop連携などが必要なエンタープライズ用途

【4】最適化戦略の実装ポイント

対象 手法
シャッフル最適化 AQE + spark.sql.shuffle.partitions 設定変更
メモリ管理 persist()/cache() の適用レベル、Broadcast、パーティション調整
大規模ジョインの高速化 Broadcast Hint (broadcast(df))、Partition Skew 対応
アクション最適化 Action トリガ時にだけ実行されるよう遅延評価設計、AQE 発動

総括

Apache Spark を使ったアプリケーション開発では、以下を軸にアーキテクチャの知識を適用することが重要です。

  • アプリケーション層からの設計観点
    • ワークロードの特性(バッチ/ストリーミング/分析)
    • ジョブの並列性、再利用性(キャッシュ・永続化)
    • フォーマット/スキーマ統一戦略(Parquet + schema)
  • 処理構成のアーキテクチャ観点
    • ジョブの DAG 分解とシャッフルポイントの把握
    • Driver / Executor の配置戦略(Client/Cluster)
    • Catalyst / AQE を生かす DataFrame 設計

3.Apache Spark DataFrame API アプリケーション

(=DataFrameベースで実装される処理パターンとそのAPI設計)

【1】基本操作カテゴリ(データ抽出・変形)

処理内容 API例・構文 備考
列の選択 select("col1", "col2") col()関数併用で表現力UP
行のフィルタ filter(col("value") > 100)
where(...)
filterとwhereは同等
列の追加・変換 withColumn("newCol", expr) 型変換や条件列もここで処理
列の削除 drop("colName") 不要列の削除に使う
列の名前変更 withColumnRenamed("old", "new") 複数回チェーンで複数列のリネーム対応
型変換 cast("int"), cast("string") withColumnと併用

【2】集約・統計処理(groupBy・agg)

処理内容 API例 備考
グループ単位の集計 groupBy("key").agg(avg("col").alias("avgVal")) count, sum, min, maxなど
単一列のカウント groupBy("col").count() 最も基本的な集計
複数集計 .agg(max("amount").alias("max"),
min("amount").alias("min"))
複数列同時に集計
ピボット処理 .groupBy("key").pivot("col", [val1, val2]).agg(...) 分類データを列に展開

【3】結合・統合処理(join・union)

処理内容 API例 備考
等値結合 .join(df2, df1.key == df2.key) デフォルトでinner join
外部結合/片側結合 .join(df2, on="key",
how="left_outer")
left, right, outer, left_semi, left_anti など
名前付き列での縦連結 .unionByName(df2) 列名に基づいて連結、欠損列にはnullを挿入
位置での縦連結(列名無視) .union(df2) 両者のスキーマ(列順)が一致している必要がある
ブロードキャスト結合 .join(broadcast(df2), "key") 片方が小さい場合に分散せず各ノードに展開され高速化

【4】UDF(ユーザー定義関数)の適用

処理内容 API例 備考
関数登録・SQLで使用 spark.udf.register("FUNC", myfunc)
SELECT FUNC(col)
SQL実行文内で使用可能
DataFrame APIで使用 df.withColumn("newCol",
udfFunc("colName"))
型指定が必要なことに注意
foreachで集約・カウント acc = spark.sparkContext.accumulator(0)
df.foreach(...)
非集計操作に向く(mapでは使えない)

【5】ファイルの読み書き

処理内容 API例 備考
CSVファイルの読み込み spark.read.csv("path", comment="#") コメント行除外、ヘッダ指定などオプション可能
JSONファイルの読み込み spark.read.json("path") キーマ指定可能
Parquet + スキーママージ spark.read.option("mergeSchema",
"true").parquet("path")
パーティションでスキーマ差異がある場合
パーティション書き出し df.write.partitionBy("col").parquet("path") 正しいAPIはpartitionBy
(partitionOnは×)

【6】時間/日付処理・フォーマット変換

処理内容 API例 備考
Unixタイム変換 withColumn("ts", from_unixtime("col")) yyyy-MM-dd HH:mm:ss形式など
日付の文字列変換 date_format("col", "MMM dd (EEEE)") Jan 20 (Thursday)
角度変換と数値処理 round(cos(degrees("value")), 2) 値を角度変換し、cos計算後に丸める

【7】サンプリング・分割・型チェックなど

処理内容 API例 備考
サンプリング .sample(fraction=0.1, seed=1234) 再現性のある部分抽出に最適
スキーマ確認 .printSchema() デバッグや開発時の確認に便利
列のデータ型確認 df.select("col").printSchema() 特定列の型だけ確認可能
型明示で作成 spark.createDataFrame(dataList, FloatType()) 明示的に型指定してDF作成

総括

設計観点 実装パターン例
柔軟な列操作 withColumn, cast, drop, rename, select で変換・整形
集約処理と可視化設計 groupBy + agg, pivot, orderBy を活用して BI 向け集約表を生成
結合/連結戦略 join(等値・外部・broadcast)と unionByName で動的な統合処理
形式変換・外部連携 read.json/csv/parquet + schema指定 + write.partitionBy()でデータIO整備
UDF / foreach 活用 複雑な処理・累積集計では UDF, accumulator, foreach による非標準処理も検討
構造管理と型確認 printSchema(), cast(), dropDuplicates(), sample() などでデータ整備&健全性チェック

4.Spark SQL / Catalyst 最適化構文

(=SQL文の構文理解 + Catalystによる内部最適化を意識した設計)

【1】Spark SQL の基本構文と実用記法

区分 主な構文例 内容と備考
テーブル参照 SELECT * FROM tableName
SELECT col1, col2 FROM df_view
createOrReplaceTempViewで DataFrame を仮想テーブル化
条件フィルタ WHERE col > 100
AND, OR, IN, LIKE, IS NOT NULL
標準SQL記法
集計・グルーピング GROUP BY key
COUNT(*), MAX(col), AVG(col)
HAVING句も使える
結合 INNER JOIN, LEFT JOIN, RIGHT JOIN, FULL OUTER JOIN 結合キーの明示は必須
並び替え ORDER BY col ASC, DESC NULLS FIRST / LASTでnull制御
制限・抽出 LIMIT 10, DISTINCT, DROP DUPLICATES PySparkの.dropDuplicates()相当も利用可能
関数利用 ROUND(...), COS(...), DATE_FORMAT(...) PySpark SQL関数と完全互換

【2】SQLとの連携実装パターン(PySparkとの橋渡し)

実装用途 PySpark記法例
DataFrame → SQL化 df.createOrReplaceTempView("myTable")
SQL実行 spark.sql("SELECT ... FROM myTable")
UDF登録 + SQLで呼出 spark.udf.register("FUNC", udf_func)
spark.sql("SELECT FUNC(col) FROM myTable")

【3】Catalyst Optimizer の最適化対象と構文上の考慮

カテゴリ 最適化内容 Spark SQLでの意識点
①プッシュダウン WHERE 句や LIMIT 条件を早期に適用 フィルタ・LIMITは早めに指定することで物理実行プランが改善される
②プロジェクション最適化 不要な列を除外する SELECT *を避けて明示列を指定するとパフォーマンス改善
③常にSQL→論理計画に変換 Catalyst内部でLogicalPlan生成 → 解析 → 最適化 DataFrame APIでもSQLでも裏側はCatalystで同様に最適化される
④統計情報による最適化 ANALYZE TABLE やファイルサイズ情報による最適join順 Adaptive Query Execution (AQE)と組み合わせると効果大
⑤条件式の簡約 不要なブール式の除外や整理 WHERE col > 5 OR col > 10→WHERE col > 5 に簡約など

【4】Adaptive Query Execution(AQE)との関係

内容 Spark SQLにおける意義
シャッフルパーティションの自動調整 実行時のデータサイズを見てspark.sql.shuffle.partitions を動的調整
ブロードキャスト結合の自動選択 小さなテーブルを自動的にbroadcastする
スキュー除去(Data Skew) 不均衡な分割をAQEが調整する
SQLにも適用される spark.sql.adaptive.enabled=true でSQL実行にも有効

【5】SQL/構文関連の注意すべき非標準API

誤用API 正しい代替 備考
sc.union(...) df1.union(df2) または
unionByName(df2)
SparkContextに対してDataFrame渡すのは不適切
partitionOn() partitionBy() DataFrameWriterでの書き出し時に使用
read().json(...) read.json(...) read()は関数ではない
"spark.schema(...)" df.printSchema() スキーマ表示はDataFrameメソッドで

総括

観点 推奨アプローチ 留意点
列・条件明示 SELECT col1, col2 + WHERE句 不要列・遅延フィルタは避ける
仮想テーブル化 .createOrReplaceTempView() SQLとDataFrame APIの橋渡しに活用
結合効率 SQLでも broadcast()ヒント適用可能 AQEとの組み合わせで効果最大化
UDF活用 必要最小限にとどめる Catalystによる最適化が効かなくなる可能性あり
統一スキーマ設計 mergeSchema活用 + SQLで一貫処理 マルチパーティション読み込みに有効

5.Spark Streaming API 活用

(=マイクロバッチ処理の基本構成・各種演算・ストレージ出力・設計留意点)

【1】Spark Streaming の基本構成

要素 説明
入力ソース Kafka、ソケット、ファイルディレクトリ、Kinesis など
マイクロバッチ構造 DStream / Structured Streaming によって時間単位で処理(例: 2秒ごと)
出力シンク コンソール、ファイル(Parquet, JSON 等)、Kafka、RDB など
トリガー Trigger.ProcessingTime("5 seconds") や Trigger.Once() などで制御
障害耐性 チェックポイントディレクトリを指定して状態の復元を可能にする

【2】Structured Streaming API の主要メソッド

API名 説明
readStream ストリーミングソースからの入力定義 spark.readStream.format("kafka")...
writeStream 出力ストリームの定義 df.writeStream.format("console")...
trigger 実行タイミングの定義 .trigger(Trigger.ProcessingTime("10 seconds"))
outputMode 出力モード(append / update / complete) .outputMode("append")
option Kafkaのオプション、checkpointLocation、パーティション制御など .option("checkpointLocation",
"/tmp/check")
start /
awaitTermination
ストリームの起動・停止制御 query.start(); query.awaitTermination()

【3】出力モード(outputMode)の選択肢と用途

モード名 説明 適用例
append 新規追加行のみ出力 ファイル出力、Kafka出力など
update 更新された集計値のみ出力 agg()等で中間更新される場合
complete 全体の集計状態を出力 状態を全件保持する集計(例:groupBy後のカウント)

【4】ウィンドウ処理と watermark(時間遅延処理)

機能 APIと意味
ウィンドウ関数 groupBy(window(col("timestamp"), "10 minutes"))
水位設定(遅延データ処理) .withWatermark("timestamp", "5 minutes")

【5】状態管理(stateful operation)

操作 説明
mapGroupsWithState Key単位で状態を保持しながら処理
flatMapGroupsWithState 複数行の出力も可能な状態管理処理
状態更新の保持要件 チェックポイントが必須。ステートが大きくなるとメモリ圧迫も考慮

【6】Kafkaとの統合例(典型)

Kafkaとの統合例

df = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "input_topic")
.load()

decoded_df = df.selectExpr("CAST(value AS STRING)")

query = decoded_df.writeStream
.format("console")
.outputMode("append")
.start()

【7】チェックポイント(失敗復旧)

設定箇所 .writeStream.option("checkpointLocation", "/path/to/checkpoint")
効果 ジョブ失敗時にも処理を再開可能にするための状態記録

【8】開発・設計時の実践ポイント

設計観点 推奨
レイテンシ設計 バッチ間隔は use case に応じて選定(秒単位 or 分単位)
遅延データ処理 watermark を指定しないと重複・欠損が発生し得る
出力先の制約確認 Kafka / Delta Lake などで制限される outputMode に注意
スキーマ整合性 構造化JSONなどでは schema = StructType(...) の定義が必要

総括

項目 チェックポイント
入力ソース Kafka, ファイル, ソケット など
スキーマ整備 readStream.schema(schema).json(...)
トリガー頻度 Trigger.ProcessingTime("5s") など指定必須
出力モード append, update, complete 使い分け
状態管理 group集計やstateful処理ならcheckpointも設計に含める
再起動対応 チェックポイントパスで復旧性を担保する設計
出力の最適化 書込形式(Parquet等)のpartitionByやmodeに注意

Discussion

ログインするとコメントできます