😀

言語別のファイルダウンロード方法 - Node.js Python Rust Go

2022/09/10に公開

Node.js、Python、Go、Rustそれぞれでのファイル・ダウンロードの方法になります。

前提条件

お堅く書きますと、
膨大な数の巨大なzipファイルを効率よく(低予算で)ダウンロードするクライアント・プログラム
という要件になります。

補足事項

サーバは無限にスケールすると想定
ペイロードの中身を開けて演算処理はしない。そのためI/Oバウンドになる。
ファイルディスクリプタ(ソケット)は無限にある

基本設計指針

  1. ストリーミングを使用し、メモリの消費を少なくする
  2. コンカレンシー(非同期orマルチスレッド)でスループットを向上する

Node.js

Node.jsはシングルスレッドのイベントループであり、非同期プログラミングをします。
pipeを使用して少しづつファイルに書き出しています。

Node.js
import axios from "axios";
import * as fs from 'fs'
import * as stream from 'stream';
import {promisify} from 'util';
import path from 'path'


console.log('started...')
const finishedDownload = promisify(stream.finished);

const downloadZip = async (url: string) => {
    try {
        const response = await axios({
            method: "get",
            url,
            responseType: "stream", // ペイロードをストリーミング処理します
        })
        const outputStream = fs.createWriteStream(path.parse(url).base)
        await response.data.pipe(outputStream)
        await finishedDownload(outputStream); // これがないとファイル書き込み完了せずに抜けてしまいます
    } catch (error) {
        console.error(error)
    }
}

const URLs = ['https://github.com/facebook/react/archive/refs/heads/main.zip',
    'https://github.com/facebook/react/archive/refs/heads/0.3-stable.zip']

const tasks = URLs.map(url => downloadZip(url))

Promise.all(tasks).then((values) => {
    console.log(values);
});

promisifyを使用してプログラムがすぐに終了しないようにしているところがカッコ悪いです。
ペイロードに対してCPUヘビィな処理する場合はworker-threadsモジュールを使い別のCPUコアで実行することで、メインスレッドのイベントループをブロックしないようにします。

https://www.digitalocean.com/community/tutorials/how-to-use-multithreading-in-node-js

Python

Node.jsと同様にシングルスレッドを使用した非同期プログラミングになります。
aioを使用して非同期にネットワークとファイルにアクセスします。
また、チャンクで少しづつ読み書きすることで巨大なファイルをメモリに読み込まないようにします。

Pythonのコードでdownload_file()を実行した時点ではFutureが返されるだけでまだ実行されません。

import asyncio
import pathlib

import aiofiles
import aiohttp

URLs = ['https://github.com/facebook/react/archive/refs/heads/main.zip',
        'https://github.com/facebook/react/archive/refs/heads/0.3-stable.zip']

DEFAULT_CHUNK_SIZE = 1048576  # = 1 Mb


async def download_file(session: aiohttp.ClientSession, url: str):
    async with session.get(url) as response:
        file_name = pathlib.Path(url).name
        async with aiofiles.open(file_name, mode='wb') as f:
            while chunk := await response.content.read(DEFAULT_CHUNK_SIZE):
                await f.write(chunk)
        return url, file_name


async def main():
    async with aiohttp.ClientSession() as session:
        tasks = []
        for url in URLs:
            t = download_file(session, url)
            tasks.append(t)

        for fut in asyncio.as_completed(tasks):
            result = await fut
            print(result)


if __name__ == '__main__':
    asyncio.run(main())

Pythonでも条件付きで別スレッドを立ち上げてメインのイベントループをブロックしないようにすることは可能です。ただし、Python(CPython)ではスレッドはGILの制約があるためマルチコアでPythonコードを実行できません。ネットワークI/Oを呼び出したり、numpyなどのCコードの実行などでGILをリリースする必要があります。

https://superfastpython.com/threadpoolexecutor-vs-gil/

Rust

RustのTokioを使い、マルチスレッドのイベントループで非同期プログラミングをします。

tokio::spawnはマルチコア上で走るグリーンスレッドになります。つまり、遠慮なくたくさんのスレッドを生成できます。ただし、Node.jsと同じでイベントループを使用してグリーンスレッドを処理します。そのため、ブロック呼び出しをするとイベントループをブロックしてしまうのでご注意ください。
Node.jsとは違い、Tokioはマルチコア上でマルチスレッド(OSスレッド)になります。Node.jsが論理コアの数だけ起動しているイメージです。

use std::path::Path;
use tokio::io;
use anyhow::Result;
use futures::TryStreamExt;
use tokio::io::AsyncWriteExt;
use tokio_util::io::StreamReader;
use futures::{stream::FuturesUnordered, StreamExt};


async fn fetch_url(url: String) -> Result<(), Box<dyn std::error::Error>> {
    let tokens: Vec<&str> = url.split("/").collect();

    let mut response = reqwest::get(&url).await?;
    let mut file = tokio::fs::File::create(tokens.last().unwrap()).await?;
    while let Some(chunk) = response.chunk().await? {
        file.write_all(&*chunk).await?; // チャンクで書き出します
    }
    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let urls = vec!["https://github.com/facebook/react/archive/refs/heads/main.zip",
                    "https://github.com/facebook/react/archive/refs/heads/0.3-stable.zip"];
    let mut tasks = FuturesUnordered::new();
    for url in urls {
        let handle = tokio::spawn(async move { // goroutineみたいな感じ
            fetch_url(url.to_string()).await.unwrap();
        });
	// この時点でスレッドは走っています
        tasks.push(handle);
    }

    // join_allは遅いよ。https://github.com/tokio-rs/tokio/issues/2401
    while let Some(item) = tasks.next().await { // 完了を待ちます
        let () = item.unwrap();
    }
    Ok(())
}

Go

さて、Goだけは他の3つの言語と異なり、非同期ではなく同期的なプログラミングになります。
goroutineはとても軽量なスレッドであり、OSスレッドではありません。そして、プリエンプティブなタスクスケジューラのため、ランタイムがいい感じにCPUコアにスケジューリングしてくれます。

package main

import (
	"io"
	"log"
	"net/http"
	"os"
	"strings"
	"sync"
)

func main() {
	urls := []string{
		"https://github.com/facebook/react/archive/refs/heads/main.zip",
		"https://github.com/facebook/react/archive/refs/heads/0.3-stable.zip",
	}

	var wg sync.WaitGroup
	wg.Add(len(urls))

	for _, url := range urls {
		go func(url string) {
			defer wg.Done()
			t := strings.Split(url, "/")

			output, err := os.Create(t[len(t)-1])
			if err != nil {
				log.Fatalf("%v", err)
			}
			defer output.Close()

			res, err := http.Get(url)
			if err != nil {
				log.Fatalf("%v", err)
			}

			defer res.Body.Close()
			// ペイロードをメモリに全部読み込まないようにします
			_, err = io.Copy(output, res.Body)
			if err != nil {
				log.Fatalf("%v", err)
			}
		}(url)
	}

	wg.Wait()
}

Go1.14に導入されたAsync Preemptionについて中の人の説明
YouTubeのvideoIDが不正ですhttps://youtu.be/1I1WmeSjRSw?t=882

総評

やはり、Goは分かりやすいですね。シンプルなキーワード、同期プログラミングのためソースコードが直感的で分かりやすいです。一方で、非同期プログラミングは基本的なOSやネットワークの仕組みを知ってないとパフォーマンスを悪化させるコードをうっかり仕込んでしまいます。
またGoの凄さはCPUバウンドな処理にも対応できるランタイムの強さでしょう。

もちろん、インタプリター言語のJSとPythonでは例外処理しなくても良いため短期間で開発できるフットワークの軽さがあります。

Discussion