BAEL-5636 Move the files from patterns/ into patterns-modules/ (#12742)

This commit is contained in:
bipster
2022-09-20 00:49:57 -04:00
committed by GitHub
parent b695aa51b5
commit 2842e5cee7
7 changed files with 0 additions and 0 deletions
@@ -0,0 +1,48 @@
package com.baeldung.seda.apachecamel;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.camel.Exchange;
import org.apache.camel.builder.ExpressionBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.processor.aggregate.AbstractListAggregationStrategy;
public class WordCountRoute extends RouteBuilder {
public static final String receiveTextUri = "seda:receiveText?concurrentConsumers=5";
public static final String splitWordsUri = "seda:splitWords?concurrentConsumers=5";
public static final String toLowerCaseUri = "seda:toLowerCase?concurrentConsumers=5";
public static final String countWordsUri = "seda:countWords?concurrentConsumers=5";
public static final String returnResponse = "mock:result";
@Override
public void configure() throws Exception {
from(receiveTextUri).to(splitWordsUri);
from(splitWordsUri).transform(ExpressionBuilder.bodyExpression(s -> s.toString()
.split(" ")))
.to(toLowerCaseUri);
from(toLowerCaseUri).split(body(), new StringListAggregationStrategy())
.transform(ExpressionBuilder.bodyExpression(body -> body.toString()
.toLowerCase()))
.end()
.to(countWordsUri);
from(countWordsUri).transform(ExpressionBuilder.bodyExpression(List.class, body -> body.stream()
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()))))
.to(returnResponse);
}
}
class StringListAggregationStrategy extends AbstractListAggregationStrategy<String> {
@Override
public String getValue(Exchange exchange) {
return exchange.getIn()
.getBody(String.class);
}
}
@@ -0,0 +1,56 @@
package com.baeldung.seda.springintegration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.messaging.MessageChannel;
@Configuration
public class ChannelConfiguration {
private final TaskExecutor receiveTextChannelThreadPool;
private final TaskExecutor splitWordsChannelThreadPool;
private final TaskExecutor toLowerCaseChannelThreadPool;
private final TaskExecutor countWordsChannelThreadPool;
private final TaskExecutor returnResponseChannelThreadPool;
public ChannelConfiguration(TaskExecutor receiveTextChannelThreadPool, TaskExecutor splitWordsChannelThreadPool, TaskExecutor toLowerCaseChannelThreadPool, TaskExecutor countWordsChannelThreadPool, TaskExecutor returnResponseChannelThreadPool) {
this.receiveTextChannelThreadPool = receiveTextChannelThreadPool;
this.splitWordsChannelThreadPool = splitWordsChannelThreadPool;
this.toLowerCaseChannelThreadPool = toLowerCaseChannelThreadPool;
this.countWordsChannelThreadPool = countWordsChannelThreadPool;
this.returnResponseChannelThreadPool = returnResponseChannelThreadPool;
}
@Bean(name = "receiveTextChannel")
public MessageChannel getReceiveTextChannel() {
return MessageChannels.executor("receive-text", receiveTextChannelThreadPool)
.get();
}
@Bean(name = "splitWordsChannel")
public MessageChannel getSplitWordsChannel() {
return MessageChannels.executor("split-words", splitWordsChannelThreadPool)
.get();
}
@Bean(name = "toLowerCaseChannel")
public MessageChannel getToLowerCaseChannel() {
return MessageChannels.executor("to-lower-case", toLowerCaseChannelThreadPool)
.get();
}
@Bean(name = "countWordsChannel")
public MessageChannel getCountWordsChannel() {
return MessageChannels.executor("count-words", countWordsChannelThreadPool)
.get();
}
@Bean(name = "returnResponseChannel")
public MessageChannel getReturnResponseChannel() {
return MessageChannels.executor("return-response", returnResponseChannelThreadPool)
.get();
}
}
@@ -0,0 +1,81 @@
package com.baeldung.seda.springintegration;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.aggregator.MessageGroupProcessor;
import org.springframework.integration.aggregator.ReleaseStrategy;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
@Configuration
@EnableIntegration
public class IntegrationConfiguration {
private final MessageChannel receiveTextChannel;
private final MessageChannel splitWordsChannel;
private final MessageChannel toLowerCaseChannel;
private final MessageChannel countWordsChannel;
private final MessageChannel returnResponseChannel;
private final Function<String, String[]> splitWordsFunction = sentence -> sentence.split(" ");
private final Function<List<String>, Map<String, Long>> convertArrayListToCountMap = list -> list.stream()
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
private final Function<String, String> toLowerCase = String::toLowerCase;
private final MessageGroupProcessor buildMessageWithListPayload = messageGroup -> MessageBuilder.withPayload(messageGroup.streamMessages()
.map(Message::getPayload)
.collect(Collectors.toList()))
.build();
private final ReleaseStrategy listSizeReached = r -> r.size() == r.getSequenceSize();
public IntegrationConfiguration(MessageChannel receiveTextChannel, MessageChannel splitWordsChannel, MessageChannel toLowerCaseChannel, MessageChannel countWordsChannel, MessageChannel returnResponseChannel) {
this.receiveTextChannel = receiveTextChannel;
this.splitWordsChannel = splitWordsChannel;
this.toLowerCaseChannel = toLowerCaseChannel;
this.countWordsChannel = countWordsChannel;
this.returnResponseChannel = returnResponseChannel;
}
@Bean
public IntegrationFlow receiveText() {
return IntegrationFlows.from(receiveTextChannel)
.channel(splitWordsChannel)
.get();
}
@Bean
public IntegrationFlow splitWords() {
return IntegrationFlows.from(splitWordsChannel)
.transform(splitWordsFunction)
.channel(toLowerCaseChannel)
.get();
}
@Bean
public IntegrationFlow toLowerCase() {
return IntegrationFlows.from(toLowerCaseChannel)
.split()
.transform(toLowerCase)
.aggregate(aggregatorSpec -> aggregatorSpec.releaseStrategy(listSizeReached)
.outputProcessor(buildMessageWithListPayload))
.channel(countWordsChannel)
.get();
}
@Bean
public IntegrationFlow countWords() {
return IntegrationFlows.from(countWordsChannel)
.transform(convertArrayListToCountMap)
.channel(returnResponseChannel)
.get();
}
}
@@ -0,0 +1,61 @@
package com.baeldung.seda.springintegration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
public class TaskExecutorConfiguration {
@Bean("receiveTextChannelThreadPool")
public TaskExecutor receiveTextChannelThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(1);
executor.setMaxPoolSize(5);
executor.setThreadNamePrefix("receive-text-channel-thread-pool");
executor.initialize();
return executor;
}
@Bean("splitWordsChannelThreadPool")
public TaskExecutor splitWordsChannelThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(1);
executor.setMaxPoolSize(5);
executor.setThreadNamePrefix("split-words-channel-thread-pool");
executor.initialize();
return executor;
}
@Bean("toLowerCaseChannelThreadPool")
public TaskExecutor toLowerCaseChannelThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(1);
executor.setMaxPoolSize(5);
executor.setThreadNamePrefix("tto-lower-case-channel-thread-pool");
executor.initialize();
return executor;
}
@Bean("countWordsChannelThreadPool")
public TaskExecutor countWordsChannelThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(1);
executor.setMaxPoolSize(5);
executor.setThreadNamePrefix("count-words-channel-thread-pool");
executor.initialize();
return executor;
}
@Bean("returnResponseChannelThreadPool")
public TaskExecutor returnResponseChannelThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(1);
executor.setMaxPoolSize(5);
executor.setThreadNamePrefix("return-response-channel-thread-pool");
executor.initialize();
return executor;
}
}
@@ -0,0 +1,12 @@
package com.baeldung.seda.springintegration;
import java.util.Map;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
@MessagingGateway
public interface TestGateway {
@Gateway(requestChannel = "receiveTextChannel", replyChannel = "returnResponseChannel")
public Map<String, Long> countWords(String test);
}