Wednesday 18 November 2020

Spring Batch: Flows

Flow is a collection of steps, which can be reusable. For example, we can create a flow that contain some common steps across the jobs.

 

For example, you can create a flow that read all the subscribers of a job and send an email that some Job X is started to all the subscribers. Since this is a common step, we can create a flow and reuse it in multiple jobs.

 


 

How to create a flow?

Using FlowBuilder, you can create a flow.

@Bean
public Flow sendNotification() {

	FlowBuilder<Flow> flowBuilder = new FlowBuilder<Flow>("notification");

	flowBuilder.start(readSubscribers()).next(sendEmail()).end();

	return flowBuilder.build();
}


Once you define a flow, you can use it anywhere in the job steps. For example, I used this flow as starting step.

@Bean
public Job job(Flow sendNotification) {
	return jobBuilderFactory.get("My-Job").start(sendNotification).next(step1()).next(step2()).next(step3()).end().build();
}



Find the below working application.

 

Step 1: Create new maven project ‘flow-demo’.

 

Step 2: Update pom.xml with maven dependencies.

 

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.sample.app</groupId>
	<artifactId>flow-demo</artifactId>
	<version>1</version>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.1.6.RELEASE</version>
	</parent>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	</properties>

	<dependencies>

		<!-- https://mvnrepository.com/artifact/org.springframework.batch/spring-batch-core -->
		<dependency>
			<groupId>org.springframework.batch</groupId>
			<artifactId>spring-batch-core</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-jpa</artifactId>
		</dependency>

		<dependency>
			<groupId>com.h2database</groupId>
			<artifactId>h2</artifactId>
		</dependency>
	</dependencies>
</project>


Step 3: Create new package ‘com.sample.app.configuration’ and define EmailFlowConfiguration.

 

EmailFlowConfiguration.java

package com.sample.app.configuration;

import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class EmailFlowConfiguration {
	@Autowired
	private StepBuilderFactory stepBuilderFactory;

	@Bean
	public Step readSubscribers() {
		return this.stepBuilderFactory.get("Read-subscribers")
				.tasklet((StepContribution contribution, ChunkContext chunkContext) -> {
					System.out.println("Flow Step 1: Reading all the subscribers for this job");

					return RepeatStatus.FINISHED;

				}).build();
	}

	@Bean
	public Step sendEmail() {
		return this.stepBuilderFactory.get("Send-Email")
				.tasklet((StepContribution contribution, ChunkContext chunkContext) -> {
					System.out.println("Flow Step 2: Sending email to all subscribers");

					return RepeatStatus.FINISHED;

				}).build();
	}

	@Bean
	public Flow sendNotification() {

		FlowBuilder<Flow> flowBuilder = new FlowBuilder<Flow>("notification");

		flowBuilder.start(readSubscribers()).next(sendEmail()).end();

		return flowBuilder.build();
	}
}


Step 4: Define JobConfiguration.

 

JobConfiguration.java

package com.sample.app.configuration;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class JobConfiguration {
	@Autowired
	private JobBuilderFactory jobBuilderFactory;

	@Autowired
	private StepBuilderFactory stepBuilderFactory;

	@Bean
	public Step step1() {
		return this.stepBuilderFactory.get("My-Step1")
				.tasklet((StepContribution contribution, ChunkContext chunkContext) -> {
					System.out.println("Step1:  Finished Execution");

					return RepeatStatus.FINISHED;

				}).build();
	}

	@Bean
	public Step step2() {
		return this.stepBuilderFactory.get("My-Step2")
				.tasklet((StepContribution contribution, ChunkContext chunkContext) -> {
					System.out.println("Step2:  Finished Execution");

					return RepeatStatus.FINISHED;

				}).build();
	}

	@Bean
	public Step step3() {
		return this.stepBuilderFactory.get("My-Step3")
				.tasklet((StepContribution contribution, ChunkContext chunkContext) -> {
					System.out.println("Step3:  Finished Execution");

					return RepeatStatus.FINISHED;

				}).build();
	}

	@Bean
	public Job job(Flow sendNotification) {
		return jobBuilderFactory.get("My-Job").start(sendNotification).next(step1()).next(step2()).next(step3()).end().build();
	}

}


Step 5: Define main application class.

 

App.java

package com.sample.app;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@EnableBatchProcessing
@SpringBootApplication
public class App {

	public static void main(String[] args) {
		SpringApplication.run(App.class, args);
	}
}


Step 6: Create application.properties file under src/main/resources folder.

 

application.properties


logging.level.root=ERROR
logging.level.org.hibernate=ERROR

## H2 specific properties
spring.h2.console.enabled=true
spring.h2.console.path=/h2

spring.datasource.url=jdbc:h2:file:~/db/myOrg.db;DB_CLOSE_ON_EXIT=FALSE;DB_CLOSE_DELAY=-1;

spring.datasource.username=krishna
spring.datasource.password=password123

spring.datasource.driverClassName=org.h2.Driver

## JPA specific properties
# Creates the schema, destroying previous data.
spring.jpa.hibernate.ddl-auto=create

spring.jpa.database-platform=org.hibernate.dialect.H2Dialect

spring.jpa.show-sql=false
spring.jpa.properties.hibernate.format_sql=false

## Database connection pooling properties
# Number of ms to wait before throwing an exception if no connection is available.
spring.datasource.max-wait=10000

# Maximum number of active connections that can be allocated from this pool at the same time.
spring.datasource.tomcat.max-active=10
spring.datasource.tomcat.max-idle=5
spring.datasource.tomcat.min-idle=3


Total project structure looks like below.





Run App.java, you will see below messages in console.


  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.1.6.RELEASE)

Flow Step 1: Reading all the subscribers for this job
Flow Step 2: Sending email to all subscribers
Step1:  Finished Execution
Step2:  Finished Execution
Step3:  Finished Execution


You can download complete working application from this link.

https://github.com/harikrishna553/springboot/tree/master/batch/flow-demo






Previous                                                    Next                                                    Home

No comments:

Post a Comment