Open3
boto3チートシート
boto3
S3
# refer ~/.aws/.credential
s3 = boto3.client('s3')
function
def s3_download_df(bucket: str, prefix: str, is_header: bool, dest='', is_rm_tmp=True) -> pd.DataFrame:
dest_name = None
stamp = datetime.now().astimezone(timezone('Asia/Tokyo')).strftime("%y%m%d%H%M%S")
dest_name = dest if dest else '{}.{}'.format(prefix.replace('/', '_'), stamp)
boto3.resource('s3').Bucket(bucket).download_file(prefix, dest_name)
df = pd.read_csv(dest_name, header=(0 if is_header else None))
if is_rm_tmp:
os.remove(dest_name)
return df
def s3_upload_df(df: pd.DataFrame, bucket: str, prefix, header=False, index=False):
io_s = StringIO()
df.to_csv(io_s, header=header, index=index)
boto3.resource('s3').Object(bucket, prefix).put(Body=io_s.getvalue())
return None
def s3_upload_file(filepath: str, bucket, prefix):
boto3.resource('s3').Bucket(bucket).Object(prefix).upload_file(filepath)
return None
test function
# s3_download_df(bucket = "bucket_name", prefix = "yasumura/csv_test/test.csv",is_header = True)
# os.getcwd()
# s3_upload_file(filepath="./test_2.csv", bucket = "bucket_name", prefix = "yasumura/csv_test/test_2.csv")
# df = s3_download_df(bucket = "bucket_name", prefix = "yasumura/csv_test/test.csv",is_header = True)
# s3_upload_df(df,bucket = "bucket_name", prefix = "yasumura/csv_test/test_3.csv")
list_object
オブジェクトパスを出力
import boto3
s3 = boto3.client('s3')
bucket='bucket_name'
response = s3.list_objects_v2(Bucket=bucket)
for object in response['Contents']:
if f'{prefix}' in object['Key']:
object_path = object['Key']
print(f'object_path: {object_path}')
pyspark出力オブジェクトをリネーム
項目名 segment
で分割出力されたものを、segment
に入っている値でリネーム
import boto3
s3 = boto3.client('s3')
bucket='bucket_name'
prefix='prefix_first/prefix_second'
response = s3.list_objects_v2(Bucket=bucket)
for object in response['Contents']:
if f'{prefix}' in object['Key']:
object_path = object['Key']
print(f'object_path: {object_path}')
# セグメントの取得
object_path_split = object_path.split('/')
objec_prefix = object_path_split[1]
segment = survey_prefix.replace('segment=','')
# _SUCCESSは除外
if (segment == '_SUCCESS'):
print('skip')
continue
print(f'segment: {segment}')
# セグメント名で出力ファイルをリネーム
new_object_path = f'{prefix}/{objec_prefix }/{suegment}.csv'
print(f'new_object_path: {new_object_path}')
print('copy to rename...')
s3.copy_object(Bucket=bucket, Key=new_object_path, CopySource={'Bucket':bucket, 'Key':object_path})
print('delete old object...')
s3.delete_object(Bucket=bucket, Key=object_path)
print('renamed complete\n')
billing
サービス別日次料金推移
# サービス別日時料金推移
import boto3
import pandas as pd
from datetime import datetime, timedelta
# 日付のリスト生成()
start_date_list = [datetime(2021, 12, 31) + timedelta(days=i) for i in range(104)]
end_date_list = [datetime(2022, 1, 1) + timedelta(days=i) for i in range(104)]
# 文字列に変換
start_date_str_list = [d.strftime("%Y-%m-%d") for d in start_date_list]
end_date_str_list = [d.strftime("%Y-%m-%d") for d in end_date_list]
print(start_date_str_list)
print(end_date_str_list)
client = boto3.client("ce", region_name="ap-northeast-1")
data = []
for index, start_date in enumerate(start_date_str_list):
response = client.get_cost_and_usage(
TimePeriod={"Start": start_date, "End": end_date_str_list[index]},
Granularity="DAILY",
Metrics=["AmortizedCost"],
GroupBy=[{"Type": "DIMENSION", "Key": "SERVICE"}],
)
response_groups = response["ResultsByTime"][0]["Groups"]
# print(response_groups)
daily_cost_dict = {"date": end_date_str_list[index]}
for service in response_groups:
service_name = service["Keys"][0]
cost = service["Metrics"]["AmortizedCost"]["Amount"]
# print(f"{service_name}: {cost}")
daily_cost_dict[service_name] = cost
# print(daily_const_dict)
data.append(daily_cost_dict)
df = pd.json_normalize(data)
df.to_csv("daily_cost_aws.csv", index=False, encoding="utf-8")
print("done")