😎

Rustを使って大きなファイルを分割してGCSにアップロードする

2024/09/29に公開

背景

前回の記事: https://zenn.dev/harui/articles/9143558098c454 では、GCEのVMに溜まっていくファイルを定期的にGCSにアップロードしたうえで削除するプログラムをRustで書いたのでした。

しかし、そのプログラムではアップロードするファイルをメモリに一度に読み込むためメモリに載らないほど大きなファイルを扱うことはできませんでした。

この記事では、大きなファイルを小さなチャンクに分けてアップロードすることで、大きなファイルでもGCSにアップロードできるようなプログラムにしていきます。

実現方法

前回と同じく、google-cloud-storageというクレートを使っていきます。

Google Cloud Storageには「再開可能なアップロード(Resumable Upload)」をするAPIが提供されており、そのAPIを使い大きなファイルを複数のチャンクに分けてアップロードすることで、やりたいことが実現できそうです。
(参考: https://cloud.google.com/storage/docs/performing-resumable-uploads?hl=ja)

親切なことに、google-cloud-storageは再開可能なアップロードをサポートしており、データを分割してアップロードする場合のExampleまでドキュメントに載っています。これを参考にしてやっていきます。
https://docs.rs/google-cloud-storage/latest/google_cloud_storage/client/struct.Client.html#method.prepare_resumable_upload

コードは前回の記事[1]をベースに改変していく形で進めていきます。

upload_gcsをいじる

前回のプログラムでは、

let data = fs::read(&path_str).unwrap();

としている部分で、ファイルを一度にすべて読んでいる大変なことになっているのでした。その行は削除しておきます。

GCSの再開可能なアップロードは

  1. 再開可能なアップロードのセッションを開始する
  2. 1で作ったセッションを使って、(データを複数のチャンクに分けたり分けなかったりしながら) チャンクのデータをアップロードする
  3. 2をすべてのデータをアップロードするまで繰り返す

という流れで進んでいきます。 google-cloud-storageでも同じようにまずはセッションを作る必要があります。

let upload_client = client.prepare_resumable_upload(&req, &upload_type).await?;

などとするとアップロードセッションをクレートが作ってくれます。(たぶん)

データを分けるチャンクのサイズを設定します。ここで、チャンクのサイズは256KiBの倍数である必要があります。(GCS側の制約によるもの: https://cloud.google.com/storage/docs/performing-resumable-uploads?hl=ja#chunked-upload)

let chunk_size_num: u64 = 32 * 1024 * 1024;

こ こではデバッグのしやすさなども考えて32MiBにしました。

色々定義します。使うからです。

let mut last_r: Option<u64> = None;
let filesz = fs::metadata(&path_str).unwrap().len();
let mut file = fs::File::open(&path_str).expect("could not open the file");

Google Cloud Storageとgoogle-cloud-storageはアップロードするチャンクの範囲などを閉区間で表すことに注意してください。

ここでlast_rは最後にアップロードしたチャンクの終端のバイトを表すのですが、最後にアップロードした区間ががない場合(最初のチャンクの場合)もきれいに表せるようにOption<u64>にしてます。Rustっぽいですね~~~

(もしu64i64を気軽に変換できる言語だったら、last_r = -1;と初期化すればいいだけな気もします)

        while last_r.is_none() || last_r.unwrap() + 1 < filesz {
            let l;

            match last_r {
                Some(r) => {
                    l = r + 1;
                }
                None => {
                    l = 0;
                }
            }
            let r = std::cmp::min((l as i64+ chunk_size_num as i64 - 1) as u64, (filesz as i64 - 1) as u64);

            assert!(l <= r);

            log::debug!("reading bytes {}-{}/{}", l, r, filesz);
            let chunk_size_data= ChunkSize::new(l, r, Some(filesz));
            let chunk_data = read_file_chunk(&mut file, l, r);
            let status = upload_client.upload_multiple_chunk(chunk_data, &chunk_size_data).await?;
            log::info!("status : {:?}", status);

            match status {
                UploadStatus::Ok(object) => {
                    log::debug!("partialy uploaded : {:?}", object);

                    last_r = Some(r);
                },

                UploadStatus::NotStarted => {
                    log::debug!("upload not started");
                },

                UploadStatus::ResumeIncomplete(uploaded_range) => {
                    log::debug!("this range has been uploaded : {:?}", uploaded_range);

                    last_r = Some(uploaded_range.last_byte);
                }
            }

        }

これが、実際にチャンクごとに分けて処理する部分です。。

google-cloud-storageではチャンクを(チャンクの始まりの位置, チャンクの終わりの位置, 全体のファイルサイズ)で指定して作り、それとデータをuploadmultiple_chunkに渡すことでチャンクに分割したアップロードができます。

アップロードしようとした結果に応じてlast_rの更新処理を変えているのは(当たり前ですが)ポイント高いです。

ここで、チャンクのデータを読み込む処理はread_file_chunkで行っています。

read_file_chunk

// read file in [l, r] range
fn read_file_chunk(file: &mut fs::File, l: u64, r: u64) -> Vec<u8> {
    let mut buf = vec![0; (r as i64 - l as i64 + 1) as usize];
    file.seek(std::io::SeekFrom::Start(l)).expect("failed to seek");


    // https://doc.rust-lang.org/std/io/trait.Read.html#method.read_exact
    file.read_exact(&mut buf).expect("failed to read");

    buf
}

このようにファイルの一部分だけ読むというのが今回の記事の主題でしたが、主題は意外と簡単に書けてしまえるんですね。

ファイルの指定位置から読みたい場合には上のようにシークを使うことが(Rustに限らず)一般的なようで、シークというのはファイル操作における一般的な概念だそうです。
知りませんでした。

これですべてです。データを複数のチャンクに分けてアップロードすると聞くとめちゃめちゃ複雑な処理になるかもしれないと思うのですが、意外とシンプル(主観!)な感じに収まりました。

出来上がったコードの全体像

今まではパートごとに分けて説明してきましたが、最終的に次のようになります。

use chrono::{Datelike, Local};
use env_logger;
use google_cloud_storage::client::{Client, ClientConfig};
use google_cloud_storage::http::objects::upload::{Media, UploadObjectRequest, UploadType};
use google_cloud_storage::http::resumable_upload_client::{ChunkSize, UploadStatus};
use google_cloud_storage::http::Error;
use log::LevelFilter;
use regex::Regex;
use std::fs;
use std::io::{Read, Seek};
use std::path::PathBuf;
use tokio::runtime::Runtime;
use std::env;

fn is_valid_date(year: i32, month: u32, day: u32) -> bool {
    if month < 1 || month > 12 {
        return false;
    }

    if day < 1 || day > 31 {
        return false;
    }

    match month {
        4 | 6 | 9 | 11 => {
            if day > 30 {
                return false;
            }
            true
        }
        2 => {
            if year % 4 == 0 && (year % 100 != 0 || year % 400 == 0) {
                if day > 29 {
                    return false;
                }
                true
            } else {
                if day > 28 {
                    return false;
                }
                true
            }
        }

        _ => true,
    }
}

// read file in [l, r] range
fn read_file_chunk(file: &mut fs::File, l: u64, r: u64) -> Vec<u8> {
    let mut buf = vec![0; (r as i64 - l as i64 + 1) as usize];
    file.seek(std::io::SeekFrom::Start(l)).expect("failed to seek");


    // https://doc.rust-lang.org/std/io/trait.Read.html#method.read_exact
    file.read_exact(&mut buf).expect("failed to read");

    buf
}

fn upload_gcs(entry: PathBuf) -> Result<(), Error> {
    let rt = Runtime::new().unwrap();

    rt.block_on(async {
        let config = ClientConfig::default().with_auth().await.unwrap();
        let client = Client::new(config);

        let path_str = entry.to_str().unwrap().to_string();
        let file_name = entry.file_name().unwrap().to_str().unwrap();

        let req = UploadObjectRequest {
            bucket: "YOURBUCKETNAMEEEE".to_string(),
            ..Default::default()
        };
        let upload_type = UploadType::Simple(Media::new(format!("YOURDIRECTORYINGCS/{}", file_name)));
        let upload_client = client.prepare_resumable_upload(&req, &upload_type).await?;

        //      32MiB   = 32 * 1Ki * 1KiB
        let chunk_size_num:u64 = 32 * 1024 * 1024;

        // chunk size shoud be multiple of 256KiB
        //ref: https://cloud.google.com/storage/docs/performing-resumable-uploads?hl=ja#chunked-upload
        assert!(chunk_size_num % (256*1024) == 0);
        
        // [l,r]
        let mut last_r: Option<u64> = None;
        let filesz = fs::metadata(&path_str).unwrap().len();
        
        let mut file = fs::File::open(&path_str).expect("cloud not open the file"); // just opening file, not reading the whole file.

        while last_r.is_none() || last_r.unwrap() + 1 < filesz {
            let l;

            match last_r {
                Some(r) => {
                    l = r + 1;
                }
                None => {
                    l = 0;
                }
            }
            let r = std::cmp::min((l as i64+ chunk_size_num as i64 - 1) as u64, (filesz as i64 - 1) as u64);

            assert!(l <= r);

            log::debug!("reading bytes {}-{}/{}", l, r, filesz);
            let chunk_size_data= ChunkSize::new(l, r, Some(filesz));
            let chunk_data = read_file_chunk(&mut file, l, r);
            let status = upload_client.upload_multiple_chunk(chunk_data, &chunk_size_data).await?;
            log::info!("status : {:?}", status);

            match status {
                UploadStatus::Ok(object) => {
                    log::debug!("partialy uploaded : {:?}", object);

                    last_r = Some(r);
                },

                UploadStatus::NotStarted => {
                    log::debug!("upload not started");
                },

                UploadStatus::ResumeIncomplete(uploaded_range) => {
                    log::debug!("this range has been uploaded : {:?}", uploaded_range);

                    last_r = Some(uploaded_range.last_byte);
                }
            }

        }

        // verify whether upload has been completed
        let status =  upload_client.status(Some(filesz)).await;
        
        match status {
            Ok(upload_status) => {
                log::info!("upload completed : {:?}", file_name);
                log::debug!("upload_status : {:?}", upload_status);
                Ok(())
            },
            Err(er) => {
                log::error!("upload failed : {:?}", er);

                Err(er)
            }
        }
    })

}

fn main() {
    env_logger::builder()
        .filter_level(LevelFilter::Debug)
        .init();

    let args = env::args().collect::<Vec<String>>();

    if args.len() != 2 {
        log::error!("Usage: {} <directory>", args[0]);
        std::process::exit(1);
    }
    assert_eq!(args.len(), 2);

    let dir_path = args[1].clone();
    let re = Regex::new(r"(\d{4})(\d{2})(\d{2})\.txt$").unwrap();

    let today_date = Local::now();
    let today_year = today_date.year();
    let today_month = today_date.month();
    let today_day = today_date.day();

    for entry in fs::read_dir(dir_path).unwrap() {
        match entry {
            Ok(entry) => {
                let path = entry.path();
                let file_name = path.file_name().unwrap().to_str().unwrap();

                if let Some(caps) = re.captures(file_name) {
                    let year: i32 = caps[1].parse().unwrap();
                    let month: u32 = caps[2].parse().unwrap();
                    let day: u32 = caps[3].parse().unwrap();

                    if is_valid_date(year, month, day) {
                        if year < today_year
                            || (year == today_year && month < today_month)
                            || (year == today_year && month == today_month && day < today_day)
                        {
                            log::info!("we should replace : {}", file_name);
                            let result = upload_gcs(entry.path());

                            match result {
                                Ok(_) => {
                                    fs::remove_file(entry.path())
                                        .expect(format!("failed to remove file : {}", file_name).as_str());
                                    log::info!("removed : {}", file_name);
                                }
                                Err(e) => {
                                    log::error!("err : {}", e);
                                }
                            }

                        } else {
                            log::info!("we should not replace : {}", file_name);
                        }
                    } else {
                        log::debug!("Invalid : {}", file_name);
                    }
                } else {
                    log::debug!("not captured: {}", file_name);
                }
            }
            Err(e) => {
                log::error!("err : {}", e);
            }
        }
    }
}

最後に

書いていて思ったのですが、コードを手元で書いているときはrust-anylizerが働いているので変数の型が簡単にわかるものの、Zennの記事で型を省略するのは読者フレンドリーではなかったかもしれません。次回以降改善していきたいです。

最後まで読んでいただきありがとうございます!

私は初心者Rustaceanなので、「ここはこう書くとよりRustらしい」や「実はその書き方は冗長で、~と書ける」などのコメント・アドバイスもお待ちしております!

良ければ、左側のハートマークからいいねを押していってほしいです!

脚注
  1. 前回の記事はこちら https://zenn.dev/harui/articles/9143558098c454 ↩︎

Discussion