Monday 30 November 2020

Spring Batch: Maintain State while reading

 

In this post, I am going to show how to maintain state while reading the data.

 

ItemStream Interface

ItemStream interface provide access to the ExecutionContext, where the state is maintained.

 

Following table summarizes the methods defined by ItemStream interface.

 

Method

Description

open(ExecutionContext executionContext)

Open the stream for the provided ExecutionContext. When the job is executed first time, then new ExecutionContext object gets created and given to open method.

 

But when the job is restarted, then already persisted ExecutionContext object is given to open method. Using the persisted ExecutionContext we can restart the job where we left.

void update(ExecutionContext executionContext)

Indicates that the execution context provided during open is about to be saved. If any state is remaining, but has not been put in the context, it should be added here.

 

This method is called once per chunk or once per transaction. We can use this method to update the current state.

void close()

If any resources are needed for the stream to operate they need to be destroyed here. Once this method has been called all other methods (except open) may throw an exception.

 

 


 

As shown in above figure, When a job started execution, open method of ItemReader gets called. When the job is executed first time, then new ExecutionContext object gets created and given to open method. But when the job is restarted, then already persisted ExecutionContext object is given to open method. Using the persisted ExecutionContext we can restart the job where we left

 

‘update’ method is called once per chunk or once per transaction (after read -> Process -> Write). We can use this method to update the current state.

 

‘close’ method is called once per chunk or once per transaction. If any resources are needed for the stream to operate they need to be destroyed here.

 

How to create stateful reader?

By implementing 'ItemStreamReader' interface you can create stateful item reader.

 

public class MyStatefulItemReader implements ItemStreamReader<String> {

         ......

         ......

}

 

Find the below working application.

 

Step 1: Create new maven project 'stateful-read-demo'.

 

Step 2: Update pom.xml with maven dependencies.

 

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.sample.app</groupId>
	<artifactId>stateful-read-demo</artifactId>
	<version>1</version>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.1.6.RELEASE</version>
	</parent>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	</properties>

	<dependencies>

		<!-- https://mvnrepository.com/artifact/org.springframework.batch/spring-batch-core -->
		<dependency>
			<groupId>org.springframework.batch</groupId>
			<artifactId>spring-batch-core</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-jpa</artifactId>
		</dependency>

		<dependency>
			<groupId>com.h2database</groupId>
			<artifactId>h2</artifactId>
		</dependency>
	</dependencies>
</project>


Step 3: Create new package ‘com.sample.app.readers’ and define MyStatefulItemReader.

 

MyStatefulItemReader.java

package com.sample.app.readers;

import java.util.List;

import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemStreamReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;

public class MyStatefulItemReader implements ItemStreamReader<String> {

	private List<String> items;
	private int currentIndex;
	private boolean firstRun;

	public MyStatefulItemReader(List<String> items) {
		this.items = items;
		currentIndex = 0;
		firstRun = true;
	}

	@Override
	public void open(ExecutionContext executionContext) throws ItemStreamException {
		System.out.println("In open method");
		if (executionContext.containsKey("currentItemIndex")) {
			this.currentIndex = executionContext.getInt("currentItemIndex");
			this.firstRun = false;
		} else {
			this.currentIndex = 0;
			executionContext.put("currentItemIndex", this.currentIndex);
		}

	}

	@Override
	public void update(ExecutionContext executionContext) throws ItemStreamException {
		System.out.println("In update method");
		executionContext.put("currentItemIndex", this.currentIndex);
	}

	@Override
	public void close() throws ItemStreamException {
		System.out.println("In close method");

	}

	@Override
	public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
		System.out.println("In read method");
		String item = null;

		if (currentIndex == 9 && firstRun) {
			throw new RuntimeException("Error Occuured while processing the data");
		}

		if (currentIndex < items.size()) {
			item = items.get(currentIndex);
			currentIndex++;
		}

		return item;
	}

}


As you see ‘read’ method, I am throwing exception for the item 9.

 

As you see ‘open’ method, I am reading ‘currentItemIndex’ value from ExecutionContext on restart of the application.

 

Step 4: Create package ‘com.sample.app.configuration’ and define JobConfiguration.

 

JobConfiguration.java

package com.sample.app.configuration;

import java.util.ArrayList;
import java.util.List;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

import com.sample.app.readers.MyStatefulItemReader;

@Configuration
@EnableBatchProcessing
public class JobConfiguration {
	@Autowired
	private JobBuilderFactory jobBuilderFactory;

	@Autowired
	private StepBuilderFactory stepBuilderFactory;

	@Bean
	public MyStatefulItemReader myStatefulItemReader() {
		List<String> items = new ArrayList<>();

		for (int i = 0; i < 25; i++) {
			items.add("Item " + i);
		}

		return new MyStatefulItemReader(items);
	}

	@Bean
	public ItemWriter itemWriter() {
		return new ItemWriter<String>() {

			@Override
			public void write(List<? extends String> items) throws Exception {
				for (String item : items) {
					System.out.println("Writing : " + item);
				}
			}

		};
	}

	@Bean
	public Step step1() {
		return this.stepBuilderFactory.get("step1").chunk(5).reader(myStatefulItemReader()).writer(itemWriter())
				.build();
	}

	@Bean
	public Job myJob(JobRepository jobRepository, PlatformTransactionManager platformTransactionManager) {

		return jobBuilderFactory.get("My-First-Job").start(step1()).build();
	}

}


Step 5: Define App.java

 

App.java 

package com.sample.app;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@EnableBatchProcessing
@SpringBootApplication
public class App {

	public static void main(String[] args) {
		SpringApplication.run(App.class, args);
	}
}

 

Step 6: Create ‘application.properties’ file under src/main/resources folder.

 

application.properties

 

logging.level.root=ERROR
logging.level.org.hibernate=ERROR

## H2 specific properties
spring.h2.console.enabled=true
spring.h2.console.path=/h2

spring.datasource.url=jdbc:h2:file:~/db/myOrg.db;DB_CLOSE_ON_EXIT=FALSE;DB_CLOSE_DELAY=-1;

spring.datasource.username=krishna
spring.datasource.password=password123

spring.datasource.driverClassName=org.h2.Driver

## JPA specific properties
# Creates the schema, destroying previous data.
spring.jpa.hibernate.ddl-auto=create-drop

spring.jpa.database-platform=org.hibernate.dialect.H2Dialect

spring.jpa.show-sql=false
spring.jpa.properties.hibernate.format_sql=false

## Database connection pooling properties
# Number of ms to wait before throwing an exception if no connection is available.
spring.datasource.max-wait=10000

# Maximum number of active connections that can be allocated from this pool at the same time.
spring.datasource.tomcat.max-active=10
spring.datasource.tomcat.max-idle=5
spring.datasource.tomcat.min-idle=3

 

Total project structure looks like below.

 


Run App.java, you will see below messages in console.

In open method
In update method
In read method
In read method
In read method
In read method
In read method
Writing : Item 0
Writing : Item 1
Writing : Item 2
Writing : Item 3
Writing : Item 4
In update method
In read method
In read method
In read method
In read method
In read method
2020-04-07 15:45:09.774 ERROR 70982 --- [           main] o.s.batch.core.step.AbstractStep         : Encountered an error executing step step1 in job My-First-Job

java.lang.RuntimeException: Error Occuured while processing the data
	at com.sample.app.readers.MyStatefulItemReader.read(MyStatefulItemReader.java:55) ~[classes/:na]
	at com.sample.app.readers.MyStatefulItemReader.read(MyStatefulItemReader.java:1) ~[classes/:na]
	at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:94) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE]
	.....
	.....
In close method

Open the url 'http://localhost:8080/h2' to see h2 console.

 

 

Give the username as 'krishna' and password as 'password123' and login to H2 console.



See the data of 'BATCH_STEP_EXECUTION_CONTEXT' table.

 


 

Take the information from 'SHORT_CONTEXT', you will see following information.

{
	"batch.taskletType": "org.springframework.batch.core.step.item.ChunkOrientedTasklet",
	"batch.stepType": "org.springframework.batch.core.step.tasklet.TaskletStep",
	"currentItemIndex": 5
}


As you see above json, currentItemIndex value is set to 5. That means we processed items till the index 4 and when we restart the application, we should start processing from the index 5.

 

Stop and restart the application, you will see below messages in console.

In open method
In update method
In read method
In read method
In read method
In read method
In read method
Writing : Item 5
Writing : Item 6
Writing : Item 7
Writing : Item 8
Writing : Item 9
In update method
In read method
In read method
In read method
In read method
In read method
Writing : Item 10
Writing : Item 11
Writing : Item 12
Writing : Item 13
Writing : Item 14
In update method
In read method
In read method
In read method
In read method
In read method
Writing : Item 15
Writing : Item 16
Writing : Item 17
Writing : Item 18
Writing : Item 19
In update method
In read method
In read method
In read method
In read method
In read method
Writing : Item 20
Writing : Item 21
Writing : Item 22
Writing : Item 23
Writing : Item 24
In update method
In read method
In update method
In close method


You can download complete working application from this link.

https://github.com/harikrishna553/springboot/tree/master/batch/stateful-read-demo

 



 







Previous                                                    Next                                                    Home

No comments:

Post a Comment