📖

Google DriveのファイルをGoogle Cloud Storageに移動させる

2023/11/08に公開

Google Driveにある動画ファイルをGoogle Cloud Storageに移動させます。毎週手作業で行っており、毎週行うのは大変に面倒臭いのでコーディングの練習も兼ねて作りました。初めはGAS(Google Apps Script)でやろうとしましたが、gas言語という私にはあまり馴染みがない言語で書かれているので、結局みんな大好きpythonで書くことにしました。

https://github.com/IDOShun/transferFile

環境づくり

コーディングする前に、諸々の必要なものを揃えます。面倒臭いことは早めにしておきます。

必要なもの

  1. Google Drive API OAUTH2.0 Client ID
  2. Bucketに対するobject書き込み権限を持ったサービスアカウントのcredential

1. Google Drive API OAUTH 認証

Google DriveにアクセスするためにOAuthを有効にします。

  • google drive api 有効化
    gCloudで、google drive apiが使えるように有効化します。

  • OAuth 2.0有効化
    続いて、OAuthのClient IDを発行します。
    APIs&ServicesページのOAuth consent screenより、clientのアクセススコープを設定します。

    今回の用途の場合、読み取り(とダウンロード)ができれば良いので、scopeはauth/drive.readonlyを選択します。
    スコープが設定できたら、次はClient IDを発行します。
    Credentials より、+create credentialsを選択。OAuth Client IDを選択。application typeは desktop appにし、適当な名前をつけて発行します。Client IDが発行できたら、認証情報をダウンロードしておきます。

2. SAの発行

gCloudでバケットを作成して、オブジェクトの書き込み権限を持ったservice accountを作っておきます。RoleはStorage Object Creatorで作成しました。

コーディング

さて、作っていきます。
devcontainer上で開発していきます。

OAuth 認証

def authorizeApi(scopes, credential_path):
    store = file.Storage('token.json')
    creds = store.get()
    if not creds or creds.invalid:
        flow = client.flow_from_clientsecrets(credential_path, scopes)
        creds = tools.run_flow(flow, store)
    return build('drive', 'v3', http=creds.authorize(Http()))

APIを使ってDriveの情報を操作するクライアントを作ります。

scopesは先ほどここで指定したスコープと同じものを指定します。今回の場合はこちらです。

scopes = ['https://www.googleapis.com/auth/drive.readonly']

https://googleapis.github.io/google-api-python-client/docs/epy/index.html
https://zenn.dev/wtkn25/articles/python-googledriveapi-operation

Google Drive 操作

先ほど作成したクライアントで、Google Driveを操作します。

def getFilesFromGDrive(drive_service, folder_id, filter_by_filename):
    filters = [
        f"'{folder_id}' in parents",
        f"name contains '{file_name}'",
        "mimeType != 'application/vnd.google-apps.folder'"
    ]
    query = " and ".join(filters)

    folders = drive_service.files().list(
        q=query,
        spaces= "drive",
        includeItemsFromAllDrives= False if isMyDrive else True,
        supportsAllDrives=True,
        fields='nextPageToken, files(id, name)'
        ).execute()
    files_list = folders.get('files', [])

    if not files_list:
        print(f'No files found in folder with ID {folder_id}.')
    else:
        print(f"Files in the folder with ID {folder_id}:")
        for file in files_list:
            print(f"{file['name']} ({file['id']})")

Queryについて

Queryに、Google Drive上のファイル(フォルダ)を検索する条件を指定します。

f"'{folder_id}' in parents"

親フォルダがfolder_idであるファイル/フォルダを検索します。

f"name contains '{filter_by_filename}'"

filter_by_filenameで指定した名前を含むファイルを検索します。

"mimeType != 'application/vnd.google-apps.folder'"

フォルダを検索対象から除外して、ファイルのみを検索対象にします。
詳しくはこちらに書いてあります。
https://developers.google.com/drive/api/guides/search-files

ここで、ひとまずきちんとフォルダの中のファイルが取得できているかを確認します。

Files in the folder with ID folder_id:
file_name.mp4 (file_id)
file_name.vtt (file_id)

きちんと検索されているみたいです。

Stream

次に、取得したファイルIDで、Google DriveよりStreamでファイルをダウンロードします。

def makeArchiveOfAFolder(folder, drive_service):
    archive = io.BytesIO()
    with zipfile.ZipFile(archive, 'w', zipfile.ZIP_DEFLATED) as zip_archive:
        file_list = getFilesFromGDrive(drive_service, folder['id'], "")
        archive_list = []
        for index, file in enumerate(file_list):
            tmp = io.BytesIO()
            downloader = MediaIoBaseDownload(tmp, drive_service.files().get_media(fileId=file['id']))
            done = False
            # progress bar
            with tqdm(total=100, desc=f"Downloading {file['name']}") as pbar:
                while not done:
                    status, done = downloader.next_chunk()
                    pbar.update(status.progress() * 100 - pbar.n)
            tmp.seek(0)
            print("creating archive...")
            archive_list.append(zipfile.ZipInfo(file['name']))
            zip_archive.writestr(archive_list[index], tmp.read())
            print("done.")
    archive.seek(0)
    return archive

streamでデータを取得するために、MediaIoBaseDownload関数を使用するみたいです。現在のダウンロードステータスがわからないのがストレスだったので、progress barもつけてみました。
ここで、お気づきの方も多いと思いますが、zip(archive)に変換しています。これは一回作成した後に気がついたのですが、GCSにアップロードするときにその後に行う処理の関係上、zipで格納しないといけないという仕様だったのでこの形にしました。

zipについてはこちらのサイトを参考にしました。そこそこ難しく、理解して作成するのに1日強かかってしまいました。

コード解説

downloader = MediaIoBaseDownload(tmp,drive_service.files().get_media(fileId=file['id']))
            done = False
            # progress bar
            with tqdm(total=100, desc=f"Downloading {file['name']}") as pbar:
                while not done:
                    status, done = downloader.next_chunk()
                    pbar.update(status.progress() * 100 - pbar.n)

ここで、MediaIoBaseDownload関数で、Bytes型でファイルの情報を受け取っています。

tmp.seek(0)
            print("creating archive...")
            archive_list.append(zipfile.ZipInfo(file['name']))
            zip_archive.writestr(archive_list[index], tmp.read())
            print("done.")

ここで、zipにするためのarchiveを作成しています。今回は、1つのフォルダに2つ以上のファイルを紐付けたかったのでこのようなロジックにしました。(ただ、ファイルが大きすぎたらメモリが足りなくなるかも・・・)

Upload

先ほど作成したarchiveをGCSにアップロードします。

def upload(credential_for_gcs, bucket_name, archive):
        archive.seek(0)
        storage_client = storage.Client.from_service_account_json(credential_for_gcs)
        bkt = storage_client.bucket(bucket_name)
        blob = bkt.blob(str(getCurrentTime('Asia/Tokyo'))+'.zip')
        print("uploading to gcs...")
        blob.upload_from_file(archive, content_type='application/zip')
        print("done.")

zipファイルの名前はアップロードした日付にします。

実行してみると、、、

Files in the folder with ID folder_id:
file_name.mp4 (file_id)
file_name.vtt (file_id)
Downloading filename.mp4: 100%|███████████████████████████████████████████████| 100.0/100 [02:59<00:00,  1.79s/it]
creating archive...
done.
Downloading filename.vtt: 100%|██████████████████████████████████████████████| 100.0/100 [00:01<00:00, 68.45it/s]
creating archive...
done.
uploading to gcs...
done.


無事にアップロードできています!

終わりに

今回一番大変だった部分は、zipファイルの仕組みを理解することでした。今まで何気なく使用していましたが、下層レイヤーの部分を勉強すると面白いですね。考案した先人は偉大です。
ここまでのステップで半自動化(アップロードまで)はできたので、今後はschedulerのcronなどを使って1週間に1回の定期実行をする必要がありますがとりあえず今回はここまで。

ありがとうございました。

Discussion