Python の Boto3(The AWS SDK for Python)でファイルアップロード中にキャンセルする
TransferManager
の shutdown()
を呼び出せばキャンセルできます → https://github.com/boto/s3transfer/blob/develop/s3transfer/manager.py
ただし、 shutdown()
から呼ぶときの _shutdown()
の引数が誤っているので、ここでは __exit__()
を呼ぶことにします。
引数が誤っている件はすでに issue になっています Use correct parameter list to _shutdown() by JasonDeArte · Pull Request #144 · boto/s3transfer · GitHub
upload_file
TransferManager はコンテキストマネージャーとしても実装されているので、以下のように利用したい気分ですが、 upload_file() は同期処理のようです( Support asyncio · Issue #458 · boto/botocore · GitHub )
with S3Transfer(client=client) as transfer:
transfer.upload_file(filepath, bucket_name, filepath)
つまり upload_file()
が完了しないと処理が戻ってきません。 upload_file()
そのものをキャンセルするには、別途スレッドを作り、そのスレッドからキャンセルする必要があります。あるよね?
実装
upload_file()
を呼ぶスレッドと、 __exit__()
を呼ぶスレッドを作ればキャンセルできます。エラー処理など省いてます。
import botocore
import boto3
from boto3.s3.transfer import TransferConfig, S3Transfer
from botocore.config import Config
import threading
import concurrent.futures
from time import sleep
MB = 1024 ** 2
GB = 1024 ** 3
region_name = '<region_name>'
bucket_name = '<bucket_name>'
filepath = 'largefile'
access_key = '<access_key>'
secret = '<secret>'
endpoint = '<endpoint>'
filepath = 'largefile'
transfer = None
def shutdown_thread():
global transfer
sleep(20)
print("go shutdown")
transfer.__exit__(RuntimeError, 'shutdown')
return 'shutdown end'
def upload_thread():
global transfer
client = boto3.client(
's3',
endpoint_url=endpoint,
aws_access_key_id=access_key,
aws_secret_access_key=secret,
region_name=region_name
)
try:
transfer = S3Transfer(client=client)
transfer.upload_file(filepath, bucket_name, filepath)
except botocore.exceptions.ClientError as err:
print(err.response['Error']['Code'])
except Exception as err:
print(err)
return 'upload end'
def upload():
futures = []
with concurrent.futures.ThreadPoolExecutor() as executor:
futures.append(executor.submit(upload_thread))
futures.append(executor.submit(shutdown_thread))
for f in concurrent.futures.as_completed(futures):
print(f.result())
upload()
ここでは sleep()
させましたが、 threading.Event
を用いて wait()
しておき、何かのトリガーで __exit__()
を呼ぶというのが一般的な使い方かと。
こんな感じで。動作未確認です。
ev = threading.Event()
def shutdown_thread():
global transfer
ev.wait()
print("go shutdown")
transfer.__exit__(RuntimeError, 'shutdown')
return 'shutdown end'
動作確認
ファイルを作ります。
% truncate -s 1GB largefile
スクリプトを実行します。
% python3 boto3.client.upload_file_shutdown.py
go shutdown
shutdown
upload end
shutdown end
ファイルはありません。
% aws s3 ls <bucket_name> --endpoint-url=<endpoint> --recursive --human-readable --summarize
Total Objects: 0
Total Size: 0 Bytes
マルチパートアップロード
なお upload_file()
はデフォルトではマルチパートアップロードしますが(※1)、 shutdown()
あるいは __exit__()
が呼ばれた場合はちゃんとマルチパートアップロードをアボートしてくれるようです(※2)。
ログレベルを DEBUG にしておくとどの API を呼んでいるのか出力されます。
boto3.set_stream_logger('', logging.DEBUG)
こんなログが出力されます。
botocore.hooks [DEBUG] Event before-parameter-build.s3.AbortMultipartUpload: calling handler
botocore.hooks [DEBUG] Event before-call.s3.AbortMultipartUpload: calling handler
list-multipart-uploads
を実行して、何も残っていないことが確認できます。
% aws --endpoint-url=<endpoint> s3api list-multipart-uploads --bucket <bucket_name>
%
※1 Uploading files — Boto3 Docs 1.17.57 documentation
The upload_file method accepts a file name, a bucket name, and an object name. The method handles large files by splitting them into smaller chunks and uploading each chunk in parallel.
※2 Amazon S3 へのマルチパートアップロードに AWS CLI を使用する
マルチパートアップロードに aws s3api コマンドを使用して、プロセスが中断した場合は、アップロードの不完全な部分を削除して、その部分を再度アップロードする必要があります。不完全な部分を削除するには、AbortIncompleteMultipartUpload ライフサイクルアクション を使用します。あるいは、aws s3api コマンドを使用して、以下の手順に従って不完全な部分を削除します。
Discussion