BAEL-3298 Spring batch retry job (#8690)
* BAEL-3298 Spring batch retry job * BAEL-3298 Adding tests and refactoring * BAEL-3298 Some more refactoring * Review comments * Some refactoring * Refactoring RetryItemProcessor * Minor refactoring in test class * BAEL-3298 Some more refactoring * BAEL-3298 Using @MockBean * BAEL-3298 minor update * BAEL-3298 Updating names of testcases * updating id to 9999 * Updating id to 9999 * Updating id to 9999
This commit is contained in:
committed by
GitHub
parent
b609d50214
commit
9da978ec65
@@ -18,6 +18,8 @@ public class App {
|
||||
final AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
|
||||
context.register(SpringConfig.class);
|
||||
context.register(SpringBatchConfig.class);
|
||||
context.register(SpringBatchRetryConfig.class);
|
||||
|
||||
context.refresh();
|
||||
|
||||
// Spring xml config
|
||||
@@ -26,6 +28,8 @@ public class App {
|
||||
runJob(context, "firstBatchJob");
|
||||
runJob(context, "skippingBatchJob");
|
||||
runJob(context, "skipPolicyBatchJob");
|
||||
runJob(context, "retryBatchJob");
|
||||
|
||||
}
|
||||
|
||||
private static void runJob(AnnotationConfigApplicationContext context, String batchJobName) {
|
||||
|
||||
@@ -0,0 +1,117 @@
|
||||
package org.baeldung.batch;
|
||||
|
||||
import org.apache.http.client.config.RequestConfig;
|
||||
import org.apache.http.conn.ConnectTimeoutException;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.apache.http.impl.client.HttpClientBuilder;
|
||||
import org.baeldung.batch.model.Transaction;
|
||||
import org.baeldung.batch.service.RecordFieldSetMapper;
|
||||
import org.baeldung.batch.service.RetryItemProcessor;
|
||||
import org.springframework.batch.core.Job;
|
||||
import org.springframework.batch.core.Step;
|
||||
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
|
||||
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
|
||||
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
|
||||
import org.springframework.batch.item.ItemProcessor;
|
||||
import org.springframework.batch.item.ItemReader;
|
||||
import org.springframework.batch.item.ItemWriter;
|
||||
import org.springframework.batch.item.file.FlatFileItemReader;
|
||||
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
|
||||
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
|
||||
import org.springframework.batch.item.xml.StaxEventItemWriter;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.core.io.Resource;
|
||||
import org.springframework.dao.DeadlockLoserDataAccessException;
|
||||
import org.springframework.oxm.Marshaller;
|
||||
import org.springframework.oxm.jaxb.Jaxb2Marshaller;
|
||||
|
||||
import java.text.ParseException;
|
||||
|
||||
@Configuration
|
||||
@EnableBatchProcessing
|
||||
public class SpringBatchRetryConfig {
|
||||
|
||||
private static final String[] tokens = { "username", "userid", "transactiondate", "amount" };
|
||||
private static final int TWO_SECONDS = 2000;
|
||||
|
||||
@Autowired
|
||||
private JobBuilderFactory jobBuilderFactory;
|
||||
|
||||
@Autowired
|
||||
private StepBuilderFactory stepBuilderFactory;
|
||||
|
||||
@Value("input/recordRetry.csv")
|
||||
private Resource inputCsv;
|
||||
|
||||
@Value("file:xml/retryOutput.xml")
|
||||
private Resource outputXml;
|
||||
|
||||
public ItemReader<Transaction> itemReader(Resource inputData) throws ParseException {
|
||||
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
|
||||
tokenizer.setNames(tokens);
|
||||
DefaultLineMapper<Transaction> lineMapper = new DefaultLineMapper<>();
|
||||
lineMapper.setLineTokenizer(tokenizer);
|
||||
lineMapper.setFieldSetMapper(new RecordFieldSetMapper());
|
||||
FlatFileItemReader<Transaction> reader = new FlatFileItemReader<>();
|
||||
reader.setResource(inputData);
|
||||
reader.setLinesToSkip(1);
|
||||
reader.setLineMapper(lineMapper);
|
||||
return reader;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public CloseableHttpClient closeableHttpClient() {
|
||||
final RequestConfig config = RequestConfig.custom()
|
||||
.setConnectTimeout(TWO_SECONDS)
|
||||
.build();
|
||||
return HttpClientBuilder.create().setDefaultRequestConfig(config).build();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ItemProcessor<Transaction, Transaction> retryItemProcessor() {
|
||||
return new RetryItemProcessor();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ItemWriter<Transaction> itemWriter(Marshaller marshaller) {
|
||||
StaxEventItemWriter<Transaction> itemWriter = new StaxEventItemWriter<>();
|
||||
itemWriter.setMarshaller(marshaller);
|
||||
itemWriter.setRootTagName("transactionRecord");
|
||||
itemWriter.setResource(outputXml);
|
||||
return itemWriter;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Marshaller marshaller() {
|
||||
Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
|
||||
marshaller.setClassesToBeBound(Transaction.class);
|
||||
return marshaller;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Step retryStep(@Qualifier("retryItemProcessor") ItemProcessor<Transaction, Transaction> processor,
|
||||
ItemWriter<Transaction> writer) throws ParseException {
|
||||
return stepBuilderFactory.get("retryStep")
|
||||
.<Transaction, Transaction>chunk(10)
|
||||
.reader(itemReader(inputCsv))
|
||||
.processor(processor)
|
||||
.writer(writer)
|
||||
.faultTolerant()
|
||||
.retryLimit(3)
|
||||
.retry(ConnectTimeoutException.class)
|
||||
.retry(DeadlockLoserDataAccessException.class)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Bean(name = "retryBatchJob")
|
||||
public Job retryJob(@Qualifier("retryStep") Step retryStep) {
|
||||
return jobBuilderFactory
|
||||
.get("retryBatchJob")
|
||||
.start(retryStep)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
@@ -9,6 +9,8 @@ import javax.xml.bind.annotation.XmlRootElement;
|
||||
public class Transaction {
|
||||
private String username;
|
||||
private int userId;
|
||||
private int age;
|
||||
private String postCode;
|
||||
private Date transactionDate;
|
||||
private double amount;
|
||||
|
||||
@@ -46,9 +48,25 @@ public class Transaction {
|
||||
this.amount = amount;
|
||||
}
|
||||
|
||||
public int getAge() {
|
||||
return age;
|
||||
}
|
||||
|
||||
public void setAge(int age) {
|
||||
this.age = age;
|
||||
}
|
||||
|
||||
public String getPostCode() {
|
||||
return postCode;
|
||||
}
|
||||
|
||||
public void setPostCode(String postCode) {
|
||||
this.postCode = postCode;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Transaction [username=" + username + ", userId=" + userId + ", transactionDate=" + transactionDate + ", amount=" + amount + "]";
|
||||
return "Transaction [username=" + username + ", userId=" + userId + ", age=" + age + ", postCode=" + postCode + ", transactionDate=" + transactionDate + ", amount=" + amount + "]";
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,42 @@
|
||||
package org.baeldung.batch.service;
|
||||
|
||||
import org.apache.http.HttpResponse;
|
||||
import org.apache.http.client.methods.HttpGet;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.apache.http.util.EntityUtils;
|
||||
import org.baeldung.batch.model.Transaction;
|
||||
import org.codehaus.jettison.json.JSONException;
|
||||
import org.codehaus.jettison.json.JSONObject;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.batch.item.ItemProcessor;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class RetryItemProcessor implements ItemProcessor<Transaction, Transaction> {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(RetryItemProcessor.class);
|
||||
|
||||
@Autowired
|
||||
private CloseableHttpClient closeableHttpClient;
|
||||
|
||||
@Override
|
||||
public Transaction process(Transaction transaction) throws IOException, JSONException {
|
||||
LOGGER.info("Attempting to process user with id={}", transaction.getUserId());
|
||||
HttpResponse response = fetchMoreUserDetails(transaction.getUserId());
|
||||
|
||||
//parse user's age and postCode from response and update transaction
|
||||
String result = EntityUtils.toString(response.getEntity());
|
||||
JSONObject userObject = new JSONObject(result);
|
||||
transaction.setAge(Integer.parseInt(userObject.getString("age")));
|
||||
transaction.setPostCode(userObject.getString("postCode"));
|
||||
|
||||
return transaction;
|
||||
}
|
||||
|
||||
private HttpResponse fetchMoreUserDetails(int id) throws IOException {
|
||||
final HttpGet request = new HttpGet("http://www.baeldung.com:81/user/" + id);
|
||||
return closeableHttpClient.execute(request);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
username, user_id, transaction_date, transaction_amount
|
||||
sammy, 1234, 31/10/2015, 10000
|
||||
john, 9999, 3/12/2015, 12321
|
||||
|
@@ -54,4 +54,19 @@
|
||||
</batch:tasklet>
|
||||
</batch:step>
|
||||
</batch:job>
|
||||
|
||||
<batch:job id="retryBatchJob">
|
||||
<batch:step id="retryStep">
|
||||
<batch:tasklet>
|
||||
<batch:chunk reader="itemReader" writer="itemWriter"
|
||||
processor="retryItemProcessor" commit-interval="10"
|
||||
retry-limit="3">
|
||||
<batch:retryable-exception-classes>
|
||||
<batch:include class="org.apache.http.conn.ConnectTimeoutException"/>
|
||||
<batch:include class="org.springframework.dao.DeadlockLoserDataAccessException"/>
|
||||
</batch:retryable-exception-classes>
|
||||
</batch:chunk>
|
||||
</batch:tasklet>
|
||||
</batch:step>
|
||||
</batch:job>
|
||||
</beans>
|
||||
|
||||
Reference in New Issue
Block a user