🔄

GolangでCPUパワーを使い切る - 並行処理と並列処理で画像バッチ処理を高速化

に公開

こんにちは!今回は、Goで実装した画像バッチ処理システムを題材に、並行処理と並列処理について掘り下げてみたいと思います。多くのコア数を持つCPUが当たり前になった今、そのパワーを余すことなく活用するテクニックを身につけましょう。

はじめに:並行処理と並列処理の違い

プログラミングにおける「並行処理(Concurrency)」と「並列処理(Parallelism)」は、しばしば混同されますが、実は異なる概念です。

  • 並行処理(Concurrency) - 「複数のタスクを同時に扱う設計」

    • 一度に複数の処理を行うように設計すること
    • 実際には一つのCPUでも可能(タスク切り替えで実現)
    • I/O待ち時間を効率的に利用できる
  • 並列処理(Parallelism) - 「複数のタスクを物理的に同時に実行」

    • 複数のCPUコアで文字通り同時に処理を実行
    • 計算負荷の高いタスクを分散させる
    • CPUコア数に比例して性能向上が期待できる

Rob Pike(Goの開発者)の有名な言葉を借りると、

"Concurrency is about dealing with lots of things at once. Parallelism is about doing lots of things at once."

この記事では、ただ概念を説明するだけでなく、実際に動作する画像処理プログラムを通して両者の使い方を解説します。

作成するプログラム:画像バッチプロセッサー

今回作成する画像バッチプロセッサーは、指定したディレクトリ内の画像ファイルを一括で処理するツールです。

ソースコードは、image-processorを参照してください。

主な機能

  • 指定ディレクトリ内の画像を再帰的に検索します。
  • リサイズ、グレースケール変換、明るさ・コントラスト調整などの処理を行います。
  • ウォーターマーク(著作権表示など)を追加します。
  • マルチコアを活用した並列処理による高速化を行います。

まずはプログラムの全体像を把握しましょう。

プログラムの全体構造

プログラムは以下の4つの主要部分で構成されています。

  1. ファイルスキャン(並行処理) - ディレクトリを再帰的に探索して画像を見つける。
  2. ワーカープール(並列処理) - 複数のゴルーチンで同時に画像処理を実行する。
  3. 画像処理 - 実際の変換処理(リサイズ・フィルタ適用など)を行う。
  4. 結果収集(並行処理) - 処理結果の収集とログ/進捗表示を行う。

これらが連携して効率的な処理を実現します。

ステップ1:必要なパッケージと基本構造

まずは必要なパッケージをインポートし、基本的なデータ構造を定義します。

package main

import (
	"flag"
	"fmt"
	"image"
	"image/jpeg"
	"image/png"
	"log"
	"os"
	"path/filepath"
	"runtime"
	"strings"
	"sync"
	"time"

	"github.com/disintegration/imaging"
	// 他必要なパッケージ
)

// 処理オプション
type ProcessOptions struct {
	Resize        bool
	Width         int
	Height        int
	Watermark     bool
	WatermarkText string
	Grayscale     bool
	Brightness    float64
	Contrast      float64
}

// 画像情報
type ImageFile struct {
	Path      string
	FileName  string
	Extension string
}

// 処理結果
type ProcessResult struct {
	OriginalPath string
	OutputPath   string
	Success      bool
	Error        error
	ProcessTime  time.Duration
}

まず、この構造にいくつか注目ポイントがあります。

  1. runtimeパッケージ:CPUコア数などシステム情報の取得に使用しています。
  2. syncパッケージ:並行処理を安全に行うためのプリミティブ(Mutex、WaitGroupなど)を提供します。
  3. 独自の構造体:画像ファイルや処理結果を表現するための型定義です。

ステップ2:並行処理によるファイルスキャン

ファイルシステムを探索して画像を見つけ出す部分から解説します。ここでは並行処理のテクニックが活きてきます。

// 画像ファイルをスキャンする関数(並行処理)
func scanImageFiles(inputDir string, logger *log.Logger, verbose bool) []ImageFile {
	var files []ImageFile
	var mutex sync.Mutex  // スライスの保護用
	var wg sync.WaitGroup

	// 入力ディレクトリを絶対パスに変換
	absInputDir, err := filepath.Abs(inputDir)
	if err != nil {
		logger.Printf("入力ディレクトリパスの変換に失敗しました: %s", err)
		return files
	}

	// 各ディレクトリを並行処理するための関数
	var scanDir func(dir string)
	scanDir = func(dir string) {
		defer wg.Done()

		entries, err := os.ReadDir(dir)
		if err != nil {
			logger.Printf("ディレクトリの読み取りに失敗しました: %s - %v", dir, err)
			return
		}

		// ディレクトリ内の各エントリを処理
		for _, entry := range entries {
			path := filepath.Join(dir, entry.Name())

			// ディレクトリの場合は再帰的に処理
			if entry.IsDir() {
				wg.Add(1)
				go scanDir(path)  // 新しいゴルーチンで並行処理
				continue
			}

			// ファイル拡張子のチェック
			ext := strings.ToLower(filepath.Ext(entry.Name()))
			if ext == ".jpg" || ext == ".jpeg" || ext == ".png" {
				file := ImageFile{
					Path:      path,
					FileName:  entry.Name(),
					Extension: ext,
				}

				mutex.Lock()
				files = append(files, file)
				mutex.Unlock()

				if verbose {
					logger.Printf("画像ファイル検出: %s", path)
				}
			}
		}
	}

	// 初期ディレクトリから開始
	wg.Add(1)
	scanDir(absInputDir)

	// すべてのゴルーチンの完了を待機
	wg.Wait()

	return files
}

並行処理の実装ポイント

  1. 再帰的なゴルーチン起動

    • サブディレクトリごとに新しいゴルーチンを起動し、同時に探索します。
    • ディレクトリ構造が深く複雑な場合に効果的です。
  2. sync.WaitGroup

    • 生成したすべてのゴルーチンの完了を待つための仕組みです。
    • wg.Add(1) でカウンタを増やし、wg.Done() で減らします。
    • wg.Wait() ですべてのゴルーチンの終了を待機します。
  3. ミューテックスによるデータ保護

    • mutex.Lock()mutex.Unlock() で共有データ(filesスライス)へのアクセスを保護します。
    • 複数のゴルーチンからの同時更新によるデータ破損を防止します。

ファイルスキャンに並行処理を使うメリットは、I/O待ち時間が多い操作だからです。ディスクからの読み取りは比較的遅い操作なので、一つのディレクトリを読み込んでいる間に別のゴルーチンで別ディレクトリの処理を進められます。

ステップ3:並列処理によるワーカープール

ファイルスキャンで検出した画像ファイルを処理するため、並列処理の仕組みを実装します。ここがプログラムの核心部分です。

// メイン関数内のワーカープール部分
// チャネルの準備
filesCh := make(chan ImageFile)
resultsCh := make(chan ProcessResult)

// ワーカープールの作成(並列処理)
var wg sync.WaitGroup
for i := 0; i < *workers; i++ {
	wg.Add(1)
	go func(id int) {
		defer wg.Done()
		processWorker(id, filesCh, resultsCh, absOutputDir, opts, logger, verbose)
	}(i + 1)
}

// 結果の収集(並行処理)
go func() {
	for result := range resultsCh {
		bar.Add(1)
		if verbose {
			if result.Success {
				logger.Printf("処理完了: %s -> %s (%s)",
					result.OriginalPath,
					result.OutputPath,
					result.ProcessTime)
			} else {
				logger.Printf("処理失敗: %s, エラー: %v (%s)",
					result.OriginalPath,
					result.Error,
					result.ProcessTime)
			}
		}
	}
}()

// 画像ファイルをチャネルに送信
go func() {
	for _, file := range files {
		filesCh <- file
	}
	close(filesCh)
}()

// すべてのワーカーの終了を待機
wg.Wait()
close(resultsCh)

実際のワーカー関数

// 画像処理ワーカー関数(並列処理)
func processWorker(id int, files <-chan ImageFile, results chan<- ProcessResult, 
                  outputDir string, opts ProcessOptions, logger *log.Logger, verbose bool) {
	for file := range files {
		startTime := time.Now()

		if verbose {
			logger.Printf("ワーカー %d: 処理開始 %s", id, file.Path)
		}

		// 出力パスの準備
		// ... パス関連の処理 ...

		// 画像処理の実行
		result := processImage(file, outputPath, opts)
		result.ProcessTime = time.Since(startTime)

		results <- result
	}
}

並列処理の実装ポイント

  1. ワーカープール

    • 指定数(通常はCPUコア数)のワーカーゴルーチンを事前に生成します。
    • 各ワーカーは独立して動作し、タスクをチャネルから取得します。
  2. チャネルによるタスク分配

    • filesCh - 処理すべきファイルを渡すチャネルです。
    • resultsCh - 処理結果を受け取るチャネルです。
    • チャネルは型付きの並行キューとして機能します。
  3. ブロッキングと負荷分散

    • チャネルの range 構文でタスクを待機します。
    • 処理の早いワーカーが自動的により多くのタスクを処理します(自然な負荷分散)。

ここで注目すべきは、チャネルを使った「パイプライン」のような構造です。あるゴルーチンがファイルをチャネルに送信し、複数のワーカーが並列でそれを処理し、別のゴルーチンが結果を収集します。これにより、データの流れがスムーズになり、CPUリソースを最大限に活用できます。

ステップ4:実際の画像処理関数

ワーカーから呼び出される画像処理関数の実装です。

// 画像処理関数
func processImage(file ImageFile, outputPath string, opts ProcessOptions) ProcessResult {
	// 画像の読み込み
	srcFile, err := os.Open(file.Path)
	if err != nil {
		return ProcessResult{
			OriginalPath: file.Path,
			OutputPath:   outputPath,
			Success:      false,
			Error:        fmt.Errorf("ファイルオープンエラー: %v", err),
		}
	}
	defer srcFile.Close()

	var img image.Image

	// 画像形式に基づく読み込み
	switch strings.ToLower(file.Extension) {
	case ".jpg", ".jpeg":
		img, err = jpeg.Decode(srcFile)
	case ".png":
		img, err = png.Decode(srcFile)
	default:
		return ProcessResult{
			OriginalPath: file.Path,
			OutputPath:   outputPath,
			Success:      false,
			Error:        fmt.Errorf("未対応の画像形式: %s", file.Extension),
		}
	}

	if err != nil {
		return ProcessResult{/* エラー結果 */}
	}

	// 画像処理の適用
	var processedImg image.Image = img

	// リサイズ
	if opts.Resize {
		processedImg = imaging.Resize(processedImg, opts.Width, opts.Height, imaging.Lanczos)
	}

	// グレースケール変換
	if opts.Grayscale {
		processedImg = imaging.Grayscale(processedImg)
	}

	// 明るさ調整
	if opts.Brightness != 0 {
		processedImg = imaging.AdjustBrightness(processedImg, opts.Brightness*100)
	}

	// コントラスト調整
	if opts.Contrast != 0 {
		processedImg = imaging.AdjustContrast(processedImg, opts.Contrast*100)
	}

	// ウォーターマーク追加
	if opts.Watermark {
		processedImg = addWatermark(processedImg, opts.WatermarkText)
	}

	// 処理済み画像の保存
	// ... 保存処理 ...

	return ProcessResult{
		OriginalPath: file.Path,
		OutputPath:   outputPath,
		Success:      true,
		Error:        nil,
	}
}

この関数は純粋な画像処理を行うもので、並行性や並列性の実装は含まれていません。しかし、複数のワーカーゴルーチンからこの関数が同時に呼び出されることで、実質的な並列処理が実現されます。

パフォーマンス検証:並列化の効果

実際に並列処理の効果を確認するため、異なるワーカー数での処理時間を比較してみましょう。以下は100枚の画像(各100MB)をリサイズする場合の例です。

ワーカー数 処理時間 速度向上率 平均処理時間/画像
1 (並列なし) 21.56秒 1.0x (基準) 245.0ms
4 7.98秒 2.7x 90.7ms
8 7.10秒 3.0x 80.8ms
12 6.84秒 3.2x 77.7ms

この結果から興味深い点が見えてきます。

  1. 並列化による大幅な高速化(4コアで2.7倍速)
  2. コア数増加に伴う性能向上の頭打ち
  3. 理想的な線形スケーリング(4倍のコアで4倍速)には届かない現実

なぜ理想的なスケーリングに届かないのでしょうか?それには以下の理由が考えられます。

  1. ディスクI/Oのボトルネック
  2. メモリ帯域の制限
  3. 並列処理のオーバーヘッド(スレッド管理や同期コスト)

実装上の注意点とベストプラクティス

Goで並行・並列プログラミングを行う際の注意点をまとめます。

1. 適切なワーカー数の選択

workers := flag.Int("workers", runtime.NumCPU(), "ワーカー数 (並列処理)")

基本的にはCPUコア数と同じにすることが推奨されますが、以下のような例外もあります。

  • I/O待ちが多い処理:CPU数より多めに設定しても効果的になる場合もある。
  • メモリ制約がある環境:少なめに設定してメモリ消費を抑制できる。

2. 共有リソースへのアクセス保護

複数のゴルーチンが同じデータにアクセスする場合、必ずミューテックスなどで保護します。

var mutex sync.Mutex

mutex.Lock()
// 共有データへのアクセス
mutex.Unlock()

3. 適切なエラーハンドリング

並行処理では特にエラー処理が重要です。

// ワーカーからのエラー結果も適切に収集
result := ProcessResult{
    OriginalPath: file.Path,
    OutputPath:   outputPath,
    Success:      false,
    Error:        err,  // エラー情報を保存
}
results <- result

4. コンテキストの活用

長時間実行される処理では、キャンセル可能な設計を検討します。

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

// コンテキストを利用したキャンセル可能な処理
select {
case <-ctx.Done():
    return ProcessResult{Error: ctx.Err()}
case <-time.After(100 * time.Millisecond):
    // 実際の処理
}

まとめ:並行処理と並列処理の使い分け

最後に、並行処理と並列処理の使い分けについて整理します。

並行処理を使うべき状況

  • I/O待ち時間が多い処理(ファイル読み書き、ネットワーク通信)
  • 非同期イベントの処理
  • UIの応答性を維持する必要がある場合

並列処理を使うべき状況

  • CPU負荷の高い計算処理(画像・動画処理、数値計算)
  • 大量データの処理(バッチ処理、データ変換)
  • 独立したタスクで分割可能な処理

理想的なアプローチ

多くの実用的なプログラムでは、並行処理と並列処理を組み合わせることで最高のパフォーマンスを発揮します。今回の画像処理プログラムでも、

  • 並行処理 - ファイルスキャン、結果収集、進捗表示
  • 並列処理 - 複数ワーカーによる画像処理

この組み合わせにより、I/O待ち時間を有効活用しつつ、CPU計算能力も最大限に引き出しています。

Go言語は「Concurrency is not Parallelism」の哲学のもと、これらを自然に表現できる言語設計になっています。チャネル、ゴルーチン、select文などの基本機能を使いこなせば、複雑な並行・並列処理も明瞭に実装できます。

ぜひ、この記事の内容を参考に、あなた自身のプログラムでも効率的な並行・並列処理を実装してみてください。多コアCPUの力を最大限に引き出し、より高速で効率的なプログラムを作れるようになるでしょう。

参考資料

  1. Go言語による並行処理 - https://go.dev/blog/concurrency-is-not-parallelism
  2. Effective Go - https://golang.org/doc/effective_go.html#concurrency
  3. Go Concurrency Patterns - https://talks.golang.org/2012/concurrency.slide

Discussion