🦁

GCS に配置された gzip 圧縮ファイルをストリーミング処理する方法

2024/08/05に公開

はじめに

Google Cloud Storage (GCS) に保存されている gzip 圧縮ファイルを BigQuery などの別のシステムで利用する場合、ファイルを編集する必要があることがあります。
例えば、外部システム (例: ログ収集システム) から出力されたファイルのフォーマットを BigQuery に合うように変更したり、個人情報などの秘匿情報をマスクする必要があるかもしれません。
この記事は、Golang を使用してこの問題に効率的に対処する方法を紹介します。

最近、 GCS に配置された gzip 圧縮ファイルの編集方法について相談を何度か受けました。
相談相手が検討していた方法は以下の2つの方法のどちらかでした。

  1. ファイル全体をローカルのストレージにダウンロードして保存してから処理する。
  2. ファイル全体を一度メモリに展開してから処理する。

しかし、これらの方法はファイルが大きい場合にリソースを大量に消費してしまいます。そこで、すべてを一度にダウンロードせずにストリーミングで処理する方法を紹介します。

概要とアプローチ

本記事では、以下の操作を実現する方法を説明します:

  1. GCS から gzip 圧縮されたファイルをストリームで展開する。
  2. ファイルの中身を加工する。
  3. 再度 gzip 圧縮して GCS にアップロードする。

具体的には、 Golang を使用して特定の文字列 ("hoge") を別の文字列 ("fuga") に置換する例を通じて説明します。このアプローチにより、メモリ使用量を抑えつつ、大規模なファイルも効率的に処理することが可能になります。

実装

まず、必要なライブラリをインポートし、 GCS クライアントを初期化します。この実装には cloud.google.com/go/storage ライブラリを使用します。
以下が実際の Golang コードです:

package main

import (
	"bufio"
	"compress/gzip"
	"context"
	"log"
	"strings"

	"cloud.google.com/go/storage"
)

func main() {
	ctx := context.Background()

	// クライアントの初期化
	client, err := storage.NewClient(ctx)
	if err != nil {
		log.Fatalf("Failed to create client: %v", err)
	}
	defer client.Close()

	bucketName := "your-bucket-name"
	srcObjectName := "path/to/source-file.gz"
	dstObjectName := "path/to/destination-file.gz"

	err = processGCSFile(ctx, client, bucketName, srcObjectName, dstObjectName)
	if err != nil {
		log.Fatalf("Failed to process file: %v", err)
	}
}

func processGCSFile(ctx context.Context, client *storage.Client, bucketName, srcObjectName, dstObjectName string) error {
	// GCS からファイルを読み込み
	bucket := client.Bucket(bucketName)
	srcObject := bucket.Object(srcObjectName)
	reader, err := srcObject.NewReader(ctx)
	if err != nil {
		return err
	}
	defer reader.Close()

	gzipReader, err := gzip.NewReader(reader)
	if err != nil {
		return err
	}
	defer gzipReader.Close()

	// GCS に書き出すためのwriterを作成
	dstObject := bucket.Object(dstObjectName)
	writerGCS := dstObject.NewWriter(ctx)
	defer writerGCS.Close()

	gzipWriter := gzip.NewWriter(writerGCS)
	defer gzipWriter.Close()

	scanner := bufio.NewScanner(gzipReader)
	writer := bufio.NewWriter(gzipWriter)

	// 各行を読み込んで変更を加え、ストリーミングで書き出し
	for scanner.Scan() {
		line := scanner.Text()
		modifiedLine := strings.ReplaceAll(line, "hoge", "fuga")
		_, err := writer.WriteString(modifiedLine + "\n")
		if err != nil {
			return err
		}
	}
	if err := scanner.Err(); err != nil {
		return err
	}

	writer.Flush()

	return nil
}

コードの詳細説明

コードはサンプルのため、あまり綺麗なコードになっていませんが、以下のような処理を実施しています。

  1. GCSクライアントの初期化:

    • storage.NewClient で GCS クライアントを初期化します。
  2. ファイルの読み込み:

    • bucket.Object(srcObjectName).NewReader(ctx) で GCS から gzip 圧縮されたファイルを読み込みます。
    • gzip.NewReader で gzip 圧縮を解凍します。
  3. データの変換とストリーミング書き出し:

    • bufio.Scanner で行ごとに読み込み、 strings.ReplaceAll で "hoge" を "fuga" に置換します。
    • 変更されたデータを直接 GCS にストリーミングで書き出します。

このアプローチにより、ファイル全体をメモリに読み込むことなく、効率的に処理を行うことができます。

注意点

大規模なファイルを処理する場合、 bufio.Scanner のデフォルトのバッファサイズ(64KB)が小さすぎる可能性があります。
その場合は、 scanner.Buffer()を使用してバッファサイズを調整する必要があります。

Tips

今回紹介したような流れで処理することで他の言語でも同じように実装することが出来ると思います。
例えば、 Bash などのシェルで実施する場合は以下のように実施します。

gsutil cp gs://your-bucket-name/path/to/source-file.gz - | gzip -d | sed 's/hoge/fuga/' | gzip | gsutil cp - gs://your-bucket-name/path/to/destination-file.gz

まとめ

この方法を用いることで、ファイル全体をローカルストレージやメモリに展開することなく、ストリーミングで効率的に gzip 圧縮ファイルを編集し、再アップロードすることができます。大規模なデータ処理や、リソースが制限された環境での作業に特に有効です。

この手法の主な利点は以下の通りです:

  • メモリ使用量の削減
  • 処理時間の短縮(特に大規模ファイルの場合)
  • スケーラビリティの向上

ただし、複雑な処理や全体のデータに依存する操作には適さない場合があるため、ユースケースに応じて適切な方法を選択することが重要です。

このブログ記事が、 GCS 上の gzip 圧縮ファイルの編集に関するあなたの問題解決に役立てば幸いです。

※ データ分析、データ基盤構築、及び AI 活用に関するご相談は、以下よりお気軽にお問い合わせください。
お問い合わせフォーム

Hogetic Lab

Discussion