📝

GoogleDriveに保存された動画ファイルを利用してCloudFront + S3での署名付きURLでの動画配信を行う

2024/01/08に公開

概要

WHY

自社の動画コンテンツをYoutube上にアップロードし、その動画をYoutubeのAPIとiFrameを用いて配信していたものを、Youtubeに依存せずに自社リソースにコンテンツを置いてそこから配信する形に切り替える必要が出てきたため。

TODO

  • CloudFront及びS3を用いての動画配信を署名付きURLで行うためのリソース設計・構築
  • コンテンツを自社リソースで管理するための仕組みづくり

要件とネックとどう解決するか

要件

  • コストの大幅な増加はNG
  • 動画コンテンツ自体を運用・保守・管理するのは非エンジニアである
    • コンテンツ管理に対し、複雑なシステムやアプリケーションの作成はNG
  • 動画コンテンツの保守・管理は既存のものから変更しない
    • 上述の理由から既存のものから変更した際に運用ができなくなる可能性があるため
  • Youtubeの利用は廃止する
  • 動画コンテンツは限られたユーザーのみ閲覧できる

既存の運用・保守・管理方法

  1. GoogleDriveに動画ファイル保存(バックアップ及び親データ扱い)
  2. 1に保存した動画と同じものをYoutubeチャンネルにアップ
  3. Youtube APIでリンクを取得し、iFrameで配信
  4. 動画のメタ情報等と当該動画のYoutubeへのリンクを管理用のスプレッドシートに記載
  5. 動画情報の更新や新しい動画の追加を検知するため、日次のバッチ処理でスプレッドシートの情報をMySQL(AWSのRDS)に保存

ネック

  • 動画コンテンツ自体を運用・保守・管理するのは非エンジニアである
  • 動画コンテンツの運用・保守・管理は既存のものから大きく動かさないこと

本来は動画コンテンツの運用・保守・管理をスプレッドシートではなく、新しい社内システムを作成しそこで賄うのがベストと考えていたが、現場から諸般の事情でどうしてもスプレッドシートから変えないでくれと念押しをされてしまったので断念し、妥協点を探ることに。

新しい運用・保守・管理

リソースはCloudFront+S3で配信をすることに決定。
あとは結果的に以下の通り変更することとなった。

  1. GoogleDriveに動画ファイル保存(バックアップ及び親データ扱い)
  2. 動画管理のスプレッドシートに当該ファイルのGoogleDriveのPathを記載
  3. 日次のバッチ処理でスプレッドシートの情報キャッチ
  4. 3で変更があった場合や新規に追加された動画があった場合はGoogleDriveから対象の動画ファイルをS3へアップロード
  5. CloudFrontの署名付きURLを用いて対象の動画を配信

運用・保守・管理において非エンジニアが関わる領域に関しては新しいことはGoogleDriveのPathの記載だけにして、コンテンツの保存や更新は日次処理において自動化することで対応。
問題が起きたときには

  1. スプレッドシートの運用がルール通りなされているか
  2. コンテンツの保存・更新・配信が正常であるか

の2点において非エンジニアの領域とエンジニアの領域とで明確に切り分けもできるので、まずまずの妥協点にはなった。

設計

ざっくりとした構成図は以下の通り。

なお、今回のリソースの構築と管理はTerraformで行った。

S3+CloudFrontでの署名付きURL配信

参考

上述の記事が非常に参考になりました。
今回自分は以下の手順でリソースの作成を行った。

  1. キーペアの準備(Localで秘密鍵と公開鍵を作成する)
  2. 公開鍵・秘密鍵をSecrets Managerに保存
  3. 各リソース作成

以下個別のTips。
なお、Secrets ManagerもTerraformで管理できなくはないが、セキュリティ上分離するほうが良いため管理から外している。

公開鍵と秘密鍵をSecretsMangerで扱うときの注意点

参考: 「AWS Secrets MangerとParameter Storeに登録した公開鍵をTerraformで取得する」

CloudFrontでキーグループを作成する場合、公開鍵をSecretsMangerに保存する場合は、それに対応する必ずbinaryの形式にする。
SecretsMangerでは改行はスペースと見なされるため、textは値を取得する際に
InvalidArgument: Your request contains empty/invalid/out of limits RSA Encoded Keyでエラーとなってしまう。
逆に秘密鍵は--secret-stringで保存しないと署名付きURL発行時にエラーになる。

AWS CLIでSecretsMangerからシークレットを取得する(値も含む)

Secrets Managerにきちんと鍵を登録できているか確認する用。

aws --profile リージョンが定義されたプロファイル名 secretsmanager get-secret-value \
    --secret-id シークレット名

AWS CLIでSecretsMangerのシークレットの値を更新する

もし仮に間違えて登録してしまった場合に更新したいときに。

aws --profile リージョンが定義されたプロファイル名 secretsmanager put-secret-value \
    --secret-id シークレット名 \
    --secret-binary fileb://path

--secret-binaryを使うのであれば必ずfileb://を使う。
逆にfile読み出しで文字列の値で更新するなら--secret-text file://pathとする。

Terraform例

  • streaming_videoはTerraform上での識別子のようなもので任意にわかりやすいものを設定する
    • 今回自分は全て同じ識別子にしたが、あまり良くなかったかもしれない
  • S3は何も指定しないとStandardで作成されてしまうが、常にIntelligent-Tieringにしたい場合はruleで設定する(daysは設定しなくてよし)
  • S3のアクセスブロックはTerraformで作成する場合、依存関係に注意する
    • depends_onを適切に設定しないとリソースの構築に失敗する
      • パブリックアクセスブロックとオーナーシップコントロールを作成してから、アカウントパブリックアクセスブロックを作成し、バケットポリシーを作成する
# バケット作成
resource "aws_s3_bucket" "streaming_video" {
  bucket        = var.streaming_video_bucket_name
  force_destroy = "false"
  tags = {
    env = var.env
  }
}

# CORSの設定
resource "aws_s3_bucket_cors_configuration" "streaming_video" {
  bucket = aws_s3_bucket.streaming_video.id

  cors_rule {
    allowed_headers = ["*"]
    allowed_methods = ["GET", "HEAD"]
    allowed_origins = var.streaming_video_target_origin
    expose_headers  = []
    max_age_seconds = 3000
  }
}

resource "aws_s3_bucket_lifecycle_configuration" "streaming_video_bucket_config" {
  bucket = var.streaming_video_bucket_name

  # バケット配下全てに適用するためfilterは定義しない
  rule {
    id     = "streaming_video_bucket_archive_rule"
    status = "Enabled"

    # デフォルトでIntelligent-Tieringにする(days=0の状態)
    transition {
      storage_class = "INTELLIGENT_TIERING"
    }
  }
}

# 暗号化の設定が必要ならresource "aws_s3_bucket_server_side_encryption_configuration"を定義
# ただし、設定しなくても現在はデフォルトでAES256で暗号化してくれる。providerのバージョン指定に注意

resource "aws_s3_bucket_ownership_controls" "streaming_video" {
  bucket = aws_s3_bucket.streaming_video.id

  rule {
    object_ownership = "BucketOwnerEnforced" # ACLを無効にする、現在のデフォルトだが念のため明示的に指定しておく
  }
}

# パブリックアクセスブロックの設定
resource "aws_s3_bucket_public_access_block" "streaming_video" {
  bucket                  = aws_s3_bucket.streaming_video.id
  block_public_acls       = true
  block_public_policy     = true
  ignore_public_acls      = true
  restrict_public_buckets = true
}

# アカウントパブリックアクセスブロックの設定
resource "aws_s3_account_public_access_block" "streaming_video" {
  block_public_acls       = true
  block_public_policy     = true
  ignore_public_acls      = true
  restrict_public_buckets = true

  depends_on = [
    aws_s3_bucket_public_access_block.streaming_video,
    aws_s3_bucket_ownership_controls.streaming_video
  ]

}

# バケットポリシーの設定
resource "aws_s3_bucket_policy" "streaming_video" {
  bucket = var.streaming_video_bucket_name
  policy = data.aws_iam_policy_document.bucket_streaming_video.json
  depends_on = [
    aws_s3_bucket_public_access_block.streaming_video,
    aws_s3_account_public_access_block.streaming_video
  ]
}

# バケットポリシーの実体を定義
data "aws_iam_policy_document" "bucket_streaming_video" {
  version = "2012-10-17" # 新規にポリシーを作る場合はこのバージョンを指定
  statement {
    principals {
      type        = "Service"
      identifiers = ["cloudfront.amazonaws.com"]
    }
    actions   = ["s3:GetObject"]
    resources = ["${aws_s3_bucket.streaming_video.arn}/*"]
    # ポリシーの評価
    condition {
      test     = "StringEquals"                             # 旧ポリシーのStringEquals
      variable = "aws:SourceArn"                            # 旧ポリシーのAWS:SourceArn
      values   = [aws_cloudfront_distribution.streaming_video.arn] # 旧ポリシーのAWS:SourceArnのvalue
    }
  }
}

  • ディストリビューションにはOAC(オリジンアクセスコントロール)を必ず設定する
  • キャッシュポリシーは必ず設定する(設定しないと自動でLegacyの設定を行われてしまう)
  • オリジンリクエストポリシー及びレスポンスヘッダーポリシーは必要に応じて設定する
    • 動画の再生においては必要なCORSの設定をしておけば問題ないが、動画に対して何かブラウザの拡張機能を適用したいと言った場合は上記のポリシーを設定しないとCORSに引っかかってしまう
      • 例えば動画のある場面をスクリーンショットにするという拡張機能を使用した場合、動画を静止画に描画しようとする際にCORS的にはAccess-Control-Allow-Origin及びAccess-Control-Allow-Headersが明示的にリクエストに含まれていないとNGということらしい
      • なお、HTML側のvideoタグ側でcrossorigin="anonymous"を追加することも必要

# ディストリビューションの作成
resource "aws_cloudfront_distribution" "streaming_video" {

  origin {
    domain_name = aws_s3_bucket.streaming_video.bucket_regional_domain_name
    origin_id   = aws_s3_bucket.streaming_video.id
    # OAC
    origin_access_control_id = aws_cloudfront_origin_access_control.streaming_video.id
  }

  tags = {
    env = var.env
  }

  viewer_certificate {
    cloudfront_default_certificate = true
  }

  default_cache_behavior {
    target_origin_id       = aws_s3_bucket.streaming_video.id
    viewer_protocol_policy = "https-only"
    allowed_methods        = ["GET", "HEAD", "OPTIONS"]
    cached_methods         = ["GET", "HEAD", "OPTIONS"]
    # キーグループの指定
    trusted_key_groups = [aws_cloudfront_key_group.streaming_video.id]
    default_ttl        = "0" # キャッシュの有効期限、0はキャッシュしない
    compress           = "true"

    #  使用するポリシー群
    cache_policy_id            = data.aws_cloudfront_cache_policy.streaming_video.id
    origin_request_policy_id   = data.aws_cloudfront_origin_request_policy.streaming_video.id
    response_headers_policy_id = aws_cloudfront_response_headers_policy.streaming_video.id
  }
  # コンテンツのエンドユーザーリクエストを受け付けるディストリビューションが有効かどうか。
  enabled         = "true"
  # ディストリビューションでIPv6が有効かどうか
  is_ipv6_enabled = "true"


  restrictions {
    geo_restriction {
      # ホワイトリスト、ブラックリスト形式にするかの設定、noneはどちらもしない
      restriction_type = "none"
    }
  }
}

# 公開鍵。先にSecretsMangerに登録したものを取得して、使用する(バイナリ形式で保存しておく)
data "aws_secretsmanager_secret" "streaming_video" {
  # シークレットの名前を指定
  name = var.streaming_video_public_key_secret
}

data "aws_secretsmanager_secret_version" "streaming_video" {
  secret_id = data.aws_secretsmanager_secret.streaming_video.id
}

# 取得した公開鍵をCloudFrontに登録する
resource "aws_cloudfront_public_key" "streaming_video" {
  encoded_key = data.aws_secretsmanager_secret_version.streaming_video.secret_binary
  name        = var.aws_cloudfront_public_key
}

# CloudFrontのキーグループを設定
resource "aws_cloudfront_key_group" "streaming_video" {
  items = [aws_cloudfront_public_key.streaming_video.id]
  name  = var.aws_cloudfront_key_group
}

# Cache Policy(設定しないとLegacyの設定になる)
# 参考: https://docs.aws.amazon.com/ja_jp/AmazonCloudFront/latest/DeveloperGuide/using-managed-cache-policies.html
data "aws_cloudfront_cache_policy" "streaming_video" {
  name = "Managed-CachingOptimized" # キャッシュ効率を最適化
}

# オリジンリクエストポリシー
data "aws_cloudfront_origin_request_policy" "streaming_video" {
  name = "Managed-CORS-S3Origin" # AWSのマネージドのpolicy
}

# レスポンスヘッダーポリシー
resource "aws_cloudfront_response_headers_policy" "streaming_video" {
  name    = "${var.env}-streaming_video-response-headers-policy"

  cors_config {
    access_control_allow_credentials = false

    access_control_allow_headers {
      items = ["*"]
    }

    access_control_allow_methods {
      items = ["GET", "HEAD"]
    }

    access_control_allow_origins {
      items = var.streaming_video_target_origin
    }

    origin_override = true
  }
}

resource "aws_cloudfront_origin_access_control" "streaming_video" {
  name                              = var.aws_cloudfront_origin_access_control_name
  origin_access_control_origin_type = "s3"
  # CloudFrontが署名するリクエスト、特別指定がなければalways
  signing_behavior                  = "always"
  # CloudFrontがどのようにリクエストに署名(認証)するかの指定。sigv4のみ。
  signing_protocol                  = "sigv4"

}

Terraformで気をつけることとか

  • provider.tfにおけるTerraformの設定バージョンとrequired_providersで設定したプロバイダーのバージョンには注意する
    • どちらもバージョンによって書式が違っていたりするため
      • Terraformのバージョンとプロバイダーのバージョンとが乖離しすぎるのも難あり
    • AWSの場合はこちらで自分が使用するバージョンに応じたドキュメントを確認しつつ記載することをおすすめ
      • S3の暗号化のデフォルトに対応しているのは4系からなのでそれ以上を使用する
  • tfstateファイルはGitの管理に載せない
    • tfbackendファイルを作成して、任意のリソースに保存する
      • S3だと例えば以下のように記載することで任意のバケットに置ける
bucket  = バケット名
key     = キー名(ex. terraform/stream_video/stream_video.tfstate)
region = "ap-northeast-1"
profile = 使用しているプロファイル名

GoogleDriveからS3にファイルをアップロードする

boto3とGoogleDriveAPIのv3を使用して行う。
署名付きURLは動画へのアクセス時に再度boto3でリクエストをして、動画のアクセス期限等の必要なパラメータを取得しないといけないのでここではその雛形となるURLを返すようにする。


import boto3
import io
import re
import time
import traceback

from logging import getLogger

from botocore.config import Config
from botocore.exceptions import BotoCoreError, ClientError
from googleapiclient.errors import HttpError
from googleapiclient.http import MediaIoBaseDownload


def get_video_url(url: str):
    """
    Google Driveに保存してある動画を配信で用いるS3バケットへ保存する。
    署名付きURLのクエリパラメーターを除いた配信用URLを作成し、返り値として返す。

    また、transaction.atomic下でtry-exceptを行っているため例外をキャッチした場合その部分のロールバックは行われないため、
    以下のケースの場合は処理を中断して上記と同様に空文字を返す。
    - GoogleDriveにて対象のファイルが存在しない、またはファイルを取得するのに失敗した場合
    - ファイルのダウンロードに失敗した場合
    - s3へのアップロード時に何らかのエラーが起きた場合
    - s3へのアップロード時にエラーが起きた場合のリトライが全て失敗した場合

    return: str(処理中止の際は空文字)
    """
    # 処理時間計測開始
    process_start = time.time()
    # Google Drive API v3準拠でClientを作成
    drive_service = _get_official_client()
    # 読み込まれたスプレッドシートからURLを取得できなかった場合は空文字を返す
    if not url:
        return ""
    else:
        # URLからid部分を抽出する
        file_id_match = re.search(r'file/d/([^/]+)/view', url)
        if file_id_match is None:
            logger.info('指定されたURLからファイルのidを抽出できません \n'
                        f'URL: {url}')
            return ""
        file_id = file_id_match.group(1)
        try:
            # 存在確認、ファイル取得とは別にやる必要がある
            file_meta = drive_service.files().get(
                fileId=file_id, supportsAllDrives=True).execute()
            logger.info(f'file_name: {file_meta["name"]} をGoogleDriveからダウンロードします')
            # リクエストをそのまま渡す必要があるのでexecute()しない
            request = drive_service.files().get_media(fileId=file_id, supportsAllDrives=True)
        except HttpError as error:
            if error.resp.status == 404:
                logger.info('ファイルが存在しません \n'
                            f'URL: {url}')
            else:
                logger.info('Google Drive APIへのリクエストに失敗しました')
            return ""

        # オンメモリに保存
        file_byte_object = io.BytesIO()
        downloader = MediaIoBaseDownload(file_byte_object, request)
        done = False
        # ダウンロード終わるまで待つ
        while done is False:
            try:
                # next_chunk()のリトライ回数は3
                status, done = downloader.next_chunk(3)
                logger.info("Download %d%%." % int(status.progress() * 100))
            except HttpError as error:
                logger.info(f'ファイルのダウンロードに失敗しました {error}')
                return ""

        # ファイルデータを抽出
        file_data = file_byte_object.getvalue()
        file_size = round(len(file_data) / (1024 * 1024), 1)
        logger.info(f"ダウンロードしたファイルのサイズは: {file_size}MB")
        dl_from_g_drive_elapsed_time = time.time() - process_start
        # GoogleDriveからファイルデータを抽出するまでかかったタイム
        logger.info(
            f"dl_from_g_drive_elapsed_time:{dl_from_g_drive_elapsed_time}")
        # S3へアップロード
        s3_upload_start = time.time()
        # boto3でタイムアウト時に自動でエクスポネンシャルバックオフでリトライするための設定
        config = Config(
            retries={
                # リトライ回数
                'max_attempts': 3,
                'mode': 'standard'
            }
        )
        s3 = boto3.client(
            's3',
            aws_access_key_id=AWS_ACCESS_KEY,
            aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
            config=config)
        mime_type = 'video/mp4'
        content_disposition = 'inline'
        # 任意のものを設定
        file_name = f"{url}.mp4"
        # 任意のものを設定(バケット配下のどこに置くかのPathを書く)
        save_directory = "stream_video/"
        s3_obj_key = save_directory + file_name
        try:
            s3.put_object(
                Body=file_data,
                Bucket=S3_BUCKET_NAME_CLOUDFRONT,
                Key=save_directory + file_name,
                ContentType=mime_type,
                ContentDisposition=content_disposition,
                # Intelligent-Tieringを強制する
                StorageClass='INTELLIGENT_TIERING'
            )
        except BotoCoreError as e:
            logger.info(f"BotoCoreError: {e} \n"
                        f'URL: {url}')
            return ""
        except ClientError as e:
            http_status_code = e.response["ResponseMetadata"]["HTTPStatusCode"]
            error_code = e.response["Error"]["Code"]
            logger.info(
                f"put_objectに失敗しました: エラーコード: {http_status_code}, エラー内容: {error_code} \n"
                f'URL: {url}')
            return ""

        s3_upload_elapsed_time = time.time() - s3_upload_start
        logger.info(f"s3_upload_elapsed_time:{s3_upload_elapsed_time}")

        # 署名付きURLのパラメーターを除いた配信用URLを作成
        video_url = f"https://{CLOUDFRONT_AWS_DISTRIBUTION_DOMAIN_NAME}/{s3_obj_key}"
        # GoogleDriveからダウンロード ~ S3アップロード完了までにかかった時間
        elapsed_time = time.time() - process_start
        logger.info(f"{file_name} total elapsed_time :{elapsed_time}")

        return video_url

あとは動画を開くときのAPIで


import datetime
from botocore.signers import CloudFrontSigner
import rsa

def rsa_signer(message):
  session = boto3.session.Session()
  client = session.client(
    service_name='secretsmanager',
    region_name=aws_region_name,
    aws_access_key_id=aws_access_key,
    aws_secret_access_key=aws_secret_access_key
  )

  # 秘密鍵のSecrets Managerでのシークレット名を指定する
  __secret_value = client.get_secret_value(
    SecretId=video_url_secret_name
  )
  cloudfront_private_key_bytes = str.encode(self.__secret_value, encoding='ascii')
  cloudfront_private_key = serialization.load_pem_private_key(cloudfront_private_key_bytes, password=None, backend=default_backend())
  return rsa.sign(message, cloudfront_private_key, 'SHA-1')

key_id = "Your CloudFront Key Pair ID"
url = "Your CloudFront distribution URL for the object"
expiration = datetime.datetime.utcnow() + datetime.timedelta(hours=1)

signer = CloudFrontSigner(key_id, rsa_signer)
signed_url = signer.generate_presigned_url(url, date_less_than=expiration)

といったように取得すれば良いということになる。
(あくまでサンプルです)

Tips

Intelligent-Tieringを強制する

先程Terraformでリソース自体にIntelligent-Tieringを強制させたが、どうやらboto3でのPUT時にも明示的に指定をする必要があり、StorageClass='INTELLIGENT_TIERING'put_object()で引数に設定した。
(設定しないとStandard扱いになる)

MediaIoBaseDownload

GoogleDriveからファイルをダウンロードするベーシックな書き方は以下の通り


# オンメモリに保存
file_byte_object = io.BytesIO()
downloader = MediaIoBaseDownload(file_byte_object, request)
done = False
# ダウンロード終わるまで待つ
while done is False:
    try:
        # next_chunk()のリトライ回数は3
        status, done = downloader.next_chunk(3)
        logger.info("Download %d%%." % int(status.progress() * 100))
    except HttpError as error:
        logger.info(f'ファイルのダウンロードに失敗しました {error}')
        return ""

MediaIoBaseDownloadchunksize=1024*1024といったようにチャンクサイズを指定すればマルチパート的にダウンロード処理が可能。
引数に指定がない場合は100MB刻みで行う。
downloader.next_chunk(3)とするとエクスポネンシャルバックオフなリトライをしてくれる。

参考

Google Drive APIのバージョン問題

GoogleDriveAPIにおいてv2はすでにLegacyであり、v3への移行がGoogleからは推奨されている。
(バージョン自体はv4まである)
今回の対応にあたりシステムではv2を使用していたのでv3への移行をしたが、pythonでGoogleDriveAPIを扱う場合、google-api-python-clientというライブラリがあるのだが、v2.97.0(2023/08/17時点の最新)においてはv3のメソッドであるfiles.get()でファイルデータを取得しようとしてもメタデータが返るというバグがあるため、やむなくv2のメソッドであるget_media()を併用した。
参考

GoogleDriveのAPIを利用するにはCredentialが必要なので以下のように取得する。


def _get_official_client():
    """
    CredentialからClientを作成する。
    scopesにはそのClientに対して許可する権限を指定する。
    """
    drive_scope = [
        'https://spreadsheets.google.com/feeds',
        'https://www.googleapis.com/auth/drive',
        'https://www.googleapis.com/auth/drive.file',
        "https://www.googleapis.com/auth/spreadsheets"
    ]

    credentials = Credentials.from_service_account_info(
        GCP_CREDENTIAL_V3, scopes=drive_scope
    )
    return build('drive', 'v3', credentials=credentials)

終わり

最初に話を受けたときにはTerraformもGoogleDriveAPIも全くのゼロベースだったのでイメージが全然湧かず、どうしたものかなと頭を抱えそうになったけれどもリソース構築(=Terraform)の方は先駆者の方々がかなり詳細に情報を共有してくれていたのでなんとかなりました。

1番の問題はGoogleDriveのAPIでバージョンでの仕様の違いとfiles.get()のバグに気づくのがドキュメントの情報が薄くかつAPIに関しては情報が散財しているのと、似たような事例が存在しなかったのもあって実装に少し手間取ってしまったことは少し悔しかったです。

この記事がどなたかの参考になれば幸いです。

Discussion