パーティション化されたCSVファイルをCloudSQLにimportする方法
問題
パーティション化されたCSVファイルをCloudSQLにimportする場面は時々あると思います。
残念ながらCloudSQLはBigQueryのようにwildcardsによるimportを対応していません。需要はあるようですが↓
ファイルごとにimportするとオーバーヘッドが毎回発生するため、速度的に実用性があまりないと思います。一方、importはオペレーションの1種なので、並列処理はできません。
HTTP Error 409: Operation failed because another operation was already in progress.
There is already a pending operation for your instance. Only one operation is allowed at a time. Try your request after the current operation is complete.
なので、ファイルを結合してimportするのはより現実的な解決策だと思います。
gsutil compose
gsutil compose
を利用すると、GCSにある複数のファイルを結合できます。
cliのみならず、SDK(google.cloud.storage.Blob.compose
)も同じ機能が提供されています。
ただし、結合できるファイルは最大32個という制約があります。
There is a limit (currently 32) to the number of components that can be composed in a single operation.
32より多いファイルを結合したい
32より多いファイルを結合したい場合どうすれば良いでしょうか。
cliからだと、この方法で簡単にできます。
バッチ処理あるいはアプリケーションにファイルを結合するロジックを追加したい場合は、SDKを利用するのは普通でしょう。GCPのコミュニティではすでに良さそうなチュートリアルが投稿されています。
Accumulator PatternとAccumulator Tree二つの方法が紹介されており、ファイルが多い場合の実行効率を考えるとAccumulator Treeがより実用的に見えます。
しかし、手法についてチュートリアルでは詳しく説明してもらっていますが、フルなソースがなかったたま、そのままでは使えません。
検証を含めて自分なりにソースコードをまとめてみました。
認証を正常に行ったあと、main関数内の変数project_id
, bucket_name
, prefix
を編集すれば、そのまま実行できます。下記の3つのファイルを結合する場合、prefix
はaaa/bbb/hoge
になります。
aaa/bbb/hoge001.csv
aaa/bbb/hoge002.csv
aaa/bbb/hoge003.csv
requirements.txt
google-cloud-logging==2.7.1
google-cloud-storage==2.9.0
import logging
from concurrent.futures import Executor, Future, ThreadPoolExecutor
from itertools import count
from time import sleep
from typing import Any, Iterable, List
import google.cloud.logging
from google.cloud import storage
client = google.cloud.logging.Client()
client.setup_logging()
logging.basicConfig(
level=logging.INFO, format="%(asctime)s %(levelname)s:%(name)s %(message)s"
)
logger = logging.getLogger(__name__)
def generate_composition_chunks(slices: List, chunk_size: int = 31) -> Iterable[List]:
"""Given an indefinitely long list of blobs, return the list in 31-item chunks.
Arguments:
slices {List} -- A list of blobs, which are slices of a desired final blob.
Returns:
Iterable[List] -- An iteration of 31-item chunks of the input list.
Yields:
Iterable[List] -- A 31-item chunk of the input list.
"""
while len(slices):
chunk = slices[:chunk_size]
yield chunk
slices = slices[chunk_size:]
def generate_hex_sequence() -> Iterable[str]:
"""Generate an indefinite sequence of hexadecimal integers.
Yields:
Iterator[Iterable[str]]: The sequence of hex digits, as strings.
"""
for i in count(0):
yield hex(i)[2:]
def delete_objects_concurrent(blobs, executor, client) -> None:
"""Delete Cloud Storage objects concurrently.
Args:
blobs (List[storage.Blob]): The objects to delete.
executor (Executor): An executor to schedule the deletions in.
client (storage.Client): Cloud Storage client to use.
"""
for blob in blobs:
logger.info("Deleting slice {}".format(blob.name))
executor.submit(blob.delete, client=client)
sleep(0.005) # quick and dirty ramp-up (Sorry, Dijkstra.)
def ensure_results(maybe_futures: List[Any]) -> List[Any]:
"""Pass in a list that may contain futures, and if so, wait for
the result of the future; for all other types in the list,
simply append the value.
Args:
maybe_futures (List[Any]): A list which may contain futures.
Returns:
List[Any]: A list with the values passed in, or Future.result() values.
"""
results = []
for mf in maybe_futures:
if isinstance(mf, Future):
results.append(mf.result())
else:
results.append(mf)
return results
def compose_and_cleanup(
blob: storage.Blob,
chunk: List[storage.Blob],
client: storage.Client,
executor: Executor,
):
"""Compose a blob and clean up its components. Cleanup tasks are
scheduled in the provided executor and the composed blob immediately
returned.
Args:
blob (storage.Blob): The blob to be composed.
chunk (List[storage.Blob]): The component blobs.
client (storage.Client): A Cloud Storage client.
executor (Executor): An executor in which to schedule cleanup tasks.
Returns:
storage.Blob: The composed blob.
"""
# wait on results if the chunk has any futures
chunk = ensure_results(chunk)
blob.compose(chunk, client=client)
# clean up components, no longer need them
delete_objects_concurrent(chunk, executor, client)
return blob
def compose(
object_path: str,
slices: List[storage.Blob],
client: storage.Client,
executor: Executor,
) -> storage.Blob:
"""Compose an object from an indefinite number of slices. Composition is
performed concurrently using a tree of accumulators. Cleanup is
performed concurrently using the provided executor.
Arguments:
object_path {str} -- The path for the final composed blob.
slices {List[storage.Blob]} -- A list of the slices that should
compose the blob, in order.
client {storage.Client} -- A Cloud Storage client to use.
executor {Executor} -- A concurrent.futures.Executor to use for
cleanup execution.
Returns:
storage.Blob -- The composed blob.
"""
logger.info("Composing")
chunks = generate_composition_chunks(slices)
next_chunks = []
identifier = generate_hex_sequence()
while len(next_chunks) > 32 or not next_chunks: # falsey empty list is ok
for chunk in chunks:
# make intermediate accumulator
intermediate_accumulator = storage.Blob.from_string(
object_path + next(identifier)
)
logger.info("Intermediate composition: %s", intermediate_accumulator)
future_iacc = executor.submit(
compose_and_cleanup, intermediate_accumulator, chunk, client, executor
)
# store reference for next iteration
next_chunks.append(future_iacc)
# go again with intermediate accumulators
chunks = generate_composition_chunks(next_chunks)
# Now can do final compose
final_blob = storage.Blob.from_string(object_path)
# chunks is a list of lists, so flatten it
final_chunk = [blob for sublist in chunks for blob in sublist]
logger.info("Final composition: %s", final_blob)
compose_and_cleanup(final_blob, final_chunk, client, executor)
logger.info("Composition complete")
return final_blob
def main():
project_id = "your_project_id"
bucket_name = "your_bucket_name"
prefix = "your_prefix"
final_blob_name = 'result.csv'
storage_clinet = storage.Client(project_id)
blobs = list(storage_clinet.list_blobs(bucket_name, prefix=prefix))
with ThreadPoolExecutor(max_workers=4) as executor:
compose(
f"gs://{bucket_name}/{prefix}/{final_blob_name}",
blobs,
storage_clinet,
executor,
)
以上ご参考になれば幸いです。
Discussion