Wednesday, 16 December 2020

Spring Batch: Write to Multiple Destinations

In this post, I am going to explain how to write data to multiple destinations.

 

For example, as you see below image, application reads data from a database and write the information to .txt file, .json file and .xml file.

 


 

Step 1: Create ItemWriters to write content to .txt, .json and .xml files.

@Bean
public FlatFileItemWriter<Employee> txtFileItemWriter() throws Exception {
	FlatFileItemWriter<Employee> flatFileItemWriter = new FlatFileItemWriter<>();

	flatFileItemWriter.setLineAggregator(new PassThroughLineAggregator<Employee>());
	String outFilePath = "/Users/Shared/result.txt";

	flatFileItemWriter.setResource(new FileSystemResource(outFilePath));

	flatFileItemWriter.afterPropertiesSet();

	return flatFileItemWriter;
}

@Bean
public FlatFileItemWriter<Employee> jsonFileItemWriter() throws Exception {
	FlatFileItemWriter<Employee> flatFileItemWriter = new FlatFileItemWriter<>();

	flatFileItemWriter.setLineAggregator(new EmployeeJsonLineAggregator());
	String outFilePath = "/Users/Shared/result.json";

	flatFileItemWriter.setResource(new FileSystemResource(outFilePath));

	flatFileItemWriter.afterPropertiesSet();

	return flatFileItemWriter;
}

@Bean
public StaxEventItemWriter<Employee> xmlFileItemWriter() throws Exception {
	StaxEventItemWriter<Employee> staxEventItemWriter = new StaxEventItemWriter<>();

	Map<String, Class> aliases = new HashMap<>();
	aliases.put("employee", Employee.class);

	XStreamMarshaller marshaller = new XStreamMarshaller();
	marshaller.setAliases(aliases);

	staxEventItemWriter.setRootTagName("employees");
	staxEventItemWriter.setMarshaller(marshaller);
	String outFilePath = "/Users/Shared/result.xml";
	staxEventItemWriter.setResource(new FileSystemResource(outFilePath));

	staxEventItemWriter.afterPropertiesSet();

	return staxEventItemWriter;
}


Step 2: Create 'CompositeItemWriter' using ItemWriters created in step 1.

@Bean
public CompositeItemWriter<Employee> compositeItemWriter() throws Exception {
	CompositeItemWriter<Employee> compositeItemWriter = new CompositeItemWriter<>();

	List<ItemWriter<? super Employee>> itemWriters = new ArrayList<>();

	itemWriters.add(txtFileItemWriter());
	itemWriters.add(jsonFileItemWriter());
	itemWriters.add(xmlFileItemWriter());

	compositeItemWriter.setDelegates(itemWriters);
	compositeItemWriter.afterPropertiesSet();

	return compositeItemWriter;

}


Now use this ‘CompositeItemWriter’ to write data.

 

Find the below working application.

 

Step 1: Create new maven project ‘write-to-multiple-destinations’.

 

Step 2: Update pom.xml with maven dependencies.

 

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.sample.app</groupId>
	<artifactId>write-to-multiple-destinations</artifactId>
	<version>1</version>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.1.6.RELEASE</version>
	</parent>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	</properties>

	<dependencies>

		<!-- https://mvnrepository.com/artifact/org.springframework.batch/spring-batch-core -->
		<dependency>
			<groupId>org.springframework.batch</groupId>
			<artifactId>spring-batch-core</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-jpa</artifactId>
		</dependency>

		<dependency>
			<groupId>com.h2database</groupId>
			<artifactId>h2</artifactId>
		</dependency>


		<dependency>
			<groupId>com.thoughtworks.xstream</groupId>
			<artifactId>xstream</artifactId>
			<version>1.4.11.1</version>
		</dependency>

		<!-- https://mvnrepository.com/artifact/org.springframework/spring-oxm -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-oxm</artifactId>
		</dependency>

	</dependencies>
</project>


Step 3: Create application.properties file under src/main/resources folder.

 

application.properties

logging.level.root=ERROR
logging.level.org.hibernate=ERROR

## H2 specific properties
spring.h2.console.enabled=true
spring.h2.console.path=/h2

spring.datasource.url=jdbc:h2:file:~/db/myOrg.db;DB_CLOSE_ON_EXIT=FALSE;DB_CLOSE_DELAY=-1;

spring.datasource.username=krishna
spring.datasource.password=password123

spring.datasource.driverClassName=org.h2.Driver

## JPA specific properties
# Creates the schema, destroying previous data.
spring.jpa.hibernate.ddl-auto=create-drop

spring.jpa.database-platform=org.hibernate.dialect.H2Dialect

spring.jpa.show-sql=false
spring.jpa.properties.hibernate.format_sql=false

## Database connection pooling properties
# Number of ms to wait before throwing an exception if no connection is available.
spring.datasource.max-wait=10000

# Maximum number of active connections that can be allocated from this pool at the same time.
spring.datasource.tomcat.max-active=10
spring.datasource.tomcat.max-idle=5
spring.datasource.tomcat.min-idle=3


Step 4: Create data.sql file under src/main/resources folder.

 

data.sql

INSERT INTO employee (id, first_name, last_name) VALUES
  (1, 'Sowmya', 'Krishna'),
  (2, 'Mridhu', 'Latha'),
  (3, 'Gowthami','Prasad'),
  (4, 'Sailu', 'Venkat'),
  (5, 'chamu', 'krishna'),
  (6, 'Rama', 'Lakshman'),
  (7, 'Saranya', 'Nataraj'),
  (8, 'Suneetha', 'Surendra');


Step 5: Create package ‘com.sample.app.entity’ and define Employee class.

 

Employee.java

package com.sample.app.entity;

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.Id;
import javax.persistence.Table;

@Entity
@Table(name = "employee")
public class Employee {

	@Id
	@GeneratedValue
	@Column(name = "id")
	private int id;

	@Column(name = "first_name")
	private String firstName;

	@Column(name = "last_name")
	private String lastName;

	public int getId() {
		return id;
	}

	public void setId(int id) {
		this.id = id;
	}

	public String getFirstName() {
		return firstName;
	}

	public void setFirstName(String firstName) {
		this.firstName = firstName;
	}

	public String getLastName() {
		return lastName;
	}

	public void setLastName(String lastName) {
		this.lastName = lastName;
	}

	@Override
	public String toString() {
		StringBuilder builder = new StringBuilder();
		builder.append("Employee [id=").append(id).append(", firstName=").append(firstName).append(", lastName=")
				.append(lastName).append("]");
		return builder.toString();
	}

}


Step 6: Create package ‘com.sample.app.mappers’ and define EmployeeRowMapper.

 

EmployeeRowMapper.java

package com.sample.app.mappers;

import java.sql.ResultSet;
import java.sql.SQLException;

import org.springframework.jdbc.core.RowMapper;

import com.sample.app.entity.Employee;

public class EmployeeRowMapper implements RowMapper<Employee> {

	@Override
	public Employee mapRow(ResultSet rs, int rowNum) throws SQLException {

		Employee emp = new Employee();

		emp.setId(rs.getInt("id"));
		emp.setFirstName(rs.getString("first_name"));
		emp.setLastName(rs.getString("last_name"));

		return emp;
	}

}


Step 7: Create package ‘com.sample.app.lineaggregators’ and define EmployeeJsonLineAggregator.

 

EmployeeJsonLineAggregator.java

package com.sample.app.lineaggregators;

import org.springframework.batch.item.file.transform.LineAggregator;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.sample.app.entity.Employee;


public class EmployeeJsonLineAggregator implements LineAggregator<Employee> {

	private ObjectMapper objectMapper = new ObjectMapper();

	{
		objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
	}

	@Override
	public String aggregate(Employee emp) {

		try {
			return objectMapper.writeValueAsString(emp);
		} catch (JsonProcessingException e) {
			throw new RuntimeException("Error Occured while serializing Employee instance : " + emp);
		}
	}

}


Step 8: Create package ‘com.sample.app.configuration’ and define JobConfiguration.

 

JobConfiguration.java

package com.sample.app.configuration;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import javax.sql.DataSource;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.transform.PassThroughLineAggregator;
import org.springframework.batch.item.support.CompositeItemWriter;
import org.springframework.batch.item.xml.StaxEventItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.oxm.xstream.XStreamMarshaller;
import org.springframework.transaction.PlatformTransactionManager;

import com.sample.app.entity.Employee;
import com.sample.app.lineaggregators.EmployeeJsonLineAggregator;
import com.sample.app.mappers.EmployeeRowMapper;

@Configuration
@EnableBatchProcessing
public class JobConfiguration {
	@Autowired
	private JobBuilderFactory jobBuilderFactory;

	@Autowired
	private StepBuilderFactory stepBuilderFactory;

	@Autowired
	private DataSource dataSource;

	@Bean
	public JdbcCursorItemReader<Employee> jdbcCursorItemReader() {
		JdbcCursorItemReader<Employee> cursorItemReader = new JdbcCursorItemReader<>();

		cursorItemReader.setSql("SELECT id, first_name, last_name FROM employee ORDER BY first_name");
		cursorItemReader.setDataSource(dataSource);
		cursorItemReader.setRowMapper(new EmployeeRowMapper());
		return cursorItemReader;
	}

	@Bean
	public FlatFileItemWriter<Employee> txtFileItemWriter() throws Exception {
		FlatFileItemWriter<Employee> flatFileItemWriter = new FlatFileItemWriter<>();

		flatFileItemWriter.setLineAggregator(new PassThroughLineAggregator<Employee>());
		String outFilePath = "/Users/Shared/result.txt";

		flatFileItemWriter.setResource(new FileSystemResource(outFilePath));

		flatFileItemWriter.afterPropertiesSet();

		return flatFileItemWriter;
	}

	@Bean
	public FlatFileItemWriter<Employee> jsonFileItemWriter() throws Exception {
		FlatFileItemWriter<Employee> flatFileItemWriter = new FlatFileItemWriter<>();

		flatFileItemWriter.setLineAggregator(new EmployeeJsonLineAggregator());
		String outFilePath = "/Users/Shared/result.json";

		flatFileItemWriter.setResource(new FileSystemResource(outFilePath));

		flatFileItemWriter.afterPropertiesSet();

		return flatFileItemWriter;
	}

	@Bean
	public StaxEventItemWriter<Employee> xmlFileItemWriter() throws Exception {
		StaxEventItemWriter<Employee> staxEventItemWriter = new StaxEventItemWriter<>();

		Map<String, Class> aliases = new HashMap<>();
		aliases.put("employee", Employee.class);

		XStreamMarshaller marshaller = new XStreamMarshaller();
		marshaller.setAliases(aliases);

		staxEventItemWriter.setRootTagName("employees");
		staxEventItemWriter.setMarshaller(marshaller);
		String outFilePath = "/Users/Shared/result.xml";
		staxEventItemWriter.setResource(new FileSystemResource(outFilePath));

		staxEventItemWriter.afterPropertiesSet();

		return staxEventItemWriter;
	}

	@Bean
	public CompositeItemWriter<Employee> compositeItemWriter() throws Exception {
		CompositeItemWriter<Employee> compositeItemWriter = new CompositeItemWriter<>();

		List<ItemWriter<? super Employee>> itemWriters = new ArrayList<>();

		itemWriters.add(txtFileItemWriter());
		itemWriters.add(jsonFileItemWriter());
		itemWriters.add(xmlFileItemWriter());

		compositeItemWriter.setDelegates(itemWriters);
		compositeItemWriter.afterPropertiesSet();

		return compositeItemWriter;
	}

	@Bean
	public Step step1() throws Exception {
		return this.stepBuilderFactory.get("step1").<Employee, Employee>chunk(5).reader(jdbcCursorItemReader())
				.writer(compositeItemWriter()).build();
	}

	@Bean
	public Job myJob(JobRepository jobRepository, PlatformTransactionManager platformTransactionManager)
			throws Exception {

		return jobBuilderFactory.get("My-First-Job").start(step1()).build();
	}

}


Step 9: Define App.java

App.java

package com.sample.app;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@EnableBatchProcessing
@SpringBootApplication
public class App {

	public static void main(String[] args) {
		SpringApplication.run(App.class, args);
	}
}


Total project structure looks like below.





Run App.java.

 

You can see that result.txt, result.json and result.xml file are created /Users/Shared folder.


$ls /Users/Shared/result.*
/Users/Shared/result.json	/Users/Shared/result.txt	/Users/Shared/result.xml



You can download complete working application from this link.

https://github.com/harikrishna553/springboot/tree/master/batch/write-to-multiple-destinations

 









Previous                                                    Next                                                    Home

No comments:

Post a Comment