🫢

[ Future ] 並行処理の際のログについて考えてみた

2024/07/19に公開

python の futures を使用して並行処理を実施した際のログの扱い方についての備忘録として記事に残します。

では、初めて行きます。

今回やること

  • 以下をローカル(ターミナルでコマンドを叩いて)で実行する
    • ローカル(自身のPC)に存在するcsvファイルをparquet形式で S3 に保存する

要件

  • 1000個のデータに区切ってファイルを配置する → つまり1ファイル1000個のデータとなる
  • 同期処理では時間がかかるため、非同期処理で処理を行う

考えたこと

  • 非同期処理でどのようにログを表示させるか?
    • 1ファイル毎に以下の詳細を表示
      • upload処理が入る前
        • print(bucket_name, s3_path)
      • uploadが成功した後
        • print(f"File uploaded successfully: {s3_path}")
    • アップロードに成功したファイルの数を表示

困ったこと (2点)

  • 非同期処理のため、単に以下のようなコードにしても想定の順序通りに表示されない
upload_file.py
def upload_file(s3_client, file_path, bucket_name, s3_path):
  try:
    s3_client.upload_file(file_path, bucket_name, s3_path)
    # ログ2
    print(f"File uploaded successfully: {s3_path}")
    return True
  except FileNotFoundError:
    print(f"The file was not found: {file_path}")
    return False
  except NoCredentialsError:
    print("Credentials not available")
    return False
  except Exception as e:
    # その他全てのエラーに対する処理
    print(f"予期しないエラーが発生しました: {e}")
    return False
example.py
from concurrent.futures import ThreadPoolExecutor
from .upload_file import upload_file

with ThreadPoolExecutor() as executor:
....

chunk_size = 1000
count = 0

for i in range(0, len(result), chunk_size):
    ...
    formatted_date = convert_date_format(date_path)
    s3_path = refile_path.replace(parquet_path + "/", formatted_date)
    
    # ログ1
    print(bucket_name, s3_path)
    
    # 非同期でs3バケットにアップロードする
    future = executor.submit(upload_file, s3_client, refile_path, bucket_name, s3_path)

上記を実行すると、ターミナルに以下の順序で出力されることを想定

print(bucket_name, s3_path)
print(f"File uploaded successfully: {s3_path}")

だが、もちろん非同期なので順番通り表示されるはずもなく、実行すると以下のようになる

print(bucket_name, s3_path)
print(bucket_name, s3_path)
print(bucket_name, s3_path)
print(bucket_name, s3_path)
print(bucket_name, s3_path)
print(bucket_name, s3_path)
print(bucket_name, s3_path)
print(f"File uploaded successfully: {s3_path}")
  • そもそもファイルのアップロードに成功した数をどうやって取得する?

解決策

ログ順序について

add_done_callback を使用して、非同期処理がdoneになったらログを吐き出すコールバック関数を使用する。
https://docs.python.org/ja/3/library/asyncio-future.html

こんな感じ

example.py
from concurrent.futures import ThreadPoolExecutor
from .upload_file import upload_file

with ThreadPoolExecutor() as executor:
....

chunk_size = 1000
count = 0

for i in range(0, len(result), chunk_size):
    ...
    formatted_date = convert_date_format(date_path)
    s3_path = refile_path.replace(parquet_path + "/", formatted_date)
    
    # 非同期でs3バケットにアップロードする
    future = executor.submit(upload_file, s3_client, refile_path, bucket_name, s3_path)

    # futureが done になったら、コールバック関数を実行
    future.add_done_callback( 👈 こちらを追加
                        # ログ1
                        lambda f: print(bucket_name, s3_path)
                    )

アップロードに成功したファイルの数を取得

以下の手順で実現可能です

  • 全てのタスクの完了を待つ
  • resultを使用して futureの結果(アップロードに成功した数)を取得

コードにすると以下のようになります

example.py
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED
from .upload_file import upload_file

futures = []

with ThreadPoolExecutor() as executor:
....

chunk_size = 1000
count = 0

for i in range(0, len(result), chunk_size):
    ...
    formatted_date = convert_date_format(date_path)
    s3_path = refile_path.replace(parquet_path + "/", formatted_date)
    
    # 非同期でs3バケットにアップロードする
    future = executor.submit(upload_file, s3_client, refile_path, bucket_name, s3_path)

    # futureが done になったら、コールバック関数を実行
    future.add_done_callback(
                        # ログ1
                        lambda f: print(bucket_name, s3_path)
                    )

👇 こちらを追加
# 全てのタスクの完了を待つ
wait(futures, return_when=ALL_COMPLETED)
# future.result()のTrueの数を数える
true_count = sum(f.result() for f in futures if f.result() == True)
print(f"Number of True results: {true_count}")

これで、無事、ログ出力の順序を制御でき、ファイルのアップロード数を取得できたので、
ローカルで実行した後に、awsのコンソール上(S3)と表示されたアップロード数でファイル数の整合が取れるようになりました。

まとめ

そもそも、ここまでログを出す必要があるのか?という疑問はありますが、
futuresの挙動が少しでも理解できたので、よしとしようと思います。

ですが、futuresのタスクの完了の数を取得するには、一度配列にタスクを入れる必要があるので、ライブラリの使用上仕方ないですが、配列に入れなくとも、結果を取得できたら尚嬉しいと感じました。

今回の記事が誰かのお役に立てれば幸いです。

NCDCエンジニアブログ

Discussion