[BAEL-3646] Conditional Flow in Spring Batch
This commit is contained in:
+18
@@ -0,0 +1,18 @@
|
||||
package org.baeldung.conditionalflow;
|
||||
|
||||
import org.springframework.boot.CommandLineRunner;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
|
||||
@SpringBootApplication
|
||||
public class ConditionalFlowApplication implements CommandLineRunner {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(ConditionalFlowApplication.class, args);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run(String... args) throws Exception {
|
||||
System.out.println("Running and exiting");
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,19 @@
|
||||
package org.baeldung.conditionalflow;
|
||||
|
||||
import org.springframework.batch.core.ExitStatus;
|
||||
import org.springframework.batch.core.JobExecution;
|
||||
import org.springframework.batch.core.StepExecution;
|
||||
import org.springframework.batch.core.job.flow.FlowExecutionStatus;
|
||||
import org.springframework.batch.core.job.flow.JobExecutionDecider;
|
||||
|
||||
public class NumberInfoDecider implements JobExecutionDecider {
|
||||
|
||||
@Override
|
||||
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
|
||||
if(jobExecution.getExitStatus().equals("UNKNOWN")) {
|
||||
return new FlowExecutionStatus("NOTIFY");
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,85 @@
|
||||
package org.baeldung.conditionalflow.config;
|
||||
|
||||
import org.baeldung.conditionalflow.NumberInfoDecider;
|
||||
import org.baeldung.conditionalflow.model.NumberInfo;
|
||||
import org.baeldung.conditionalflow.step.*;
|
||||
import org.springframework.batch.core.Job;
|
||||
import org.springframework.batch.core.Step;
|
||||
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
|
||||
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
|
||||
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
@Configuration
|
||||
@EnableBatchProcessing
|
||||
public class NumberInfoConfig {
|
||||
|
||||
@Bean
|
||||
@Qualifier("NotificationStep")
|
||||
public Step notificationStep(StepBuilderFactory sbf) {
|
||||
return sbf.get("Billing step").tasklet(new NotifierTasklet()).build();
|
||||
}
|
||||
|
||||
public Step numberGeneratorStep(StepBuilderFactory sbf, int[] values, String prepend) {
|
||||
return sbf.get("Number generator")
|
||||
.<NumberInfo, Integer>chunk(1)
|
||||
.reader(new NumberInfoGenerator(values))
|
||||
.processor(new NumberInfoClassifier())
|
||||
.writer(new PrependingStdoutWriter<>(prepend))
|
||||
.build();
|
||||
}
|
||||
|
||||
public Step numberGeneratorStepDecider(StepBuilderFactory sbf, int[] values, String prepend) {
|
||||
return sbf.get("Number generator")
|
||||
.<NumberInfo, Integer>chunk(1)
|
||||
.reader(new NumberInfoGenerator(values))
|
||||
.processor(new NumberInfoClassifierWithDecider())
|
||||
.writer(new PrependingStdoutWriter<>(prepend))
|
||||
.build();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Job numberGeneratorNonNotifierJob(JobBuilderFactory jobBuilderFactory,
|
||||
StepBuilderFactory stepBuilderFactory,
|
||||
@Qualifier("NotificationStep") Step notificationStep
|
||||
) {
|
||||
int[] nonNotifierData = {-1, -2, -3};
|
||||
Step step = numberGeneratorStep(stepBuilderFactory, nonNotifierData, "First Dataset Processor");
|
||||
return jobBuilderFactory.get("Number generator - first dataset")
|
||||
.start(step)
|
||||
.on("NOTIFY").to(notificationStep)
|
||||
.from(step).on("*").stop()
|
||||
.end()
|
||||
.build();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Job numberGeneratorNotifierJob(JobBuilderFactory jobBuilderFactory,
|
||||
StepBuilderFactory stepBuilderFactory,
|
||||
@Qualifier("NotificationStep") Step notificationStep
|
||||
) {
|
||||
int[] billableData = {11, -2, -3};
|
||||
Step dataProviderStep = numberGeneratorStep(stepBuilderFactory, billableData, "Second Dataset Processor");
|
||||
return jobBuilderFactory.get("Number generator - second dataset")
|
||||
.start(dataProviderStep)
|
||||
.on("NOTIFY").to(notificationStep)
|
||||
.end()
|
||||
.build();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Job numberGeneratorNotifierJobWithDecider(JobBuilderFactory jobBuilderFactory,
|
||||
StepBuilderFactory stepBuilderFactory,
|
||||
@Qualifier("NotificationStep") Step notificationStep
|
||||
) {
|
||||
int[] billableData = {11, -2, -3};
|
||||
Step dataProviderStep = numberGeneratorStepDecider(stepBuilderFactory, billableData, "Third Dataset Processor");
|
||||
return jobBuilderFactory.get("Number generator - third dataset")
|
||||
.start(dataProviderStep)
|
||||
.next(new NumberInfoDecider()).on("NOTIFY").to(notificationStep)
|
||||
.end()
|
||||
.build();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,47 @@
|
||||
package org.baeldung.conditionalflow.model;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
public class NumberInfo {
|
||||
private int number;
|
||||
|
||||
public static NumberInfo from(int number){
|
||||
return new NumberInfo(number);
|
||||
}
|
||||
|
||||
public NumberInfo(int number) {
|
||||
this.number = number;
|
||||
}
|
||||
|
||||
public boolean isPositive() {
|
||||
return number > 0;
|
||||
}
|
||||
|
||||
public boolean isEven() {
|
||||
return number % 2 == 0;
|
||||
}
|
||||
|
||||
public int getNumber() {
|
||||
return number;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
NumberInfo that = (NumberInfo) o;
|
||||
return number == that.number;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(number);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "NumberInfo{" +
|
||||
"number=" + number +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,14 @@
|
||||
package org.baeldung.conditionalflow.step;
|
||||
|
||||
import org.springframework.batch.core.StepContribution;
|
||||
import org.springframework.batch.core.scope.context.ChunkContext;
|
||||
import org.springframework.batch.core.step.tasklet.Tasklet;
|
||||
import org.springframework.batch.repeat.RepeatStatus;
|
||||
|
||||
public class NotifierTasklet implements Tasklet {
|
||||
@Override
|
||||
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
|
||||
System.err.println("[" + chunkContext.getStepContext().getJobName() + "] contains interesting data!!");
|
||||
return RepeatStatus.FINISHED;
|
||||
}
|
||||
}
|
||||
+31
@@ -0,0 +1,31 @@
|
||||
package org.baeldung.conditionalflow.step;
|
||||
|
||||
import org.baeldung.conditionalflow.model.NumberInfo;
|
||||
import org.springframework.batch.core.ExitStatus;
|
||||
import org.springframework.batch.core.StepExecution;
|
||||
import org.springframework.batch.core.annotation.BeforeStep;
|
||||
import org.springframework.batch.core.listener.ItemListenerSupport;
|
||||
import org.springframework.batch.item.ItemProcessor;
|
||||
|
||||
public class NumberInfoClassifier extends ItemListenerSupport<NumberInfo, Integer>
|
||||
implements ItemProcessor<NumberInfo, Integer> {
|
||||
private StepExecution stepExecution;
|
||||
|
||||
@BeforeStep
|
||||
public void beforeStep(StepExecution stepExecution) {
|
||||
this.stepExecution = stepExecution;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterProcess(NumberInfo item, Integer result) {
|
||||
super.afterProcess(item, result);
|
||||
if (item.isPositive()) {
|
||||
stepExecution.setExitStatus(new ExitStatus("NOTIFY"));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer process(NumberInfo numberInfo) throws Exception {
|
||||
return Integer.valueOf(numberInfo.getNumber());
|
||||
}
|
||||
}
|
||||
+15
@@ -0,0 +1,15 @@
|
||||
package org.baeldung.conditionalflow.step;
|
||||
|
||||
import org.baeldung.conditionalflow.model.NumberInfo;
|
||||
import org.springframework.batch.core.StepExecution;
|
||||
import org.springframework.batch.item.ItemProcessor;
|
||||
|
||||
public class NumberInfoClassifierWithDecider
|
||||
implements ItemProcessor<NumberInfo, Integer> {
|
||||
private StepExecution stepExecution;
|
||||
|
||||
@Override
|
||||
public Integer process(NumberInfo numberInfo) throws Exception {
|
||||
return Integer.valueOf(numberInfo.getNumber());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,23 @@
|
||||
package org.baeldung.conditionalflow.step;
|
||||
|
||||
import org.baeldung.conditionalflow.model.NumberInfo;
|
||||
import org.springframework.batch.item.ItemReader;
|
||||
|
||||
public class NumberInfoGenerator implements ItemReader<NumberInfo> {
|
||||
private int[] values;
|
||||
private int counter;
|
||||
|
||||
public NumberInfoGenerator(int[] values){
|
||||
this.values = values;
|
||||
counter = 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public NumberInfo read() {
|
||||
if(counter == values.length){
|
||||
return null;
|
||||
} else {
|
||||
return new NumberInfo(values[counter++]);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,17 @@
|
||||
package org.baeldung.conditionalflow.step;
|
||||
|
||||
import org.baeldung.conditionalflow.model.NumberInfo;
|
||||
import org.springframework.batch.core.ExitStatus;
|
||||
import org.springframework.batch.core.StepExecution;
|
||||
import org.springframework.batch.core.annotation.BeforeStep;
|
||||
import org.springframework.batch.core.listener.ItemListenerSupport;
|
||||
import org.springframework.batch.item.ItemProcessor;
|
||||
|
||||
public class NumberInfoProcessor implements ItemProcessor<NumberInfo, Integer> {
|
||||
private StepExecution stepExecution;
|
||||
|
||||
@Override
|
||||
public Integer process(NumberInfo numberInfo) throws Exception {
|
||||
return Integer.valueOf(numberInfo.getNumber());
|
||||
}
|
||||
}
|
||||
+30
@@ -0,0 +1,30 @@
|
||||
package org.baeldung.conditionalflow.step;
|
||||
|
||||
import org.springframework.batch.item.ItemWriter;
|
||||
|
||||
import java.io.OutputStream;
|
||||
import java.util.List;
|
||||
|
||||
public class PrependingStdoutWriter<T> implements ItemWriter<T> {
|
||||
private String prependText;
|
||||
private OutputStream writeTo;
|
||||
|
||||
private PrependingStdoutWriter() {
|
||||
}
|
||||
|
||||
public PrependingStdoutWriter(String prependText, OutputStream os){
|
||||
this.prependText = prependText;
|
||||
this.writeTo = os;
|
||||
}
|
||||
|
||||
public PrependingStdoutWriter(String prependText) {
|
||||
this(prependText, System.out);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(List<? extends T> list) throws Exception {
|
||||
for (T listItem : list) {
|
||||
System.out.println(prependText + " " + listItem.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user