'ValidatingItemProcessor' is used to validate the input and return without any modification. Using this processor either we can skip the item if it do not meet the validation criteria or fail the job.
How to make the items to skip if they do not meet validation criteria?
'ValidatingItemProcessor#setFilter(true)' used to skip the invalid items.
How to throw ValidationException?
'ValidatingItemProcessor#setFilter(false)' throw ValidationException, whenever it find an item that do not meet validation criteria. It leads to entire job in FAILED state.
Step 1: Create Validator by implementing Validator interface.
public class EmployeeValidator implements Validator<Employee> {
@Override
public void validate(Employee emp) throws ValidationException {
if (emp.getFirstName() == null) {
throw new ValidationException("firstName must not be null");
}
if (emp.getLastName() == null) {
throw new ValidationException("lastName must not be null");
}
}
}
Step 2: Define an instance of ValidatingItemProcessor using EmployeeValidator created in step 1.
@Bean
public ValidatingItemProcessor<Employee> validatingItemProcessor() {
ValidatingItemProcessor<Employee> itemProcessor = new ValidatingItemProcessor<>(new EmployeeValidator());
itemProcessor.setFilter(true);
return itemProcessor;
}
Find the below working application.
Step 1: Create new maven project ‘validatingItemProcessor-demo’.
Step 2: Update pom.xml with maven dependencies.
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sample.app</groupId>
<artifactId>validatingItemProcessor-demo</artifactId>
<version>1</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.6.RELEASE</version>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.springframework.batch/spring-batch-core -->
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
</dependency>
</dependencies>
</project>
Step 3: Create application.properties file under src/main/resources folder.
application.properties
logging.level.root=ERROR logging.level.org.hibernate=ERROR ## H2 specific properties spring.h2.console.enabled=true spring.h2.console.path=/h2 spring.datasource.url=jdbc:h2:file:~/db/myOrg.db;DB_CLOSE_ON_EXIT=FALSE;DB_CLOSE_DELAY=-1; spring.datasource.username=krishna spring.datasource.password=password123 spring.datasource.driverClassName=org.h2.Driver ## JPA specific properties # Creates the schema, destroying previous data. spring.jpa.hibernate.ddl-auto=create-drop spring.jpa.database-platform=org.hibernate.dialect.H2Dialect spring.jpa.show-sql=false spring.jpa.properties.hibernate.format_sql=false ## Database connection pooling properties # Number of ms to wait before throwing an exception if no connection is available. spring.datasource.max-wait=10000 # Maximum number of active connections that can be allocated from this pool at the same time. spring.datasource.tomcat.max-active=10 spring.datasource.tomcat.max-idle=5 spring.datasource.tomcat.min-idle=3
Step 4: Create data.sql file under src/main/resources folder.
data.sql
INSERT INTO employee (id, first_name, last_name) VALUES
(1, 'Sowmya', null),
(2, 'Mridhu', 'Latha'),
(3, 'Gowthami','Prasad'),
(4, 'Sailu', 'Venkat'),
(5, 'chamu', null),
(6, 'Rama', 'Lakshman'),
(7, 'Saranya', 'Nataraj'),
(8, null, 'Surendra');
Step 5: Create package ‘com.sample.app.entity’ and define Employee class.
Employee.java
package com.sample.app.entity;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.Id;
import javax.persistence.Table;
@Entity
@Table(name = "employee")
public class Employee {
@Id
@GeneratedValue
@Column(name = "id")
private int id;
@Column(name = "first_name")
private String firstName;
@Column(name = "last_name")
private String lastName;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("Employee [id=").append(id).append(", firstName=").append(firstName).append(", lastName=")
.append(lastName).append("]");
return builder.toString();
}
}
Step 6: Create package ‘com.sample.app.mappers’ and define EmployeeRowMapper.
EmployeeRowMapper.java
package com.sample.app.mappers;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.springframework.jdbc.core.RowMapper;
import com.sample.app.entity.Employee;
public class EmployeeRowMapper implements RowMapper<Employee> {
@Override
public Employee mapRow(ResultSet rs, int rowNum) throws SQLException {
Employee emp = new Employee();
emp.setId(rs.getInt("id"));
emp.setFirstName(rs.getString("first_name"));
emp.setLastName(rs.getString("last_name"));
return emp;
}
}
Step 7: Create package ‘com.sample.app.lineaggregators’ and define EmployeeJsonLineAggregator.
EmployeeJsonLineAggregator.java
package com.sample.app.lineaggregators;
import org.springframework.batch.item.file.transform.LineAggregator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.sample.app.entity.Employee;
public class EmployeeJsonLineAggregator implements LineAggregator<Employee> {
private ObjectMapper objectMapper = new ObjectMapper();
{
objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
}
@Override
public String aggregate(Employee emp) {
try {
return objectMapper.writeValueAsString(emp);
} catch (JsonProcessingException e) {
throw new RuntimeException("Error Occured while serializing Employee instance : " + emp);
}
}
}
Step 8: Create package ‘com.sample.app.validators’ and define EmployeeValidator.
EmployeeValidator.java
package com.sample.app.validators;
import org.springframework.batch.item.validator.ValidationException;
import org.springframework.batch.item.validator.Validator;
import com.sample.app.entity.Employee;
public class EmployeeValidator implements Validator<Employee> {
@Override
public void validate(Employee emp) throws ValidationException {
if (emp.getFirstName() == null) {
throw new ValidationException("firstName must not be null");
}
if (emp.getLastName() == null) {
throw new ValidationException("lastName must not be null");
}
}
}
Step 9: Create package ‘com.sample.app.configuration’ and define JobConfiguration.
JobConfiguration.java
package com.sample.app.configuration;
import javax.sql.DataSource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.validator.ValidatingItemProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.transaction.PlatformTransactionManager;
import com.sample.app.entity.Employee;
import com.sample.app.lineaggregators.EmployeeJsonLineAggregator;
import com.sample.app.mappers.EmployeeRowMapper;
import com.sample.app.validators.EmployeeValidator;
@Configuration
@EnableBatchProcessing
public class JobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
@Bean
public JdbcCursorItemReader<Employee> jdbcCursorItemReader() {
JdbcCursorItemReader<Employee> cursorItemReader = new JdbcCursorItemReader<>();
cursorItemReader.setSql("SELECT id, first_name, last_name FROM employee ORDER BY first_name");
cursorItemReader.setDataSource(dataSource);
cursorItemReader.setRowMapper(new EmployeeRowMapper());
return cursorItemReader;
}
@Bean
public FlatFileItemWriter<Employee> jsonFileItemWriter() throws Exception {
FlatFileItemWriter<Employee> flatFileItemWriter = new FlatFileItemWriter<>();
flatFileItemWriter.setLineAggregator(new EmployeeJsonLineAggregator());
String outFilePath = "/Users/Shared/result.json";
flatFileItemWriter.setResource(new FileSystemResource(outFilePath));
flatFileItemWriter.afterPropertiesSet();
return flatFileItemWriter;
}
@Bean
public ValidatingItemProcessor<Employee> validatingItemProcessor() {
ValidatingItemProcessor<Employee> itemProcessor = new ValidatingItemProcessor<>(new EmployeeValidator());
itemProcessor.setFilter(true);
return itemProcessor;
}
@Bean
public Step step1() throws Exception {
return this.stepBuilderFactory.get("step1").<Employee, Employee>chunk(3).reader(jdbcCursorItemReader())
.processor(validatingItemProcessor()).writer(jsonFileItemWriter()).build();
}
@Bean
public Job myJob(JobRepository jobRepository, PlatformTransactionManager platformTransactionManager)
throws Exception {
return jobBuilderFactory.get("My-First-Job").start(step1()).build();
}
}
Step 10: Define main application class.
App.java
package com.sample.app;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@EnableBatchProcessing
@SpringBootApplication
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
}
Total project structure looks like below.
Run App.java.
Open ‘/Users/Shared/result.json’ file, you can observe that the fields with null values are skipped.
$ cat /Users/Shared/result.json
{
"id" : 3,
"firstName" : "Gowthami",
"lastName" : "Prasad"
}
{
"id" : 2,
"firstName" : "Mridhu",
"lastName" : "Latha"
}
{
"id" : 6,
"firstName" : "Rama",
"lastName" : "Lakshman"
}
{
"id" : 4,
"firstName" : "Sailu",
"lastName" : "Venkat"
}
{
"id" : 7,
"firstName" : "Saranya",
"lastName" : "Nataraj"
}
You can download complete working application from this link.
https://github.com/harikrishna553/springboot/tree/master/batch/validatingItemProcessor-demo
No comments:
Post a Comment