😺

GCEのVM上のファイルを定期的にGCSへアップロードして削除する with Rust

2024/09/28に公開

想定する読者

  • GCEのVM上のファイルを定期的にGCSへアップロードしたい!という人

  • 仮想通貨取引所の約定履歴や板情報を全部保存するなんてストレージがいくらあっても足りないから無理だよ~~~と思っている人

背景(botter向け)

botterの皆さんは当然取引所から配信されるデータをすべて保存したいと思っているはずです。
しかし、実際にすべて保存していると、大変な量のストレージを圧迫することになってしまいます。ということで、この記事では日々ストレージを圧迫するファイルを(日ごとに分けられているとする)、Google Cloud StorageへアップロードするプログラムをRustで書くことで問題を解決しました。

当初は、https://zenn.dev/mtkn1/articles/bigquery-subscription の記事で紹介されている方法のように、BigQueryにPub/Subでぶち込むことを考えていましたが、ぶち込んだデータをSQLで分析することはどうせないことや、Googleの公式RustSDKがなく、google-cloud-pubsubというクレートを使う場合でもPub/Subのセッションが24hほどで切れてしまい、それの検知や再接続するメソッドは提供されていないようだったことや、Pub/Subではお金がかかったりすることから、保存コストが安いGCSに保存することを考えました。

Cloud Run 関数を使うことも考えましたが、ランタイムでRustが提供されていないこととかが微妙だったり、DockerでやるにしてもDocker上からGCEのインスタンスにアクセスするステップが面倒くさそうだったのでやめました。

やりたいこと

GCEのVM上に毎日溜まっていく、***_20240927.txtのようなsuffixを持つファイルを定期的にチェックして、前日以前のファイルであったらGCSにアップロードしたうえで削除する

実現方法

Cloud Run 関数を使ったほうがイケてる感は増すものの、なんだか(Rustでやるには特に)面倒くさそうだったので、以下のように泥臭くcronでやることにしました。

  1. ***_20240927.txtのようにyyyymmdd.txtで終わるファイルを見つけたら、前日以前かどうか判定して、そうであればGCSにアップロードして削除するプログラムをRustで書く
  2. 1のプログラムのバイナリをVMインスタンス上のcronで定期的に実行する

Step0: (Google Cloud Storageを使っていない場合はセットアップする)

Google Cloud Storageにバケットを作りましょう。無料枠が適用されるなかでは一番日本に近いオレゴンが安牌だと思われます。

TODO: 書く

Step1: Rustで前日以前のファイルをチェックしてGCSにアップロードするプログラムを書く

現時点(2024-09-28)では、GCPの公式SDKではRust用のものは公開されていません。なのでgoogle-cloud-storageというクレートを使うことにします。
2022年頃から今まで継続的に更新されているクレートです。https://crates.io/crates/google-cloud-storage

TODO: 説明を書く

use chrono::{Datelike, Local};
use env_logger;
use google_cloud_storage::client::{Client, ClientConfig};
use google_cloud_storage::http::objects::upload::{Media, UploadObjectRequest, UploadType};
use google_cloud_storage::http::objects::Object;
use google_cloud_storage::http::Error;
use log::LevelFilter;
use regex::Regex;
use std::fs;
use std::path::PathBuf;
use tokio::runtime::Runtime;
use std::env;

fn is_valid_date(year: i32, month: u32, day: u32) -> bool {
    if month < 1 || month > 12 {
        return false;
    }

    if day < 1 || day > 31 {
        return false;
    }

    match month {
        4 | 6 | 9 | 11 => {
            if day > 30 {
                return false;
            }
            true
        }
        2 => {
            if year % 4 == 0 && (year % 100 != 0 || year % 400 == 0) {
                if day > 29 {
                    return false;
                }
                true
            } else {
                if day > 28 {
                    return false;
                }
                true
            }
        }

        _ => true,
    }
}

fn upload_gcs(entry: PathBuf) -> Result<Object, Error> {
    let rt = Runtime::new().unwrap();

    rt.block_on(async {
        let config = ClientConfig::default().with_auth().await.unwrap();
        let client = Client::new(config);

        let path_str = entry.to_str().unwrap().to_string();
        let file_name = entry.file_name().unwrap().to_str().unwrap();

        let upload_type = UploadType::Simple(Media::new(format!("DIR/{}", file_name)));

        let data = fs::read(&path_str).unwrap();

        let upload_result = client.upload_object(
            &UploadObjectRequest {
                bucket: "BUCKETNAME".to_string(),
                ..Default::default()
            },
            data,
            &upload_type,
        ).await;

        match upload_result {
            Ok(_) => {
                log::info!("upload success : {}", path_str);
            }
            Err(e) => {
                log::error!("upload failed : {}", e);
                return Err(e);
            }
        }

        upload_result
    })

}

fn main() {
    env_logger::builder()
        .filter_level(LevelFilter::Debug)
        .init();

    let args = env::args().collect::<Vec<String>>();

    assert_eq!(args.len(), 2);
    let dir_path = args[1].clone();
    let re = Regex::new(r"(\d{4})(\d{2})(\d{2})\.txt$").unwrap();

    let today_date = Local::now();
    let today_year = today_date.year();
    let today_month = today_date.month();
    let today_day = today_date.day();

    for entry in fs::read_dir(dir_path).unwrap() {
        match entry {
            Ok(entry) => {
                let path = entry.path();
                let file_name = path.file_name().unwrap().to_str().unwrap();

                if let Some(caps) = re.captures(file_name) {
                    let year: i32 = caps[1].parse().unwrap();
                    let month: u32 = caps[2].parse().unwrap();
                    let day: u32 = caps[3].parse().unwrap();

                    if is_valid_date(year, month, day) {
                        if year < today_year
                            || (year == today_year && month < today_month)
                            || (year == today_year && month == today_month && day < today_day)
                        {
                            log::info!("we should replace : {}", file_name);
                            let result = upload_gcs(entry.path());

                            match result {
                                Ok(_) => {
                                    fs::remove_file(entry.path())
                                        .expect(format!("failed to remove file : {}", file_name).as_str());
                                    log::info!("removed : {}", file_name);
                                }
                                Err(e) => {
                                    log::error!("err : {}", e);
                                }
                            }

                        } else {
                            log::info!("we should not replace : {}", file_name);
                        }
                    } else {
                        log::debug!("Invalid : {}", file_name);
                    }
                } else {
                    log::debug!("not captured: {}", file_name);
                }
            }
            Err(e) => {
                log::error!("err : {}", e);
            }
        }
    }
}

上のように、引数でディレクトリのパスを受け取って、そのディレクトリ内のtxtファイルであって前日以前の日付で終わるものをGCSにアップロードして削除するようにしてみました。

実際に動かす際は、プログラム中のBUCKETNAMEDIRをそれぞれ実際のGCSのバケット名や、GCSで保存したいディレクトリに変更してください。

使うクレートをcargo addしていただけではOpenSSL関係のリンカのエラーが出てビルドできなかったので、ビルドがうまくいったときのCargo.tomlも載せておきます。

[package]
name = "daily-upload-and-remove"
version = "0.1.0"
edition = "2021"

[dependencies]
chrono = "0.4.38"
env_logger = "0.11.5"
google-cloud-storage = "0.22.1"
log = "0.4.22"
regex = "1.10.6"
tokio = {version="1.40.0", features=["rt-multi-thread"]}
reqwest = { version = "0.11", features = ["default-tls", "rustls-tls"] } # Or "openssl-tls" if explicitly using OpenSSL
openssl = { version = "0.10", features = ["vendored"] } 

Step2. cronの設定

GCEのVMインスタンスに接続して、
crontab -e

として開かれたエディタ上で

0 * * * * (Rustプログラムのバイナリのあるpath) (Rustプログラムに渡す引数(ディレクトリのパス)) >> (ログの出力先のファイルのパス) 2>&1

と記入することで、x時00分になるたびに、Rustのプログラムを呼んで、ログを指定したファイルに書き足してくれるようになります。

問題点

上のプログラムは、ファイルをメモリに読み込んでからアップロードしているため、メモリに載らないほど大きなファイルをアップロードすることはできません。
GCEの無料枠のVMインスタンスのメモリは1GBしかないため、かなり厳しい制約になっています。

この制約は再開可能なアップロードを使って、ファイルを分割しつつアップロードすることで改善できる見込みです。
その改善についても追々記事にしていく予定です。

TODO

費用の見積もりなどを書く (BigQueryなどと比較する)

Discussion