Thursday 17 December 2020

Spring Batch: Classify data while writing to multiple destinations

In this post, I am going to explain how to classify data while writing to multiple destinations.

 


 

As you see above image, Application reads data from database and write all the records with even ids to xml file and records with odd ids to json file.

 

Step 1: Create writers to write data to json and xml files.

@Bean
public FlatFileItemWriter<Employee> jsonFileItemWriter() throws Exception {
	FlatFileItemWriter<Employee> flatFileItemWriter = new FlatFileItemWriter<>();

	flatFileItemWriter.setLineAggregator(new EmployeeJsonLineAggregator());
	String outFilePath = "/Users/Shared/result.json";

	flatFileItemWriter.setResource(new FileSystemResource(outFilePath));

	flatFileItemWriter.afterPropertiesSet();

	return flatFileItemWriter;
}

@Bean
public StaxEventItemWriter<Employee> xmlFileItemWriter() throws Exception {
	StaxEventItemWriter<Employee> staxEventItemWriter = new StaxEventItemWriter<>();

	Map<String, Class> aliases = new HashMap<>();
	aliases.put("employee", Employee.class);

	XStreamMarshaller marshaller = new XStreamMarshaller();
	marshaller.setAliases(aliases);

	staxEventItemWriter.setRootTagName("employees");
	staxEventItemWriter.setMarshaller(marshaller);
	String outFilePath = "/Users/Shared/result.xml";
	staxEventItemWriter.setResource(new FileSystemResource(outFilePath));

	staxEventItemWriter.afterPropertiesSet();

	return staxEventItemWriter;
}


Step 2: Create classifier that classify data based on employee record id.

public class EmployeeClassifier implements Classifier<Employee, ItemWriter<? super Employee>> {

	private ItemWriter<Employee> evenItemWriter;
	private ItemWriter<Employee> oddItemWriter;

	public EmployeeClassifier(ItemWriter<Employee> evenItemWriter, ItemWriter<Employee> oddItemWriter) {
		this.evenItemWriter = evenItemWriter;
		this.oddItemWriter = oddItemWriter;
	}

	@Override
	public ItemWriter<? super Employee> classify(Employee employee) {

		int empId = employee.getId();

		if (empId % 2 == 0) {
			return evenItemWriter;
		}

		return oddItemWriter;

	}

}


Step 3: Define ClassifierCompositeItemWriter.


@Bean
public ClassifierCompositeItemWriter<Employee> classifierCompositeItemWriter() throws Exception {
	ClassifierCompositeItemWriter<Employee> writer = new ClassifierCompositeItemWriter<>();

	EmployeeClassifier empClassifier = new EmployeeClassifier(xmlFileItemWriter(), jsonFileItemWriter());

	writer.setClassifier(empClassifier);

	return writer;
}


Step 4: Define a step using ClassifierCompositeItemWriter.

@Bean
public Step step1() throws Exception {
	return this.stepBuilderFactory.get("step1")
			.<Employee, Employee>chunk(5)
			.reader(jdbcCursorItemReader())
			.writer(classifierCompositeItemWriter())
			.stream(xmlFileItemWriter())
			.stream(jsonFileItemWriter())
			.build();
}


As you see after registering the writer, I registered xmlFileItemWriter() and jsonFileItemWriter() as streams. This is used for callbacks that manage restart data.

 

Find the below working application.

 

Step 1: Create new maven project ‘write-classified-data-to-destinations’.

 

Step 2: Update pom.xml with maven dependencies.

 

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.sample.app</groupId>
	<artifactId>write-classified-data-to-destinations</artifactId>
	<version>1</version>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.1.6.RELEASE</version>
	</parent>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	</properties>

	<dependencies>

		<!-- https://mvnrepository.com/artifact/org.springframework.batch/spring-batch-core -->
		<dependency>
			<groupId>org.springframework.batch</groupId>
			<artifactId>spring-batch-core</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-jpa</artifactId>
		</dependency>

		<dependency>
			<groupId>com.h2database</groupId>
			<artifactId>h2</artifactId>
		</dependency>


		<dependency>
			<groupId>com.thoughtworks.xstream</groupId>
			<artifactId>xstream</artifactId>
			<version>1.4.11.1</version>
		</dependency>

		<!-- https://mvnrepository.com/artifact/org.springframework/spring-oxm -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-oxm</artifactId>
		</dependency>

	</dependencies>
</project>


Step 3: Create application.properties file under src/main/resources folder.

 

application.properties

logging.level.root=ERROR
logging.level.org.hibernate=ERROR

## H2 specific properties
spring.h2.console.enabled=true
spring.h2.console.path=/h2

spring.datasource.url=jdbc:h2:file:~/db/myOrg.db;DB_CLOSE_ON_EXIT=FALSE;DB_CLOSE_DELAY=-1;

spring.datasource.username=krishna
spring.datasource.password=password123

spring.datasource.driverClassName=org.h2.Driver

## JPA specific properties
# Creates the schema, destroying previous data.
spring.jpa.hibernate.ddl-auto=create-drop

spring.jpa.database-platform=org.hibernate.dialect.H2Dialect

spring.jpa.show-sql=false
spring.jpa.properties.hibernate.format_sql=false

## Database connection pooling properties
# Number of ms to wait before throwing an exception if no connection is available.
spring.datasource.max-wait=10000

# Maximum number of active connections that can be allocated from this pool at the same time.
spring.datasource.tomcat.max-active=10
spring.datasource.tomcat.max-idle=5
spring.datasource.tomcat.min-idle=3


Step 4: Create data.sql file under src/main/resources folder.

 

data.sql 

INSERT INTO employee (id, first_name, last_name) VALUES
  (1, 'Sowmya', 'Krishna'),
  (2, 'Mridhu', 'Latha'),
  (3, 'Gowthami','Prasad'),
  (4, 'Sailu', 'Venkat'),
  (5, 'chamu', 'krishna'),
  (6, 'Rama', 'Lakshman'),
  (7, 'Saranya', 'Nataraj'),
  (8, 'Suneetha', 'Surendra');

 

Step 5: Create package ‘com.sample.app.entity’ and define Employee.

 

Employee.java

package com.sample.app.entity;

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.Id;
import javax.persistence.Table;

@Entity
@Table(name = "employee")
public class Employee {

	@Id
	@GeneratedValue
	@Column(name = "id")
	private int id;

	@Column(name = "first_name")
	private String firstName;

	@Column(name = "last_name")
	private String lastName;

	public int getId() {
		return id;
	}

	public void setId(int id) {
		this.id = id;
	}

	public String getFirstName() {
		return firstName;
	}

	public void setFirstName(String firstName) {
		this.firstName = firstName;
	}

	public String getLastName() {
		return lastName;
	}

	public void setLastName(String lastName) {
		this.lastName = lastName;
	}

	@Override
	public String toString() {
		StringBuilder builder = new StringBuilder();
		builder.append("Employee [id=").append(id).append(", firstName=").append(firstName).append(", lastName=")
				.append(lastName).append("]");
		return builder.toString();
	}

}

 

Step 6: Create package ‘com.sample.app.mappers’ and define EmployeeRowMapper.

 

EmployeeRowMapper.java

package com.sample.app.mappers;

import java.sql.ResultSet;
import java.sql.SQLException;

import org.springframework.jdbc.core.RowMapper;

import com.sample.app.entity.Employee;

public class EmployeeRowMapper implements RowMapper<Employee> {

	@Override
	public Employee mapRow(ResultSet rs, int rowNum) throws SQLException {

		Employee emp = new Employee();

		emp.setId(rs.getInt("id"));
		emp.setFirstName(rs.getString("first_name"));
		emp.setLastName(rs.getString("last_name"));

		return emp;
	}

}

 

Step 7: Create package ‘com.sample.app.lineaggregators’ and define EmployeeJsonLineAggregator.

 

EmployeeJsonLineAggregator.java

 

package com.sample.app.lineaggregators;

import org.springframework.batch.item.file.transform.LineAggregator;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.sample.app.entity.Employee;

public class EmployeeJsonLineAggregator implements LineAggregator<Employee> {

	private ObjectMapper objectMapper = new ObjectMapper();

	{
		objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
	}

	@Override
	public String aggregate(Employee emp) {

		try {
			return objectMapper.writeValueAsString(emp);
		} catch (JsonProcessingException e) {
			throw new RuntimeException("Error Occured while serializing Employee instance : " + emp);
		}
	}

}

 

Step 8: Create package ‘com.sample.app.classifier’ and define EmployeeClassifier.

 

EmployeeClassifier.java 

package com.sample.app.classifier;

import org.springframework.batch.item.ItemWriter;
import org.springframework.classify.Classifier;

import com.sample.app.entity.Employee;

public class EmployeeClassifier implements Classifier<Employee, ItemWriter<? super Employee>> {

	private ItemWriter<Employee> evenItemWriter;
	private ItemWriter<Employee> oddItemWriter;

	public EmployeeClassifier(ItemWriter<Employee> evenItemWriter, ItemWriter<Employee> oddItemWriter) {
		this.evenItemWriter = evenItemWriter;
		this.oddItemWriter = oddItemWriter;
	}

	@Override
	public ItemWriter<? super Employee> classify(Employee employee) {

		int empId = employee.getId();

		if (empId % 2 == 0) {
			return evenItemWriter;
		}

		return oddItemWriter;

	}

}

 

Step 9: Create package ‘com.sample.app.configuration’ and define JobConfiguration.

 

JobConfiguration.java

 

package com.sample.app.configuration;

import java.util.HashMap;
import java.util.Map;

import javax.sql.DataSource;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.support.ClassifierCompositeItemWriter;
import org.springframework.batch.item.xml.StaxEventItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.oxm.xstream.XStreamMarshaller;
import org.springframework.transaction.PlatformTransactionManager;

import com.sample.app.classifier.EmployeeClassifier;
import com.sample.app.entity.Employee;
import com.sample.app.lineaggregators.EmployeeJsonLineAggregator;
import com.sample.app.mappers.EmployeeRowMapper;

@Configuration
@EnableBatchProcessing
public class JobConfiguration {
	@Autowired
	private JobBuilderFactory jobBuilderFactory;

	@Autowired
	private StepBuilderFactory stepBuilderFactory;

	@Autowired
	private DataSource dataSource;

	@Bean
	public JdbcCursorItemReader<Employee> jdbcCursorItemReader() {
		JdbcCursorItemReader<Employee> cursorItemReader = new JdbcCursorItemReader<>();

		cursorItemReader.setSql("SELECT id, first_name, last_name FROM employee ORDER BY first_name");
		cursorItemReader.setDataSource(dataSource);
		cursorItemReader.setRowMapper(new EmployeeRowMapper());
		return cursorItemReader;
	}

	@Bean
	public FlatFileItemWriter<Employee> jsonFileItemWriter() throws Exception {
		FlatFileItemWriter<Employee> flatFileItemWriter = new FlatFileItemWriter<>();

		flatFileItemWriter.setLineAggregator(new EmployeeJsonLineAggregator());
		String outFilePath = "/Users/Shared/result.json";

		flatFileItemWriter.setResource(new FileSystemResource(outFilePath));

		flatFileItemWriter.afterPropertiesSet();

		return flatFileItemWriter;
	}

	@Bean
	public StaxEventItemWriter<Employee> xmlFileItemWriter() throws Exception {
		StaxEventItemWriter<Employee> staxEventItemWriter = new StaxEventItemWriter<>();

		Map<String, Class> aliases = new HashMap<>();
		aliases.put("employee", Employee.class);

		XStreamMarshaller marshaller = new XStreamMarshaller();
		marshaller.setAliases(aliases);

		staxEventItemWriter.setRootTagName("employees");
		staxEventItemWriter.setMarshaller(marshaller);
		String outFilePath = "/Users/Shared/result.xml";
		staxEventItemWriter.setResource(new FileSystemResource(outFilePath));

		staxEventItemWriter.afterPropertiesSet();

		return staxEventItemWriter;
	}

	@Bean
	public ClassifierCompositeItemWriter<Employee> classifierCompositeItemWriter() throws Exception {
		ClassifierCompositeItemWriter<Employee> writer = new ClassifierCompositeItemWriter<>();

		EmployeeClassifier empClassifier = new EmployeeClassifier(xmlFileItemWriter(), jsonFileItemWriter());

		writer.setClassifier(empClassifier);

		return writer;
	}

	@Bean
	public Step step1() throws Exception {
		return this.stepBuilderFactory.get("step1")
				.<Employee, Employee>chunk(5)
				.reader(jdbcCursorItemReader())
				.writer(classifierCompositeItemWriter())
				.stream(xmlFileItemWriter())
				.stream(jsonFileItemWriter())
				.build();
	}

	@Bean
	public Job myJob(JobRepository jobRepository, PlatformTransactionManager platformTransactionManager)
			throws Exception {

		return jobBuilderFactory.get("My-First-Job").start(step1()).build();
	}

}

 

Step 10: Define main application class.

 

App.java

package com.sample.app;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@EnableBatchProcessing
@SpringBootApplication
public class App {

	public static void main(String[] args) {
		SpringApplication.run(App.class, args);
	}
}

 

Total project structure looks like below.

 


 

Run App.java

 

You can observe that result.xml and result.json files are created in /Users/Shared folder.

$ls /Users/Shared/result.*
/Users/Shared/result.json	/Users/Shared/result.xml
$
$
$cat /Users/Shared/result.json
{
  "id" : 3,
  "firstName" : "Gowthami",
  "lastName" : "Prasad"
}
{
  "id" : 7,
  "firstName" : "Saranya",
  "lastName" : "Nataraj"
}
{
  "id" : 1,
  "firstName" : "Sowmya",
  "lastName" : "Krishna"
}
{
  "id" : 5,
  "firstName" : "chamu",
  "lastName" : "krishna"
}
$
$
$
$cat /Users/Shared/result.xml 
<?xml version="1.0" encoding="UTF-8"?><employees><employee><id>2</id><firstName>Mridhu</firstName><lastName>Latha</lastName></employee><employee><id>6</id><firstName>Rama</firstName><lastName>Lakshman</lastName></employee><employee><id>4</id><firstName>Sailu</firstName><lastName>Venkat</lastName></employee><employee><id>8</id><firstName>Suneetha</firstName><lastName>Surendra</lastName></employee></employees>

 

You can download complete working application from this link.

https://github.com/harikrishna553/springboot/tree/master/batch/write-classified-data-to-destinations

 

 


Previous                                                    Next                                                    Home

No comments:

Post a Comment