BAEL-4706 - Spring Boot with Spring Batch (#10292)

Co-authored-by: Jonathan Cook <jcook@sciops.esa.int>
This commit is contained in:
Jonathan Cook
2020-12-01 08:18:26 +01:00
committed by GitHub
parent eed264e2e4
commit 06bebff4cb
12 changed files with 332 additions and 1 deletions
@@ -0,0 +1,81 @@
package com.baeldung.batch;
import javax.sql.DataSource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Value("${file.input}")
private String fileInput;
@Bean
public FlatFileItemReader<Coffee> reader() {
return new FlatFileItemReaderBuilder<Coffee>().name("coffeeItemReader")
.resource(new ClassPathResource(fileInput))
.delimited()
.names(new String[] { "brand", "origin", "characteristics" })
.fieldSetMapper(new BeanWrapperFieldSetMapper<Coffee>() {{
setTargetType(Coffee.class);
}})
.build();
}
@Bean
public CoffeeItemProcessor processor() {
return new CoffeeItemProcessor();
}
@Bean
public JdbcBatchItemWriter<Coffee> writer(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Coffee>().itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO coffee (brand, origin, characteristics) VALUES (:brand, :origin, :characteristics)")
.dataSource(dataSource)
.build();
}
@Bean
public Job importUserJob(JobCompletionNotificationListener listener, Step step1) {
return jobBuilderFactory.get("importUserJob")
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(step1)
.end()
.build();
}
@Bean
public Step step1(JdbcBatchItemWriter<Coffee> writer) {
return stepBuilderFactory.get("step1")
.<Coffee, Coffee> chunk(10)
.reader(reader())
.processor(processor())
.writer(writer)
.build();
}
}
@@ -0,0 +1,47 @@
package com.baeldung.batch;
public class Coffee {
private String brand;
private String origin;
private String characteristics;
public Coffee() {
}
public Coffee(String brand, String origin, String characteristics) {
this.brand = brand;
this.origin = origin;
this.characteristics = characteristics;
}
public String getBrand() {
return brand;
}
public void setBrand(String brand) {
this.brand = brand;
}
public String getOrigin() {
return origin;
}
public void setOrigin(String origin) {
this.origin = origin;
}
public String getCharacteristics() {
return characteristics;
}
public void setCharacteristics(String characteristics) {
this.characteristics = characteristics;
}
@Override
public String toString() {
return "Coffee [brand=" + getBrand() + ", origin=" + getOrigin() + ", characteristics=" + getCharacteristics() + "]";
}
}
@@ -0,0 +1,24 @@
package com.baeldung.batch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
public class CoffeeItemProcessor implements ItemProcessor<Coffee, Coffee> {
private static final Logger LOGGER = LoggerFactory.getLogger(CoffeeItemProcessor.class);
@Override
public Coffee process(final Coffee coffee) throws Exception {
String brand = coffee.getBrand().toUpperCase();
String origin = coffee.getOrigin().toUpperCase();
String chracteristics = coffee.getCharacteristics().toUpperCase();
Coffee transformedCoffee = new Coffee(brand, origin, chracteristics);
LOGGER.info("Converting ( {} ) into ( {} )", coffee, transformedCoffee);
return transformedCoffee;
}
}
@@ -0,0 +1,34 @@
package com.baeldung.batch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
@Component
public class JobCompletionNotificationListener extends JobExecutionListenerSupport {
private static final Logger LOGGER = LoggerFactory.getLogger(JobCompletionNotificationListener.class);
private final JdbcTemplate jdbcTemplate;
@Autowired
public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void afterJob(JobExecution jobExecution) {
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
LOGGER.info("!!! JOB FINISHED! Time to verify the results");
String query = "SELECT brand, origin, characteristics FROM coffee";
jdbcTemplate.query(query, (rs, row) -> new Coffee(rs.getString(1), rs.getString(2), rs.getString(3)))
.forEach(coffee -> LOGGER.info("Found < {} > in the database.", coffee));
}
}
}
@@ -0,0 +1,13 @@
package com.baeldung.batch;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringBootBatchProcessingApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBootBatchProcessingApplication.class, args);
}
}