📌

AWS X-Ray SDK for JavaとCompletableFeatureを利用した並列処理の相性が良くない

2022/10/17に公開約9,900字

背景

SpringBootで書かれたとあるWEB-APIエンドポイントの応答が遅いのでボトルネックを調べて…ということで手段としてX-RayとJava側の実装としてAWS X-Ray SDK for Javaを使用したのですが実にハマりました。

とあるAPIエンドポイントは省略的な例として以下のような処理順序になっていました。

var res1 = callExternalApi1();
var res2 = callExternalApi2();
proceedSomething(res1, res2);

XRayを導入して可視化したところ、下図のようにcallExternalApi1(), callExternalApi2()の応答待ち時間が大部分占めていました。

外部APIの応答速度はどうにもならないので、せめてもの抵抗でexternal1, 2の呼び出しを並列化しようということになりました。(呼び出し順序に依存関係はないものとします)

CompletableFuture.supplyAsyncを用いてAPI呼び出し箇所を並列化して上図のような結果になることを期待したのですが…、意図通りに並列化されずに依然として直列処理されてしまうという問題に遭遇しました。

短い結論

AWS X-Ray SDK for Javaの仕様といえば仕様なのですが、誤解を招きやすく複数のIssueが起票されています。しかしながら、この誤解を生みやすい仕様を修正する計画は無いようで「当SDKの代わりにADOTの利用を薦める」という結論です。

https://github.com/aws/aws-xray-sdk-java/blob/master/README.md#mega-opentelemetry-java-with-aws-x-ray

AWS X-Ray recommends using AWS Distro for OpenTelemetry (ADOT) to instrument your application instead of this X-Ray SDK due to its wider range of features and instrumentations. See the AWS X-Ray docs on Working with Java for more help with choosing between ADOT and X-Ray SDK.

If you want additional features when tracing your Java applications, please open an issue on the OpenTelemetry Java Instrumentation repository.

名言はされていませんが、AWS X-Ray SDK for Javaはレガシーライブラリということで今後新たに使うべきではないということでしょう。

https://docs.aws.amazon.com/xray/latest/devguide/xray-java.html

上記のページで「There are two way...」と2つの手段が紹介されていますがADOTの選択肢にrecommendなどと優劣がわかるようにドキュメントを書いてもらえることを期待したいです。

本文

並列処理のためのCompletableFutureのsupplyAsync

Java8くらいから非同期処理ライブラリとしてCompletableFutureが提供されるようになったと記憶しています。CompletableFutureについては色々記事がある(例えばこちら)と思うのでここでは割愛しますが、2つの外部API呼び出しを並列化するためにsupplyAsyncメソッドを使って下記のように修正しました。

var future1 = CompletableFuture.supplyAsync(SomeProcesses::callExternalApi1);
var future2 = CompletableFuture.supplyAsync(SomeProcesses::callExternalApi2);
CompletableFuture.allOf(future1, future2)
    .thenAccept(nothing -> {
        SomeProcesses.proceedSomething(future1.join(), future2.join());
    }).join();

しかしこれは下記の説明通りSegmentNotFoundExceptionが発生することになります。
https://docs.aws.amazon.com/ja_jp/xray/latest/devguide/xray-sdk-java-multithreading.html

アプリケーションで新しいスレッドを作成すると、AWSXRayRecorder は現在のセグメントまたはサブセグメント Entity への参照を保持しません。実装されたクライアントを新しいスレッドで使用すると、SDK は存在しないセグメントに書き込みを試み、SegmentNotFoundException が発生します。

ここで解説されている通り、beginSegmentしたスレッドと別のスレッドからはCurrentSegmentへの参照ができない(内部的にはCurrentSegmentはThreadLocalで保持されているため)ことが原因です。

supplyAsyncとExecutor

さて、supplyAsyncメソッドは本来第二引数にExecutorを指定します。

supplyAsync(Supplier<U> supplier, Executor executor)
指定されたエグゼキュータで実行されているタスクが指定されたサプライヤを呼び出して取得した値を使用して非同期的に完了する新しいCompletableFutureを返します。

これはsupplyAsyncメソッドに限らずCompletableFutureの***Asyncメソッドはすべてそのようなインターフェースになっています。そしてそれは省略可能で、省略された場合にはForkJoinPool.commonPoolが指定されたものして暗黙の動作が定義されています。このことはCompletableFutureのJavaDocの冒頭にも書かれています。(非常にわかりにくいのですが)

明示的なExecutor引数を持たないすべての非同期メソッドは、ForkJoinPool.commonPool()を使用して実行されます(ただし、少なくとも2個の並列性レベルがサポートされない場合は、新しいスレッドが作成されてそれぞれのタスクを実行します)。モニタリング、デバッグおよび追跡を簡単にするため、生成される非同期タスクはすべてマーカー・インタフェースCompletableFuture.AsynchronousCompletionTaskのインスタンスです。

ForkJoinPool.commonPoolは一般的な非同期処理に適したデフォルトのExecutorで内部的なスレッドプールを使っていい感じに非同期実行をサポートしてくれるものです。(雑)
https://codechacha.com/ja/java-fork-join-pool/

SegmentContextExecutorをCompletableFutureに渡す

話をX-Rayのマルチスレッドに関する説明ドキュメントに戻すと以下のような例があります。

AWSドキュメント記載の例
client.getItem(request).thenComposeAsync(response -> {
    // If we did not provide the segment context executor, this request would not be traced correctly.
    return client.getItem(request2);
}, SegmentContextExecutors.newSegmentContextExecutor());

なるほど、これを真似てsupplyAsyncの第2引数にSegmentContextExecutors.newSegmentContextExecutor()を渡せば良いのか、と筆者は思いました。

var future1 = CompletableFuture.supplyAsync(SomeProcesses::callExternalApi1,
    SegmentContextExecutors.newSegmentContextExecutor());
var future2 = CompletableFuture.supplyAsync(SomeProcesses::callExternalApi2,
    SegmentContextExecutors.newSegmentContextExecutor());
CompletableFuture.allOf(future1, future2)
    .thenAccept(nothing -> {
        SomeProcesses.proceedSomething(future1.join(), future2.join());
    }).join();

これは例外を発生させること無く動作します。

別スレッドでの実行にならない

しかし・・・実行された結果をX-Rayコンソールで確認すると依然として直列実行のままではありませんか!

並列実行してもらうためにはcallExternalApi1(), callExternalApi2()をそれぞれ別のスレッドで実行する必要があるのですがそうなっていないことが原因でした。ログにスレッド名を出力してみるとそれぞれの処理が同じスレッドで実行されてしまっています。

api1 finished <- [thread=main][segment=async-process]
api2 finished <- [thread=main][segment=async-process]
proceeded (res1, res2) <- [thread=main][segment=async-process]

なぜこのような挙動をしてしまうのか? これは#132のIssueの中でsoftprops氏が丁寧に解説してくれています。
https://github.com/aws/aws-xray-sdk-java/issues/132#issuecomment-755063216

The default executor service CompletableFuture.supplyAsync will use is the CommonPool, a good default for most IO-bound applications. The second optional argument is indeed intended to implement the Executor interface, for asynchronous code, its typically an executor backed by a thread pool like those exposed by factory methods on the std library's Executors class.

  • suppyAsyncではデフォルトで(ForkJoinPoolの)commonPoolが使われる
  • 第二引数には明示的にExecutorを指定できる
    • 典型的には非同期実行のためにスレッドプールを使うようなもの

ここまではこの記事の前段で述べてきた内容です。

If one were to use the SegmentContextExecutor, calls would turn to blocking calls as SegmentContextExecutor does not execute a task within a thread pool It just happens on the current thread which in all irony means that no segment propagation is needed!

この一文が非常に的を射たものでした。

  • SegmentContextExecutorは非同期タスクをスレッドプールを使って実行しない
  • だから現在のスレッド(supplyAsyncを呼び出したスレッドと同じスレッドと解釈)で実行されてブロックされてしまう(1が終わるまで2が始まらないというような直列的な挙動になる)

SegmentContextExecutorのソースを読むと納得感しかありません。
https://github.com/aws/aws-xray-sdk-java/blob/fd93c8a938e635137dedc70edf9b709f039ba0c1/aws-xray-recorder-sdk-core/src/main/java/com/amazonaws/xray/contexts/SegmentContextExecutors.java#L71-L84

どうすれば良いのか

現時点ではあまりしっくり来る方法がありません。そして前述通りAWS X-RayチームはADOTにフォーカスすることを表明しているので、これ以上AWS X-Ray SDK for Javaが改善されることはなさそうです。

その上で、#132で提案されている2つの方法を紹介します。

1. Entity#runで包む

willarmiros氏が述べている方法で処理をWrapします。

Entity entity = AWSXRay.getTraceEntity();
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
    var ret = new AtomicReference<String>();
    entity.run(() -> {
	ret.set(callExternalApi1());
    });
    return ret.get();
});

runメソッドがCallableではなくRunnableを受ける都合で戻り値のreturnが素直にできないのが少し気持ち悪いですね。

https://github.com/aws/aws-xray-sdk-java/blob/fd93c8a938e635137dedc70edf9b709f039ba0c1/aws-xray-recorder-sdk-core/src/main/java/com/amazonaws/xray/entities/Entity.java#L39-L57

2. SegmentContextExecutorに別のExecutorを渡せるようにする

softprops氏が提案している方法で、SegmentContextExecutorをこの様に実装すると良いという案です。

var future1 = CompletableFuture.supplyAsync(SomeProcesses::callExternalApi1,
                 XraySegmentContextExecutors.newExecutor(ForkJoinPool.commonPool()));

XraySegmentContextExecutorは筆者が「こういうイメージだろう」と思って書いてみたものです。executeメソッドで処理を内部executorに委譲しています。
https://github.com/NewGyu/aws-xray-pallarel-sample/blob/e0216aa21544b0af625d226e070f89e6a7c76116/app/src/main/java/newgyu/xray/XraySegmentContextExecutors.java#L19-L34

こうなるとドキュメントで言われている「SegmentContextExecutorを使用して非同期処理を書いてくれ」というものに近くなります。

まとめ

  • ADOTを使いましょう
  • Javaの並列処理は難しすぎる

おまけ

結局AWSドキュメントの例示は正しいのか?

X-Rayのマルチスレッドに関する説明ドキュメント

AWSドキュメント記載の例
client.getItem(request).thenComposeAsync(response -> {
    // If we did not provide the segment context executor, this request would not be traced correctly.
    return client.getItem(request2);
}, SegmentContextExecutors.newSegmentContextExecutor());

これは結局のところ期待どおりに動作するのでしょうかというと、このケースはAmazonDynamoDBAsyncClientがgetItem(getItemAsyncが正しいメソッド名ではないかと思う)の内部でFixedThreadPoolを使用しているため、その後続であるthenComposeAsyncも前段階で指定されたスレッドで動作します。
https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-dynamodb/src/main/java/com/amazonaws/services/dynamodbv2/AmazonDynamoDBAsyncClient.java

上記の例では確かに期待どおりに動作するけれど、多くの人が非同期処理に用いるsupplyAsyncを用いたユースケースとはかなりかけ離れているなんとも微妙な例でした。

やはり続々と勘違いする人が出てきている

#132以外にもsupplyAsync+X-Ray SDKで悩んでいる人がいるようで、やはり分かりづらいのでしょうね。
https://github.com/aws/aws-xray-sdk-java/issues/322
https://github.com/aws/aws-xray-sdk-java/issues/347

Discussion

ログインするとコメントできます