📚

Python の Boto3(The AWS SDK for Python)でファイルアップロード中にキャンセルする

2021/04/26に公開

TransferManagershutdown() を呼び出せばキャンセルできます → https://github.com/boto/s3transfer/blob/develop/s3transfer/manager.py

ただし、 shutdown() から呼ぶときの _shutdown() の引数が誤っているので、ここでは __exit__() を呼ぶことにします。

引数が誤っている件はすでに issue になっています Use correct parameter list to _shutdown() by JasonDeArte · Pull Request #144 · boto/s3transfer · GitHub

upload_file

TransferManager はコンテキストマネージャーとしても実装されているので、以下のように利用したい気分ですが、 upload_file() は同期処理のようです( Support asyncio · Issue #458 · boto/botocore · GitHub )

with S3Transfer(client=client) as transfer:
    transfer.upload_file(filepath, bucket_name, filepath)

つまり upload_file() が完了しないと処理が戻ってきません。 upload_file() そのものをキャンセルするには、別途スレッドを作り、そのスレッドからキャンセルする必要があります。あるよね?

実装

upload_file() を呼ぶスレッドと、 __exit__() を呼ぶスレッドを作ればキャンセルできます。エラー処理など省いてます。

import botocore
import boto3
from boto3.s3.transfer import TransferConfig, S3Transfer
from botocore.config import Config
import threading
import concurrent.futures
from time import sleep

MB = 1024 ** 2
GB = 1024 ** 3

region_name = '<region_name>'
bucket_name = '<bucket_name>'
filepath = 'largefile'
access_key = '<access_key>'
secret = '<secret>'
endpoint = '<endpoint>'

filepath = 'largefile'
transfer = None


def shutdown_thread():
    global transfer
    sleep(20)
    print("go shutdown")
    transfer.__exit__(RuntimeError, 'shutdown')
    return 'shutdown end'


def upload_thread():
    global transfer
    client = boto3.client(
        's3',
        endpoint_url=endpoint,
        aws_access_key_id=access_key,
        aws_secret_access_key=secret,
        region_name=region_name
    )

    try:
        transfer = S3Transfer(client=client)
        transfer.upload_file(filepath, bucket_name, filepath)
    except botocore.exceptions.ClientError as err:
        print(err.response['Error']['Code'])
    except Exception as err:
        print(err)
    return 'upload end'


def upload():
    futures = []
    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures.append(executor.submit(upload_thread))
        futures.append(executor.submit(shutdown_thread))
        for f in concurrent.futures.as_completed(futures):
            print(f.result())


upload()

ここでは sleep() させましたが、 threading.Event を用いて wait() しておき、何かのトリガーで __exit__() を呼ぶというのが一般的な使い方かと。

こんな感じで。動作未確認です。

ev = threading.Event()

def shutdown_thread():
    global transfer
    ev.wait()
    print("go shutdown")
    transfer.__exit__(RuntimeError, 'shutdown')
    return 'shutdown end'

動作確認

ファイルを作ります。

% truncate -s 1GB largefile 

スクリプトを実行します。

% python3 boto3.client.upload_file_shutdown.py
go shutdown
shutdown
upload end
shutdown end

ファイルはありません。

% aws s3 ls <bucket_name> --endpoint-url=<endpoint> --recursive --human-readable --summarize

Total Objects: 0
   Total Size: 0 Bytes

マルチパートアップロード

なお upload_file() はデフォルトではマルチパートアップロードしますが(※1)、 shutdown() あるいは __exit__() が呼ばれた場合はちゃんとマルチパートアップロードをアボートしてくれるようです(※2)。

ログレベルを DEBUG にしておくとどの API を呼んでいるのか出力されます。

boto3.set_stream_logger('', logging.DEBUG)

こんなログが出力されます。

botocore.hooks [DEBUG] Event before-parameter-build.s3.AbortMultipartUpload: calling handler
botocore.hooks [DEBUG] Event before-call.s3.AbortMultipartUpload: calling handler

list-multipart-uploads を実行して、何も残っていないことが確認できます。

% aws --endpoint-url=<endpoint> s3api list-multipart-uploads --bucket <bucket_name>
%

※1 Uploading files — Boto3 Docs 1.17.57 documentation

The upload_file method accepts a file name, a bucket name, and an object name. The method handles large files by splitting them into smaller chunks and uploading each chunk in parallel.

※2 Amazon S3 へのマルチパートアップロードに AWS CLI を使用する

マルチパートアップロードに aws s3api コマンドを使用して、プロセスが中断した場合は、アップロードの不完全な部分を削除して、その部分を再度アップロードする必要があります。不完全な部分を削除するには、AbortIncompleteMultipartUpload ライフサイクルアクション を使用します。あるいは、aws s3api コマンドを使用して、以下の手順に従って不完全な部分を削除します。

Discussion