重い初期化、並列処理, Singletonの罠, そしてInstancePoolへ
はじめに
初期化処理が重いインスタンスってありますよね。代表的な例だとJDBCコネクションとか。Factoryパターンで初期化を隠蔽するような奴は概ね重いのはどの言語でも一緒だと思います。
こういった重い処理を例えばループなどで大量に初期化してしまうのはかなりのコストになってしまい、特にバッチのようなチリツモな処理では無視できないほど実行時間に影響を与えてしまう恐れがあります。
そういった場合はSingletonなどを使ってインスタンスの生成コストを隠蔽しますが、マルチスレッド環境下だと影響が出る場合もあります。以前、そういうケースでバグってるのに出くわした事があるので、記憶の整理がてらまとめておこうと思います。
コード例はJavaですが、あまりJavaかどうかには依存しない考え方になっていると思うので、他の言語が得意な人は疑似コード程度に思っておいてください。
なお、JDBCを始めとした一般的に重いとされているインスタンスには枯れたPoolの実装が通常はあるので、そちらを使う方が良いでしょう。
一番最初の素朴な実装
さて、まずはベースとなるサンプルコードを示しましょう。1から100までのループの中でCounterクラスを初期化して、2回ほどインクリメントした後に合計値を出しています。Counterの初期値は0なので合計値は200を返します。一応計測用に三回処理を回してる感じです。
private static void invoke() {
int total = IntStream.range(0, 100)
.map(i -> {
var counter = new Counter();
counter.increment();
counter.increment();
return counter.get();
}).reduce((xs, x) -> xs + x).getAsInt();
System.out.println("Total: " + total);
}
public static void main(String[] args) {
System.out.println("param-size: " + ForkJoinPool.commonPool().getParallelism());
for (int i = 0; i < 3; i++) {
long s = System.nanoTime();
System.out.println("Loop: " + i);
invoke();
long e = System.nanoTime();
System.out.println(((e - s) / 1000 / 1000) + " ms");
}
}
呼び出しているCounterクラスはこんな感じ。実際は何か色々してて初期化とincrement
が重いのですが、とりあえずSleepを入れています。ファイルアクセスとかDBアクセスとか外部APIとかそういうのです。たぶん。
public class Counter {
private int count = 0;
public Counter() {
try {
System.out.println("Counter:create:" + this.hashCode());
Thread.sleep(300L);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
public void increment() {
try {
Thread.sleep(10L);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
count++;
}
public void clear(){
count = 0;
}
public int get() {
return count;
}
}
それではこのクラスを実行してみましょう。単純な処理ですが初期化とincrement
にそれなりに時間が掛かるので結構時間を使ってしまいます。
Loop: 0
Counter:create:245257410
Counter:create:1705736037
Counter:create:455659002
~中略~
Total: 200
34199 ms
Loop: 1
~中略~
Total: 200
34187 ms
Loop: 2
~中略~
Total: 200
34150 ms
どの処理も34秒程度で終わり、合計値が200ですね。中略していますが、毎回インスタンスが生成されているのが分かると思います。
それでは、この処理を高速化していきましょう。
並列処理による高速化
まず最初に考えるのはマルチスレッドを使った並列処理による高速化ですよね? 時代はメニーコア! 1CPU/1Coreなんて今は昔です。我々もフリーランチに甘えるばかりではありません。まあ、若干クラウド環境とかだとコア数先祖返りしてるケースもあるけど、それはそれ。
Javaには多くの並列化の仕組みがありますが、今回はもっともお手軽な並列ストリームを使います。
int total = IntStream.range(0, 100)
.parallel()
.map(i -> {
var counter = new Counter();
counter.increment();
counter.increment();
return counter.get();
}).reduce((xs, x) -> xs + x).getAsInt();
System.out.println("Total: " + total);
parallel
って足すだけだから簡単ですね!この書き方だと並列度は実行環境に依存しますが、私のマシンでは11並列になります。実行結果は以下の通り。
param-size: 11
Loop: 0
Counter:create:1364922542
Counter:create:1048596955
Counter:create:1178848130
...
~中略~
Total: 200
3048 ms
Loop: 1
~中略~
Total: 200
3044 ms
Loop: 2
~中略~
Total: 200
3396 ms
3秒前後とかなり速くなりましたね! スリープ処理だから並列化の影響をダイレクトに受けて気持ちが良いですねー。実際はこんなにスケールしないケースも多いですが、それでも並列化の効果は大きいですよね。
それでも、まだ物足りない。もっと高速化を、というあなたは本題のインスタンス生成コストの省略を次の章で試しましょう。
シングルトン、並列処理、そしてバグ
そもそもインスタンスの生成に300msもかかっているので、これを毎回生成するのが何よりの無駄ですよね? 無駄を省くのは改善の基本です。インスタンスの生成の数を抑えるテクニックと言えば定番はSingletonですね。さっそく実装してみましょう。
以下のようにCounterインスタンスを保持するSingletonを作ります。良くある書き方ですね?
public class CounterSource {
private static Counter counter = null;
private CounterSource() {}
public static Counter get() {
if (counter == null) {
counter = new Counter();
}
return counter;
}
}
続いて、直接newするのではなく、CounterSourceを経由して取得するようにコードを変更します。
int total = IntStream.range(0, 100)
.parallel()
.map(i -> {
var counter = CounterSource.get();
counter.increment();
counter.increment();
return counter.get();
}).reduce((xs, x) -> xs + x).getAsInt();
System.out.println("Total: " + total);
そして実行結果は以下になります。
param-size: 11
Loop: 0
Counter:create:781607405
Counter:create:1955827078
Counter:create:62803695
Counter:create:1283928880
Counter:create:1364922542
Counter:create:582939092
Counter:create:1048596955
Counter:create:1184496519
Counter:create:1060245958
Counter:create:761384723
Counter:create:552016626
Counter:create:121354012
Total: 7798
606 ms
Loop: 1
Total: 26265
280 ms
Loop: 2
Total: 45361
294 ms
280ms~600ms前後と爆速になりましたが... 奇妙な所がいくつかありますね? まずトータル値がおかしいです。200のはずが26265とか良く分からない値になっています。こんなとんでもないバグがあってはリリース出来ませんね。さあ、デバックの時間だ!
バグの原因を追え!
ここで勘の良い人は思うかもしれません。 「あ、シングルトンだから状態を保持してるのでループ毎にcounterをクリアしなきゃダメなんだ!」 と。確かにそれは怪しそうですよね? 早速修正してみましょう。counter.clear()
でCounterSourceから取得する度に初期化しています。計算前に毎回初期化してるから必ずincrementの前はcount=0だと思いますよね? よし、勝ったな。風呂入ってこよう。
int total = IntStream.range(0, 100)
.parallel()
.map(i -> {
var counter = CounterSource.get();
counter.clear();
counter.increment();
counter.increment();
return counter.get();
}).reduce((xs, x) -> xs + x).getAsInt();
System.out.println("Total: " + total);
さて、お風呂に行ってる間の実行結果がこちらとなります。
param-size: 11
Loop: 0
Counter:create:1184496519
Counter:create:1364922542
Counter:create:1955827078
Counter:create:664380515
Counter:create:295530567
Counter:create:1878446100
Counter:create:781607405
Counter:create:552016626
Counter:create:1060245958
Counter:create:62803695
Counter:create:1048596955
Counter:create:121354012
Total: 583
597 ms
Loop: 1
Total: 707
311 ms
Loop: 2
Total: 639
322 ms
値がバグってるままですね!そんな、毎回初期化してるのに… こんなの絶対おかしいよ! と思ってしまうのも無理は無いかもしれません。
実は、counter.clear()
が必要なこと自体はあっています。ただ、一手足りないのです。ここで注目したいのは画面に出ているCounter:create:...
です。シングルトンなのに、なんで複数のインスタンスが出来ているのでしょう??? この原因はCounterSourceがスレッドセーフでは無いためです。
if (counter == null) {
counter = new Counter();
}
return counter;
上記のコードはnullの時に初期化という実装でシングルスレッドでは問題無いのですが、マルチスレッドでは複数スレッドから同時にアクセスしたときにnullを見たのにその後にそれぞれのスレッドが別のインスタンスを初期化するという現象が発生します。そのため、マルチスレッドで状態を変更するコードは必ずスレッドセーフに実装する必要があります。
という分けでスレッドセーフに対応させましょう。Javaではsynchronized
を使う事で並列アクセスの処理をシリアライズできます。利用側のコードは一切変更が要りません。簡単ですね。
public class CounterSource {
private static Counter counter = null;
private CounterSource() { }
public static synchronized Counter get() {
if (counter == null) {
counter = new Counter();
}
return counter;
}
}
さて、それでは結果を見ていきましょう。Counter:create:2003749087
と想定通り、インスタンスが一つだけ生成されていますね。いやー、ちょっと苦労したけど楽勝でしたね! これで今日は旨い酒が飲めそうだ...って良く見たらトータル値がやっぱりバグってるというツラい現実が目に入ってきますね。ふう、今夜は長い夜になりそうだ....
param-size: 11
Loop: 0
Counter:create:2003749087
Total: 423
617 ms
Loop: 1
Total: 755
280 ms
Loop: 2
Total: 852
283 ms
インスタンスプール実装への道
さて、先ほどのバグの原因は何でしょうか? 処理毎にclearも付けたし、スレッドセーフにもしたのにいったいどこに問題があったのでしょうか。実は原因はやはりスレッドセーフに対応していない事です。CounterSourceはスレッドセーフに実装したけど、Counter自体はスレッドセーフに対応してないままですね? synchronized
はあくまでそのメソッドをスレッドセーフにするだけで、その後の一連の処理の保証をするものではありません。
単純に考えると、Counterクラスをスレッドセーフに修正してしまうという案が思いつきます。それも一つの手ですが、これが自前のクラスでは無く標準ライブラリやサードパーティのライブラリだとどうでしょうか? 修正できませんよね? こういったケースで役立つのがInstance Poolです。
Instance PoolはSingletonあるいはFlyweightパターンのように特定の数のインスタンスをstaticにプールしておいて、それを利用者の要求に応じて払い出す仕組みです。これによって、都度インスタンスの生成をする事を避けつつ、インスタンスが複数スレッドで同時に共有されることを防ぐのです。
Instance Poolで最も代表的なものはJDBC Connectionですね。所謂コネクションプールがこれにあたります。他にもEJBなんかでも使われていますし、他の言語でもそれなりによく使われるテックニックだと思います。
今回はかなりシンプルに実装してみました。まずCounterクラスを包むCounterWrapperを以下のように実装します。isActive
メソッドを持ち、この状態でプールから払い出されているか否かを判定します。take
でコネクションを取得して、release
で解放ですね。Closeableの実装は必須では無いのですが入れとくと閉じ忘れを防げて便利です。
public class CounterWrapper implements Closeable {
private boolean isActive;
private Counter counter;
public CounterWrapper() {
this.isActive = false;
this.counter = new Counter();
}
public void take() {
this.isActive = true;
counter.clear();
}
public void release() {
this.isActive = false;
}
public Counter get() {
return this.counter;
}
@Override
public void close() throws IOException {
this.release();
}
}
つづいて、CounterSourceを以下のように変更します。変更というかもはや原型がほぼ無いですが...
Counterでは無く、CounterWrapperをプールとしてSet型で持ちます。そして、maxPoolSize
の数まではインスタンスを作成しますが、それ以上はしません。CounterSource#get()が呼ばれたら、poolに存在するアクティブでは無いインスタンスを返します。見つかるまで無限ループをさせているので、maxPoolSizeを超えたリクエストを受けた場合はリリースされるまで待ちます。
public class CounterSource {
private static Set<CounterWrapper> pool = new HashSet<>();
private static int maxPoolSize = 3;
public static void setMaxPoolSize(int maxPoolSize) {
CounterSource.maxPoolSize = maxPoolSize;
}
public static synchronized CounterWrapper get() {
if (pool.size() < maxPoolSize) {
var wrap = new CounterWrapper();
pool.add(wrap);
};
while (true) {
for (var x : pool) {
if (x.isActive == false) {
x.take();
return x;
}
}
}
}
}
最後に呼び出し側も一部修正が必要になります。まずCounterSourceからCounterWrapperを取得して、その後takeメソッドで実際のCounterインスタンスを取得しています。リリース漏れを防ぐためにtry-catch-resourceの構文でCounterWrapperの取得を行っていますが、それ以外はほぼ同じコードですね。
int total = IntStream.range(0, 100)
.parallel()
.map(i -> {
try ( var wrap = CounterSource.get()) {
var counter = wrap.get();
counter.increment();
counter.increment();
return counter.get();
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
}).reduce((xs, x) -> xs + x).getAsInt();
System.out.println("Total: " + total);
それでは実行してみましょう。CounterインスタンスはCounterSourceのデフォルト値の3で作成され、性能が単なる並列処理の時より改善しつつも、トータル値が正しい事が分かります。
param-size: 11
Loop: 0
Counter:create:1324119927
Counter:create:857327643
Counter:create:1799308595
Total: 200
1953 ms
Loop: 1
Total: 200
1037 ms
Loop: 2
Total: 200
1049 ms
プールサイズを最適化してみましょう。今回のコードは11並列なのでプルーサイズが3だと待ち時間が発生してしまいます。そのため同数の11にプール数も変更する事で性能改善が期待できます。
CounterSource.setMaxPoolSize(11);
int total = IntStream.range(0, 100)
.parallel()
.map(i -> {
...
以下が実行結果です。かなり速くなりましたね!
param-size: 11
Loop: 0
中略
Total: 200
3666 ms
Loop: 1
Total: 200
314 ms
Loop: 2
Total: 200
312 ms
ただベンチマークの作りの都合上、最初のテスト実行にだけ初期化が入ってるので実際の性能改善の具合が少し分かりづらいので、ループ数を1万回に増やし有意差が分かりやすいようにしてみました。
結果は以下の通り。プール化だけでだいたい10倍以上の性能アップですね。これはループ回数、つまり一般的なバッチだとデータ量が増えれば増えるほど影響が大きくなります。なお、素朴なシーケンシャル処理での1万回は事前のテストから10倍程度の差と分かっているので想定値を入れています。終わるの待ってられないので...
実装 | 時間(sec) |
---|---|
素朴な実装 | 3000(推定) |
並列化 | 321 |
プール化 | 34 |
表: 1万回当たりの実装別実行時間
グラフにするとより際立ちますね?
図: 1万回当たりの実装別実行時間
まとめ
とりあえず初期化が重いインスタンスを含むバッチの高速化をユースケースに、並列処理でSingletonを使うときの注意点とPoolingによる解決をまとめてみました。スレッドセーフの話はマルチスレッドが基本リアルタイム系ではお約束で、だからこそFWとかがサポートしてるんですが、バッチはそういうケースが少ないのでうっかり考慮漏れのコードが紛れ込む事があります。
特にシングルスレッドを想定していたコードを性能改善で並列処理に書き換えたりするケースでは、こういったどっかにシングルトンが紛れてて謎の挙動をするってのはありがちなので注意をしたいですね。共通ライブラリでSingletonとかも。
この手の処理を自分で組む事は少ないと思いますが、ナイーブな実装でも知っておくと有名なライブラリを使うときも挙動理解の助けになって良いかなー、と思います。
それでは、Happy Hacking!
Discussion