In this post, I am going to show how to maintain state while reading the data.
ItemStream Interface
ItemStream interface provide access to the ExecutionContext, where the state is maintained.
Following table summarizes the methods defined by ItemStream interface.
Method |
Description |
open(ExecutionContext executionContext) |
Open the stream for the provided ExecutionContext. When the job is executed first time, then new ExecutionContext object gets created and given to open method.
But when the job is restarted, then already persisted ExecutionContext object is given to open method. Using the persisted ExecutionContext we can restart the job where we left. |
void update(ExecutionContext executionContext) |
Indicates that the execution context provided during open is about to be saved. If any state is remaining, but has not been put in the context, it should be added here.
This method is called once per chunk or once per transaction. We can use this method to update the current state. |
void close() |
If any resources are needed for the stream to operate they need to be destroyed here. Once this method has been called all other methods (except open) may throw an exception. |
As shown in above figure, When a job started execution, open method of ItemReader gets called. When the job is executed first time, then new ExecutionContext object gets created and given to open method. But when the job is restarted, then already persisted ExecutionContext object is given to open method. Using the persisted ExecutionContext we can restart the job where we left
‘update’ method is called once per chunk or once per transaction (after read -> Process -> Write). We can use this method to update the current state.
‘close’ method is called once per chunk or once per transaction. If any resources are needed for the stream to operate they need to be destroyed here.
How to create stateful reader?
By implementing 'ItemStreamReader' interface you can create stateful item reader.
public class MyStatefulItemReader implements ItemStreamReader<String> {
......
......
}
Find the below working application.
Step 1: Create new maven project 'stateful-read-demo'.
Step 2: Update pom.xml with maven dependencies.
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sample.app</groupId>
<artifactId>stateful-read-demo</artifactId>
<version>1</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.6.RELEASE</version>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.springframework.batch/spring-batch-core -->
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
</dependency>
</dependencies>
</project>
Step 3: Create new package ‘com.sample.app.readers’ and define MyStatefulItemReader.
MyStatefulItemReader.java
package com.sample.app.readers;
import java.util.List;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemStreamReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
public class MyStatefulItemReader implements ItemStreamReader<String> {
private List<String> items;
private int currentIndex;
private boolean firstRun;
public MyStatefulItemReader(List<String> items) {
this.items = items;
currentIndex = 0;
firstRun = true;
}
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
System.out.println("In open method");
if (executionContext.containsKey("currentItemIndex")) {
this.currentIndex = executionContext.getInt("currentItemIndex");
this.firstRun = false;
} else {
this.currentIndex = 0;
executionContext.put("currentItemIndex", this.currentIndex);
}
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
System.out.println("In update method");
executionContext.put("currentItemIndex", this.currentIndex);
}
@Override
public void close() throws ItemStreamException {
System.out.println("In close method");
}
@Override
public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
System.out.println("In read method");
String item = null;
if (currentIndex == 9 && firstRun) {
throw new RuntimeException("Error Occuured while processing the data");
}
if (currentIndex < items.size()) {
item = items.get(currentIndex);
currentIndex++;
}
return item;
}
}
As you see ‘read’ method, I am throwing exception for the item 9.
As you see ‘open’ method, I am reading ‘currentItemIndex’ value from ExecutionContext on restart of the application.
Step 4: Create package ‘com.sample.app.configuration’ and define JobConfiguration.
JobConfiguration.java
package com.sample.app.configuration;
import java.util.ArrayList;
import java.util.List;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
import com.sample.app.readers.MyStatefulItemReader;
@Configuration
@EnableBatchProcessing
public class JobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public MyStatefulItemReader myStatefulItemReader() {
List<String> items = new ArrayList<>();
for (int i = 0; i < 25; i++) {
items.add("Item " + i);
}
return new MyStatefulItemReader(items);
}
@Bean
public ItemWriter itemWriter() {
return new ItemWriter<String>() {
@Override
public void write(List<? extends String> items) throws Exception {
for (String item : items) {
System.out.println("Writing : " + item);
}
}
};
}
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1").chunk(5).reader(myStatefulItemReader()).writer(itemWriter())
.build();
}
@Bean
public Job myJob(JobRepository jobRepository, PlatformTransactionManager platformTransactionManager) {
return jobBuilderFactory.get("My-First-Job").start(step1()).build();
}
}
Step 5: Define App.java
App.java
package com.sample.app;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@EnableBatchProcessing
@SpringBootApplication
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
}
Step 6: Create ‘application.properties’ file under src/main/resources folder.
application.properties
logging.level.root=ERROR logging.level.org.hibernate=ERROR ## H2 specific properties spring.h2.console.enabled=true spring.h2.console.path=/h2 spring.datasource.url=jdbc:h2:file:~/db/myOrg.db;DB_CLOSE_ON_EXIT=FALSE;DB_CLOSE_DELAY=-1; spring.datasource.username=krishna spring.datasource.password=password123 spring.datasource.driverClassName=org.h2.Driver ## JPA specific properties # Creates the schema, destroying previous data. spring.jpa.hibernate.ddl-auto=create-drop spring.jpa.database-platform=org.hibernate.dialect.H2Dialect spring.jpa.show-sql=false spring.jpa.properties.hibernate.format_sql=false ## Database connection pooling properties # Number of ms to wait before throwing an exception if no connection is available. spring.datasource.max-wait=10000 # Maximum number of active connections that can be allocated from this pool at the same time. spring.datasource.tomcat.max-active=10 spring.datasource.tomcat.max-idle=5 spring.datasource.tomcat.min-idle=3
Total project structure looks like below.
Run App.java, you will see below messages in console.
In open method In update method In read method In read method In read method In read method In read method Writing : Item 0 Writing : Item 1 Writing : Item 2 Writing : Item 3 Writing : Item 4 In update method In read method In read method In read method In read method In read method 2020-04-07 15:45:09.774 ERROR 70982 --- [ main] o.s.batch.core.step.AbstractStep : Encountered an error executing step step1 in job My-First-Job java.lang.RuntimeException: Error Occuured while processing the data at com.sample.app.readers.MyStatefulItemReader.read(MyStatefulItemReader.java:55) ~[classes/:na] at com.sample.app.readers.MyStatefulItemReader.read(MyStatefulItemReader.java:1) ~[classes/:na] at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:94) ~[spring-batch-core-4.1.2.RELEASE.jar:4.1.2.RELEASE] ..... ..... In close method
Open the url 'http://localhost:8080/h2' to see h2 console.
Give the username as 'krishna' and password as 'password123' and login to H2 console.
See the data of 'BATCH_STEP_EXECUTION_CONTEXT' table.
Take the information from 'SHORT_CONTEXT', you will see following information.
{
"batch.taskletType": "org.springframework.batch.core.step.item.ChunkOrientedTasklet",
"batch.stepType": "org.springframework.batch.core.step.tasklet.TaskletStep",
"currentItemIndex": 5
}
As you see above json, currentItemIndex value is set to 5. That means we processed items till the index 4 and when we restart the application, we should start processing from the index 5.
Stop and restart the application, you will see below messages in console.
In open method
In update method
In read method
In read method
In read method
In read method
In read method
Writing : Item 5
Writing : Item 6
Writing : Item 7
Writing : Item 8
Writing : Item 9
In update method
In read method
In read method
In read method
In read method
In read method
Writing : Item 10
Writing : Item 11
Writing : Item 12
Writing : Item 13
Writing : Item 14
In update method
In read method
In read method
In read method
In read method
In read method
Writing : Item 15
Writing : Item 16
Writing : Item 17
Writing : Item 18
Writing : Item 19
In update method
In read method
In read method
In read method
In read method
In read method
Writing : Item 20
Writing : Item 21
Writing : Item 22
Writing : Item 23
Writing : Item 24
In update method
In read method
In update method
In close method
You can download complete working application from this link.
https://github.com/harikrishna553/springboot/tree/master/batch/stateful-read-demo
No comments:
Post a Comment