Sunday 20 December 2020

Spring Batch: CompositeItemProcessor: Chain multiple Item Processors

CompositeItemProcessor is used to chain multiple item processors. When a CompositeItemProcessor receives an item, the item will be passed through a sequence of injected ItemProcessors.

 

 


For example, following snippet define CompositeItemProcessors which comprises of three item processors.

@Bean
public CompositeItemProcessor<Employee, Employee> compositeItemProcessor() throws Exception {
  CompositeItemProcessor<Employee, Employee> processor = new CompositeItemProcessor<>();

  List<ItemProcessor<Employee, Employee>> processors = Arrays.asList(validatingItemProcessor(),
      upperCaseProcessor, nameFilterProcessor);

  processor.setDelegates(processors);
  processor.afterPropertiesSet();

  return processor;
}


Find the below working application.

 

Step 1: Create new maven project ‘compositeItemProcessor-demo.

 

Step 2: Update pom.xml with maven dependencies.

 

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.sample.app</groupId>
  <artifactId>compositeItemProcessor-demo</artifactId>
  <version>1</version>

  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.1.6.RELEASE</version>
  </parent>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>

    <!-- https://mvnrepository.com/artifact/org.springframework.batch/spring-batch-core -->
    <dependency>
      <groupId>org.springframework.batch</groupId>
      <artifactId>spring-batch-core</artifactId>
    </dependency>

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>

    <dependency>
      <groupId>com.h2database</groupId>
      <artifactId>h2</artifactId>
    </dependency>


  </dependencies>
</project>


Step 3: Create application.properties file under src/main/resources folder.

 

application.properties

logging.level.root=ERROR
logging.level.org.hibernate=ERROR

## H2 specific properties
spring.h2.console.enabled=true
spring.h2.console.path=/h2

spring.datasource.url=jdbc:h2:file:~/db/myOrg.db;DB_CLOSE_ON_EXIT=FALSE;DB_CLOSE_DELAY=-1;

spring.datasource.username=krishna
spring.datasource.password=password123

spring.datasource.driverClassName=org.h2.Driver

## JPA specific properties
# Creates the schema, destroying previous data.
spring.jpa.hibernate.ddl-auto=create-drop

spring.jpa.database-platform=org.hibernate.dialect.H2Dialect

spring.jpa.show-sql=false
spring.jpa.properties.hibernate.format_sql=false

## Database connection pooling properties
# Number of ms to wait before throwing an exception if no connection is available.
spring.datasource.max-wait=10000

# Maximum number of active connections that can be allocated from this pool at the same time.
spring.datasource.tomcat.max-active=10
spring.datasource.tomcat.max-idle=5
spring.datasource.tomcat.min-idle=3


Step 4: Create data.sql file in src/main/resources folder.

 

data.sql

INSERT INTO employee (id, first_name, last_name) VALUES
  (1, 'Sowmya', null),
  (2, 'Mridhu', 'Latha'),
  (3, 'Gowthami','Prasad'),
  (4, 'Sailu', 'Venkat'),
  (5, 'chamu', null),
  (6, 'Rama', 'Lakshman'),
  (7, 'Saranya', 'Nataraj'),
  (8, null, 'Surendra');


Step 5: Create package ‘com.sample.app.entity’ and define Employee class.

 

Employee.java

package com.sample.app.entity;

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.Id;
import javax.persistence.Table;

@Entity
@Table(name = "employee")
public class Employee {

  @Id
  @GeneratedValue
  @Column(name = "id")
  private int id;

  @Column(name = "first_name")
  private String firstName;

  @Column(name = "last_name")
  private String lastName;

  public int getId() {
    return id;
  }

  public void setId(int id) {
    this.id = id;
  }

  public String getFirstName() {
    return firstName;
  }

  public void setFirstName(String firstName) {
    this.firstName = firstName;
  }

  public String getLastName() {
    return lastName;
  }

  public void setLastName(String lastName) {
    this.lastName = lastName;
  }

  @Override
  public String toString() {
    StringBuilder builder = new StringBuilder();
    builder.append("Employee [id=").append(id).append(", firstName=").append(firstName).append(", lastName=")
        .append(lastName).append("]");
    return builder.toString();
  }

}


Step 6: Create package ‘com.sample.app.mappers’ and define EmployeeRowMapper.

 

EmployeeRowMapper.java

package com.sample.app.mappers;

import java.sql.ResultSet;
import java.sql.SQLException;

import org.springframework.jdbc.core.RowMapper;

import com.sample.app.entity.Employee;

public class EmployeeRowMapper implements RowMapper<Employee> {

  @Override
  public Employee mapRow(ResultSet rs, int rowNum) throws SQLException {

    Employee emp = new Employee();

    emp.setId(rs.getInt("id"));
    emp.setFirstName(rs.getString("first_name"));
    emp.setLastName(rs.getString("last_name"));

    return emp;
  }

}


Step 7: Create package ‘com.sample.app.lineaggregators’ and define EmployeeJsonLineAggregator.

 

EmployeeJsonLineAggregator.java

package com.sample.app.lineaggregators;

import org.springframework.batch.item.file.transform.LineAggregator;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.sample.app.entity.Employee;

public class EmployeeJsonLineAggregator implements LineAggregator<Employee> {

  private ObjectMapper objectMapper = new ObjectMapper();

  {
    objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
  }

  @Override
  public String aggregate(Employee emp) {

    try {
      return objectMapper.writeValueAsString(emp);
    } catch (JsonProcessingException e) {
      throw new RuntimeException("Error Occured while serializing Employee instance : " + emp);
    }
  }

}


Step 8: Create package ‘com.sample.app.itemprocessor’ and define EmployeeNameFilterItemProcessor.

 

EmployeeNameFilterItemProcessor.java

package com.sample.app.itemprocessor;

import java.util.Arrays;
import java.util.List;

import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;

import com.sample.app.entity.Employee;

@Component
public class EmployeeNameFilterItemProcessor implements ItemProcessor<Employee, Employee> {

  private static final List<String> NAMES_TO_EXCLUDE = Arrays.asList("Rama", "Sailu");

  @Override
  public Employee process(Employee emp) throws Exception {

    if (NAMES_TO_EXCLUDE.contains(emp.getFirstName()) || NAMES_TO_EXCLUDE.contains(emp.getLastName())) {
      return null;
    }

    return emp;
  }

}


Step 9: Define EmployeeNameUpperCaseItemProcessor.

 

EmployeeNameUpperCaseItemProcessor.java

package com.sample.app.itemprocessor;

import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;

import com.sample.app.entity.Employee;

@Component
public class EmployeeNameUpperCaseItemProcessor implements ItemProcessor<Employee, Employee> {

  @Override
  public Employee process(Employee emp) throws Exception {

    Employee newEmp = new Employee();

    newEmp.setId(emp.getId());
    newEmp.setFirstName(emp.getFirstName().toUpperCase());
    newEmp.setLastName(emp.getLastName().toUpperCase());

    return newEmp;
  }

}


Step 10: Create package ‘com.sample.app.validators’ and define EmployeeValidator.

 

EmployeeValidator.java

package com.sample.app.validators;

import org.springframework.batch.item.validator.ValidationException;
import org.springframework.batch.item.validator.Validator;

import com.sample.app.entity.Employee;

public class EmployeeValidator implements Validator<Employee> {

  @Override
  public void validate(Employee emp) throws ValidationException {

    if (emp.getFirstName() == null) {
      throw new ValidationException("firstName must not be null");
    }

    if (emp.getLastName() == null) {
      throw new ValidationException("lastName must not be null");
    }

  }

}


Step 11: Create package ‘com.sample.app.configuration’ and define JobConfiguration.

 

JobConfiguration.java 

package com.sample.app.configuration;

import java.util.Arrays;
import java.util.List;

import javax.sql.DataSource;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.support.CompositeItemProcessor;
import org.springframework.batch.item.validator.ValidatingItemProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.transaction.PlatformTransactionManager;

import com.sample.app.entity.Employee;
import com.sample.app.itemprocessor.EmployeeNameFilterItemProcessor;
import com.sample.app.itemprocessor.EmployeeNameUpperCaseItemProcessor;
import com.sample.app.lineaggregators.EmployeeJsonLineAggregator;
import com.sample.app.mappers.EmployeeRowMapper;
import com.sample.app.validators.EmployeeValidator;

@Configuration
@EnableBatchProcessing
public class JobConfiguration {
  @Autowired
  private JobBuilderFactory jobBuilderFactory;

  @Autowired
  private StepBuilderFactory stepBuilderFactory;

  @Autowired
  private DataSource dataSource;

  @Autowired
  private EmployeeNameUpperCaseItemProcessor upperCaseProcessor;

  @Autowired
  private EmployeeNameFilterItemProcessor nameFilterProcessor;

  @Bean
  public JdbcCursorItemReader<Employee> jdbcCursorItemReader() {
    JdbcCursorItemReader<Employee> cursorItemReader = new JdbcCursorItemReader<>();

    cursorItemReader.setSql("SELECT id, first_name, last_name FROM employee ORDER BY first_name");
    cursorItemReader.setDataSource(dataSource);
    cursorItemReader.setRowMapper(new EmployeeRowMapper());
    return cursorItemReader;
  }

  @Bean
  public FlatFileItemWriter<Employee> jsonFileItemWriter() throws Exception {
    FlatFileItemWriter<Employee> flatFileItemWriter = new FlatFileItemWriter<>();

    flatFileItemWriter.setLineAggregator(new EmployeeJsonLineAggregator());
    String outFilePath = "/Users/Shared/result.json";

    flatFileItemWriter.setResource(new FileSystemResource(outFilePath));

    flatFileItemWriter.afterPropertiesSet();

    return flatFileItemWriter;
  }

  @Bean
  public ValidatingItemProcessor<Employee> validatingItemProcessor() {
    ValidatingItemProcessor<Employee> itemProcessor = new ValidatingItemProcessor<>(new EmployeeValidator());

    itemProcessor.setFilter(true);

    return itemProcessor;
  }

  @Bean
  public CompositeItemProcessor<Employee, Employee> compositeItemProcessor() throws Exception {
    CompositeItemProcessor<Employee, Employee> processor = new CompositeItemProcessor<>();

    List<ItemProcessor<Employee, Employee>> processors = Arrays.asList(validatingItemProcessor(),
        upperCaseProcessor, nameFilterProcessor);

    processor.setDelegates(processors);
    processor.afterPropertiesSet();

    return processor;
  }

  @Bean
  public Step step1() throws Exception {
    return this.stepBuilderFactory.get("step1").<Employee, Employee>chunk(3).reader(jdbcCursorItemReader())
        .processor(compositeItemProcessor()).writer(jsonFileItemWriter()).build();
  }

  @Bean
  public Job myJob(JobRepository jobRepository, PlatformTransactionManager platformTransactionManager)
      throws Exception {

    return jobBuilderFactory.get("My-First-Job").start(step1()).build();
  }

}

Step 12: Define main Application class.

 

App.java

package com.sample.app;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@EnableBatchProcessing
@SpringBootApplication
public class App {

  public static void main(String[] args) {
    SpringApplication.run(App.class, args);
  }
}

Total project structure looks like below.



  Previous                                                    Next                                                    Home

Run App.java.

 

Open the file ‘/Users/Shared/result.json’ to see the written content.

$cat /Users/Shared/result.json 
{
  "id" : 3,
  "firstName" : "GOWTHAMI",
  "lastName" : "PRASAD"
}
{
  "id" : 2,
  "firstName" : "MRIDHU",
  "lastName" : "LATHA"
}
{
  "id" : 6,
  "firstName" : "RAMA",
  "lastName" : "LAKSHMAN"
}
{
  "id" : 4,
  "firstName" : "SAILU",
  "lastName" : "VENKAT"
}
{
  "id" : 7,
  "firstName" : "SARANYA",
  "lastName" : "NATARAJ"
}


You can download complete working application from this link.

https://github.com/harikrishna553/springboot/tree/master/batch/compositeItemProcessor-demo










Previous                                                    Next                                                    Home

No comments:

Post a Comment