Using ‘split’ function, we can execute the flows in parallel.
For example, as you see above diagram, job has N flows executed parallely.
Following snippet creates a job with 3 flows which will execute parallely.
@Bean
public Job job() {
return jobBuilderFactory.get("My-Job").start(flow1()).split(new SimpleAsyncTaskExecutor()).add(flow2(), flow3()).end().build();
}
Find the below working application.
Step 1: Create new maven project 'flow-parallel-exec-demo'.
Step 2: Update pom.xml with maven dependencies.
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sample.app</groupId>
<artifactId>flow-parallel-exec-demo</artifactId>
<version>1</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.6.RELEASE</version>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.springframework.batch/spring-batch-core -->
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
</dependency>
</dependencies>
</project>
Step 3: Create a package ‘com.sample.app.configuration’ and define JobConfiguration.
JobConfiguration.java
package com.sample.app.configuration;
import java.util.concurrent.TimeUnit;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
@Configuration
@EnableBatchProcessing
public class JobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
private void sleep(int timeToSleep) {
try {
System.out.println(Thread.currentThread().getName() + " going to sleep for " + timeToSleep + " seconds");
TimeUnit.SECONDS.sleep(timeToSleep);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Bean
public Step step1() {
return this.stepBuilderFactory.get("My-Step1")
.tasklet((StepContribution contribution, ChunkContext chunkContext) -> {
System.out.println(chunkContext.getStepContext().getStepName() + " getting executed by "
+ Thread.currentThread().getName());
sleep(2);
System.out.println(chunkContext.getStepContext().getStepName() + " executed by "
+ Thread.currentThread().getName());
return RepeatStatus.FINISHED;
}).build();
}
@Bean
public Step step2() {
return this.stepBuilderFactory.get("My-Step2")
.tasklet((StepContribution contribution, ChunkContext chunkContext) -> {
System.out.println(chunkContext.getStepContext().getStepName() + " getting executed by "
+ Thread.currentThread().getName());
sleep(2);
System.out.println(chunkContext.getStepContext().getStepName() + " executed by "
+ Thread.currentThread().getName());
return RepeatStatus.FINISHED;
}).build();
}
@Bean
public Flow flow1() {
FlowBuilder<Flow> flowBuilder = new FlowBuilder<Flow>("flow1");
flowBuilder.start(step1()).next(step2()).end();
return flowBuilder.build();
}
@Bean
public Flow flow2() {
FlowBuilder<Flow> flowBuilder = new FlowBuilder<Flow>("flow2");
flowBuilder.start(step1()).next(step2()).end();
return flowBuilder.build();
}
@Bean
public Flow flow3() {
FlowBuilder<Flow> flowBuilder = new FlowBuilder<Flow>("flow3");
flowBuilder.start(step1()).next(step2()).end();
return flowBuilder.build();
}
@Bean
public Job job() {
return jobBuilderFactory.get("My-Job").start(flow1()).split(new SimpleAsyncTaskExecutor()).add(flow2(), flow3())
.end().build();
}
}
Step 4: Define App.java.
App.java
package com.sample.app;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@EnableBatchProcessing
@SpringBootApplication
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
}
Step 5: Create ‘application.properties’ file under src/main/resources folder.
application.properties
logging.level.root=ERROR
logging.level.org.hibernate=ERROR
## H2 specific properties
spring.h2.console.enabled=true
spring.h2.console.path=/h2
spring.datasource.url=jdbc:h2:file:~/db/myOrg.db;DB_CLOSE_ON_EXIT=FALSE;DB_CLOSE_DELAY=-1;
spring.datasource.username=krishna
spring.datasource.password=password123
spring.datasource.driverClassName=org.h2.Driver
## JPA specific properties
# Creates the schema, destroying previous data.
spring.jpa.hibernate.ddl-auto=create
spring.jpa.database-platform=org.hibernate.dialect.H2Dialect
spring.jpa.show-sql=false
spring.jpa.properties.hibernate.format_sql=false
## Database connection pooling properties
# Number of ms to wait before throwing an exception if no connection is available.
spring.datasource.max-wait=10000
# Maximum number of active connections that can be allocated from this pool at the same time.
spring.datasource.tomcat.max-active=10
spring.datasource.tomcat.max-idle=5
spring.datasource.tomcat.min-idle=3
Total project structure looks like below.
Run App.java, you will see below messages in console.
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.1.6.RELEASE)
My-Step1 getting executed by SimpleAsyncTaskExecutor-2
My-Step1 getting executed by SimpleAsyncTaskExecutor-3
SimpleAsyncTaskExecutor-3 going to sleep for 2 seconds
My-Step1 getting executed by SimpleAsyncTaskExecutor-1
SimpleAsyncTaskExecutor-1 going to sleep for 2 seconds
SimpleAsyncTaskExecutor-2 going to sleep for 2 seconds
My-Step1 executed by SimpleAsyncTaskExecutor-3
My-Step1 executed by SimpleAsyncTaskExecutor-2
My-Step1 executed by SimpleAsyncTaskExecutor-1
My-Step2 getting executed by SimpleAsyncTaskExecutor-3
SimpleAsyncTaskExecutor-3 going to sleep for 2 seconds
My-Step2 getting executed by SimpleAsyncTaskExecutor-2
SimpleAsyncTaskExecutor-2 going to sleep for 2 seconds
My-Step2 getting executed by SimpleAsyncTaskExecutor-1
SimpleAsyncTaskExecutor-1 going to sleep for 2 seconds
My-Step2 executed by SimpleAsyncTaskExecutor-3
My-Step2 executed by SimpleAsyncTaskExecutor-1
My-Step2 executed by SimpleAsyncTaskExecutor-2
You can download complete working application from this link.
https://github.com/harikrishna553/springboot/tree/master/batch/flow-parallel-exec-demo
No comments:
Post a Comment