Google DriveのファイルをGoogle Cloud Storageに移動させる
Google Driveにある動画ファイルをGoogle Cloud Storageに移動させます。毎週手作業で行っており、毎週行うのは大変に面倒臭いのでコーディングの練習も兼ねて作りました。初めはGAS(Google Apps Script)でやろうとしましたが、gas言語という私にはあまり馴染みがない言語で書かれているので、結局みんな大好きpythonで書くことにしました。
環境づくり
コーディングする前に、諸々の必要なものを揃えます。面倒臭いことは早めにしておきます。
必要なもの
- Google Drive API OAUTH2.0 Client ID
- Bucketに対するobject書き込み権限を持ったサービスアカウントのcredential
1. Google Drive API OAUTH 認証
Google DriveにアクセスするためにOAuthを有効にします。
-
google drive api 有効化
gCloudで、google drive apiが使えるように有効化します。
-
OAuth 2.0有効化
続いて、OAuthのClient IDを発行します。
APIs&ServicesページのOAuth consent screen
より、clientのアクセススコープを設定します。
今回の用途の場合、読み取り(とダウンロード)ができれば良いので、scopeはauth/drive.readonly
を選択します。
スコープが設定できたら、次はClient IDを発行します。
Credentials より、+create credentials
を選択。OAuth Client ID
を選択。application typeは desktop appにし、適当な名前をつけて発行します。Client IDが発行できたら、認証情報をダウンロードしておきます。
2. SAの発行
gCloudでバケットを作成して、オブジェクトの書き込み権限を持ったservice accountを作っておきます。RoleはStorage Object Creator
で作成しました。
コーディング
さて、作っていきます。
devcontainer上で開発していきます。
OAuth 認証
def authorizeApi(scopes, credential_path):
store = file.Storage('token.json')
creds = store.get()
if not creds or creds.invalid:
flow = client.flow_from_clientsecrets(credential_path, scopes)
creds = tools.run_flow(flow, store)
return build('drive', 'v3', http=creds.authorize(Http()))
APIを使ってDriveの情報を操作するクライアントを作ります。
scopes
は先ほどここで指定したスコープと同じものを指定します。今回の場合はこちらです。
scopes = ['https://www.googleapis.com/auth/drive.readonly']
Google Drive 操作
先ほど作成したクライアントで、Google Driveを操作します。
def getFilesFromGDrive(drive_service, folder_id, filter_by_filename):
filters = [
f"'{folder_id}' in parents",
f"name contains '{file_name}'",
"mimeType != 'application/vnd.google-apps.folder'"
]
query = " and ".join(filters)
folders = drive_service.files().list(
q=query,
spaces= "drive",
includeItemsFromAllDrives= False if isMyDrive else True,
supportsAllDrives=True,
fields='nextPageToken, files(id, name)'
).execute()
files_list = folders.get('files', [])
if not files_list:
print(f'No files found in folder with ID {folder_id}.')
else:
print(f"Files in the folder with ID {folder_id}:")
for file in files_list:
print(f"{file['name']} ({file['id']})")
Queryについて
Queryに、Google Drive上のファイル(フォルダ)を検索する条件を指定します。
f"'{folder_id}' in parents"
親フォルダがfolder_id
であるファイル/フォルダを検索します。
f"name contains '{filter_by_filename}'"
filter_by_filename
で指定した名前を含むファイルを検索します。
"mimeType != 'application/vnd.google-apps.folder'"
フォルダを検索対象から除外して、ファイルのみを検索対象にします。
詳しくはこちらに書いてあります。
ここで、ひとまずきちんとフォルダの中のファイルが取得できているかを確認します。
Files in the folder with ID folder_id:
file_name.mp4 (file_id)
file_name.vtt (file_id)
きちんと検索されているみたいです。
Stream
次に、取得したファイルIDで、Google DriveよりStreamでファイルをダウンロードします。
def makeArchiveOfAFolder(folder, drive_service):
archive = io.BytesIO()
with zipfile.ZipFile(archive, 'w', zipfile.ZIP_DEFLATED) as zip_archive:
file_list = getFilesFromGDrive(drive_service, folder['id'], "")
archive_list = []
for index, file in enumerate(file_list):
tmp = io.BytesIO()
downloader = MediaIoBaseDownload(tmp, drive_service.files().get_media(fileId=file['id']))
done = False
# progress bar
with tqdm(total=100, desc=f"Downloading {file['name']}") as pbar:
while not done:
status, done = downloader.next_chunk()
pbar.update(status.progress() * 100 - pbar.n)
tmp.seek(0)
print("creating archive...")
archive_list.append(zipfile.ZipInfo(file['name']))
zip_archive.writestr(archive_list[index], tmp.read())
print("done.")
archive.seek(0)
return archive
streamでデータを取得するために、MediaIoBaseDownload
関数を使用するみたいです。現在のダウンロードステータスがわからないのがストレスだったので、progress barもつけてみました。
ここで、お気づきの方も多いと思いますが、zip
(archive)に変換しています。これは一回作成した後に気がついたのですが、GCSにアップロードするときにその後に行う処理の関係上、zipで格納しないといけないという仕様だったのでこの形にしました。
zipについてはこちらのサイトを参考にしました。そこそこ難しく、理解して作成するのに1日強かかってしまいました。
コード解説
downloader = MediaIoBaseDownload(tmp,drive_service.files().get_media(fileId=file['id']))
done = False
# progress bar
with tqdm(total=100, desc=f"Downloading {file['name']}") as pbar:
while not done:
status, done = downloader.next_chunk()
pbar.update(status.progress() * 100 - pbar.n)
ここで、MediaIoBaseDownload
関数で、Bytes型でファイルの情報を受け取っています。
tmp.seek(0)
print("creating archive...")
archive_list.append(zipfile.ZipInfo(file['name']))
zip_archive.writestr(archive_list[index], tmp.read())
print("done.")
ここで、zipにするためのarchiveを作成しています。今回は、1つのフォルダに2つ以上のファイルを紐付けたかったのでこのようなロジックにしました。(ただ、ファイルが大きすぎたらメモリが足りなくなるかも・・・)
Upload
先ほど作成したarchiveをGCSにアップロードします。
def upload(credential_for_gcs, bucket_name, archive):
archive.seek(0)
storage_client = storage.Client.from_service_account_json(credential_for_gcs)
bkt = storage_client.bucket(bucket_name)
blob = bkt.blob(str(getCurrentTime('Asia/Tokyo'))+'.zip')
print("uploading to gcs...")
blob.upload_from_file(archive, content_type='application/zip')
print("done.")
zipファイルの名前はアップロードした日付にします。
実行してみると、、、
Files in the folder with ID folder_id:
file_name.mp4 (file_id)
file_name.vtt (file_id)
Downloading filename.mp4: 100%|███████████████████████████████████████████████| 100.0/100 [02:59<00:00, 1.79s/it]
creating archive...
done.
Downloading filename.vtt: 100%|██████████████████████████████████████████████| 100.0/100 [00:01<00:00, 68.45it/s]
creating archive...
done.
uploading to gcs...
done.
無事にアップロードできています!
終わりに
今回一番大変だった部分は、zipファイルの仕組みを理解することでした。今まで何気なく使用していましたが、下層レイヤーの部分を勉強すると面白いですね。考案した先人は偉大です。
ここまでのステップで半自動化(アップロードまで)はできたので、今後はschedulerのcronなどを使って1週間に1回の定期実行をする必要がありますがとりあえず今回はここまで。
ありがとうございました。
Discussion