📂

パーティション化されたCSVファイルをCloudSQLにimportする方法

2023/07/18に公開

問題

パーティション化されたCSVファイルをCloudSQLにimportする場面は時々あると思います。
残念ながらCloudSQLはBigQueryのようにwildcardsによるimportを対応していません。需要はあるようですが↓

https://issuetracker.google.com/issues/132058570?pli=1

ファイルごとにimportするとオーバーヘッドが毎回発生するため、速度的に実用性があまりないと思います。一方、importはオペレーションの1種なので、並列処理はできません。

https://cloud.google.com/sql/docs/troubleshooting#import-export

HTTP Error 409: Operation failed because another operation was already in progress.
There is already a pending operation for your instance. Only one operation is allowed at a time. Try your request after the current operation is complete.

なので、ファイルを結合してimportするのはより現実的な解決策だと思います。

gsutil compose

gsutil composeを利用すると、GCSにある複数のファイルを結合できます。
cliのみならず、SDK(google.cloud.storage.Blob.compose)も同じ機能が提供されています。

https://cloud.google.com/storage/docs/composing-objects#create-composite-client-libraries

https://cloud.google.com/storage/docs/gsutil/commands/compose

ただし、結合できるファイルは最大32個という制約があります。

There is a limit (currently 32) to the number of components that can be composed in a single operation.

32より多いファイルを結合したい

32より多いファイルを結合したい場合どうすれば良いでしょうか。
cliからだと、この方法で簡単にできます。
https://nakano-tomofumi.hatenablog.com/entry/2021/02/16/190626

バッチ処理あるいはアプリケーションにファイルを結合するロジックを追加したい場合は、SDKを利用するのは普通でしょう。GCPのコミュニティではすでに良さそうなチュートリアルが投稿されています。

https://cloud.google.com/community/tutorials/cloud-storage-infinite-compose

Accumulator PatternとAccumulator Tree二つの方法が紹介されており、ファイルが多い場合の実行効率を考えるとAccumulator Treeがより実用的に見えます。
しかし、手法についてチュートリアルでは詳しく説明してもらっていますが、フルなソースがなかったたま、そのままでは使えません。

検証を含めて自分なりにソースコードをまとめてみました。
認証を正常に行ったあと、main関数内の変数project_id, bucket_name, prefixを編集すれば、そのまま実行できます。下記の3つのファイルを結合する場合、prefixaaa/bbb/hogeになります。

aaa/bbb/hoge001.csv
aaa/bbb/hoge002.csv
aaa/bbb/hoge003.csv

requirements.txt

google-cloud-logging==2.7.1
google-cloud-storage==2.9.0
import logging
from concurrent.futures import Executor, Future, ThreadPoolExecutor
from itertools import count
from time import sleep
from typing import Any, Iterable, List

import google.cloud.logging
from google.cloud import storage

client = google.cloud.logging.Client()
client.setup_logging()

logging.basicConfig(
    level=logging.INFO, format="%(asctime)s %(levelname)s:%(name)s %(message)s"
)
logger = logging.getLogger(__name__)


def generate_composition_chunks(slices: List, chunk_size: int = 31) -> Iterable[List]:
    """Given an indefinitely long list of blobs, return the list in 31-item chunks.

    Arguments:
        slices {List} -- A list of blobs, which are slices of a desired final blob.

    Returns:
        Iterable[List] -- An iteration of 31-item chunks of the input list.

    Yields:
        Iterable[List] -- A 31-item chunk of the input list.
    """
    while len(slices):
        chunk = slices[:chunk_size]
        yield chunk
        slices = slices[chunk_size:]


def generate_hex_sequence() -> Iterable[str]:
    """Generate an indefinite sequence of hexadecimal integers.

    Yields:
        Iterator[Iterable[str]]: The sequence of hex digits, as strings.
    """
    for i in count(0):
        yield hex(i)[2:]


def delete_objects_concurrent(blobs, executor, client) -> None:
    """Delete Cloud Storage objects concurrently.

    Args:
        blobs (List[storage.Blob]): The objects to delete.
        executor (Executor): An executor to schedule the deletions in.
        client (storage.Client): Cloud Storage client to use.
    """
    for blob in blobs:
        logger.info("Deleting slice {}".format(blob.name))
        executor.submit(blob.delete, client=client)
        sleep(0.005)  # quick and dirty ramp-up (Sorry, Dijkstra.)


def ensure_results(maybe_futures: List[Any]) -> List[Any]:
    """Pass in a list that may contain futures, and if so, wait for
    the result of the future; for all other types in the list,
    simply append the value.

    Args:
        maybe_futures (List[Any]): A list which may contain futures.

    Returns:
        List[Any]: A list with the values passed in, or Future.result() values.
    """
    results = []
    for mf in maybe_futures:
        if isinstance(mf, Future):
            results.append(mf.result())
        else:
            results.append(mf)
    return results


def compose_and_cleanup(
    blob: storage.Blob,
    chunk: List[storage.Blob],
    client: storage.Client,
    executor: Executor,
):
    """Compose a blob and clean up its components. Cleanup tasks are
    scheduled in the provided executor and the composed blob immediately
    returned.

    Args:
        blob (storage.Blob): The blob to be composed.
        chunk (List[storage.Blob]): The component blobs.
        client (storage.Client): A Cloud Storage client.
        executor (Executor): An executor in which to schedule cleanup tasks.

    Returns:
        storage.Blob: The composed blob.
    """
    # wait on results if the chunk has any futures
    chunk = ensure_results(chunk)
    blob.compose(chunk, client=client)
    # clean up components, no longer need them
    delete_objects_concurrent(chunk, executor, client)
    return blob


def compose(
    object_path: str,
    slices: List[storage.Blob],
    client: storage.Client,
    executor: Executor,
) -> storage.Blob:
    """Compose an object from an indefinite number of slices. Composition is
    performed concurrently using a tree of accumulators. Cleanup is
    performed concurrently using the provided executor.

    Arguments:
        object_path {str} -- The path for the final composed blob.
        slices {List[storage.Blob]} -- A list of the slices that should
            compose the blob, in order.
        client {storage.Client} -- A Cloud Storage client to use.
        executor {Executor} -- A concurrent.futures.Executor to use for
            cleanup execution.

    Returns:
        storage.Blob -- The composed blob.
    """
    logger.info("Composing")
    chunks = generate_composition_chunks(slices)
    next_chunks = []
    identifier = generate_hex_sequence()

    while len(next_chunks) > 32 or not next_chunks:  # falsey empty list is ok
        for chunk in chunks:
            # make intermediate accumulator
            intermediate_accumulator = storage.Blob.from_string(
                object_path + next(identifier)
            )
            logger.info("Intermediate composition: %s", intermediate_accumulator)
            future_iacc = executor.submit(
                compose_and_cleanup, intermediate_accumulator, chunk, client, executor
            )
            # store reference for next iteration
            next_chunks.append(future_iacc)
        # go again with intermediate accumulators
        chunks = generate_composition_chunks(next_chunks)

    # Now can do final compose
    final_blob = storage.Blob.from_string(object_path)
    # chunks is a list of lists, so flatten it
    final_chunk = [blob for sublist in chunks for blob in sublist]
    logger.info("Final composition: %s", final_blob)
    compose_and_cleanup(final_blob, final_chunk, client, executor)

    logger.info("Composition complete")

    return final_blob


def main():
    project_id = "your_project_id"
    bucket_name = "your_bucket_name"
    prefix = "your_prefix"
    final_blob_name = 'result.csv'

    storage_clinet = storage.Client(project_id)
    blobs = list(storage_clinet.list_blobs(bucket_name, prefix=prefix))

    with ThreadPoolExecutor(max_workers=4) as executor:
        compose(
            f"gs://{bucket_name}/{prefix}/{final_blob_name}",
            blobs,
            storage_clinet,
            executor,
        )

以上ご参考になれば幸いです。

Discussion