BAEL-3298 Adding tests and refactoring

This commit is contained in:
Shubhra
2020-02-09 18:16:20 +05:30
parent 49f97ae586
commit 7e05fcbe3f
7 changed files with 230 additions and 40 deletions
@@ -18,6 +18,8 @@ public class App {
final AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
context.register(SpringConfig.class);
context.register(SpringBatchConfig.class);
context.register(SpringBatchRetryConfig.class);
context.refresh();
// Spring xml config
@@ -1,17 +1,14 @@
package org.baeldung.batch;
import org.apache.http.conn.ConnectTimeoutException;
import org.baeldung.batch.model.Transaction;
import org.baeldung.batch.service.CustomItemProcessor;
import org.baeldung.batch.service.CustomSkipPolicy;
import org.baeldung.batch.service.MissingUsernameException;
import org.baeldung.batch.service.NegativeAmountException;
import org.baeldung.batch.service.RecordFieldSetMapper;
import org.baeldung.batch.service.RetryItemProcessor;
import org.baeldung.batch.service.SkippingItemProcessor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
@@ -26,15 +23,12 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
import org.springframework.dao.DeadlockLoserDataAccessException;
import org.springframework.oxm.Marshaller;
import org.springframework.oxm.jaxb.Jaxb2Marshaller;
import java.text.ParseException;
@Configuration
@EnableBatchProcessing
public class SpringBatchConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@@ -75,11 +69,6 @@ public class SpringBatchConfig {
return new SkippingItemProcessor();
}
@Bean
public ItemProcessor<Transaction, Transaction> retryItemProcessor() {
return new RetryItemProcessor();
}
@Bean
public ItemWriter<Transaction> itemWriter(Marshaller marshaller) {
StaxEventItemWriter<Transaction> itemWriter = new StaxEventItemWriter<>();
@@ -128,22 +117,6 @@ public class SpringBatchConfig {
.build();
}
@Bean
public Step retryStep(@Qualifier("retryItemProcessor") ItemProcessor<Transaction, Transaction> processor,
ItemWriter<Transaction> writer) throws ParseException {
return stepBuilderFactory
.get("retryStep")
.<Transaction, Transaction>chunk(10)
.reader(itemReader(inputCsv))
.processor(processor)
.writer(writer)
.faultTolerant()
.retryLimit(3)
.retry(ConnectTimeoutException.class)
.retry(DeadlockLoserDataAccessException.class)
.build();
}
@Bean(name = "skippingBatchJob")
public Job skippingJob(@Qualifier("skippingStep") Step skippingStep) {
return jobBuilderFactory
@@ -152,14 +125,6 @@ public class SpringBatchConfig {
.build();
}
@Bean(name = "retryBatchJob")
public Job retryJob(@Qualifier("retryStep") Step retryStep) {
return jobBuilderFactory
.get("retryBatchJob")
.start(retryStep)
.build();
}
@Bean
public Step skipPolicyStep(@Qualifier("skippingItemProcessor") ItemProcessor<Transaction, Transaction> processor,
ItemWriter<Transaction> writer) throws ParseException {
@@ -0,0 +1,102 @@
package org.baeldung.batch;
import org.apache.http.conn.ConnectTimeoutException;
import org.baeldung.batch.model.Transaction;
import org.baeldung.batch.service.RecordFieldSetMapper;
import org.baeldung.batch.service.RetryItemProcessor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.xml.StaxEventItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
import org.springframework.dao.DeadlockLoserDataAccessException;
import org.springframework.oxm.Marshaller;
import org.springframework.oxm.jaxb.Jaxb2Marshaller;
import java.text.ParseException;
@Configuration
@EnableBatchProcessing
public class SpringBatchRetryConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Value("input/record.csv")
private Resource inputCsv;
@Value("file:xml/retryOutput.xml")
private Resource outputXml;
public ItemReader<Transaction> itemReader(Resource inputData) throws UnexpectedInputException, ParseException {
FlatFileItemReader<Transaction> reader = new FlatFileItemReader<>();
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
String[] tokens = { "username", "userid", "transactiondate", "amount" };
tokenizer.setNames(tokens);
reader.setResource(inputData);
DefaultLineMapper<Transaction> lineMapper = new DefaultLineMapper<>();
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(new RecordFieldSetMapper());
reader.setLinesToSkip(1);
reader.setLineMapper(lineMapper);
return reader;
}
@Bean
public ItemProcessor<Transaction, Transaction> retryItemProcessor() {
return new RetryItemProcessor();
}
@Bean
public ItemWriter<Transaction> itemWriter(Marshaller marshaller) {
StaxEventItemWriter<Transaction> itemWriter = new StaxEventItemWriter<>();
itemWriter.setMarshaller(marshaller);
itemWriter.setRootTagName("transactionRecord");
itemWriter.setResource(outputXml);
return itemWriter;
}
@Bean
public Marshaller marshaller() {
Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
marshaller.setClassesToBeBound(Transaction.class);
return marshaller;
}
@Bean
public Step retryStep(
@Qualifier("retryItemProcessor")
ItemProcessor<Transaction, Transaction> processor, ItemWriter<Transaction> writer) throws ParseException {
return stepBuilderFactory.get("retryStep").<Transaction, Transaction>chunk(10).reader(itemReader(inputCsv))
.processor(processor)
.writer(writer)
.faultTolerant()
.retryLimit(3)
.retry(ConnectTimeoutException.class)
.retry(DeadlockLoserDataAccessException.class)
.build();
}
@Bean(name = "retryBatchJob")
public Job retryJob(
@Qualifier("retryStep")
Step retryStep) {
return jobBuilderFactory.get("retryBatchJob").start(retryStep).build();
}
}
@@ -5,7 +5,10 @@ import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import org.baeldung.batch.model.Transaction;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
@@ -16,18 +19,29 @@ public class RetryItemProcessor implements ItemProcessor<Transaction, Transactio
private static final Logger LOGGER = LoggerFactory.getLogger(RetryItemProcessor.class);
private CloseableHttpClient client;
public RetryItemProcessor() {
final RequestConfig config = RequestConfig.custom().setConnectTimeout(2 * 1000).build();
client = HttpClientBuilder.create().setDefaultRequestConfig(config).build();
}
@Override
public Transaction process(Transaction transaction) throws IOException {
public Transaction process(Transaction transaction) throws IOException, JSONException {
LOGGER.info("Attempting to process user with id={}", transaction.getUserId());
System.out.println("Attempting to process user with id=" + transaction.getUserId());
HttpResponse response = fetchMoreUserDetails(transaction.getUserId());
//parse user's age and postCode from response and update transaction
String result = EntityUtils.toString(response.getEntity());
JSONObject userObject = new JSONObject(result);
transaction.setAge(Integer.parseInt(userObject.getString("age")));
transaction.setPostCode(userObject.getString("postCode"));
return transaction;
}
private HttpResponse fetchMoreUserDetails(int id) throws IOException {
final RequestConfig config = RequestConfig.custom().setConnectTimeout(2 * 1000).build();
final CloseableHttpClient client = HttpClientBuilder.create().setDefaultRequestConfig(config).build();
final HttpGet request = new HttpGet("http://www.baeldung.com:81/user/" + id);
return client.execute(request);
}