Open4

Google Cloud StorageのgzipをCloud Functionでチャンク毎に展開する

oxonoxon

前提

Google Cloud Storage上のgzipファイル(2 GB程度 / 展開後 10GB程度)をCloud Functionを使って展開したい

  • Cloud Storageのイベントトリガーでgzipファイルが置かれると関数が起動される
  • イベントトリガーなので9分以内に処理を終わらせる必要がある
  • httpトリガーにしたりCloud Run Jobsとかにすれば制限時間は伸ばせるが、元々がイベントトリガーで実装されてたので変更するのめんどくさい
  • Cloud Function (Gen2)のメモリのマックスが32 GBなので展開中のメモリにも気を使う必要がありそう
oxonoxon

通常実装

from typing import Any, List, Optional, Tuple
import gzip
from google.cloud import storage

def decompress_gzip(
    source_bucket_name: str,
    source_blob_name: str,
    destination_bucket_name: Optional[str] = None,
    decompressed_blob_name: Optional[str] = None,
)
    storage_client = storage.Client()
    source_bucket = storage_client.get_bucket(source_bucket_name)
    compress_blob = source_bucket.blob(source_blob_name)

    destination_bucket = storage_client.get_bucket(
        destination_bucket_name if destination_bucket_name else source_bucket_name
    )

    if decompressed_blob_name is None:
        decompressed_blob_name = source_blob_name.rstrip('.gz')  # .gzを除去
    decompressed_blob = destination_bucket.blob(decompressed_blob_name)

    with decompressed_blob.open("wb") as decompressed_file:
        with compress_blob.open("rb") as compressed_file:
            decompressed_file.write(gzip.decompress(compressed_file.read()))

これだとメモリが32 GBでも足りなかった。
(事前にGoogle Clab ProのNotebook上で試したときは〜20 GB程度で問題なかったのだが、、、)

一度に展開するのは無理そうなのでチャンク毎に展開→ Cloud Storageに書き込み ができるようにした。↓

oxonoxon

チャンク毎の処理に変更

from typing import Any, List, Optional, Tuple
import gzip
from google.cloud import storage

def decompress_large_gzip(
    source_bucket_name: str,
    source_blob_name: str,
    destination_bucket_name: Optional[str] = None,
    decompressed_blob_name: Optional[str] = None,
    chunk_size: int = 256 * 1024 * 1024,
) -> Tuple[str, str]:
    """decompress gzip file and upload

    Args:
        source_bucket_name(str): ソースバケット名
        source_blob_name(str): ソースバケットのファイル名
        destination_bucket_name(str, optional): 展開先バケット名(未指定の場合はソースバケット)
        decompressed_blob_name(str, optional): 展開後のファイル名(未指定の場合はソースファイル名から.gzを除去)
        chunk_size(int): チャンクサイズ(64-256MB 推奨)
    """
    source_bucket = storage_client.get_bucket(source_bucket_name)
    compress_blob = source_bucket.blob(source_blob_name)

    destination_bucket = storage_client.get_bucket(
        destination_bucket_name if destination_bucket_name else source_bucket_name
    )

    if decompressed_blob_name is None:
        decompressed_blob_name = source_blob_name.rstrip('.gz')  # .gzを除去
    decompressed_blob = destination_bucket.blob(decompressed_blob_name)

    # 圧縮ファイルをチャンク毎に読み込み、解凍しファイルに書き込み
    with compress_blob.open("rb") as compressed_file:
        with gzip.GzipFile(fileobj=compressed_file, mode='rb') as gzip_file:
            with decompressed_blob.open("wb") as decompressed_file:
                while True:
                    chunk = gzip_file.read(chunk_size)
                    if not chunk:
                        break
                    decompressed_file.write(chunk)

    return destination_bucket.name, decompressed_blob_name

この実装でメモリ使用量を〜2 GB程度にまで落とすことができた。
また実行時間も問題ない(4 min 以内程度なので9 minの制限は余裕)ことも確認。

oxonoxon

3重withブロックの中身について

最後のwithブロックの中でBlob.write(chunk)しているのでwithブロック中はバッファに書き込まれ、ブロックが終了すると一気に対象の場所に書き込まれる…ようである。
当初上書きされないのか?と懸念を持っていたが実際の展開済ファイルを見てみると問題なく展開されていた。

まとめ

意外とこの手の微妙なさばきが見当たらなかったので投稿。
実行時間をもう少し切り詰めたかったらチャンクサイズを調整する余地があるかもしれない。
実際はCloud FunctionではなくCloud Run JobsとかComposerとかDataprocとかを使えるならそうしたほうがいいが、今回はめんどくささが勝ってしまったのでCloud Functionで頑張ることとした。