Update README.md

This commit is contained in:
johnA1331
2019-10-30 22:12:05 +08:00
committed by GitHub
parent db85c8f275
commit 33998bdac8
20533 changed files with 1642695 additions and 0 deletions
+13
View File
@@ -0,0 +1,13 @@
## Spring Integration
This module contains articles about Spring Integration
### Relevant Articles:
- [Introduction to Spring Integration](https://www.baeldung.com/spring-integration)
- [Security In Spring Integration](https://www.baeldung.com/spring-integration-security)
- [Spring Integration Java DSL](https://www.baeldung.com/spring-integration-java-dsl)
- [Using Subflows in Spring Integration](https://www.baeldung.com/spring-integration-subflows)
- [Transaction Support in Spring Integration](https://www.baeldung.com/spring-integration-transaction)
### Running the Sample
Executing the `mvn exec:java` maven command (either from the command line or from an IDE) will start up the application. Follow the command prompt for further instructions.
+128
View File
@@ -0,0 +1,128 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.baeldung.samples.spring.integration</groupId>
<artifactId>spring-integration</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<name>spring-integration</name>
<packaging>jar</packaging>
<url>http://www.springsource.org/spring-integration</url>
<parent>
<groupId>com.baeldung</groupId>
<artifactId>parent-modules</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<dependencies>
<!-- Testing -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>javax.activation</groupId>
<artifactId>activation</artifactId>
<version>${javax-activation.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>javax.mail</groupId>
<artifactId>mail</artifactId>
<version>${javax-mail.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-twitter</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mail</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-ftp</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.social</groupId>
<artifactId>spring-social-core</artifactId>
<version>${spring-social.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-file</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-config</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-security</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jdbc</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-test</artifactId>
<version>${spring.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>1.4.197</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-eclipse-plugin</artifactId>
<version>${maven-eclipse-plugin.version}</version>
<configuration>
<additionalProjectnatures>
<projectnature>org.springframework.ide.eclipse.core.springnature</projectnature>
</additionalProjectnatures>
<additionalBuildcommands>
<buildcommand>org.springframework.ide.eclipse.core.springbuilder</buildcommand>
</additionalBuildcommands>
<downloadSources>true</downloadSources>
<downloadJavadocs>true</downloadJavadocs>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>${exec-maven-plugin.version}</version>
<configuration>
<mainClass>com.baeldung.samples.FileCopyConfig</mainClass>
</configuration>
</plugin>
</plugins>
</build>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring.version>5.0.13.RELEASE</spring.version>
<spring-social.version>1.1.4.RELEASE</spring-social.version>
<javax-mail.version>1.4.7</javax-mail.version>
<javax-activation.version>1.1.1</javax-activation.version>
<maven-eclipse-plugin.version>2.10</maven-eclipse-plugin.version>
<exec-maven-plugin.version>1.5.0</exec-maven-plugin.version>
</properties>
</project>
@@ -0,0 +1,153 @@
package com.baeldung.dsl;
import java.io.File;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.channel.PriorityChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.core.GenericSelector;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.integration.file.FileReadingMessageSource;
import org.springframework.integration.file.FileWritingMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
/**
* JavaDSLFileCopyConfig contains various Integration Flows created from various spring integration components.
* Activate only one flow at a time by un-commenting @Bean annotation from IntegrationFlow beans.
* <p>
* Different flows are :<br>
* - {@link #fileMover()} - default app - activated<br>
* - {@link #fileMoverWithLambda()} - app with file writing expressions as lambda<br>
* - {@link #fileMoverWithPriorityChannel()} - app with priority channel<br>
* - {@link #fileReader()}, {@link #fileWriter()}, {@link #anotherFileWriter()} - app with bridge
*/
@Configuration
@EnableIntegration
@IntegrationComponentScan
public class JavaDSLFileCopyConfig {
public static final String INPUT_DIR = "source";
public static final String OUTPUT_DIR = "target";
public static final String OUTPUT_DIR2 = "target2";
@Bean
public MessageSource<File> sourceDirectory() {
FileReadingMessageSource messageSource = new FileReadingMessageSource();
messageSource.setDirectory(new File(INPUT_DIR));
return messageSource;
}
@Bean
public GenericSelector<File> onlyJpgs() {
return new GenericSelector<File>() {
@Override
public boolean accept(File source) {
return source.getName()
.endsWith(".jpg");
}
};
}
@Bean
public MessageHandler targetDirectory() {
FileWritingMessageHandler handler = new FileWritingMessageHandler(new File(OUTPUT_DIR));
handler.setExpectReply(false); // end of pipeline, reply not needed
return handler;
}
@Bean
public IntegrationFlow fileMover() {
return IntegrationFlows.from(sourceDirectory(), configurer -> configurer.poller(Pollers.fixedDelay(10000)))
.filter(onlyJpgs())
.handle(targetDirectory())
.get();
}
// @Bean
public IntegrationFlow fileMoverWithLambda() {
return IntegrationFlows.from(sourceDirectory(), configurer -> configurer.poller(Pollers.fixedDelay(10000)))
.filter(message -> ((File) message).getName()
.endsWith(".jpg"))
.handle(targetDirectory())
.get();
}
@Bean
public PriorityChannel alphabetically() {
return new PriorityChannel(1000, (left, right) -> ((File) left.getPayload()).getName()
.compareTo(((File) right.getPayload()).getName()));
}
// @Bean
public IntegrationFlow fileMoverWithPriorityChannel() {
return IntegrationFlows.from(sourceDirectory())
.filter(onlyJpgs())
.channel("alphabetically")
.handle(targetDirectory())
.get();
}
@Bean
public MessageHandler anotherTargetDirectory() {
FileWritingMessageHandler handler = new FileWritingMessageHandler(new File(OUTPUT_DIR2));
handler.setExpectReply(false); // end of pipeline, reply not needed
return handler;
}
@Bean
public MessageChannel holdingTank() {
return MessageChannels.queue().get();
}
// @Bean
public IntegrationFlow fileReader() {
return IntegrationFlows.from(sourceDirectory(), configurer -> configurer.poller(Pollers.fixedDelay(10)))
.filter(onlyJpgs())
.channel("holdingTank")
.get();
}
// @Bean
public IntegrationFlow fileWriter() {
return IntegrationFlows.from("holdingTank")
.bridge(e -> e.poller(Pollers.fixedRate(1, TimeUnit.SECONDS, 20)))
.handle(targetDirectory())
.get();
}
// @Bean
public IntegrationFlow anotherFileWriter() {
return IntegrationFlows.from("holdingTank")
.bridge(e -> e.poller(Pollers.fixedRate(2, TimeUnit.SECONDS, 10)))
.handle(anotherTargetDirectory())
.get();
}
public static void main(final String... args) {
final AbstractApplicationContext context = new AnnotationConfigApplicationContext(JavaDSLFileCopyConfig.class);
context.registerShutdownHook();
final Scanner scanner = new Scanner(System.in);
System.out.print("Please enter a string and press <enter>: ");
while (true) {
final String input = scanner.nextLine();
if ("q".equals(input.trim())) {
context.close();
scanner.close();
break;
}
}
System.exit(0);
}
}
@@ -0,0 +1,73 @@
package com.baeldung.samples;
import java.io.File;
import java.util.Scanner;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.file.FileReadingMessageSource;
import org.springframework.integration.file.FileWritingMessageHandler;
import org.springframework.integration.file.filters.SimplePatternFileListFilter;
import org.springframework.integration.file.support.FileExistsMode;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
@Configuration
@EnableIntegration
public class FileCopyConfig {
public final String INPUT_DIR = "source";
public final String OUTPUT_DIR = "target";
public final String FILE_PATTERN = "*.jpg";
@Bean
public MessageChannel fileChannel() {
return new DirectChannel();
}
@Bean
@InboundChannelAdapter(value = "fileChannel", poller = @Poller(fixedDelay = "10000"))
public MessageSource<File> fileReadingMessageSource() {
FileReadingMessageSource sourceReader = new FileReadingMessageSource();
sourceReader.setDirectory(new File(INPUT_DIR));
sourceReader.setFilter(new SimplePatternFileListFilter(FILE_PATTERN));
return sourceReader;
}
@Bean
@ServiceActivator(inputChannel = "fileChannel")
public MessageHandler fileWritingMessageHandler() {
FileWritingMessageHandler handler = new FileWritingMessageHandler(new File(OUTPUT_DIR));
handler.setFileExistsMode(FileExistsMode.REPLACE);
handler.setExpectReply(false);
return handler;
}
public static void main(final String... args) {
final AbstractApplicationContext context = new AnnotationConfigApplicationContext(FileCopyConfig.class);
context.registerShutdownHook();
final Scanner scanner = new Scanner(System.in);
System.out.print("Please enter a string and press <enter>: ");
while (true) {
final String input = scanner.nextLine();
if ("q".equals(input.trim())) {
context.close();
scanner.close();
break;
}
}
System.exit(0);
}
}
@@ -0,0 +1,7 @@
package com.baeldung.samples.endpoints;
public interface Activator<T> {
public void handleMessage(T input);
}
@@ -0,0 +1,20 @@
package com.baeldung.samples.endpoints;
import java.io.File;
import java.util.logging.Logger;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.messaging.Message;
public class ActivatorImpl implements Activator<Message<File>> {
@Override
public void handleMessage(Message<File> input) {
File filePayload = input.getPayload();
IntegrationMessageHeaderAccessor accessor = new IntegrationMessageHeaderAccessor(input);
Logger.getAnonymousLogger().info("The file size "+filePayload.length());
Logger.getAnonymousLogger().info("The time of the message "+accessor.getTimestamp());
}
}
@@ -0,0 +1,45 @@
package com.baeldung.si.security;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Logger;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;
@Service
public class MessageConsumer {
private String messageContent;
private Map<String, String> messagePSContent = new ConcurrentHashMap<>();
public String getMessageContent() {
return messageContent;
}
public void setMessageContent(String messageContent) {
this.messageContent = messageContent;
}
public Map<String, String> getMessagePSContent() {
return messagePSContent;
}
public void setMessagePSContent(Map<String, String> messagePSContent) {
this.messagePSContent = messagePSContent;
}
@ServiceActivator(inputChannel = "endDirectChannel")
public void endDirectFlow(Message<?> message) {
setMessageContent(message.getPayload().toString());
}
@ServiceActivator(inputChannel = "finalPSResult")
public void endPSFlow(Message<?> message) {
Logger.getAnonymousLogger().info(Thread.currentThread().getName() + " has completed ---------------------------");
messagePSContent.put(Thread.currentThread().getName(), (String) message.getPayload());
}
}
@@ -0,0 +1,50 @@
package com.baeldung.si.security;
import java.util.logging.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.security.channel.ChannelSecurityInterceptor;
import org.springframework.integration.security.channel.SecuredChannel;
import org.springframework.messaging.Message;
import org.springframework.security.access.AccessDecisionManager;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.security.authentication.AuthenticationManager;
@Configuration
@EnableIntegration
public class SecuredDirectChannel {
@Bean(name = "startDirectChannel")
@SecuredChannel(interceptor = "channelSecurityInterceptor", sendAccess = { "ROLE_VIEWER", "jane" })
public DirectChannel startDirectChannel() {
return new DirectChannel();
}
@ServiceActivator(inputChannel = "startDirectChannel", outputChannel = "endDirectChannel")
@PreAuthorize("hasRole('ROLE_LOGGER')")
public Message<?> logMessage(Message<?> message) {
Logger.getAnonymousLogger().info(message.toString());
return message;
}
@Bean(name = "endDirectChannel")
@SecuredChannel(interceptor = "channelSecurityInterceptor", sendAccess = { "ROLE_EDITOR" })
public DirectChannel endDirectChannel() {
return new DirectChannel();
}
@Autowired
@Bean
public ChannelSecurityInterceptor channelSecurityInterceptor(AuthenticationManager authenticationManager, AccessDecisionManager customAccessDecisionManager) {
ChannelSecurityInterceptor channelSecurityInterceptor = new ChannelSecurityInterceptor();
channelSecurityInterceptor.setAuthenticationManager(authenticationManager);
channelSecurityInterceptor.setAccessDecisionManager(customAccessDecisionManager);
return channelSecurityInterceptor;
}
}
@@ -0,0 +1,46 @@
package com.baeldung.si.security;
import java.util.ArrayList;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.security.channel.ChannelSecurityInterceptor;
import org.springframework.security.access.AccessDecisionManager;
import org.springframework.security.access.AccessDecisionVoter;
import org.springframework.security.access.vote.AffirmativeBased;
import org.springframework.security.access.vote.RoleVoter;
import org.springframework.security.authentication.AuthenticationManager;
import org.springframework.security.config.annotation.method.configuration.EnableGlobalMethodSecurity;
import org.springframework.security.config.annotation.method.configuration.GlobalMethodSecurityConfiguration;
@Configuration
@EnableGlobalMethodSecurity(prePostEnabled = true)
public class SecurityConfig extends GlobalMethodSecurityConfiguration {
@Override
@Bean
public AuthenticationManager authenticationManager() throws Exception {
return super.authenticationManager();
}
@Bean
public AccessDecisionManager customAccessDecisionManager() {
List<AccessDecisionVoter<? extends Object>> decisionVoters = new ArrayList<>();
decisionVoters.add(new RoleVoter());
decisionVoters.add(new UsernameAccessDecisionVoter());
AccessDecisionManager accessDecisionManager = new AffirmativeBased(decisionVoters);
return accessDecisionManager;
}
@Autowired
@Bean
public ChannelSecurityInterceptor channelSecurityInterceptor(AuthenticationManager authenticationManager, AccessDecisionManager customAccessDecisionManager) {
ChannelSecurityInterceptor channelSecurityInterceptor = new ChannelSecurityInterceptor();
channelSecurityInterceptor.setAuthenticationManager(authenticationManager);
channelSecurityInterceptor.setAccessDecisionManager(customAccessDecisionManager);
return channelSecurityInterceptor;
}
}
@@ -0,0 +1,82 @@
package com.baeldung.si.security;
import java.util.stream.Collectors;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.PublishSubscribeChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.config.GlobalChannelInterceptor;
import org.springframework.integration.security.channel.SecuredChannel;
import org.springframework.integration.security.channel.SecurityContextPropagationChannelInterceptor;
import org.springframework.integration.support.DefaultMessageBuilderFactory;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.security.core.context.SecurityContext;
import org.springframework.security.core.context.SecurityContextHolder;
@Configuration
@EnableIntegration
public class SecurityPubSubChannel {
@Bean(name = "startPSChannel")
@SecuredChannel(interceptor = "channelSecurityInterceptor", sendAccess = "ROLE_VIEWER")
public PublishSubscribeChannel startChannel() {
return new PublishSubscribeChannel(executor());
}
@ServiceActivator(inputChannel = "startPSChannel", outputChannel = "finalPSResult")
@PreAuthorize("hasRole('ROLE_LOGGER')")
public Message<?> changeMessageToRole(Message<?> message) {
return buildNewMessage(getRoles(), message);
}
@ServiceActivator(inputChannel = "startPSChannel", outputChannel = "finalPSResult")
@PreAuthorize("hasRole('ROLE_VIEWER')")
public Message<?> changeMessageToUserName(Message<?> message) {
return buildNewMessage(getUsername(), message);
}
@Bean(name = "finalPSResult")
public DirectChannel finalPSResult() {
return new DirectChannel();
}
@Bean
@GlobalChannelInterceptor(patterns = { "startPSChannel", "endDirectChannel" })
public ChannelInterceptor securityContextPropagationInterceptor() {
return new SecurityContextPropagationChannelInterceptor();
}
@Bean
public ThreadPoolTaskExecutor executor() {
ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
pool.setCorePoolSize(10);
pool.setMaxPoolSize(10);
pool.setWaitForTasksToCompleteOnShutdown(true);
return pool;
}
public String getRoles() {
SecurityContext securityContext = SecurityContextHolder.getContext();
return securityContext.getAuthentication().getAuthorities().stream().map(auth -> auth.getAuthority()).collect(Collectors.joining(","));
}
public String getUsername() {
SecurityContext securityContext = SecurityContextHolder.getContext();
return securityContext.getAuthentication().getName();
}
public Message<String> buildNewMessage(String content, Message<?> message) {
DefaultMessageBuilderFactory builderFactory = new DefaultMessageBuilderFactory();
MessageBuilder<String> messageBuilder = builderFactory.withPayload(content);
messageBuilder.copyHeaders(message.getHeaders());
return messageBuilder.build();
}
}
@@ -0,0 +1,45 @@
package com.baeldung.si.security;
import java.util.Collection;
import org.springframework.security.access.AccessDecisionVoter;
import org.springframework.security.access.ConfigAttribute;
import org.springframework.security.core.Authentication;
public class UsernameAccessDecisionVoter implements AccessDecisionVoter<Object> {
private String rolePrefix = "ROLE_";
@Override
public boolean supports(ConfigAttribute attribute) {
if ((attribute.getAttribute() != null)
&& !attribute.getAttribute().startsWith(rolePrefix)) {
return true;
}else {
return false;
}
}
@Override
public boolean supports(Class<?> clazz) {
return true;
}
@Override
public int vote(Authentication authentication, Object object, Collection<ConfigAttribute> attributes) {
if (authentication == null) {
return ACCESS_DENIED;
}
String name = authentication.getName();
int result = ACCESS_ABSTAIN;
for (ConfigAttribute attribute : attributes) {
if (this.supports(attribute)) {
result = ACCESS_DENIED;
if (attribute.getAttribute().equals(name)) {
return ACCESS_GRANTED;
}
}
}
return result;
}
}
@@ -0,0 +1,57 @@
package com.baeldung.subflows.discardflow;
import java.util.Collection;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
@EnableIntegration
@IntegrationComponentScan
public class FilterExample {
@MessagingGateway
public interface NumbersClassifier {
@Gateway(requestChannel = "classify.input")
void classify(Collection<Integer> numbers);
}
@Bean
QueueChannel multipleofThreeChannel() {
return new QueueChannel();
}
@Bean
QueueChannel remainderIsOneChannel() {
return new QueueChannel();
}
@Bean
QueueChannel remainderIsTwoChannel() {
return new QueueChannel();
}
boolean isMultipleOfThree(Integer number) {
return number % 3 == 0;
}
boolean isRemainderOne(Integer number) {
return number % 3 == 1;
}
boolean isRemainderTwo(Integer number) {
return number % 3 == 2;
}
@Bean
public IntegrationFlow classify() {
return flow -> flow.split()
.<Integer> filter(this::isMultipleOfThree, notMultiple -> notMultiple
.discardFlow(oneflow -> oneflow
.<Integer> filter(this::isRemainderOne,
twoflow -> twoflow .discardChannel("remainderIsTwoChannel"))
.channel("remainderIsOneChannel")))
.channel("multipleofThreeChannel");
}
}
@@ -0,0 +1,60 @@
package com.baeldung.subflows.publishsubscribechannel;
import java.util.Collection;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
@EnableIntegration
@IntegrationComponentScan
public class PublishSubscibeChannelExample {
@MessagingGateway
public interface NumbersClassifier {
@Gateway(requestChannel = "classify.input")
void classify(Collection<Integer> numbers);
}
@Bean
QueueChannel multipleofThreeChannel() {
return new QueueChannel();
}
@Bean
QueueChannel remainderIsOneChannel() {
return new QueueChannel();
}
@Bean
QueueChannel remainderIsTwoChannel() {
return new QueueChannel();
}
boolean isMultipleOfThree(Integer number) {
return number % 3 == 0;
}
boolean isRemainderOne(Integer number) {
return number % 3 == 1;
}
boolean isRemainderTwo(Integer number) {
return number % 3 == 2;
}
@Bean
public IntegrationFlow classify() {
return flow -> flow.split()
.publishSubscribeChannel(subscription -> subscription.subscribe(subflow -> subflow.<Integer> filter(this::isMultipleOfThree)
.channel("multipleofThreeChannel"))
.subscribe(subflow -> subflow.<Integer> filter(this::isRemainderOne)
.channel("remainderIsOneChannel"))
.subscribe(subflow -> subflow.<Integer> filter(this::isRemainderTwo)
.channel("remainderIsTwoChannel")));
}
}
@@ -0,0 +1,60 @@
package com.baeldung.subflows.routetorecipients;
import java.util.Collection;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
@EnableIntegration
@IntegrationComponentScan
public class RouteToRecipientsExample {
@MessagingGateway
public interface NumbersClassifier {
@Gateway(requestChannel = "classify.input")
void classify(Collection<Integer> numbers);
}
@Bean
QueueChannel multipleofThreeChannel() {
return new QueueChannel();
}
@Bean
QueueChannel remainderIsOneChannel() {
return new QueueChannel();
}
@Bean
QueueChannel remainderIsTwoChannel() {
return new QueueChannel();
}
boolean isMultipleOfThree(Integer number) {
return number % 3 == 0;
}
boolean isRemainderOne(Integer number) {
return number % 3 == 1;
}
boolean isRemainderTwo(Integer number) {
return number % 3 == 2;
}
@Bean
public IntegrationFlow classify() {
return flow -> flow.split()
.routeToRecipients(route -> route
.recipientFlow(subflow -> subflow
.<Integer> filter(this::isMultipleOfThree)
.channel("multipleofThreeChannel"))
.<Integer> recipient("remainderIsOneChannel",this::isRemainderOne)
.<Integer> recipient("remainderIsTwoChannel",this::isRemainderTwo));
}
}
@@ -0,0 +1,77 @@
package com.baeldung.subflows.separateflows;
import java.util.Collection;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
@EnableIntegration
@IntegrationComponentScan
public class SeparateFlowsExample {
@MessagingGateway
public interface NumbersClassifier {
@Gateway(requestChannel = "multipleOfThreeFlow.input")
void multipleofThree(Collection<Integer> numbers);
@Gateway(requestChannel = "remainderIsOneFlow.input")
void remainderIsOne(Collection<Integer> numbers);
@Gateway(requestChannel = "remainderIsTwoFlow.input")
void remainderIsTwo(Collection<Integer> numbers);
}
@Bean
QueueChannel multipleOfThreeChannel() {
return new QueueChannel();
}
@Bean
QueueChannel remainderIsOneChannel() {
return new QueueChannel();
}
@Bean
QueueChannel remainderIsTwoChannel() {
return new QueueChannel();
}
boolean isMultipleOfThree(Integer number) {
return number % 3 == 0;
}
boolean isRemainderOne(Integer number) {
return number % 3 == 1;
}
boolean isRemainderTwo(Integer number) {
return number % 3 == 2;
}
@Bean
public IntegrationFlow multipleOfThreeFlow() {
return flow -> flow.split()
.<Integer> filter(this::isMultipleOfThree)
.channel("multipleOfThreeChannel");
}
@Bean
public IntegrationFlow remainderIsOneFlow() {
return flow -> flow.split()
.<Integer> filter(this::isRemainderOne)
.channel("remainderIsOneChannel");
}
@Bean
public IntegrationFlow remainderIsTwoFlow() {
return flow -> flow.split()
.<Integer> filter(this::isRemainderTwo)
.channel("remainderIsTwoChannel");
}
}
@@ -0,0 +1,62 @@
package com.baeldung.subflows.subflowmapping;
import java.util.Collection;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
@EnableIntegration
@IntegrationComponentScan
public class RouterExample {
@MessagingGateway
public interface NumbersClassifier {
@Gateway(requestChannel = "classify.input")
void classify(Collection<Integer> numbers);
}
@Bean
QueueChannel multipleofThreeChannel() {
return new QueueChannel();
}
@Bean
QueueChannel remainderIsOneChannel() {
return new QueueChannel();
}
@Bean
QueueChannel remainderIsTwoChannel() {
return new QueueChannel();
}
boolean isMultipleOfThree(Integer number) {
return number % 3 == 0;
}
boolean isRemainderOne(Integer number) {
return number % 3 == 1;
}
boolean isRemainderTwo(Integer number) {
return number % 3 == 2;
}
@Bean
public IntegrationFlow classify() {
return flow -> flow.split()
.<Integer, Integer> route(number -> number % 3,
mapping -> mapping
.channelMapping(0, "multipleofThreeChannel")
.subFlowMapping(1, subflow -> subflow.channel("remainderIsOneChannel"))
.subFlowMapping(2, subflow -> subflow
.<Integer> handle((payload, headers) -> {
// do extra work on the payload
return payload;
}))).channel("remainderIsTwoChannel");
}
}
@@ -0,0 +1,27 @@
package com.baeldung.tx;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
public class ServiceActivator {
@Autowired
private JdbcTemplate jdbcTemplate;
private final Logger log = LoggerFactory.getLogger(this.getClass());
public void checkTestResults(String payload) {
this.jdbcTemplate.update("insert into STUDENT values(?)", payload);
if (payload.toLowerCase().startsWith("fail")) {
log.error("Service failure. Test result: {}", payload);
throw new RuntimeException("Service failure.");
}
log.info("Service success. Test result: {}", payload);
}
}
@@ -0,0 +1,155 @@
package com.baeldung.tx;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.annotation.Transformer;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.file.FileReadingMessageSource;
import org.springframework.integration.file.filters.SimplePatternFileListFilter;
import org.springframework.integration.file.transformer.FileToStringTransformer;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.integration.transaction.*;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
import org.springframework.messaging.MessageChannel;
import org.springframework.transaction.interceptor.TransactionInterceptor;
import javax.sql.DataSource;
import java.io.File;
import java.util.Scanner;
@Configuration
@EnableIntegration
public class TxIntegrationConfig {
private final Logger log = LoggerFactory.getLogger(this.getClass());
public final String INPUT_DIR = System.getProperty("java.io.tmpdir") + "/tx/";
public final String FILE_PATTERN = "*.txt";
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private DataSource dataSource;
@Autowired
private TransactionSynchronizationFactory transactionSynchronizationFactory;
@Autowired
DataSourceTransactionManager txManager;
@Bean
public MessageChannel inputChannel() {
return new DirectChannel();
}
@Bean
@InboundChannelAdapter(value = "inputChannel", poller = @Poller(value = "pollerMetadata"))
public MessageSource<File> fileReadingMessageSource() {
FileReadingMessageSource sourceReader = new FileReadingMessageSource();
sourceReader.setDirectory(new File(INPUT_DIR));
sourceReader.setFilter(new SimplePatternFileListFilter(FILE_PATTERN));
return sourceReader;
}
@Bean
public PollerMetadata pollerMetadata() {
return Pollers.fixedDelay(5000)
.advice(transactionInterceptor())
.transactionSynchronizationFactory(transactionSynchronizationFactory)
.get();
}
private TransactionInterceptor transactionInterceptor() {
return new TransactionInterceptorBuilder().transactionManager(txManager).build();
}
@Bean
public TransactionSynchronizationFactory transactionSynchronizationFactory() {
ExpressionEvaluatingTransactionSynchronizationProcessor processor =
new ExpressionEvaluatingTransactionSynchronizationProcessor();
SpelExpressionParser spelParser = new SpelExpressionParser();
processor.setAfterCommitExpression(
spelParser.parseExpression(
"payload.renameTo(new java.io.File(payload.absolutePath + '.PASSED'))"));
processor.setAfterRollbackExpression(
spelParser.parseExpression(
"payload.renameTo(new java.io.File(payload.absolutePath + '.FAILED'))"));
return new DefaultTransactionSynchronizationFactory(processor);
}
@Bean
@Transformer(inputChannel = "inputChannel", outputChannel = "toServiceChannel")
public FileToStringTransformer fileToStringTransformer() {
return new FileToStringTransformer();
}
@ServiceActivator(inputChannel = "toServiceChannel")
public void serviceActivator(String payload) {
jdbcTemplate.update("insert into STUDENT values(?)", payload);
if (payload.toLowerCase().startsWith("fail")) {
log.error("Service failure. Test result: {} ", payload);
throw new RuntimeException("Service failure.");
}
log.info("Service success. Test result: {}", payload);
}
@Bean
public JdbcTemplate jdbcTemplate(DataSource dataSource) {
return new JdbcTemplate(dataSource);
}
@Bean
public DataSource dataSource() {
return new EmbeddedDatabaseBuilder()
.setType(EmbeddedDatabaseType.H2)
.addScript("classpath:table.sql")
.build();
}
@Bean
public DataSourceTransactionManager txManager() {
return new DataSourceTransactionManager(dataSource);
}
public static void main(final String... args) {
final AbstractApplicationContext context = new AnnotationConfigApplicationContext(TxIntegrationConfig.class);
context.registerShutdownHook();
final Scanner scanner = new Scanner(System.in);
System.out.print("Integration flow is running. Type q + <enter> to quit ");
while (true) {
final String input = scanner.nextLine();
if ("q".equals(input.trim())) {
context.close();
scanner.close();
break;
}
}
System.exit(0);
}
}
@@ -0,0 +1,80 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xmlns:file="http://www.springframework.org/schema/integration/file"
xmlns:ftp="http://www.springframework.org/schema/integration/ftp"
xmlns:mail="http://www.springframework.org/schema/integration/mail"
xmlns:twitter="http://www.springframework.org/schema/integration/twitter"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
http://www.springframework.org/schema/integration/ftp
http://www.springframework.org/schema/integration/ftp/spring-integration-ftp.xsd
http://www.springframework.org/schema/integration/mail
http://www.springframework.org/schema/integration/mail/spring-integration-mail.xsd
http://www.springframework.org/schema/integration/twitter
http://www.springframework.org/schema/integration/twitter/spring-integration-twitter.xsd">
<int:publish-subscribe-channel id="videoChannel" />
<int:bridge input-channel="videoChannel" output-channel="ftpOutbound" />
<int:channel id="ftpOutbound" />
<int:bridge input-channel="videoChannel" output-channel="emailOutbound" />
<int:channel id="emailOutbound" />
<int:bridge output-channel="twitterOutbound" input-channel="videoChannel" />
<int:channel id="twitterOutbound" />
<twitter:outbound-channel-adapter
twitter-template="twitterTemplate" channel="twitterOutbound" />
<bean name="twitterTemplate"
class="org.springframework.social.twitter.api.impl.TwitterTemplate">
<constructor-arg index="0" value="${consumer-key}" />
<constructor-arg index="1" value="${consumer-secret}" />
<constructor-arg index="2" value="${access-token}" />
<constructor-arg index="3" value="${access-token-secret}" />
</bean>
<file:inbound-channel-adapter id="videoFolder"
directory="/studio/edits/video" channel="videoChannel"
prevent-duplicates="true">
<int:poller fixed-rate="1000" />
</file:inbound-channel-adapter>
<mail:outbound-channel-adapter channel="emailOutbound"
mail-sender="mailSender" />
<bean name="mailSender" class="org.springframework.mail.javamail.JavaMailSenderImpl">
<property name="host" value="smtp.gmail.com" />
<property name="port" value="587" />
<property name="username" value="yourUsername" />
<property name="password" value="yourPassword" />
</bean>
<ftp:outbound-channel-adapter channel="ftpOutbound"
remote-directory="/news/archive/video/" session-factory="sessionFactory" />
<bean name="sessionFactory"
class="org.springframework.integration.ftp.session.DefaultFtpSessionFactory">
<property name="host" value="localhost" />
<property name="port" value="587" />
<property name="username" value="yourUsername" />
<property name="password" value="yourPassword" />
</bean>
<int:bridge input-channel="videoChannel" output-channel="activatorChannel"/>
<int:channel id="activatorChannel"/>
<int:service-activator input-channel="activatorChannel" method="handleMessage" ref="customActivator"/>
<bean name="customActivator" class="com.baeldung.samples.endpoints.ActivatorImpl"/>
</beans>
@@ -0,0 +1,80 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xmlns:file="http://www.springframework.org/schema/integration/file"
xmlns:ftp="http://www.springframework.org/schema/integration/ftp"
xmlns:mail="http://www.springframework.org/schema/integration/mail"
xmlns:twitter="http://www.springframework.org/schema/integration/twitter"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
http://www.springframework.org/schema/integration/ftp
http://www.springframework.org/schema/integration/ftp/spring-integration-ftp.xsd
http://www.springframework.org/schema/integration/mail
http://www.springframework.org/schema/integration/mail/spring-integration-mail.xsd
http://www.springframework.org/schema/integration/twitter
http://www.springframework.org/schema/integration/twitter/spring-integration-twitter.xsd">
<int:publish-subscribe-channel id="videoChannel" />
<int:bridge input-channel="videoChannel" output-channel="ftpOutbound" />
<int:channel id="ftpOutbound" />
<int:bridge input-channel="videoChannel" output-channel="emailOutbound" />
<int:channel id="emailOutbound" />
<int:bridge output-channel="twitterOutbound" input-channel="videoChannel" />
<int:channel id="twitterOutbound" />
<twitter:outbound-channel-adapter
twitter-template="twitterTemplate" channel="twitterOutbound" />
<bean name="twitterTemplate"
class="org.springframework.social.twitter.api.impl.TwitterTemplate">
<constructor-arg index="0" value="${consumer-key}" />
<constructor-arg index="1" value="${consumer-secret}" />
<constructor-arg index="2" value="${access-token}" />
<constructor-arg index="3" value="${access-token-secret}" />
</bean>
<file:inbound-channel-adapter id="videoFolder"
directory="/studio/edits/video" channel="videoChannel"
prevent-duplicates="true">
<int:poller fixed-rate="1000" />
</file:inbound-channel-adapter>
<mail:outbound-channel-adapter channel="emailOutbound"
mail-sender="mailSender" />
<bean name="mailSender" class="org.springframework.mail.javamail.JavaMailSenderImpl">
<property name="host" value="smtp.gmail.com" />
<property name="port" value="587" />
<property name="username" value="yourUsername" />
<property name="password" value="yourPassword" />
</bean>
<ftp:outbound-channel-adapter channel="ftpOutbound"
remote-directory="/news/archive/video/" session-factory="sessionFactory" />
<bean name="sessionFactory"
class="org.springframework.integration.ftp.session.DefaultFtpSessionFactory">
<property name="host" value="localhost" />
<property name="port" value="587" />
<property name="username" value="yourUsername" />
<property name="password" value="yourPassword" />
</bean>
<int:bridge input-channel="videoChannel" output-channel="activatorChannel"/>
<int:channel id="activatorChannel"/>
<int:service-activator input-channel="activatorChannel" method="handleMessage" ref="customActivator"/>
<bean name="customActivator" class="com.baeldung.samples.endpoints.ActivatorImpl"/>
</beans>
@@ -0,0 +1,43 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:file="http://www.springframework.org/schema/integration/file"
xmlns:ftp="http://www.springframework.org/schema/integration/ftp"
xmlns:mail="http://www.springframework.org/schema/integration/mail"
xmlns:twitter="http://www.springframework.org/schema/integration/twitter"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
http://www.springframework.org/schema/integration/ftp
http://www.springframework.org/schema/integration/ftp/spring-integration-ftp.xsd
http://www.springframework.org/schema/integration/mail
http://www.springframework.org/schema/integration/mail/spring-integration-mail.xsd
http://www.springframework.org/schema/integration/twitter
http://www.springframework.org/schema/integration/twitter/spring-integration-twitter.xsd">
<file:inbound-channel-adapter id="videoFolder"
directory="C:\projects\baeldung\clean-start-tutorials\tutorials\spring-integration\src\main\resources\source"
channel="videoChannel"
prevent-duplicates="true">
<int:poller fixed-rate="1000"/>
</file:inbound-channel-adapter>
<file:outbound-channel-adapter id="destFileFolder"
directory="C:\projects\baeldung\clean-start-tutorials\tutorials\spring-integration\src\main\resources\destination"
channel="videoChannel"
>
</file:outbound-channel-adapter>
<int:channel id="videoChannel"/>
<!-- <int:service-activator input-channel="videoChannel" output-channel="destFileFolder" method="handleMessage"
ref="customActivator"/>
<bean name="customActivator" class="com.baeldung.samples.endpoints.ActivatorImpl"/>-->
</beans>
@@ -0,0 +1,62 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:file="http://www.springframework.org/schema/integration/file"
xmlns:ftp="http://www.springframework.org/schema/integration/ftp"
xmlns:mail="http://www.springframework.org/schema/integration/mail"
xmlns:twitter="http://www.springframework.org/schema/integration/twitter"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
http://www.springframework.org/schema/integration/ftp
http://www.springframework.org/schema/integration/ftp/spring-integration-ftp.xsd
http://www.springframework.org/schema/integration/mail
http://www.springframework.org/schema/integration/mail/spring-integration-mail.xsd
http://www.springframework.org/schema/integration/twitter
http://www.springframework.org/schema/integration/twitter/spring-integration-twitter.xsd">
<file:inbound-channel-adapter id="videoFolder"
directory="C:\projects\baeldung\clean-start-tutorials\tutorials\spring-integration\src\main\resources\source"
channel="videoChannel"
prevent-duplicates="true">
<int:poller fixed-rate="1000"/>
</file:inbound-channel-adapter>
<file:outbound-channel-adapter id="destFileFolder"
directory="C:\projects\baeldung\clean-start-tutorials\tutorials\spring-integration\src\main\resources\destination"
>
</file:outbound-channel-adapter>
<int:publish-subscribe-channel id="videoChannel" />
<int:channel id="emailOutbound" />
<mail:outbound-channel-adapter channel="emailOutbound"
mail-sender="mailSender" />
<file:file-to-bytes-transformer
input-channel="videoChannel"
output-channel="emailOutbound"
delete-files="false"/>
<mail:header-enricher input-channel="videoChannel" output-channel="emailOutbound">
<mail:attachment-filename value="test"/>
</mail:header-enricher>
<bean name="mailSender" class="org.springframework.mail.javamail.JavaMailSenderImpl">
<property name="host" value="smtp.gmail.com" />
<property name="port" value="587" />
<property name="username" value="${username}" />
<property name="password" value="${password}" />
</bean>
<int:bridge input-channel="videoChannel" output-channel="destFileFolder" />
<int:bridge input-channel="videoChannel" output-channel="emailOutbound" />
</beans>
@@ -0,0 +1,67 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://www.springframework.org/schema/beans"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xmlns:jdbc="http://www.springframework.org/schema/jdbc"
xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc"
xsi:schemaLocation="http://www.springframework.org/schema/jdbc https://www.springframework.org/schema/jdbc/spring-jdbc.xsd
http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file https://www.springframework.org/schema/integration/file/spring-integration-file.xsd
http://www.springframework.org/schema/integration/jdbc https://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc.xsd
http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd">
<context:annotation-config/>
<context:property-placeholder/>
<int-file:inbound-channel-adapter
channel="inputChannel"
auto-create-directory="true"
filename-pattern="*.txt"
directory="${java.io.tmpdir}/tx/">
<int:poller fixed-delay="500">
<int:transactional transaction-manager="txManager" synchronization-factory="syncFactory"/>
</int:poller>
</int-file:inbound-channel-adapter>
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit expression="payload.renameTo(new java.io.File(payload.absolutePath + '.PASSED'))"
channel="infoLogger"/>
<int:after-rollback expression="payload.renameTo(new java.io.File(payload.absolutePath + '.FAILED'))"
channel="errorLogger"/>
</int:transaction-synchronization-factory>
<int:channel id="inputChannel"/>
<int-file:file-to-string-transformer input-channel="inputChannel" output-channel="toServiceChannel"/>
<int:service-activator input-channel="toServiceChannel"
ref="serviceActivator"
method="checkTestResults"/>
<int:logging-channel-adapter id="infoLogger" level="INFO"/>
<int:logging-channel-adapter id="errorLogger" level="ERROR"/>
<bean id="serviceActivator" class="com.baeldung.tx.ServiceActivator"/>
<bean id="txManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource"/>
</bean>
<jdbc:embedded-database id="dataSource" type="H2">
<jdbc:script location="classpath*:table.sql"/>
</jdbc:embedded-database>
<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
<property name="dataSource" ref="dataSource"/>
</bean>
<int-jdbc:inbound-channel-adapter channel="infoLogger"
query="select TEST from STUDENT" data-source="dataSource">
<int:poller fixed-delay="5000"/>
</int-jdbc:inbound-channel-adapter>
</beans>
@@ -0,0 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration>
Binary file not shown.

After

Width:  |  Height:  |  Size: 760 KiB

@@ -0,0 +1,4 @@
CREATE TABLE IF NOT EXISTS STUDENT (
TEST VARCHAR(256)
);
@@ -0,0 +1,41 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.baeldung.samples;
import org.junit.Test;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public final class FileCopyIntegrationTest {
//
@Test
public void whenFileCopyConfiguration_thanFileCopiedSuccessfully() throws InterruptedException {
final AbstractApplicationContext context = new AnnotationConfigApplicationContext(FileCopyConfig.class.getCanonicalName());
context.registerShutdownHook();
Thread.sleep(5000);
}
@Test
public void publish() throws InterruptedException {
final AbstractApplicationContext context = new ClassPathXmlApplicationContext("classpath:META-INF/spring/integration/spring-integration-file-publish-context.xml");
Thread.sleep(15000);
}
}
@@ -0,0 +1,68 @@
package com.baeldung.si;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.security.test.context.support.WithMockUser;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.baeldung.si.security.MessageConsumer;
import com.baeldung.si.security.SecurityConfig;
import com.baeldung.si.security.SecurityPubSubChannel;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = { SecurityPubSubChannel.class, MessageConsumer.class, SecurityConfig.class })
public class TestSpringIntegrationSecurityExecutorIntegrationTest {
@Autowired
SubscribableChannel startPSChannel;
@Autowired
MessageConsumer messageConsumer;
@Autowired
ThreadPoolTaskExecutor executor;
final String DIRECT_CHANNEL_MESSAGE = "Direct channel message";
@Before
public void clearData() {
messageConsumer.setMessagePSContent(new ConcurrentHashMap<>());
executor.setWaitForTasksToCompleteOnShutdown(true);
}
@Test
@WithMockUser(username = "user", roles = { "VIEWER" })
public void givenRoleUser_whenSendMessageToPSChannel_thenNoMessageArrived() throws IllegalStateException, InterruptedException {
startPSChannel.send(new GenericMessage<String>(DIRECT_CHANNEL_MESSAGE));
executor.getThreadPoolExecutor().awaitTermination(2, TimeUnit.SECONDS);
assertEquals(1, messageConsumer.getMessagePSContent().size());
assertTrue(messageConsumer.getMessagePSContent().values().contains("user"));
}
@Test
@WithMockUser(username = "user", roles = { "LOGGER", "VIEWER" })
public void givenRoleUserAndLogger_whenSendMessageToPSChannel_then2GetMessages() throws IllegalStateException, InterruptedException {
startPSChannel.send(new GenericMessage<String>(DIRECT_CHANNEL_MESSAGE));
executor.getThreadPoolExecutor().awaitTermination(2, TimeUnit.SECONDS);
assertEquals(2, messageConsumer.getMessagePSContent().size());
assertTrue(messageConsumer.getMessagePSContent().values().contains("user"));
assertTrue(messageConsumer.getMessagePSContent().values().contains("ROLE_LOGGER,ROLE_VIEWER"));
}
}
@@ -0,0 +1,81 @@
package com.baeldung.si;
import static org.junit.Assert.assertEquals;
import org.hamcrest.core.IsInstanceOf;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.security.access.AccessDeniedException;
import org.springframework.security.authentication.AuthenticationCredentialsNotFoundException;
import org.springframework.security.test.context.support.WithMockUser;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.baeldung.si.security.MessageConsumer;
import com.baeldung.si.security.SecuredDirectChannel;
import com.baeldung.si.security.SecurityConfig;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = { SecurityConfig.class, SecuredDirectChannel.class, MessageConsumer.class })
public class TestSpringIntegrationSecurityIntegrationTest {
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Autowired
SubscribableChannel startDirectChannel;
@Autowired
MessageConsumer messageConsumer;
final String DIRECT_CHANNEL_MESSAGE = "Direct channel message";
@Test(expected = AuthenticationCredentialsNotFoundException.class)
public void givenNoUser_whenSendToDirectChannel_thenCredentialNotFound() {
startDirectChannel.send(new GenericMessage<String>(DIRECT_CHANNEL_MESSAGE));
}
@Test
@WithMockUser(username = "jane", roles = { "LOGGER" })
public void givenRoleLogger_whenSendMessageToDirectChannel_thenAccessDenied() {
expectedException.expectCause(IsInstanceOf.<Throwable> instanceOf(AccessDeniedException.class));
startDirectChannel.send(new GenericMessage<String>(DIRECT_CHANNEL_MESSAGE));
}
@Test
@WithMockUser(username = "jane")
public void givenJane_whenSendMessageToDirectChannel_thenAccessDenied() {
expectedException.expectCause(IsInstanceOf.<Throwable> instanceOf(AccessDeniedException.class));
startDirectChannel.send(new GenericMessage<String>(DIRECT_CHANNEL_MESSAGE));
}
@Test
@WithMockUser(roles = { "VIEWER" })
public void givenRoleViewer_whenSendToDirectChannel_thenAccessDenied() {
expectedException.expectCause(IsInstanceOf.<Throwable> instanceOf(AccessDeniedException.class));
startDirectChannel.send(new GenericMessage<String>(DIRECT_CHANNEL_MESSAGE));
}
@Test
@WithMockUser(roles = { "LOGGER", "VIEWER", "EDITOR" })
public void givenRoleLoggerAndUser_whenSendMessageToDirectChannel_thenFlowCompletedSuccessfully() {
startDirectChannel.send(new GenericMessage<String>(DIRECT_CHANNEL_MESSAGE));
assertEquals(DIRECT_CHANNEL_MESSAGE, messageConsumer.getMessageContent());
}
@Test
@WithMockUser(username = "jane", roles = { "LOGGER", "EDITOR" })
public void givenJaneLoggerEditor_whenSendToDirectChannel_thenFlowCompleted() {
startDirectChannel.send(new GenericMessage<String>(DIRECT_CHANNEL_MESSAGE));
assertEquals(DIRECT_CHANNEL_MESSAGE, messageConsumer.getMessageContent());
}
}
@@ -0,0 +1,62 @@
package com.baeldung.subflows.discardflow;
import static org.junit.Assert.assertEquals;
import java.util.Arrays;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.messaging.Message;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.baeldung.subflows.discardflow.FilterExample.NumbersClassifier;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = { FilterExample.class })
public class FilterUnitTest {
@Autowired
private QueueChannel multipleofThreeChannel;
@Autowired
private QueueChannel remainderIsOneChannel;
@Autowired
private QueueChannel remainderIsTwoChannel;
@Autowired
private NumbersClassifier numbersClassifier;
@Test
public void whenSendMessagesToFlow_thenNumbersAreClassified() {
numbersClassifier.classify(Arrays.asList(1, 2, 3, 4, 5, 6));
Message<?> outMessage = multipleofThreeChannel.receive(0);
assertEquals(outMessage.getPayload(), 3);
outMessage = multipleofThreeChannel.receive(0);
assertEquals(outMessage.getPayload(), 6);
outMessage = remainderIsOneChannel.receive(0);
assertEquals(outMessage.getPayload(), 1);
outMessage = remainderIsOneChannel.receive(0);
assertEquals(outMessage.getPayload(), 4);
outMessage = remainderIsTwoChannel.receive(0);
assertEquals(outMessage.getPayload(), 2);
outMessage = remainderIsTwoChannel.receive(0);
assertEquals(outMessage.getPayload(), 5);
}
}
@@ -0,0 +1,62 @@
package com.baeldung.subflows.publishsubscribechannel;
import static org.junit.Assert.assertEquals;
import java.util.Arrays;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.messaging.Message;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.baeldung.subflows.publishsubscribechannel.PublishSubscibeChannelExample.NumbersClassifier;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = { PublishSubscibeChannelExample.class })
public class PublishSubscribeChannelUnitTest {
@Autowired
private QueueChannel multipleofThreeChannel;
@Autowired
private QueueChannel remainderIsOneChannel;
@Autowired
private QueueChannel remainderIsTwoChannel;
@Autowired
private NumbersClassifier numbersClassifier;
@Test
public void whenSendMessagesToFlow_thenNumbersAreClassified() {
numbersClassifier.classify(Arrays.asList(1, 2, 3, 4, 5, 6));
Message<?> outMessage = multipleofThreeChannel.receive(0);
assertEquals(outMessage.getPayload(), 3);
outMessage = multipleofThreeChannel.receive(0);
assertEquals(outMessage.getPayload(), 6);
outMessage = remainderIsOneChannel.receive(0);
assertEquals(outMessage.getPayload(), 1);
outMessage = remainderIsOneChannel.receive(0);
assertEquals(outMessage.getPayload(), 4);
outMessage = remainderIsTwoChannel.receive(0);
assertEquals(outMessage.getPayload(), 2);
outMessage = remainderIsTwoChannel.receive(0);
assertEquals(outMessage.getPayload(), 5);
}
}
@@ -0,0 +1,63 @@
package com.baeldung.subflows.routetorecipients;
import static org.junit.Assert.assertEquals;
import java.util.Arrays;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.messaging.Message;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.baeldung.subflows.routetorecipients.RouteToRecipientsExample;
import com.baeldung.subflows.routetorecipients.RouteToRecipientsExample.NumbersClassifier;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = { RouteToRecipientsExample.class })
public class RouteToRecipientsUnitTest {
@Autowired
private QueueChannel multipleofThreeChannel;
@Autowired
private QueueChannel remainderIsOneChannel;
@Autowired
private QueueChannel remainderIsTwoChannel;
@Autowired
private NumbersClassifier numbersClassifier;
@Test
public void whenSendMessagesToFlow_thenNumbersAreClassified() {
numbersClassifier.classify(Arrays.asList(1, 2, 3, 4, 5, 6));
Message<?> outMessage = multipleofThreeChannel.receive(0);
assertEquals(outMessage.getPayload(), 3);
outMessage = multipleofThreeChannel.receive(0);
assertEquals(outMessage.getPayload(), 6);
outMessage = remainderIsOneChannel.receive(0);
assertEquals(outMessage.getPayload(), 1);
outMessage = remainderIsOneChannel.receive(0);
assertEquals(outMessage.getPayload(), 4);
outMessage = remainderIsTwoChannel.receive(0);
assertEquals(outMessage.getPayload(), 2);
outMessage = remainderIsTwoChannel.receive(0);
assertEquals(outMessage.getPayload(), 5);
}
}
@@ -0,0 +1,75 @@
package com.baeldung.subflows.separateflows;
import java.util.Arrays;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.messaging.Message;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import com.baeldung.subflows.separateflows.SeparateFlowsExample.NumbersClassifier;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = { SeparateFlowsExample.class })
public class SeparateFlowsUnitTest {
@Autowired
private QueueChannel multipleOfThreeChannel;
@Autowired
private QueueChannel remainderIsOneChannel;
@Autowired
private QueueChannel remainderIsTwoChannel;
@Autowired
private NumbersClassifier numbersClassifier;
@Test
public void whenSendMessagesToMultipleOf3Flow_thenOutputMultiplesOf3() {
numbersClassifier.multipleofThree(Arrays.asList(1, 2, 3, 4, 5, 6));
Message<?> outMessage = multipleOfThreeChannel.receive(0);
assertEquals(outMessage.getPayload(), 3);
outMessage = multipleOfThreeChannel.receive(0);
assertEquals(outMessage.getPayload(), 6);
outMessage = multipleOfThreeChannel.receive(0);
assertNull(outMessage);
}
@Test
public void whenSendMessagesToRemainderIs1Flow_thenOutputRemainderIs1() {
numbersClassifier.remainderIsOne(Arrays.asList(1, 2, 3, 4, 5, 6));
Message<?> outMessage = remainderIsOneChannel.receive(0);
assertEquals(outMessage.getPayload(), 1);
outMessage = remainderIsOneChannel.receive(0);
assertEquals(outMessage.getPayload(), 4);
}
@Test
public void whenSendMessagesToRemainderIs2Flow_thenOutputRemainderIs2() {
numbersClassifier.remainderIsTwo(Arrays.asList(1, 2, 3, 4, 5, 6));
Message<?> outMessage = remainderIsTwoChannel.receive(0);
assertEquals(outMessage.getPayload(), 2);
outMessage = remainderIsTwoChannel.receive(0);
assertEquals(outMessage.getPayload(), 5);
}
}
@@ -0,0 +1,62 @@
package com.baeldung.subflows.subflowmapping;
import static org.junit.Assert.assertEquals;
import java.util.Arrays;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.messaging.Message;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.baeldung.subflows.subflowmapping.RouterExample.NumbersClassifier;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = { RouterExample.class })
public class RouterUnitTest {
@Autowired
private QueueChannel multipleofThreeChannel;
@Autowired
private QueueChannel remainderIsOneChannel;
@Autowired
private QueueChannel remainderIsTwoChannel;
@Autowired
private NumbersClassifier numbersClassifier;
@Test
public void whenSendMessagesToFlow_thenNumbersAreClassified() {
numbersClassifier.classify(Arrays.asList(1, 2, 3, 4, 5, 6));
Message<?> outMessage = multipleofThreeChannel.receive(0);
assertEquals(outMessage.getPayload(), 3);
outMessage = multipleofThreeChannel.receive(0);
assertEquals(outMessage.getPayload(), 6);
outMessage = remainderIsOneChannel.receive(0);
assertEquals(outMessage.getPayload(), 1);
outMessage = remainderIsOneChannel.receive(0);
assertEquals(outMessage.getPayload(), 4);
outMessage = remainderIsTwoChannel.receive(0);
assertEquals(outMessage.getPayload(), 2);
outMessage = remainderIsTwoChannel.receive(0);
assertEquals(outMessage.getPayload(), 5);
}
}
@@ -0,0 +1,56 @@
package com.baeldung.tx;
import org.junit.Assert;
import org.junit.Test;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
public final class TxIntegrationTest {
private static final String CONTEXT_CONFIG = "classpath:META-INF/spring/integration/spring-integration-tx-context.xml";
@Test
public void whenFileDoesntStartWithFail_thenTxSuccessful() throws InterruptedException, IOException {
final AbstractApplicationContext context =
new ClassPathXmlApplicationContext(CONTEXT_CONFIG);
String fileName = System.getProperty("java.io.tmpdir") + "/tx/test1.txt";
FileWriter fw = new FileWriter(fileName);
fw.write("PASSED!");
fw.close();
context.registerShutdownHook();
Thread.sleep(5000);
File file = new File(fileName + ".PASSED");
Assert.assertTrue(file.exists());
}
@Test
public void whenFileStartsWithFail_thenTxFailed() {
String fileName = System.getProperty("java.io.tmpdir") + "/tx/test2.txt";
try {
final AbstractApplicationContext context =
new ClassPathXmlApplicationContext(CONTEXT_CONFIG);
FileWriter fw = new FileWriter(fileName);
fw.write("FAILED!");
fw.close();
context.registerShutdownHook();
Thread.sleep(5000);
} catch (Exception e) {
// Exception is expected, do nothing
}
File file = new File(fileName + ".FAILED");
Assert.assertTrue(file.exists());
}
}
@@ -0,0 +1,21 @@
package org.baeldung;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.baeldung.si.security.MessageConsumer;
import com.baeldung.si.security.SecuredDirectChannel;
import com.baeldung.si.security.SecurityConfig;
import com.baeldung.si.security.SecurityPubSubChannel;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = { SecurityConfig.class, SecuredDirectChannel.class, SecurityPubSubChannel.class,
MessageConsumer.class })
public class SpringContextIntegrationTest {
@Test
public void whenSpringContextIsBootstrapped_thenNoExceptions() {
}
}
@@ -0,0 +1,21 @@
package org.baeldung;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.baeldung.si.security.MessageConsumer;
import com.baeldung.si.security.SecuredDirectChannel;
import com.baeldung.si.security.SecurityConfig;
import com.baeldung.si.security.SecurityPubSubChannel;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = { SecurityConfig.class, SecuredDirectChannel.class, SecurityPubSubChannel.class,
MessageConsumer.class })
public class SpringContextTest {
@Test
public void whenSpringContextIsBootstrapped_thenNoExceptions() {
}
}
@@ -0,0 +1,15 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
# Pattern of log message for console appender
<Pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</Pattern>
</layout>
</appender>
<logger name="org.springframework" level="ERROR"/>
<root level="INFO">
<appender-ref ref="stdout" />
</root>
</configuration>