Open4
Google Cloud StorageのgzipをCloud Functionでチャンク毎に展開する
前提
Google Cloud Storage上のgzipファイル(2 GB程度 / 展開後 10GB程度)をCloud Functionを使って展開したい
- Cloud Storageのイベントトリガーでgzipファイルが置かれると関数が起動される
- イベントトリガーなので9分以内に処理を終わらせる必要がある
- httpトリガーにしたりCloud Run Jobsとかにすれば制限時間は伸ばせるが、元々がイベントトリガーで実装されてたので変更するのめんどくさい
- Cloud Function (Gen2)のメモリのマックスが32 GBなので展開中のメモリにも気を使う必要がありそう
通常実装
from typing import Any, List, Optional, Tuple
import gzip
from google.cloud import storage
def decompress_gzip(
source_bucket_name: str,
source_blob_name: str,
destination_bucket_name: Optional[str] = None,
decompressed_blob_name: Optional[str] = None,
)
storage_client = storage.Client()
source_bucket = storage_client.get_bucket(source_bucket_name)
compress_blob = source_bucket.blob(source_blob_name)
destination_bucket = storage_client.get_bucket(
destination_bucket_name if destination_bucket_name else source_bucket_name
)
if decompressed_blob_name is None:
decompressed_blob_name = source_blob_name.rstrip('.gz') # .gzを除去
decompressed_blob = destination_bucket.blob(decompressed_blob_name)
with decompressed_blob.open("wb") as decompressed_file:
with compress_blob.open("rb") as compressed_file:
decompressed_file.write(gzip.decompress(compressed_file.read()))
これだとメモリが32 GBでも足りなかった。
(事前にGoogle Clab ProのNotebook上で試したときは〜20 GB程度で問題なかったのだが、、、)
一度に展開するのは無理そうなのでチャンク毎に展開→ Cloud Storageに書き込み ができるようにした。↓
チャンク毎の処理に変更
from typing import Any, List, Optional, Tuple
import gzip
from google.cloud import storage
def decompress_large_gzip(
source_bucket_name: str,
source_blob_name: str,
destination_bucket_name: Optional[str] = None,
decompressed_blob_name: Optional[str] = None,
chunk_size: int = 256 * 1024 * 1024,
) -> Tuple[str, str]:
"""decompress gzip file and upload
Args:
source_bucket_name(str): ソースバケット名
source_blob_name(str): ソースバケットのファイル名
destination_bucket_name(str, optional): 展開先バケット名(未指定の場合はソースバケット)
decompressed_blob_name(str, optional): 展開後のファイル名(未指定の場合はソースファイル名から.gzを除去)
chunk_size(int): チャンクサイズ(64-256MB 推奨)
"""
source_bucket = storage_client.get_bucket(source_bucket_name)
compress_blob = source_bucket.blob(source_blob_name)
destination_bucket = storage_client.get_bucket(
destination_bucket_name if destination_bucket_name else source_bucket_name
)
if decompressed_blob_name is None:
decompressed_blob_name = source_blob_name.rstrip('.gz') # .gzを除去
decompressed_blob = destination_bucket.blob(decompressed_blob_name)
# 圧縮ファイルをチャンク毎に読み込み、解凍しファイルに書き込み
with compress_blob.open("rb") as compressed_file:
with gzip.GzipFile(fileobj=compressed_file, mode='rb') as gzip_file:
with decompressed_blob.open("wb") as decompressed_file:
while True:
chunk = gzip_file.read(chunk_size)
if not chunk:
break
decompressed_file.write(chunk)
return destination_bucket.name, decompressed_blob_name
この実装でメモリ使用量を〜2 GB程度にまで落とすことができた。
また実行時間も問題ない(4 min 以内程度なので9 minの制限は余裕)ことも確認。
3重withブロックの中身について
最後のwithブロックの中でBlob.write(chunk)
しているのでwithブロック中はバッファに書き込まれ、ブロックが終了すると一気に対象の場所に書き込まれる…ようである。
当初上書きされないのか?と懸念を持っていたが実際の展開済ファイルを見てみると問題なく展開されていた。
まとめ
意外とこの手の微妙なさばきが見当たらなかったので投稿。
実行時間をもう少し切り詰めたかったらチャンクサイズを調整する余地があるかもしれない。
実際はCloud FunctionではなくCloud Run JobsとかComposerとかDataprocとかを使えるならそうしたほうがいいが、今回はめんどくささが勝ってしまったのでCloud Functionで頑張ることとした。