Merge pull request #8125 from eugenp/revert-8119-BAEL-3275-2

Revert "BAEL-3275: Using blocking queue for pub-sub"
This commit is contained in:
Eric Martin
2019-10-31 20:43:47 -05:00
committed by GitHub
parent db85c8f275
commit 3225470df5
20543 changed files with 1642750 additions and 0 deletions
@@ -0,0 +1,153 @@
package com.baeldung.dsl;
import java.io.File;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.channel.PriorityChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.core.GenericSelector;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.integration.file.FileReadingMessageSource;
import org.springframework.integration.file.FileWritingMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
/**
* JavaDSLFileCopyConfig contains various Integration Flows created from various spring integration components.
* Activate only one flow at a time by un-commenting @Bean annotation from IntegrationFlow beans.
* <p>
* Different flows are :<br>
* - {@link #fileMover()} - default app - activated<br>
* - {@link #fileMoverWithLambda()} - app with file writing expressions as lambda<br>
* - {@link #fileMoverWithPriorityChannel()} - app with priority channel<br>
* - {@link #fileReader()}, {@link #fileWriter()}, {@link #anotherFileWriter()} - app with bridge
*/
@Configuration
@EnableIntegration
@IntegrationComponentScan
public class JavaDSLFileCopyConfig {
public static final String INPUT_DIR = "source";
public static final String OUTPUT_DIR = "target";
public static final String OUTPUT_DIR2 = "target2";
@Bean
public MessageSource<File> sourceDirectory() {
FileReadingMessageSource messageSource = new FileReadingMessageSource();
messageSource.setDirectory(new File(INPUT_DIR));
return messageSource;
}
@Bean
public GenericSelector<File> onlyJpgs() {
return new GenericSelector<File>() {
@Override
public boolean accept(File source) {
return source.getName()
.endsWith(".jpg");
}
};
}
@Bean
public MessageHandler targetDirectory() {
FileWritingMessageHandler handler = new FileWritingMessageHandler(new File(OUTPUT_DIR));
handler.setExpectReply(false); // end of pipeline, reply not needed
return handler;
}
@Bean
public IntegrationFlow fileMover() {
return IntegrationFlows.from(sourceDirectory(), configurer -> configurer.poller(Pollers.fixedDelay(10000)))
.filter(onlyJpgs())
.handle(targetDirectory())
.get();
}
// @Bean
public IntegrationFlow fileMoverWithLambda() {
return IntegrationFlows.from(sourceDirectory(), configurer -> configurer.poller(Pollers.fixedDelay(10000)))
.filter(message -> ((File) message).getName()
.endsWith(".jpg"))
.handle(targetDirectory())
.get();
}
@Bean
public PriorityChannel alphabetically() {
return new PriorityChannel(1000, (left, right) -> ((File) left.getPayload()).getName()
.compareTo(((File) right.getPayload()).getName()));
}
// @Bean
public IntegrationFlow fileMoverWithPriorityChannel() {
return IntegrationFlows.from(sourceDirectory())
.filter(onlyJpgs())
.channel("alphabetically")
.handle(targetDirectory())
.get();
}
@Bean
public MessageHandler anotherTargetDirectory() {
FileWritingMessageHandler handler = new FileWritingMessageHandler(new File(OUTPUT_DIR2));
handler.setExpectReply(false); // end of pipeline, reply not needed
return handler;
}
@Bean
public MessageChannel holdingTank() {
return MessageChannels.queue().get();
}
// @Bean
public IntegrationFlow fileReader() {
return IntegrationFlows.from(sourceDirectory(), configurer -> configurer.poller(Pollers.fixedDelay(10)))
.filter(onlyJpgs())
.channel("holdingTank")
.get();
}
// @Bean
public IntegrationFlow fileWriter() {
return IntegrationFlows.from("holdingTank")
.bridge(e -> e.poller(Pollers.fixedRate(1, TimeUnit.SECONDS, 20)))
.handle(targetDirectory())
.get();
}
// @Bean
public IntegrationFlow anotherFileWriter() {
return IntegrationFlows.from("holdingTank")
.bridge(e -> e.poller(Pollers.fixedRate(2, TimeUnit.SECONDS, 10)))
.handle(anotherTargetDirectory())
.get();
}
public static void main(final String... args) {
final AbstractApplicationContext context = new AnnotationConfigApplicationContext(JavaDSLFileCopyConfig.class);
context.registerShutdownHook();
final Scanner scanner = new Scanner(System.in);
System.out.print("Please enter a string and press <enter>: ");
while (true) {
final String input = scanner.nextLine();
if ("q".equals(input.trim())) {
context.close();
scanner.close();
break;
}
}
System.exit(0);
}
}
@@ -0,0 +1,73 @@
package com.baeldung.samples;
import java.io.File;
import java.util.Scanner;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.file.FileReadingMessageSource;
import org.springframework.integration.file.FileWritingMessageHandler;
import org.springframework.integration.file.filters.SimplePatternFileListFilter;
import org.springframework.integration.file.support.FileExistsMode;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
@Configuration
@EnableIntegration
public class FileCopyConfig {
public final String INPUT_DIR = "source";
public final String OUTPUT_DIR = "target";
public final String FILE_PATTERN = "*.jpg";
@Bean
public MessageChannel fileChannel() {
return new DirectChannel();
}
@Bean
@InboundChannelAdapter(value = "fileChannel", poller = @Poller(fixedDelay = "10000"))
public MessageSource<File> fileReadingMessageSource() {
FileReadingMessageSource sourceReader = new FileReadingMessageSource();
sourceReader.setDirectory(new File(INPUT_DIR));
sourceReader.setFilter(new SimplePatternFileListFilter(FILE_PATTERN));
return sourceReader;
}
@Bean
@ServiceActivator(inputChannel = "fileChannel")
public MessageHandler fileWritingMessageHandler() {
FileWritingMessageHandler handler = new FileWritingMessageHandler(new File(OUTPUT_DIR));
handler.setFileExistsMode(FileExistsMode.REPLACE);
handler.setExpectReply(false);
return handler;
}
public static void main(final String... args) {
final AbstractApplicationContext context = new AnnotationConfigApplicationContext(FileCopyConfig.class);
context.registerShutdownHook();
final Scanner scanner = new Scanner(System.in);
System.out.print("Please enter a string and press <enter>: ");
while (true) {
final String input = scanner.nextLine();
if ("q".equals(input.trim())) {
context.close();
scanner.close();
break;
}
}
System.exit(0);
}
}
@@ -0,0 +1,7 @@
package com.baeldung.samples.endpoints;
public interface Activator<T> {
public void handleMessage(T input);
}
@@ -0,0 +1,20 @@
package com.baeldung.samples.endpoints;
import java.io.File;
import java.util.logging.Logger;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.messaging.Message;
public class ActivatorImpl implements Activator<Message<File>> {
@Override
public void handleMessage(Message<File> input) {
File filePayload = input.getPayload();
IntegrationMessageHeaderAccessor accessor = new IntegrationMessageHeaderAccessor(input);
Logger.getAnonymousLogger().info("The file size "+filePayload.length());
Logger.getAnonymousLogger().info("The time of the message "+accessor.getTimestamp());
}
}
@@ -0,0 +1,45 @@
package com.baeldung.si.security;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Logger;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;
@Service
public class MessageConsumer {
private String messageContent;
private Map<String, String> messagePSContent = new ConcurrentHashMap<>();
public String getMessageContent() {
return messageContent;
}
public void setMessageContent(String messageContent) {
this.messageContent = messageContent;
}
public Map<String, String> getMessagePSContent() {
return messagePSContent;
}
public void setMessagePSContent(Map<String, String> messagePSContent) {
this.messagePSContent = messagePSContent;
}
@ServiceActivator(inputChannel = "endDirectChannel")
public void endDirectFlow(Message<?> message) {
setMessageContent(message.getPayload().toString());
}
@ServiceActivator(inputChannel = "finalPSResult")
public void endPSFlow(Message<?> message) {
Logger.getAnonymousLogger().info(Thread.currentThread().getName() + " has completed ---------------------------");
messagePSContent.put(Thread.currentThread().getName(), (String) message.getPayload());
}
}
@@ -0,0 +1,50 @@
package com.baeldung.si.security;
import java.util.logging.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.security.channel.ChannelSecurityInterceptor;
import org.springframework.integration.security.channel.SecuredChannel;
import org.springframework.messaging.Message;
import org.springframework.security.access.AccessDecisionManager;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.security.authentication.AuthenticationManager;
@Configuration
@EnableIntegration
public class SecuredDirectChannel {
@Bean(name = "startDirectChannel")
@SecuredChannel(interceptor = "channelSecurityInterceptor", sendAccess = { "ROLE_VIEWER", "jane" })
public DirectChannel startDirectChannel() {
return new DirectChannel();
}
@ServiceActivator(inputChannel = "startDirectChannel", outputChannel = "endDirectChannel")
@PreAuthorize("hasRole('ROLE_LOGGER')")
public Message<?> logMessage(Message<?> message) {
Logger.getAnonymousLogger().info(message.toString());
return message;
}
@Bean(name = "endDirectChannel")
@SecuredChannel(interceptor = "channelSecurityInterceptor", sendAccess = { "ROLE_EDITOR" })
public DirectChannel endDirectChannel() {
return new DirectChannel();
}
@Autowired
@Bean
public ChannelSecurityInterceptor channelSecurityInterceptor(AuthenticationManager authenticationManager, AccessDecisionManager customAccessDecisionManager) {
ChannelSecurityInterceptor channelSecurityInterceptor = new ChannelSecurityInterceptor();
channelSecurityInterceptor.setAuthenticationManager(authenticationManager);
channelSecurityInterceptor.setAccessDecisionManager(customAccessDecisionManager);
return channelSecurityInterceptor;
}
}
@@ -0,0 +1,46 @@
package com.baeldung.si.security;
import java.util.ArrayList;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.security.channel.ChannelSecurityInterceptor;
import org.springframework.security.access.AccessDecisionManager;
import org.springframework.security.access.AccessDecisionVoter;
import org.springframework.security.access.vote.AffirmativeBased;
import org.springframework.security.access.vote.RoleVoter;
import org.springframework.security.authentication.AuthenticationManager;
import org.springframework.security.config.annotation.method.configuration.EnableGlobalMethodSecurity;
import org.springframework.security.config.annotation.method.configuration.GlobalMethodSecurityConfiguration;
@Configuration
@EnableGlobalMethodSecurity(prePostEnabled = true)
public class SecurityConfig extends GlobalMethodSecurityConfiguration {
@Override
@Bean
public AuthenticationManager authenticationManager() throws Exception {
return super.authenticationManager();
}
@Bean
public AccessDecisionManager customAccessDecisionManager() {
List<AccessDecisionVoter<? extends Object>> decisionVoters = new ArrayList<>();
decisionVoters.add(new RoleVoter());
decisionVoters.add(new UsernameAccessDecisionVoter());
AccessDecisionManager accessDecisionManager = new AffirmativeBased(decisionVoters);
return accessDecisionManager;
}
@Autowired
@Bean
public ChannelSecurityInterceptor channelSecurityInterceptor(AuthenticationManager authenticationManager, AccessDecisionManager customAccessDecisionManager) {
ChannelSecurityInterceptor channelSecurityInterceptor = new ChannelSecurityInterceptor();
channelSecurityInterceptor.setAuthenticationManager(authenticationManager);
channelSecurityInterceptor.setAccessDecisionManager(customAccessDecisionManager);
return channelSecurityInterceptor;
}
}
@@ -0,0 +1,82 @@
package com.baeldung.si.security;
import java.util.stream.Collectors;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.PublishSubscribeChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.config.GlobalChannelInterceptor;
import org.springframework.integration.security.channel.SecuredChannel;
import org.springframework.integration.security.channel.SecurityContextPropagationChannelInterceptor;
import org.springframework.integration.support.DefaultMessageBuilderFactory;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.security.core.context.SecurityContext;
import org.springframework.security.core.context.SecurityContextHolder;
@Configuration
@EnableIntegration
public class SecurityPubSubChannel {
@Bean(name = "startPSChannel")
@SecuredChannel(interceptor = "channelSecurityInterceptor", sendAccess = "ROLE_VIEWER")
public PublishSubscribeChannel startChannel() {
return new PublishSubscribeChannel(executor());
}
@ServiceActivator(inputChannel = "startPSChannel", outputChannel = "finalPSResult")
@PreAuthorize("hasRole('ROLE_LOGGER')")
public Message<?> changeMessageToRole(Message<?> message) {
return buildNewMessage(getRoles(), message);
}
@ServiceActivator(inputChannel = "startPSChannel", outputChannel = "finalPSResult")
@PreAuthorize("hasRole('ROLE_VIEWER')")
public Message<?> changeMessageToUserName(Message<?> message) {
return buildNewMessage(getUsername(), message);
}
@Bean(name = "finalPSResult")
public DirectChannel finalPSResult() {
return new DirectChannel();
}
@Bean
@GlobalChannelInterceptor(patterns = { "startPSChannel", "endDirectChannel" })
public ChannelInterceptor securityContextPropagationInterceptor() {
return new SecurityContextPropagationChannelInterceptor();
}
@Bean
public ThreadPoolTaskExecutor executor() {
ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
pool.setCorePoolSize(10);
pool.setMaxPoolSize(10);
pool.setWaitForTasksToCompleteOnShutdown(true);
return pool;
}
public String getRoles() {
SecurityContext securityContext = SecurityContextHolder.getContext();
return securityContext.getAuthentication().getAuthorities().stream().map(auth -> auth.getAuthority()).collect(Collectors.joining(","));
}
public String getUsername() {
SecurityContext securityContext = SecurityContextHolder.getContext();
return securityContext.getAuthentication().getName();
}
public Message<String> buildNewMessage(String content, Message<?> message) {
DefaultMessageBuilderFactory builderFactory = new DefaultMessageBuilderFactory();
MessageBuilder<String> messageBuilder = builderFactory.withPayload(content);
messageBuilder.copyHeaders(message.getHeaders());
return messageBuilder.build();
}
}
@@ -0,0 +1,45 @@
package com.baeldung.si.security;
import java.util.Collection;
import org.springframework.security.access.AccessDecisionVoter;
import org.springframework.security.access.ConfigAttribute;
import org.springframework.security.core.Authentication;
public class UsernameAccessDecisionVoter implements AccessDecisionVoter<Object> {
private String rolePrefix = "ROLE_";
@Override
public boolean supports(ConfigAttribute attribute) {
if ((attribute.getAttribute() != null)
&& !attribute.getAttribute().startsWith(rolePrefix)) {
return true;
}else {
return false;
}
}
@Override
public boolean supports(Class<?> clazz) {
return true;
}
@Override
public int vote(Authentication authentication, Object object, Collection<ConfigAttribute> attributes) {
if (authentication == null) {
return ACCESS_DENIED;
}
String name = authentication.getName();
int result = ACCESS_ABSTAIN;
for (ConfigAttribute attribute : attributes) {
if (this.supports(attribute)) {
result = ACCESS_DENIED;
if (attribute.getAttribute().equals(name)) {
return ACCESS_GRANTED;
}
}
}
return result;
}
}
@@ -0,0 +1,57 @@
package com.baeldung.subflows.discardflow;
import java.util.Collection;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
@EnableIntegration
@IntegrationComponentScan
public class FilterExample {
@MessagingGateway
public interface NumbersClassifier {
@Gateway(requestChannel = "classify.input")
void classify(Collection<Integer> numbers);
}
@Bean
QueueChannel multipleofThreeChannel() {
return new QueueChannel();
}
@Bean
QueueChannel remainderIsOneChannel() {
return new QueueChannel();
}
@Bean
QueueChannel remainderIsTwoChannel() {
return new QueueChannel();
}
boolean isMultipleOfThree(Integer number) {
return number % 3 == 0;
}
boolean isRemainderOne(Integer number) {
return number % 3 == 1;
}
boolean isRemainderTwo(Integer number) {
return number % 3 == 2;
}
@Bean
public IntegrationFlow classify() {
return flow -> flow.split()
.<Integer> filter(this::isMultipleOfThree, notMultiple -> notMultiple
.discardFlow(oneflow -> oneflow
.<Integer> filter(this::isRemainderOne,
twoflow -> twoflow .discardChannel("remainderIsTwoChannel"))
.channel("remainderIsOneChannel")))
.channel("multipleofThreeChannel");
}
}
@@ -0,0 +1,60 @@
package com.baeldung.subflows.publishsubscribechannel;
import java.util.Collection;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
@EnableIntegration
@IntegrationComponentScan
public class PublishSubscibeChannelExample {
@MessagingGateway
public interface NumbersClassifier {
@Gateway(requestChannel = "classify.input")
void classify(Collection<Integer> numbers);
}
@Bean
QueueChannel multipleofThreeChannel() {
return new QueueChannel();
}
@Bean
QueueChannel remainderIsOneChannel() {
return new QueueChannel();
}
@Bean
QueueChannel remainderIsTwoChannel() {
return new QueueChannel();
}
boolean isMultipleOfThree(Integer number) {
return number % 3 == 0;
}
boolean isRemainderOne(Integer number) {
return number % 3 == 1;
}
boolean isRemainderTwo(Integer number) {
return number % 3 == 2;
}
@Bean
public IntegrationFlow classify() {
return flow -> flow.split()
.publishSubscribeChannel(subscription -> subscription.subscribe(subflow -> subflow.<Integer> filter(this::isMultipleOfThree)
.channel("multipleofThreeChannel"))
.subscribe(subflow -> subflow.<Integer> filter(this::isRemainderOne)
.channel("remainderIsOneChannel"))
.subscribe(subflow -> subflow.<Integer> filter(this::isRemainderTwo)
.channel("remainderIsTwoChannel")));
}
}
@@ -0,0 +1,60 @@
package com.baeldung.subflows.routetorecipients;
import java.util.Collection;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
@EnableIntegration
@IntegrationComponentScan
public class RouteToRecipientsExample {
@MessagingGateway
public interface NumbersClassifier {
@Gateway(requestChannel = "classify.input")
void classify(Collection<Integer> numbers);
}
@Bean
QueueChannel multipleofThreeChannel() {
return new QueueChannel();
}
@Bean
QueueChannel remainderIsOneChannel() {
return new QueueChannel();
}
@Bean
QueueChannel remainderIsTwoChannel() {
return new QueueChannel();
}
boolean isMultipleOfThree(Integer number) {
return number % 3 == 0;
}
boolean isRemainderOne(Integer number) {
return number % 3 == 1;
}
boolean isRemainderTwo(Integer number) {
return number % 3 == 2;
}
@Bean
public IntegrationFlow classify() {
return flow -> flow.split()
.routeToRecipients(route -> route
.recipientFlow(subflow -> subflow
.<Integer> filter(this::isMultipleOfThree)
.channel("multipleofThreeChannel"))
.<Integer> recipient("remainderIsOneChannel",this::isRemainderOne)
.<Integer> recipient("remainderIsTwoChannel",this::isRemainderTwo));
}
}
@@ -0,0 +1,77 @@
package com.baeldung.subflows.separateflows;
import java.util.Collection;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
@EnableIntegration
@IntegrationComponentScan
public class SeparateFlowsExample {
@MessagingGateway
public interface NumbersClassifier {
@Gateway(requestChannel = "multipleOfThreeFlow.input")
void multipleofThree(Collection<Integer> numbers);
@Gateway(requestChannel = "remainderIsOneFlow.input")
void remainderIsOne(Collection<Integer> numbers);
@Gateway(requestChannel = "remainderIsTwoFlow.input")
void remainderIsTwo(Collection<Integer> numbers);
}
@Bean
QueueChannel multipleOfThreeChannel() {
return new QueueChannel();
}
@Bean
QueueChannel remainderIsOneChannel() {
return new QueueChannel();
}
@Bean
QueueChannel remainderIsTwoChannel() {
return new QueueChannel();
}
boolean isMultipleOfThree(Integer number) {
return number % 3 == 0;
}
boolean isRemainderOne(Integer number) {
return number % 3 == 1;
}
boolean isRemainderTwo(Integer number) {
return number % 3 == 2;
}
@Bean
public IntegrationFlow multipleOfThreeFlow() {
return flow -> flow.split()
.<Integer> filter(this::isMultipleOfThree)
.channel("multipleOfThreeChannel");
}
@Bean
public IntegrationFlow remainderIsOneFlow() {
return flow -> flow.split()
.<Integer> filter(this::isRemainderOne)
.channel("remainderIsOneChannel");
}
@Bean
public IntegrationFlow remainderIsTwoFlow() {
return flow -> flow.split()
.<Integer> filter(this::isRemainderTwo)
.channel("remainderIsTwoChannel");
}
}
@@ -0,0 +1,62 @@
package com.baeldung.subflows.subflowmapping;
import java.util.Collection;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
@EnableIntegration
@IntegrationComponentScan
public class RouterExample {
@MessagingGateway
public interface NumbersClassifier {
@Gateway(requestChannel = "classify.input")
void classify(Collection<Integer> numbers);
}
@Bean
QueueChannel multipleofThreeChannel() {
return new QueueChannel();
}
@Bean
QueueChannel remainderIsOneChannel() {
return new QueueChannel();
}
@Bean
QueueChannel remainderIsTwoChannel() {
return new QueueChannel();
}
boolean isMultipleOfThree(Integer number) {
return number % 3 == 0;
}
boolean isRemainderOne(Integer number) {
return number % 3 == 1;
}
boolean isRemainderTwo(Integer number) {
return number % 3 == 2;
}
@Bean
public IntegrationFlow classify() {
return flow -> flow.split()
.<Integer, Integer> route(number -> number % 3,
mapping -> mapping
.channelMapping(0, "multipleofThreeChannel")
.subFlowMapping(1, subflow -> subflow.channel("remainderIsOneChannel"))
.subFlowMapping(2, subflow -> subflow
.<Integer> handle((payload, headers) -> {
// do extra work on the payload
return payload;
}))).channel("remainderIsTwoChannel");
}
}
@@ -0,0 +1,27 @@
package com.baeldung.tx;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
public class ServiceActivator {
@Autowired
private JdbcTemplate jdbcTemplate;
private final Logger log = LoggerFactory.getLogger(this.getClass());
public void checkTestResults(String payload) {
this.jdbcTemplate.update("insert into STUDENT values(?)", payload);
if (payload.toLowerCase().startsWith("fail")) {
log.error("Service failure. Test result: {}", payload);
throw new RuntimeException("Service failure.");
}
log.info("Service success. Test result: {}", payload);
}
}
@@ -0,0 +1,155 @@
package com.baeldung.tx;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.annotation.Transformer;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.file.FileReadingMessageSource;
import org.springframework.integration.file.filters.SimplePatternFileListFilter;
import org.springframework.integration.file.transformer.FileToStringTransformer;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.integration.transaction.*;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
import org.springframework.messaging.MessageChannel;
import org.springframework.transaction.interceptor.TransactionInterceptor;
import javax.sql.DataSource;
import java.io.File;
import java.util.Scanner;
@Configuration
@EnableIntegration
public class TxIntegrationConfig {
private final Logger log = LoggerFactory.getLogger(this.getClass());
public final String INPUT_DIR = System.getProperty("java.io.tmpdir") + "/tx/";
public final String FILE_PATTERN = "*.txt";
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private DataSource dataSource;
@Autowired
private TransactionSynchronizationFactory transactionSynchronizationFactory;
@Autowired
DataSourceTransactionManager txManager;
@Bean
public MessageChannel inputChannel() {
return new DirectChannel();
}
@Bean
@InboundChannelAdapter(value = "inputChannel", poller = @Poller(value = "pollerMetadata"))
public MessageSource<File> fileReadingMessageSource() {
FileReadingMessageSource sourceReader = new FileReadingMessageSource();
sourceReader.setDirectory(new File(INPUT_DIR));
sourceReader.setFilter(new SimplePatternFileListFilter(FILE_PATTERN));
return sourceReader;
}
@Bean
public PollerMetadata pollerMetadata() {
return Pollers.fixedDelay(5000)
.advice(transactionInterceptor())
.transactionSynchronizationFactory(transactionSynchronizationFactory)
.get();
}
private TransactionInterceptor transactionInterceptor() {
return new TransactionInterceptorBuilder().transactionManager(txManager).build();
}
@Bean
public TransactionSynchronizationFactory transactionSynchronizationFactory() {
ExpressionEvaluatingTransactionSynchronizationProcessor processor =
new ExpressionEvaluatingTransactionSynchronizationProcessor();
SpelExpressionParser spelParser = new SpelExpressionParser();
processor.setAfterCommitExpression(
spelParser.parseExpression(
"payload.renameTo(new java.io.File(payload.absolutePath + '.PASSED'))"));
processor.setAfterRollbackExpression(
spelParser.parseExpression(
"payload.renameTo(new java.io.File(payload.absolutePath + '.FAILED'))"));
return new DefaultTransactionSynchronizationFactory(processor);
}
@Bean
@Transformer(inputChannel = "inputChannel", outputChannel = "toServiceChannel")
public FileToStringTransformer fileToStringTransformer() {
return new FileToStringTransformer();
}
@ServiceActivator(inputChannel = "toServiceChannel")
public void serviceActivator(String payload) {
jdbcTemplate.update("insert into STUDENT values(?)", payload);
if (payload.toLowerCase().startsWith("fail")) {
log.error("Service failure. Test result: {} ", payload);
throw new RuntimeException("Service failure.");
}
log.info("Service success. Test result: {}", payload);
}
@Bean
public JdbcTemplate jdbcTemplate(DataSource dataSource) {
return new JdbcTemplate(dataSource);
}
@Bean
public DataSource dataSource() {
return new EmbeddedDatabaseBuilder()
.setType(EmbeddedDatabaseType.H2)
.addScript("classpath:table.sql")
.build();
}
@Bean
public DataSourceTransactionManager txManager() {
return new DataSourceTransactionManager(dataSource);
}
public static void main(final String... args) {
final AbstractApplicationContext context = new AnnotationConfigApplicationContext(TxIntegrationConfig.class);
context.registerShutdownHook();
final Scanner scanner = new Scanner(System.in);
System.out.print("Integration flow is running. Type q + <enter> to quit ");
while (true) {
final String input = scanner.nextLine();
if ("q".equals(input.trim())) {
context.close();
scanner.close();
break;
}
}
System.exit(0);
}
}