Spring Batchでスレッド使ってステップを並列に処理する
Spring Batchで処理を非同期的に並列で処理したい場合にどうすれば実現できるか、ソースコードとその解説を書きます。
前提
Spring Batchの概要
ここではSpring Batchの概要は解説しません。公式リファレンスか、以前Qiitaにチュートリアル的記事を書いたのでよろしければそちらを参照してください。
使用するバージョン
今回紹介するコードは以下のバージョンで動作確認しています。
ライブラリ | バージョン |
---|---|
Spring Boot | 2.2.5.RELEASE |
Spring Batch | 4.2.1 |
Java | 11 |
lombok | 1.18.12 |
今回使用するジョブ概要
今回は簡単のため「Hello, World!」と出力する処理を並列で回すジョブを考えます。
タスクレットは以下のものを使用します。
@Component
@StepScope
public class HelloWorldTasklet implements Tasklet {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("Hello, World!");
return RepeatStatus.FINISHED;
}
}
ジョブ定義
並列数3で「Hello, World!」と出力するジョブの定義は以下のようになります。
@Component
@EnableBatchProcessing
@RequiredArgsConstructor
public class ParallelJob {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Bean
public Job helloWorldParallelJob(
Flow helloWorldFlow1,
Flow helloWorldFlow2,
Flow helloWorldFlow3) {
return jobBuilderFactory.get("helloWorldParallelJob")
.start(helloWorldFlow1)
.split(new SimpleAsyncTaskExecutor())
.add(helloWorldFlow2, helloWorldFlow3)
.end()
.build();
}
@Bean
public Flow helloWorldFlow1(Step helloWorldStep) {
return new FlowBuilder<SimpleFlow>("flow1")
.start(helloWorldStep)
.build();
}
@Bean
public Flow helloWorldFlow2(Step helloWorldStep) {
return new FlowBuilder<SimpleFlow>("flow2")
.start(helloWorldStep)
.build();
}
@Bean
public Flow helloWorldFlow3(Step helloWorldStep) {
return new FlowBuilder<SimpleFlow>("flow3")
.start(helloWorldStep)
.build();
}
@Bean
public Step helloWorldStep(HelloWorldTasklet helloWorldTasklet) {
return stepBuilderFactory.get("helloWorldStep")
.tasklet(helloWorldTasklet)
.build();
}
}
解説
Flow
今回Flow型のBeanを3つ定義しています。Flowとはステップの流れを定義できるもので、例えばstep1の後にstep2を実行するようにしたい、といったことを定義できます。
Spring Batchではステップだけで並列処理を行うことはできませんが、このFlowを並列に行うことは可能です。
そこでステップを含んだFlowを複数個用意して並列で行うようにします。
今回はhelloWorldStepのみを行うFlowを複数個定義し、それらを並列に回すようにすることでステップの並列処理を実現させています。
split
helloWorldParallelJobメソッドではsplitメソッドを使用しています。
splitメソッドを呼ぶことで指定されたFlowを並列に処理するようになります。
splitメソッドの引数にはFlowの処理に使用するTaskExecutorを指定します。
今回は非同期で並列に処理したいため、SimpleAsyncTaskExecutorを指定しています。
SimpleAsyncTaskExecutorを指定することで非同期で1スレッド1Flowを処理します。
今回の場合、3つのスレッドが生成され、それぞれ並列に非同期で処理されます。
@Bean
Flow helloWorldFlow1,
Flow helloWorldFlow2,
Flow helloWorldFlow3) {
return jobBuilderFactory.get("helloWorldParallelJob")
.start(helloWorldFlow1)
.split(new SimpleAsyncTaskExecutor()) // ここ
.add(helloWorldFlow2, helloWorldFlow3)
.end()
.build();
}
ログ
以下実行ログです。それぞれ別々のスレッド名で並列に処理されていることが分かります。
2021-01-17 03:45:57.422 INFO 44824 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=helloWorldParallelJob]] launched with the following parameters: [{}]
2021-01-17 03:45:57.470 INFO 44824 --- [cTaskExecutor-2] o.s.batch.core.job.SimpleStepHandler : Executing step: [helloWorldStep]
2021-01-17 03:45:57.470 INFO 44824 --- [cTaskExecutor-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [helloWorldStep]
2021-01-17 03:45:57.470 INFO 44824 --- [cTaskExecutor-3] o.s.batch.core.job.SimpleStepHandler : Executing step: [helloWorldStep]
Hello, World!
Hello, World!
Hello, World!
2021-01-17 03:45:57.497 INFO 44824 --- [cTaskExecutor-3] o.s.batch.core.step.AbstractStep : Step: [helloWorldStep] executed in 25ms
2021-01-17 03:45:57.497 INFO 44824 --- [cTaskExecutor-1] o.s.batch.core.step.AbstractStep : Step: [helloWorldStep] executed in 25ms
2021-01-17 03:45:57.497 INFO 44824 --- [cTaskExecutor-2] o.s.batch.core.step.AbstractStep : Step: [helloWorldStep] executed in 26ms
2021-01-17 03:45:57.502 INFO 44824 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=helloWorldParallelJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 52m
まとめ
ということでSpring Batchで並列処理を行う方法を解説しました。
大事なのは
- Flowを定義すること
- splitに並列処理するためのTaskExecutor(今回はSimpleAsyncTaskExecutor)を指定すること
です。
発展編
スレッド同時実行数を制御したい
SimpleAsyncTaskExecutorはデフォルトだとFlowの数だけ同時にスレッドを生成して処理を行います。
これだとFlowの数が多い場合など状況によってはメモリリソースが枯渇して問題になるケースがあるかもしれません。
同時に実行する数を制御したい場合は以下のようにSimpleAsyncTaskExecutorを少しカスタマイズします。
@Bean
public Job helloWorldParallelJob(
Flow helloWorldFlow1,
Flow helloWorldFlow2,
Flow helloWorldFlow3,
SimpleAsyncTaskExecutor executor) {
return jobBuilderFactory.get("helloWorldParallelJob")
.start(helloWorldFlow1)
.split(executor)
.add(helloWorldFlow2, helloWorldFlow3)
.end()
.build();
}
@Bean
public SimpleAsyncTaskExecutor executor() {
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.setConcurrencyLimit(2); //同時実行数を指定
return taskExecutor;
}
SimpleAsyncTaskExecutorのsetConcurrencyLimitメソッドを用いて同時実行数を指定します。
こうすることで同時に実行されるのは2個まで、残りの処理は実行中の処理が終わるまで待機します。
今回は2で指定しています。
以下実行ログです。
最初に2ステップが処理された後、残りの1ステップが処理されていることが分かります。
2021-01-17 04:08:39.456 INFO 45369 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=helloWorldParallelJob]] launched with the following parameters: [{}]
2021-01-17 04:08:39.501 INFO 45369 --- [cTaskExecutor-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [helloWorldStep]
2021-01-17 04:08:39.501 INFO 45369 --- [cTaskExecutor-2] o.s.batch.core.job.SimpleStepHandler : Executing step: [helloWorldStep]
Hello, World!
Hello, World!
2021-01-17 04:08:39.521 INFO 45369 --- [cTaskExecutor-2] o.s.batch.core.step.AbstractStep : Step: [helloWorldStep] executed in 20ms
2021-01-17 04:08:39.521 INFO 45369 --- [cTaskExecutor-1] o.s.batch.core.step.AbstractStep : Step: [helloWorldStep] executed in 20ms
2021-01-17 04:08:39.549 INFO 45369 --- [cTaskExecutor-3] o.s.batch.core.job.SimpleStepHandler : Duplicate step [helloWorldStep] detected in execution of job=[helloWorldParallelJob]. If either step fails, both will be executed again on restart.
2021-01-17 04:08:39.551 INFO 45369 --- [cTaskExecutor-3] o.s.batch.core.job.SimpleStepHandler : Executing step: [helloWorldStep]
Hello, World!
2021-01-17 04:08:39.555 INFO 45369 --- [cTaskExecutor-3] o.s.batch.core.step.AbstractStep : Step: [helloWorldStep] executed in 4ms
2021-01-17 04:08:39.558 INFO 45369 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=helloWorldParallelJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 76ms
スレッドを使いまわしたい
SimpleAsyncTaskExecutorで同時実行数の制御はできましたが、ただこれは毎回新しいスレッドを生成するため、処理数が多い場合にはスレッド生成コストが高くついてしまいます。
そこで今度は同時実行数を制御しつつもスレッドも使いまわしたい場合です。
この場合はThreadPoolTaskExecutorを使用します。
@Bean
public Job helloWorldParallelJob(
Flow helloWorldFlow1,
Flow helloWorldFlow2,
Flow helloWorldFlow3,
ThreadPoolTaskExecutor executor) {
return jobBuilderFactory.get("helloWorldParallelJob")
.start(helloWorldFlow1)
.split(executor)
.add(helloWorldFlow2, helloWorldFlow3)
.end()
.listener(new JobExecutionListener() {
@Override
public void beforeJob(JobExecution jobExecution) {
}
@Override
public void afterJob(JobExecution jobExecution) {
executor.shutdown();
}
})
.build();
}
@Bean
public ThreadPoolTaskExecutor executor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(2);
taskExecutor.setMaxPoolSize(2);
return taskExecutor;
}
今回変わった点が2点あります。
1つ目はTaskExecutorのBean定義がThreadPoolTaskExecutorに変わりました。
今回はスレッドプールに2スレッドを生成しておき、その2スレッドで処理を順々に実行していくという設定になっています。
ThreadPoolTaskExecutorの詳細は以下公式ドキュメントやブログを参照してください。
2つ目はジョブ定義でlistenerを設定するようになった点です。
listenerにはジョブ実行の前処理・後処理を定義することができます。具体的にはJobExecutionListenerインターフェースを実装したクラスのインスタンスを指定します。
今回は匿名クラスで実装したリスナーをそのまま指定しています。
では今回リスナーには何の処理を追加しているのかというと、後処理としてThreadPoolTaskExecutorのshutdownメソッドの呼び出しを行っています。
なぜこれを明示的に呼び出しているかというと、ジョブ終了後プロセスも終了させるためです。ThreadPoolTaskExecutorを使うとジョブ終了後もスレッドがスレッドプールに残り続けるためJVMがshutdownしません。つまりプロセスがいつまでたっても終了しないということが起こってしまいます。
そのため明示的にThreadPoolTaskExecutorをshutdownしてスレッドプール内のスレッドを終了させることでプロセスも終了するようにしています。
(これに気づかず少しハマってしまいました。。以下参考)
以下実行ログです。3つ目の処理をしているスレッド名が2つ目の処理をしているスレッド名と同じであり、スレッドを使いまわしていることが分かります。
2021-01-17 04:16:37.144 INFO 45571 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=helloWorldParallelJob]] launched with the following parameters: [{}]
2021-01-17 04:16:37.176 INFO 45571 --- [ executor-2] o.s.batch.core.job.SimpleStepHandler : Executing step: [helloWorldStep]
2021-01-17 04:16:37.176 INFO 45571 --- [ executor-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [helloWorldStep]
Hello, World!
Hello, World!
2021-01-17 04:16:37.193 INFO 45571 --- [ executor-1] o.s.batch.core.step.AbstractStep : Step: [helloWorldStep] executed in 17ms
2021-01-17 04:16:37.193 INFO 45571 --- [ executor-2] o.s.batch.core.step.AbstractStep : Step: [helloWorldStep] executed in 17ms
2021-01-17 04:16:37.213 INFO 45571 --- [ executor-2] o.s.batch.core.job.SimpleStepHandler : Duplicate step [helloWorldStep] detected in execution of job=[helloWorldParallelJob]. If either step fails, both will be executed again on restart.
2021-01-17 04:16:37.215 INFO 45571 --- [ executor-2] o.s.batch.core.job.SimpleStepHandler : Executing step: [helloWorldStep]
Hello, World!
2021-01-17 04:16:37.219 INFO 45571 --- [ executor-2] o.s.batch.core.step.AbstractStep : Step: [helloWorldStep] executed in 4ms
2021-01-17 04:16:37.220 INFO 45571 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'executor'
2021-01-17 04:16:37.222 INFO 45571 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=helloWorldParallelJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 63ms
Discussion