👏

AWS SDK Java で S3 に少しずつ来たデータをバッファリングしつつ一つのファイルとしてアップロードするサンプルコード

2021/10/23に公開2

AWS SDK Java には v1 と v2 があります。
v1 で TransferManager を使ってできていたことが記事執筆時点で v2 ではまだ高レベル API が preview 扱いです。
また TransferManager だと、事前に全体のファイルサイズがわかっている必要があるとか ファイルサイズを指定せずに InputStream を使うと全部メモリに載せようとするとかいくつか制限がありました。以下のドキュメントを読むと低レベル API を使うとできそうなことが書いてあったのでやってみました。
ドキュメントの例だと AWS SDK Java v1 ですが、この記事では v1 と v2 の両方を使ってみました。

https://docs.aws.amazon.com/ja_jp/AmazonS3/latest/userguide/mpu-upload-object.html

何らかのデータストアからデータを取得して一行ずつ CSV に変換してある程度バッファに溜まったらマルチパートアップロードの一つのパートとしてアップロードするようなことを想定しています。
サンプルコードでは単純な for ループでデータを生成しています。
このコードそのままだと動きませんが AWS Lambda 上で動かすことを想定しています。

build.gradle

まずは build.gradle です。ふつうの Java アプリケーションとして作ります。
AWS SDK Java を v1 と v2 両方使うようにしているのは v1 と v2 の比較用に両方のコードを書くからです。それぞれ別のクラスに実装します。

plugins {
    id 'application'
    id 'java'
    id 'com.github.johnrengelman.shadow' version '6.0.0'
}

group 'net.okkez'
version '1.0-SNAPSHOT'

jar {
    manifest {
        attributes('Main-Class': 'net.okkez.uploadcsv.UploadCsv')
    }
}

repositories {
    mavenCentral()
}

dependencies {
    implementation platform('com.amazonaws:aws-java-sdk-bom:1.12.79')
    implementation 'com.amazonaws:aws-java-sdk-s3'
    implementation platform('software.amazon.awssdk:bom:2.17.61')
    implementation 'software.amazon.awssdk:s3:2.17.61'
    implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-csv:2.13.0'

    testImplementation 'org.junit.jupiter:junit-jupiter-api:5.7.0'
    testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.7.0'
}

test {
    useJUnitPlatform()
}

v1 を使ったコード

package net.okkez.uploadcsv;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import com.fasterxml.jackson.dataformat.csv.CsvMapper;
import com.fasterxml.jackson.dataformat.csv.CsvSchema;

import java.io.BufferedOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class UploadCsv {
  public static void main(String[] args) {
    var a = new UploadCsv();
    try {
      a.generate1_bytebuffer();
    } catch (IOException e) {
      e.printStackTrace();
    }
  }


  public void generate1_bytebuffer() throws IOException {
    final var s3 = AmazonS3ClientBuilder.defaultClient();
    final var bucket = "example";
    final var key = "test-bytebuffer.csv";

    var schema = buildSchema();
    var mapper = new CsvMapper();
    var writer = mapper.writer(schema);

    List<PartETag> eTags = new ArrayList<>();
    InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(bucket, key);
    InitiateMultipartUploadResult initResult = s3.initiateMultipartUpload(initRequest);

     // オーバーフローしないようにちょっと大きめに確保しておく
    var buffer = ByteBuffer.allocate(10 * 1024 * 1024);
    var header = List.of("id", "age", "email", "sequence");
    var headerBytes = String.join(",", header).getBytes(StandardCharsets.UTF_8);
    buffer.put(headerBytes);
    int partNumber = 0;

    for (var i = 0; i < 1000000; i++) {
      var bytes = writer.writeValueAsBytes(List.of("123", "45", "user@example.com", i));
      buffer.put(bytes);

      System.out.println("position: " + buffer.position());
      // buffer に 5MiB 溜まったらパートとしてアップロードする
      if (buffer.position() > 5 * 1024 * 1024) {
        System.out.println("Flushing...");
        var path = Files.createTempFile("temp-", ".csv");
        var ch = new FileOutputStream(path.toString()).getChannel();
        try (ch) {
          buffer.flip();
          var writtenBytes = ch.write(buffer);
          partNumber++;

          UploadPartRequest request =
              new UploadPartRequest()
                  .withBucketName(bucket)
                  .withKey(key)
                  .withFile(path.toFile())
                  .withFileOffset(0)
                  .withUploadId(initResult.getUploadId())
                  .withPartNumber(partNumber)
                  .withPartSize(writtenBytes)
                  .withGeneralProgressListener(
                      event -> {
                        System.out.println(
                            "progress: " + event.getBytesTransferred() + "/" + event.getBytes());
                      });
          UploadPartResult result = s3.uploadPart(request);
          System.out.println(result.getPartETag());
          eTags.add(result.getPartETag());
          buffer.clear();
        } finally {
          Files.deleteIfExists(path);
        }
      }
    }

    // for ループが終わったあと buffer に残っているデータをアップロードする
    if (buffer.position() > 0) {
      var path = Files.createTempFile("temp-", ".csv");
      var ch = new FileOutputStream(path.toString()).getChannel();
      try (ch) {
        buffer.flip();
        int writternBytes = ch.write(buffer);
        partNumber++;
        UploadPartRequest request =
                new UploadPartRequest()
                        .withBucketName(bucket)
                        .withKey(key)
                        .withFile(path.toFile())
                        .withFileOffset(0)
                        .withUploadId(initResult.getUploadId())
                        .withPartNumber(partNumber)
                        .withPartSize(writternBytes);
        UploadPartResult result = s3.uploadPart(request);
        eTags.add(result.getPartETag());
        buffer.clear();
      } finally {
        Files.deleteIfExists(path);
      }
    }

    // 全パートのアップロードが完了したらマルチパートアップロードの完了処理を行う
    CompleteMultipartUploadRequest completeRequest =
            new CompleteMultipartUploadRequest(bucket, key, initResult.getUploadId(), eTags);
    s3.completeMultipartUpload(completeRequest);
    System.out.println("done");
  }

  private CsvSchema buildSchema() {
    return CsvSchema.builder()
        .addColumn("id")
        .addColumn("age")
        .addColumn("email")
        .addColumn("sequence")
        .build();
  }
}

PoC 用のコードなので、大体上から読めばわかるようにしてあります。
ほぼマルチパートアップロードのサンプルコード通りです。

ByteBuffer に 5MiB 溜まったら一時ファイルに書き出して一つのパートとしてアップロードします。
最後に余った分も忘れずにアップロードします。

v2 を使ったコード

package net.okkez.uploadcsv;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.dataformat.csv.CsvMapper;
import com.fasterxml.jackson.dataformat.csv.CsvSchema;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
import software.amazon.awssdk.services.s3.model.CompletedPart;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class UploadCsv2 {

  private final ExecutorService executorService;
  private final S3Client s3Client;

  public static void main(String[] args) {
    var a = new UploadCsv2();
    try {
      a.generate2_bytes();
    } catch (IOException e) {
      e.printStackTrace();
    }
  }

  public UploadCsv2() {
    // スレッドプールのサイズはよしなに決めた(Runtimeから取れるけど)
    executorService = Executors.newFixedThreadPool(16);
    s3Client = S3Client.create();
  }

  public void generate2_bytes() throws JsonProcessingException {
    final var bucket = "example";
    final var key = "test-bytebuffer2.csv";

    final var schema = buildSchema();
    final var mapper = new CsvMapper();
    final var writer = mapper.writer(schema);

    // マルチパートアップロードの開始
    final var response = s3Client.createMultipartUpload(builder -> builder.bucket(bucket).key(key));
    final var uploadId = response.uploadId();

    // オーバーフローしないように buffer のサイズは大きめにしておく
    final var buffer = ByteBuffer.allocate(10 * 1024 * 1024);
    final var header = List.of("id", "age", "email", "sequence");
    final var headerBytes = String.join(",", header).getBytes(StandardCharsets.UTF_8);
    buffer.put(headerBytes);
    buffer.put("\n".getBytes(StandardCharsets.UTF_8));
    int partNumber = 0;
    List<Future<CompletedPart>> futures = new ArrayList<>();
    for (var i = 0; i < 1000000; i++) {
      var bytes = writer.writeValueAsBytes(List.of("123", "45", "user@example.dom", i));
      buffer.put(bytes);
      if (buffer.position() > 5 * 1024 * 1024) {
        long contentLength = buffer.position();
        buffer.flip();
        partNumber++;
	// ExecutorService にタスクを登録して Future<Completedpart> を集める
        var future =
            uploadMultipartUploadPart(
                bucket, key, uploadId, partNumber, contentLength, buffer.array());
        futures.add(future);
        buffer.clear();
      }
    }

    // for ループが終わったあと余ったデータを buffer から書き出して処理する
    if (buffer.position() > 0) {
      long contentLength = buffer.position();
      buffer.flip();
      partNumber++;
      var future =
          uploadMultipartUploadPart(
              bucket, key, uploadId, partNumber, contentLength, buffer.array());
      futures.add(future);
      buffer.clear();
    }

    // 全部のパートをアップロードするまで待つ
    while (!futures.stream().allMatch(Future::isDone)) {
      try {
        TimeUnit.MILLISECONDS.sleep((100));
      } catch (InterruptedException ignore) {
        // NOP
      }
    }

    // CompletedPart を集める
    List<CompletedPart> parts = new ArrayList<>();
    for (var future : futures) {
      try {
        parts.add(future.get());
      } catch (ExecutionException | InterruptedException e) {
        // NOP
      }
    }

    // マルチパートアップロードの完了処理を行う
    final var completedMultipartUpload = CompletedMultipartUpload.builder().parts(parts).build();
    s3Client.completeMultipartUpload(
        builder ->
            builder
                .bucket(bucket)
                .key(key)
                .uploadId(uploadId)
                .multipartUpload(completedMultipartUpload));
    executorService.shutdown();
  }

  private Future<CompletedPart> uploadMultipartUploadPart(
      final String bucket,
      final String key,
      final String uploadId,
      final int partNumber,
      final long contentLength,
      final byte[] bytes) {
    return executorService.submit(
        () -> {
          var request =
              UploadPartRequest.builder()
                  .bucket(bucket)
                  .key(key)
                  .uploadId(uploadId)
                  .contentLength(contentLength)
                  .partNumber(partNumber)
                  .build();
          var response = s3Client.uploadPart(request, RequestBody.fromBytes(bytes));
          var part = CompletedPart.builder().partNumber(partNumber).eTag(response.eTag()).build();
          return part;
        });
  }

  private CsvSchema buildSchema() {
    return CsvSchema.builder()
        .addColumn("id")
        .addColumn("age")
        .addColumn("email")
        .addColumn("sequence")
        .build();
  }
}

ExecutorService を使ってマルチスレッド化している以外はほぼ v1 を使ったコードと同じです。
エラー処理はサボっています。

あと ByteBuffer はスレッドセーフではないので ExecutorService にタスクを登録するときに byte[] として ByteBuffer から読み出すようにしています。

各パートのアップロードを並列化しているので v2 を使ったサンプルの方が速いです。300MiB くらいのデータを 5MiB に分割してアップロードした場合で 10 倍くらい速度差がありました。
たぶん v1 を使った方も同じようにすれば並列化できると思うのですが、めんどうだったのでやってません。Aws Lambda だと /tmp に書き込める量にも 500MiB の制限があるので、なるべくメモリ上で処理を行なった方が考えることが減ってよいと思いました。

まとめ

AWS SDK Java の v1 と v2 で低レベル API を使って、マルチパートアップロードを試してみました。
あまりコード量は変わりませんでしたが v1 だと uploadPart するときに File か InputStream しか指定できなかったので ByteBuffer から File にするか InputStream にするかでちょっと考えました。サンプルコードでは一時ファイルにしましたが byte[] を経由して InputStream を作れば InputStream でもできそうな気がします。
v2 だと uploadPart するときに ByteBuffer や byte[] を指定できるので、少し便利でした。

全体的に v2 の方が v1 よりも API が洗練されていて統一感があるので、これからは v2 を使っていくのがよいのではないでしょうか。

参考

Discussion

山根正大山根正大

okkezさん
とても参考になってます。助かります。

1点質問なのですが、v2側のコードに対してユニットテストを書きたいとなった時、okkezさんなら何をAssertされますか?localstack等でmockS3を立ててgenerate2_bytesメソッドを1回動かした後に、s3から一旦DLして中身のAssert、といった感じでしょうか。

※そもそもPoCなのでテストとか考えてねーよ、と言う場合はその回答が欲しいですm(_ _)m

okkezokkez

4ヶ月の前のコメントにいまさらですが。

最終的にファイルがアップロードできていればよいので generate2_bytes() を呼び出した後、ダウンロードして SHA256 ハッシュとかを Assert する気がします。
アップロード先には localstack を使うと思います。
ファイルの中身も気になる場合は、先頭の数行だけ Assert するか小さいファイルをアップロードして全行 Assert するかしそうです。