In this post, I am going to explain how to write data to multiple destinations.
For example, as you see below image, application reads data from a database and write the information to .txt file, .json file and .xml file.
Step 1: Create ItemWriters to write content to .txt, .json and .xml files.
@Bean
public FlatFileItemWriter<Employee> txtFileItemWriter() throws Exception {
FlatFileItemWriter<Employee> flatFileItemWriter = new FlatFileItemWriter<>();
flatFileItemWriter.setLineAggregator(new PassThroughLineAggregator<Employee>());
String outFilePath = "/Users/Shared/result.txt";
flatFileItemWriter.setResource(new FileSystemResource(outFilePath));
flatFileItemWriter.afterPropertiesSet();
return flatFileItemWriter;
}
@Bean
public FlatFileItemWriter<Employee> jsonFileItemWriter() throws Exception {
FlatFileItemWriter<Employee> flatFileItemWriter = new FlatFileItemWriter<>();
flatFileItemWriter.setLineAggregator(new EmployeeJsonLineAggregator());
String outFilePath = "/Users/Shared/result.json";
flatFileItemWriter.setResource(new FileSystemResource(outFilePath));
flatFileItemWriter.afterPropertiesSet();
return flatFileItemWriter;
}
@Bean
public StaxEventItemWriter<Employee> xmlFileItemWriter() throws Exception {
StaxEventItemWriter<Employee> staxEventItemWriter = new StaxEventItemWriter<>();
Map<String, Class> aliases = new HashMap<>();
aliases.put("employee", Employee.class);
XStreamMarshaller marshaller = new XStreamMarshaller();
marshaller.setAliases(aliases);
staxEventItemWriter.setRootTagName("employees");
staxEventItemWriter.setMarshaller(marshaller);
String outFilePath = "/Users/Shared/result.xml";
staxEventItemWriter.setResource(new FileSystemResource(outFilePath));
staxEventItemWriter.afterPropertiesSet();
return staxEventItemWriter;
}
Step 2: Create 'CompositeItemWriter' using ItemWriters
created in step 1.
@Bean
public CompositeItemWriter<Employee> compositeItemWriter() throws Exception {
CompositeItemWriter<Employee> compositeItemWriter = new CompositeItemWriter<>();
List<ItemWriter<? super Employee>> itemWriters = new ArrayList<>();
itemWriters.add(txtFileItemWriter());
itemWriters.add(jsonFileItemWriter());
itemWriters.add(xmlFileItemWriter());
compositeItemWriter.setDelegates(itemWriters);
compositeItemWriter.afterPropertiesSet();
return compositeItemWriter;
}
Now use this ‘CompositeItemWriter’ to write data.
Find the below working application.
Step 1: Create new maven project ‘write-to-multiple-destinations’.
Step 2: Update pom.xml with maven dependencies.
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sample.app</groupId>
<artifactId>write-to-multiple-destinations</artifactId>
<version>1</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.6.RELEASE</version>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.springframework.batch/spring-batch-core -->
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
</dependency>
<dependency>
<groupId>com.thoughtworks.xstream</groupId>
<artifactId>xstream</artifactId>
<version>1.4.11.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework/spring-oxm -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-oxm</artifactId>
</dependency>
</dependencies>
</project>
Step 3: Create application.properties file under src/main/resources folder.
application.properties
logging.level.root=ERROR logging.level.org.hibernate=ERROR ## H2 specific properties spring.h2.console.enabled=true spring.h2.console.path=/h2 spring.datasource.url=jdbc:h2:file:~/db/myOrg.db;DB_CLOSE_ON_EXIT=FALSE;DB_CLOSE_DELAY=-1; spring.datasource.username=krishna spring.datasource.password=password123 spring.datasource.driverClassName=org.h2.Driver ## JPA specific properties # Creates the schema, destroying previous data. spring.jpa.hibernate.ddl-auto=create-drop spring.jpa.database-platform=org.hibernate.dialect.H2Dialect spring.jpa.show-sql=false spring.jpa.properties.hibernate.format_sql=false ## Database connection pooling properties # Number of ms to wait before throwing an exception if no connection is available. spring.datasource.max-wait=10000 # Maximum number of active connections that can be allocated from this pool at the same time. spring.datasource.tomcat.max-active=10 spring.datasource.tomcat.max-idle=5 spring.datasource.tomcat.min-idle=3
Step 4: Create data.sql file under src/main/resources folder.
data.sql
INSERT INTO employee (id, first_name, last_name) VALUES
(1, 'Sowmya', 'Krishna'),
(2, 'Mridhu', 'Latha'),
(3, 'Gowthami','Prasad'),
(4, 'Sailu', 'Venkat'),
(5, 'chamu', 'krishna'),
(6, 'Rama', 'Lakshman'),
(7, 'Saranya', 'Nataraj'),
(8, 'Suneetha', 'Surendra');
Step 5: Create package ‘com.sample.app.entity’ and define Employee class.
Employee.java
package com.sample.app.entity;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.Id;
import javax.persistence.Table;
@Entity
@Table(name = "employee")
public class Employee {
@Id
@GeneratedValue
@Column(name = "id")
private int id;
@Column(name = "first_name")
private String firstName;
@Column(name = "last_name")
private String lastName;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("Employee [id=").append(id).append(", firstName=").append(firstName).append(", lastName=")
.append(lastName).append("]");
return builder.toString();
}
}
Step 6: Create package ‘com.sample.app.mappers’ and define EmployeeRowMapper.
EmployeeRowMapper.java
package com.sample.app.mappers;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.springframework.jdbc.core.RowMapper;
import com.sample.app.entity.Employee;
public class EmployeeRowMapper implements RowMapper<Employee> {
@Override
public Employee mapRow(ResultSet rs, int rowNum) throws SQLException {
Employee emp = new Employee();
emp.setId(rs.getInt("id"));
emp.setFirstName(rs.getString("first_name"));
emp.setLastName(rs.getString("last_name"));
return emp;
}
}
Step 7: Create package ‘com.sample.app.lineaggregators’ and define EmployeeJsonLineAggregator.
EmployeeJsonLineAggregator.java
package com.sample.app.lineaggregators;
import org.springframework.batch.item.file.transform.LineAggregator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.sample.app.entity.Employee;
public class EmployeeJsonLineAggregator implements LineAggregator<Employee> {
private ObjectMapper objectMapper = new ObjectMapper();
{
objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
}
@Override
public String aggregate(Employee emp) {
try {
return objectMapper.writeValueAsString(emp);
} catch (JsonProcessingException e) {
throw new RuntimeException("Error Occured while serializing Employee instance : " + emp);
}
}
}
Step 8: Create package ‘com.sample.app.configuration’ and define JobConfiguration.
JobConfiguration.java
package com.sample.app.configuration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.sql.DataSource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.transform.PassThroughLineAggregator;
import org.springframework.batch.item.support.CompositeItemWriter;
import org.springframework.batch.item.xml.StaxEventItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.oxm.xstream.XStreamMarshaller;
import org.springframework.transaction.PlatformTransactionManager;
import com.sample.app.entity.Employee;
import com.sample.app.lineaggregators.EmployeeJsonLineAggregator;
import com.sample.app.mappers.EmployeeRowMapper;
@Configuration
@EnableBatchProcessing
public class JobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
@Bean
public JdbcCursorItemReader<Employee> jdbcCursorItemReader() {
JdbcCursorItemReader<Employee> cursorItemReader = new JdbcCursorItemReader<>();
cursorItemReader.setSql("SELECT id, first_name, last_name FROM employee ORDER BY first_name");
cursorItemReader.setDataSource(dataSource);
cursorItemReader.setRowMapper(new EmployeeRowMapper());
return cursorItemReader;
}
@Bean
public FlatFileItemWriter<Employee> txtFileItemWriter() throws Exception {
FlatFileItemWriter<Employee> flatFileItemWriter = new FlatFileItemWriter<>();
flatFileItemWriter.setLineAggregator(new PassThroughLineAggregator<Employee>());
String outFilePath = "/Users/Shared/result.txt";
flatFileItemWriter.setResource(new FileSystemResource(outFilePath));
flatFileItemWriter.afterPropertiesSet();
return flatFileItemWriter;
}
@Bean
public FlatFileItemWriter<Employee> jsonFileItemWriter() throws Exception {
FlatFileItemWriter<Employee> flatFileItemWriter = new FlatFileItemWriter<>();
flatFileItemWriter.setLineAggregator(new EmployeeJsonLineAggregator());
String outFilePath = "/Users/Shared/result.json";
flatFileItemWriter.setResource(new FileSystemResource(outFilePath));
flatFileItemWriter.afterPropertiesSet();
return flatFileItemWriter;
}
@Bean
public StaxEventItemWriter<Employee> xmlFileItemWriter() throws Exception {
StaxEventItemWriter<Employee> staxEventItemWriter = new StaxEventItemWriter<>();
Map<String, Class> aliases = new HashMap<>();
aliases.put("employee", Employee.class);
XStreamMarshaller marshaller = new XStreamMarshaller();
marshaller.setAliases(aliases);
staxEventItemWriter.setRootTagName("employees");
staxEventItemWriter.setMarshaller(marshaller);
String outFilePath = "/Users/Shared/result.xml";
staxEventItemWriter.setResource(new FileSystemResource(outFilePath));
staxEventItemWriter.afterPropertiesSet();
return staxEventItemWriter;
}
@Bean
public CompositeItemWriter<Employee> compositeItemWriter() throws Exception {
CompositeItemWriter<Employee> compositeItemWriter = new CompositeItemWriter<>();
List<ItemWriter<? super Employee>> itemWriters = new ArrayList<>();
itemWriters.add(txtFileItemWriter());
itemWriters.add(jsonFileItemWriter());
itemWriters.add(xmlFileItemWriter());
compositeItemWriter.setDelegates(itemWriters);
compositeItemWriter.afterPropertiesSet();
return compositeItemWriter;
}
@Bean
public Step step1() throws Exception {
return this.stepBuilderFactory.get("step1").<Employee, Employee>chunk(5).reader(jdbcCursorItemReader())
.writer(compositeItemWriter()).build();
}
@Bean
public Job myJob(JobRepository jobRepository, PlatformTransactionManager platformTransactionManager)
throws Exception {
return jobBuilderFactory.get("My-First-Job").start(step1()).build();
}
}
Step 9: Define App.java
App.java
package com.sample.app;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@EnableBatchProcessing
@SpringBootApplication
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
}
Total project structure looks like below.
Run App.java.
You can see that result.txt, result.json and result.xml file are created /Users/Shared folder.
$ls /Users/Shared/result.* /Users/Shared/result.json /Users/Shared/result.txt /Users/Shared/result.xml
You can download complete working application from this link.
https://github.com/harikrishna553/springboot/tree/master/batch/write-to-multiple-destinations
No comments:
Post a Comment