1
0
mirror of synced 2026-05-22 21:33:16 +00:00

Merge branch '5.8.x' into 6.0.x

Closes gh-13882
This commit is contained in:
Marcus Da Coregio
2023-09-29 11:46:00 -03:00
1152 changed files with 10637 additions and 8548 deletions
@@ -84,8 +84,9 @@ public class AnonymousPayloadInterceptor implements PayloadInterceptor, Ordered
return ReactiveSecurityContextHolder.getContext().switchIfEmpty(Mono.defer(() -> {
AnonymousAuthenticationToken authentication = new AnonymousAuthenticationToken(this.key, this.principal,
this.authorities);
return chain.next(exchange).contextWrite(ReactiveSecurityContextHolder.withAuthentication(authentication))
.then(Mono.empty());
return chain.next(exchange)
.contextWrite(ReactiveSecurityContextHolder.withAuthentication(authentication))
.then(Mono.empty());
})).flatMap((securityContext) -> chain.next(exchange));
}
@@ -51,19 +51,19 @@ import org.springframework.util.MimeTypeUtils;
public class AuthenticationPayloadExchangeConverter implements PayloadExchangeAuthenticationConverter {
private static final MimeType COMPOSITE_METADATA_MIME_TYPE = MimeTypeUtils
.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString());
.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString());
private static final MimeType AUTHENTICATION_MIME_TYPE = MimeTypeUtils
.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_AUTHENTICATION.getString());
.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_AUTHENTICATION.getString());
private final MetadataExtractor metadataExtractor = createDefaultExtractor();
@Override
public Mono<Authentication> convert(PayloadExchange exchange) {
return Mono
.fromCallable(() -> this.metadataExtractor.extract(exchange.getPayload(),
AuthenticationPayloadExchangeConverter.COMPOSITE_METADATA_MIME_TYPE))
.flatMap((metadata) -> Mono.justOrEmpty(authentication(metadata)));
.fromCallable(() -> this.metadataExtractor.extract(exchange.getPayload(),
AuthenticationPayloadExchangeConverter.COMPOSITE_METADATA_MIME_TYPE))
.flatMap((metadata) -> Mono.justOrEmpty(authentication(metadata)));
}
private Authentication authentication(Map<String, Object> metadata) {
@@ -72,9 +72,10 @@ public class AuthenticationPayloadInterceptor implements PayloadInterceptor, Ord
@Override
public Mono<Void> intercept(PayloadExchange exchange, PayloadInterceptorChain chain) {
return this.authenticationConverter.convert(exchange).switchIfEmpty(chain.next(exchange).then(Mono.empty()))
.flatMap((a) -> this.authenticationManager.authenticate(a))
.flatMap((a) -> onAuthenticationSuccess(chain.next(exchange), a));
return this.authenticationConverter.convert(exchange)
.switchIfEmpty(chain.next(exchange).then(Mono.empty()))
.flatMap((a) -> this.authenticationManager.authenticate(a))
.flatMap((a) -> onAuthenticationSuccess(chain.next(exchange), a));
}
private Mono<Void> onAuthenticationSuccess(Mono<Void> payload, Authentication authentication) {
@@ -40,17 +40,18 @@ import org.springframework.util.MimeTypeUtils;
public class BasicAuthenticationPayloadExchangeConverter implements PayloadExchangeAuthenticationConverter {
private MimeType metadataMimetype = MimeTypeUtils
.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString());
.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString());
private MetadataExtractor metadataExtractor = createDefaultExtractor();
@Override
public Mono<Authentication> convert(PayloadExchange exchange) {
return Mono.fromCallable(() -> this.metadataExtractor.extract(exchange.getPayload(), this.metadataMimetype))
.flatMap((metadata) -> Mono
.justOrEmpty(metadata.get(UsernamePasswordMetadata.BASIC_AUTHENTICATION_MIME_TYPE.toString())))
.cast(UsernamePasswordMetadata.class).map((credentials) -> UsernamePasswordAuthenticationToken
.unauthenticated(credentials.getUsername(), credentials.getPassword()));
.flatMap((metadata) -> Mono
.justOrEmpty(metadata.get(UsernamePasswordMetadata.BASIC_AUTHENTICATION_MIME_TYPE.toString())))
.cast(UsernamePasswordMetadata.class)
.map((credentials) -> UsernamePasswordAuthenticationToken.unauthenticated(credentials.getUsername(),
credentials.getPassword()));
}
private static MetadataExtractor createDefaultExtractor() {
@@ -56,12 +56,13 @@ public class AuthorizationPayloadInterceptor implements PayloadInterceptor, Orde
@Override
public Mono<Void> intercept(PayloadExchange exchange, PayloadInterceptorChain chain) {
return ReactiveSecurityContextHolder.getContext().filter((c) -> c.getAuthentication() != null)
.map(SecurityContext::getAuthentication)
.switchIfEmpty(Mono.error(() -> new AuthenticationCredentialsNotFoundException(
"An Authentication (possibly AnonymousAuthenticationToken) is required.")))
.as((authentication) -> this.authorizationManager.verify(authentication, exchange))
.then(chain.next(exchange));
return ReactiveSecurityContextHolder.getContext()
.filter((c) -> c.getAuthentication() != null)
.map(SecurityContext::getAuthentication)
.switchIfEmpty(Mono.error(() -> new AuthenticationCredentialsNotFoundException(
"An Authentication (possibly AnonymousAuthenticationToken) is required.")))
.as((authentication) -> this.authorizationManager.verify(authentication, exchange))
.then(chain.next(exchange));
}
}
@@ -53,11 +53,14 @@ public final class PayloadExchangeMatcherReactiveAuthorizationManager
@Override
public Mono<AuthorizationDecision> check(Mono<Authentication> authentication, PayloadExchange exchange) {
return Flux.fromIterable(this.mappings)
.concatMap((mapping) -> mapping.getMatcher().matches(exchange)
.filter(PayloadExchangeMatcher.MatchResult::isMatch).map(MatchResult::getVariables)
.flatMap((variables) -> mapping.getEntry().check(authentication,
new PayloadExchangeAuthorizationContext(exchange, variables))))
.next().switchIfEmpty(Mono.fromCallable(() -> new AuthorizationDecision(false)));
.concatMap((mapping) -> mapping.getMatcher()
.matches(exchange)
.filter(PayloadExchangeMatcher.MatchResult::isMatch)
.map(MatchResult::getVariables)
.flatMap((variables) -> mapping.getEntry()
.check(authentication, new PayloadExchangeAuthorizationContext(exchange, variables))))
.next()
.switchIfEmpty(Mono.fromCallable(() -> new AuthorizationDecision(false)));
}
public static PayloadExchangeMatcherReactiveAuthorizationManager.Builder builder() {
@@ -72,28 +72,30 @@ class PayloadInterceptorRSocket extends RSocketProxy {
@Override
public Mono<Void> fireAndForget(Payload payload) {
return intercept(PayloadExchangeType.FIRE_AND_FORGET, payload)
.flatMap((context) -> this.source.fireAndForget(payload).contextWrite(context));
.flatMap((context) -> this.source.fireAndForget(payload).contextWrite(context));
}
@Override
public Mono<Payload> requestResponse(Payload payload) {
return intercept(PayloadExchangeType.REQUEST_RESPONSE, payload)
.flatMap((context) -> this.source.requestResponse(payload).contextWrite(context));
.flatMap((context) -> this.source.requestResponse(payload).contextWrite(context));
}
@Override
public Flux<Payload> requestStream(Payload payload) {
return intercept(PayloadExchangeType.REQUEST_STREAM, payload)
.flatMapMany((context) -> this.source.requestStream(payload).contextWrite(context));
.flatMapMany((context) -> this.source.requestStream(payload).contextWrite(context));
}
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
return Flux.from(payloads).switchOnFirst((signal, innerFlux) -> {
Payload firstPayload = signal.get();
return intercept(PayloadExchangeType.REQUEST_CHANNEL, firstPayload).flatMapMany((context) -> innerFlux
.index().concatMap((tuple) -> justOrIntercept(tuple.getT1(), tuple.getT2()))
.transform((securedPayloads) -> this.source.requestChannel(securedPayloads)).contextWrite(context));
return intercept(PayloadExchangeType.REQUEST_CHANNEL, firstPayload)
.flatMapMany((context) -> innerFlux.index()
.concatMap((tuple) -> justOrIntercept(tuple.getT1(), tuple.getT2()))
.transform((securedPayloads) -> this.source.requestChannel(securedPayloads))
.contextWrite(context));
});
}
@@ -104,7 +106,7 @@ class PayloadInterceptorRSocket extends RSocketProxy {
@Override
public Mono<Void> metadataPush(Payload payload) {
return intercept(PayloadExchangeType.METADATA_PUSH, payload)
.flatMap((c) -> this.source.metadataPush(payload).contextWrite(c));
.flatMap((c) -> this.source.metadataPush(payload).contextWrite(c));
}
private Mono<Context> intercept(PayloadExchangeType type, Payload payload) {
@@ -112,8 +114,10 @@ class PayloadInterceptorRSocket extends RSocketProxy {
ContextPayloadInterceptorChain chain = new ContextPayloadInterceptorChain(this.interceptors);
DefaultPayloadExchange exchange = new DefaultPayloadExchange(type, payload, this.metadataMimeType,
this.dataMimeType);
return chain.next(exchange).then(Mono.fromCallable(() -> chain.getContext()))
.defaultIfEmpty(Context.empty()).contextWrite(this.context);
return chain.next(exchange)
.then(Mono.fromCallable(() -> chain.getContext()))
.defaultIfEmpty(Context.empty())
.contextWrite(this.context);
});
}
@@ -48,7 +48,7 @@ class PayloadSocketAcceptor implements SocketAcceptor {
private MimeType defaultDataMimeType;
private MimeType defaultMetadataMimeType = MimeTypeUtils
.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString());
.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString());
PayloadSocketAcceptor(SocketAcceptor delegate, List<PayloadInterceptor> interceptors) {
Assert.notNull(delegate, "delegate cannot be null");
@@ -69,13 +69,11 @@ class PayloadSocketAcceptor implements SocketAcceptor {
MimeType metadataMimeType = parseMimeType(setup.metadataMimeType(), this.defaultMetadataMimeType);
Assert.notNull(metadataMimeType, "No `metadataMimeType` in ConnectionSetupPayload and no default value");
// FIXME do we want to make the sendingSocket available in the PayloadExchange
return intercept(setup, dataMimeType,
metadataMimeType)
.flatMap(
(ctx) -> this.delegate.accept(setup, sendingSocket)
.map((acceptingSocket) -> new PayloadInterceptorRSocket(acceptingSocket,
this.interceptors, metadataMimeType, dataMimeType, ctx))
.contextWrite(ctx));
return intercept(setup, dataMimeType, metadataMimeType)
.flatMap((ctx) -> this.delegate.accept(setup, sendingSocket)
.map((acceptingSocket) -> new PayloadInterceptorRSocket(acceptingSocket, this.interceptors,
metadataMimeType, dataMimeType, ctx))
.contextWrite(ctx));
}
private Mono<Context> intercept(Payload payload, MimeType dataMimeType, MimeType metadataMimeType) {
@@ -83,8 +81,9 @@ class PayloadSocketAcceptor implements SocketAcceptor {
ContextPayloadInterceptorChain chain = new ContextPayloadInterceptorChain(this.interceptors);
DefaultPayloadExchange exchange = new DefaultPayloadExchange(PayloadExchangeType.SETUP, payload,
metadataMimeType, dataMimeType);
return chain.next(exchange).then(Mono.fromCallable(() -> chain.getContext()))
.defaultIfEmpty(Context.empty());
return chain.next(exchange)
.then(Mono.fromCallable(() -> chain.getContext()))
.defaultIfEmpty(Context.empty());
});
}
@@ -42,7 +42,7 @@ public class PayloadSocketAcceptorInterceptor implements SocketAcceptorIntercept
private MimeType defaultDataMimeType;
private MimeType defaultMetadataMimeType = MimeTypeUtils
.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString());
.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString());
public PayloadSocketAcceptorInterceptor(List<PayloadInterceptor> interceptors) {
this.interceptors = interceptors;
@@ -49,7 +49,7 @@ public class BasicAuthenticationEncoder extends AbstractEncoder<UsernamePassword
public Flux<DataBuffer> encode(Publisher<? extends UsernamePasswordMetadata> inputStream,
DataBufferFactory bufferFactory, ResolvableType elementType, MimeType mimeType, Map<String, Object> hints) {
return Flux.from(inputStream)
.map((credentials) -> encodeValue(credentials, bufferFactory, elementType, mimeType, hints));
.map((credentials) -> encodeValue(credentials, bufferFactory, elementType, mimeType, hints));
}
@Override
@@ -43,7 +43,7 @@ import org.springframework.util.MimeTypeUtils;
public class BearerTokenAuthenticationEncoder extends AbstractEncoder<BearerTokenMetadata> {
private static final MimeType AUTHENTICATION_MIME_TYPE = MimeTypeUtils
.parseMimeType("message/x.rsocket.authentication.v0");
.parseMimeType("message/x.rsocket.authentication.v0");
private NettyDataBufferFactory defaultBufferFactory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
@@ -55,7 +55,7 @@ public class BearerTokenAuthenticationEncoder extends AbstractEncoder<BearerToke
public Flux<DataBuffer> encode(Publisher<? extends BearerTokenMetadata> inputStream,
DataBufferFactory bufferFactory, ResolvableType elementType, MimeType mimeType, Map<String, Object> hints) {
return Flux.from(inputStream)
.map((credentials) -> encodeValue(credentials, bufferFactory, elementType, mimeType, hints));
.map((credentials) -> encodeValue(credentials, bufferFactory, elementType, mimeType, hints));
}
@Override
@@ -43,7 +43,7 @@ import org.springframework.util.MimeTypeUtils;
public class SimpleAuthenticationEncoder extends AbstractEncoder<UsernamePasswordMetadata> {
private static final MimeType AUTHENTICATION_MIME_TYPE = MimeTypeUtils
.parseMimeType("message/x.rsocket.authentication.v0");
.parseMimeType("message/x.rsocket.authentication.v0");
private NettyDataBufferFactory defaultBufferFactory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
@@ -55,7 +55,7 @@ public class SimpleAuthenticationEncoder extends AbstractEncoder<UsernamePasswor
public Flux<DataBuffer> encode(Publisher<? extends UsernamePasswordMetadata> inputStream,
DataBufferFactory bufferFactory, ResolvableType elementType, MimeType mimeType, Map<String, Object> hints) {
return Flux.from(inputStream)
.map((credentials) -> encodeValue(credentials, bufferFactory, elementType, mimeType, hints));
.map((credentials) -> encodeValue(credentials, bufferFactory, elementType, mimeType, hints));
}
@Override
@@ -52,9 +52,10 @@ public class RoutePayloadExchangeMatcher implements PayloadExchangeMatcher {
Map<String, Object> metadata = this.metadataExtractor.extract(exchange.getPayload(),
exchange.getMetadataMimeType());
return Optional.ofNullable((String) metadata.get(MetadataExtractor.ROUTE_KEY))
.map((routeValue) -> this.routeMatcher.parseRoute(routeValue))
.map((route) -> this.routeMatcher.matchAndExtract(this.pattern, route)).map((v) -> MatchResult.match(v))
.orElse(MatchResult.notMatch());
.map((routeValue) -> this.routeMatcher.parseRoute(routeValue))
.map((route) -> this.routeMatcher.matchAndExtract(this.pattern, route))
.map((v) -> MatchResult.match(v))
.orElse(MatchResult.notMatch());
}
}
@@ -75,7 +75,7 @@ public class AnonymousPayloadInterceptorTests {
public void constructorKeyPrincipalAuthoritiesWhenAuthoritiesNullThenException() {
List<GrantedAuthority> authorities = null;
assertThatIllegalArgumentException()
.isThrownBy(() -> new AnonymousPayloadInterceptor("key", "principal", authorities));
.isThrownBy(() -> new AnonymousPayloadInterceptor("key", "principal", authorities));
}
@Test
@@ -91,7 +91,8 @@ public class AnonymousPayloadInterceptorTests {
AuthenticationPayloadInterceptorChain chain = new AuthenticationPayloadInterceptorChain();
TestingAuthenticationToken expected = new TestingAuthenticationToken("test", "password");
this.interceptor.intercept(this.exchange, chain)
.contextWrite(ReactiveSecurityContextHolder.withAuthentication(expected)).block();
.contextWrite(ReactiveSecurityContextHolder.withAuthentication(expected))
.block();
Authentication authentication = chain.getAuthentication();
assertThat(authentication).isEqualTo(expected);
}
@@ -33,8 +33,10 @@ class AuthenticationPayloadInterceptorChain implements PayloadInterceptorChain {
@Override
public Mono<Void> next(PayloadExchange exchange) {
return ReactiveSecurityContextHolder.getContext().map(SecurityContext::getAuthentication)
.doOnNext((a) -> this.setAuthentication(a)).then();
return ReactiveSecurityContextHolder.getContext()
.map(SecurityContext::getAuthentication)
.doOnNext((a) -> this.setAuthentication(a))
.then();
}
Authentication getAuthentication() {
@@ -66,7 +66,7 @@ import static org.mockito.Mockito.verify;
public class AuthenticationPayloadInterceptorTests {
static final MimeType COMPOSITE_METADATA = MimeTypeUtils
.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString());
.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString());
@Mock
ReactiveAuthenticationManager authenticationManager;
@@ -89,8 +89,8 @@ public class AuthenticationPayloadInterceptorTests {
interceptor.intercept(exchange, authenticationPayloadChain).block();
Authentication authentication = authenticationPayloadChain.getAuthentication();
verify(this.authenticationManager).authenticate(this.authenticationArg.capture());
assertThat(this.authenticationArg.getValue()).isEqualToComparingFieldByField(
UsernamePasswordAuthenticationToken.unauthenticated("user", "password"));
assertThat(this.authenticationArg.getValue())
.isEqualToComparingFieldByField(UsernamePasswordAuthenticationToken.unauthenticated("user", "password"));
assertThat(authentication).isEqualTo(expectedAuthentication);
}
@@ -104,7 +104,8 @@ public class AuthenticationPayloadInterceptorTests {
PayloadInterceptorChain chain = mock(PayloadInterceptorChain.class);
given(chain.next(any())).willReturn(voidResult.mono());
StepVerifier.create(interceptor.intercept(exchange, chain))
.then(() -> assertThat(voidResult.subscribeCount()).isEqualTo(1)).verifyComplete();
.then(() -> assertThat(voidResult.subscribeCount()).isEqualTo(1))
.verifyComplete();
}
private Payload createRequestPayload() {
@@ -63,8 +63,8 @@ public class AuthorizationPayloadInterceptorTests {
AuthorizationPayloadInterceptor interceptor = new AuthorizationPayloadInterceptor(
AuthenticatedReactiveAuthorizationManager.authenticated());
StepVerifier.create(interceptor.intercept(this.exchange, this.chain))
.then(() -> this.chainResult.assertWasNotSubscribed())
.verifyError(AuthenticationCredentialsNotFoundException.class);
.then(() -> this.chainResult.assertWasNotSubscribed())
.verifyError(AuthenticationCredentialsNotFoundException.class);
}
@Test
@@ -73,7 +73,8 @@ public class AuthorizationPayloadInterceptorTests {
given(this.authorizationManager.verify(any(), any())).willReturn(this.managerResult.mono());
AuthorizationPayloadInterceptor interceptor = new AuthorizationPayloadInterceptor(this.authorizationManager);
StepVerifier.create(interceptor.intercept(this.exchange, this.chain))
.then(() -> this.chainResult.assertWasSubscribed()).verifyComplete();
.then(() -> this.chainResult.assertWasSubscribed())
.verifyComplete();
}
@Test
@@ -82,10 +83,11 @@ public class AuthorizationPayloadInterceptorTests {
AuthorizationPayloadInterceptor interceptor = new AuthorizationPayloadInterceptor(
AuthorityReactiveAuthorizationManager.hasRole("USER"));
Context userContext = ReactiveSecurityContextHolder
.withAuthentication(new TestingAuthenticationToken("user", "password"));
.withAuthentication(new TestingAuthenticationToken("user", "password"));
Mono<Void> intercept = interceptor.intercept(this.exchange, this.chain).contextWrite(userContext);
StepVerifier.create(intercept).then(() -> this.chainResult.assertWasNotSubscribed())
.verifyError(AccessDeniedException.class);
StepVerifier.create(intercept)
.then(() -> this.chainResult.assertWasNotSubscribed())
.verifyError(AccessDeniedException.class);
}
@Test
@@ -94,7 +96,7 @@ public class AuthorizationPayloadInterceptorTests {
AuthorizationPayloadInterceptor interceptor = new AuthorizationPayloadInterceptor(
AuthenticatedReactiveAuthorizationManager.authenticated());
Context userContext = ReactiveSecurityContextHolder
.withAuthentication(new TestingAuthenticationToken("user", "password", "ROLE_USER"));
.withAuthentication(new TestingAuthenticationToken("user", "password", "ROLE_USER"));
Mono<Void> intercept = interceptor.intercept(this.exchange, this.chain).contextWrite(userContext);
StepVerifier.create(intercept).then(() -> this.chainResult.assertWasSubscribed()).verifyComplete();
}
@@ -54,8 +54,9 @@ public class PayloadExchangeMatcherReactiveAuthorizationManagerTests {
AuthorizationDecision expected = new AuthorizationDecision(true);
given(this.authz.check(any(), any())).willReturn(Mono.just(expected));
PayloadExchangeMatcherReactiveAuthorizationManager manager = PayloadExchangeMatcherReactiveAuthorizationManager
.builder().add(new PayloadExchangeMatcherEntry<>(PayloadExchangeMatchers.anyExchange(), this.authz))
.build();
.builder()
.add(new PayloadExchangeMatcherEntry<>(PayloadExchangeMatchers.anyExchange(), this.authz))
.build();
assertThat(manager.check(Mono.empty(), this.exchange).block()).isEqualTo(expected);
}
@@ -64,8 +65,9 @@ public class PayloadExchangeMatcherReactiveAuthorizationManagerTests {
AuthorizationDecision expected = new AuthorizationDecision(false);
given(this.authz.check(any(), any())).willReturn(Mono.just(expected));
PayloadExchangeMatcherReactiveAuthorizationManager manager = PayloadExchangeMatcherReactiveAuthorizationManager
.builder().add(new PayloadExchangeMatcherEntry<>(PayloadExchangeMatchers.anyExchange(), this.authz))
.build();
.builder()
.add(new PayloadExchangeMatcherEntry<>(PayloadExchangeMatchers.anyExchange(), this.authz))
.build();
assertThat(manager.check(Mono.empty(), this.exchange).block()).isEqualTo(expected);
}
@@ -74,10 +76,10 @@ public class PayloadExchangeMatcherReactiveAuthorizationManagerTests {
AuthorizationDecision expected = new AuthorizationDecision(true);
given(this.authz.check(any(), any())).willReturn(Mono.just(expected));
PayloadExchangeMatcherReactiveAuthorizationManager manager = PayloadExchangeMatcherReactiveAuthorizationManager
.builder().add(new PayloadExchangeMatcherEntry<>(PayloadExchangeMatchers.anyExchange(), this.authz))
.add(new PayloadExchangeMatcherEntry<>((e) -> PayloadExchangeMatcher.MatchResult.notMatch(),
this.authz2))
.build();
.builder()
.add(new PayloadExchangeMatcherEntry<>(PayloadExchangeMatchers.anyExchange(), this.authz))
.add(new PayloadExchangeMatcherEntry<>((e) -> PayloadExchangeMatcher.MatchResult.notMatch(), this.authz2))
.build();
assertThat(manager.check(Mono.empty(), this.exchange).block()).isEqualTo(expected);
}
@@ -86,10 +88,10 @@ public class PayloadExchangeMatcherReactiveAuthorizationManagerTests {
AuthorizationDecision expected = new AuthorizationDecision(true);
given(this.authz2.check(any(), any())).willReturn(Mono.just(expected));
PayloadExchangeMatcherReactiveAuthorizationManager manager = PayloadExchangeMatcherReactiveAuthorizationManager
.builder()
.add(new PayloadExchangeMatcherEntry<>((e) -> PayloadExchangeMatcher.MatchResult.notMatch(),
this.authz))
.add(new PayloadExchangeMatcherEntry<>(PayloadExchangeMatchers.anyExchange(), this.authz2)).build();
.builder()
.add(new PayloadExchangeMatcherEntry<>((e) -> PayloadExchangeMatcher.MatchResult.notMatch(), this.authz))
.add(new PayloadExchangeMatcherEntry<>(PayloadExchangeMatchers.anyExchange(), this.authz2))
.build();
assertThat(manager.check(Mono.empty(), this.exchange).block()).isEqualTo(expected);
}
@@ -43,7 +43,8 @@ class CaptureSecurityContextSocketAcceptor implements SocketAcceptor {
@Override
public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket) {
return ReactiveSecurityContextHolder.getContext()
.doOnNext((securityContext) -> this.securityContext = securityContext).thenReturn(this.accept);
.doOnNext((securityContext) -> this.securityContext = securityContext)
.thenReturn(this.accept);
}
SecurityContext getSecurityContext() {
@@ -75,7 +75,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
public class PayloadInterceptorRSocketTests {
static final MimeType COMPOSITE_METADATA = MimeTypeUtils
.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString());
.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString());
@Mock
RSocket delegate;
@@ -129,8 +129,9 @@ public class PayloadInterceptorRSocketTests {
given(this.delegate.fireAndForget(any())).willReturn(this.voidResult.mono());
PayloadInterceptorRSocket interceptor = new PayloadInterceptorRSocket(this.delegate,
Arrays.asList(this.interceptor), this.metadataMimeType, this.dataMimeType);
StepVerifier.create(interceptor.fireAndForget(this.payload)).then(() -> this.voidResult.assertWasSubscribed())
.verifyComplete();
StepVerifier.create(interceptor.fireAndForget(this.payload))
.then(() -> this.voidResult.assertWasSubscribed())
.verifyComplete();
verify(this.interceptor).intercept(this.exchange.capture(), any());
assertThat(this.exchange.getValue().getPayload()).isEqualTo(this.payload);
}
@@ -142,8 +143,8 @@ public class PayloadInterceptorRSocketTests {
PayloadInterceptorRSocket interceptor = new PayloadInterceptorRSocket(this.delegate,
Arrays.asList(this.interceptor), this.metadataMimeType, this.dataMimeType);
StepVerifier.create(interceptor.fireAndForget(this.payload))
.then(() -> this.voidResult.assertWasNotSubscribed())
.verifyErrorSatisfies((e) -> assertThat(e).isEqualTo(expected));
.then(() -> this.voidResult.assertWasNotSubscribed())
.verifyErrorSatisfies((e) -> assertThat(e).isEqualTo(expected));
verify(this.interceptor).intercept(this.exchange.capture(), any());
assertThat(this.exchange.getValue().getPayload()).isEqualTo(this.payload);
}
@@ -174,8 +175,10 @@ public class PayloadInterceptorRSocketTests {
PayloadInterceptorRSocket interceptor = new PayloadInterceptorRSocket(this.delegate,
Arrays.asList(this.interceptor), this.metadataMimeType, this.dataMimeType);
StepVerifier.create(interceptor.requestResponse(this.payload))
.then(() -> this.payloadResult.assertSubscribers()).then(() -> this.payloadResult.emit(this.payload))
.expectNext(this.payload).verifyComplete();
.then(() -> this.payloadResult.assertSubscribers())
.then(() -> this.payloadResult.emit(this.payload))
.expectNext(this.payload)
.verifyComplete();
verify(this.interceptor).intercept(this.exchange.capture(), any());
assertThat(this.exchange.getValue().getPayload()).isEqualTo(this.payload);
verify(this.delegate).requestResponse(this.payload);
@@ -188,7 +191,8 @@ public class PayloadInterceptorRSocketTests {
PayloadInterceptorRSocket interceptor = new PayloadInterceptorRSocket(this.delegate,
Arrays.asList(this.interceptor), this.metadataMimeType, this.dataMimeType);
assertThatExceptionOfType(RuntimeException.class)
.isThrownBy(() -> interceptor.requestResponse(this.payload).block()).isEqualTo(expected);
.isThrownBy(() -> interceptor.requestResponse(this.payload).block())
.isEqualTo(expected);
verify(this.interceptor).intercept(this.exchange.capture(), any());
assertThat(this.exchange.getValue().getPayload()).isEqualTo(this.payload);
verifyNoMoreInteractions(this.delegate);
@@ -208,8 +212,10 @@ public class PayloadInterceptorRSocketTests {
PayloadInterceptorRSocket interceptor = new PayloadInterceptorRSocket(assertAuthentication,
Arrays.asList(this.interceptor), this.metadataMimeType, this.dataMimeType);
StepVerifier.create(interceptor.requestResponse(this.payload))
.then(() -> this.payloadResult.assertSubscribers()).then(() -> this.payloadResult.emit(this.payload))
.expectNext(this.payload).verifyComplete();
.then(() -> this.payloadResult.assertSubscribers())
.then(() -> this.payloadResult.emit(this.payload))
.expectNext(this.payload)
.verifyComplete();
verify(this.interceptor).intercept(this.exchange.capture(), any());
assertThat(this.exchange.getValue().getPayload()).isEqualTo(this.payload);
verify(this.delegate).requestResponse(this.payload);
@@ -221,8 +227,11 @@ public class PayloadInterceptorRSocketTests {
given(this.delegate.requestStream(any())).willReturn(this.payloadResult.flux());
PayloadInterceptorRSocket interceptor = new PayloadInterceptorRSocket(this.delegate,
Arrays.asList(this.interceptor), this.metadataMimeType, this.dataMimeType);
StepVerifier.create(interceptor.requestStream(this.payload)).then(() -> this.payloadResult.assertSubscribers())
.then(() -> this.payloadResult.emit(this.payload)).expectNext(this.payload).verifyComplete();
StepVerifier.create(interceptor.requestStream(this.payload))
.then(() -> this.payloadResult.assertSubscribers())
.then(() -> this.payloadResult.emit(this.payload))
.expectNext(this.payload)
.verifyComplete();
verify(this.interceptor).intercept(this.exchange.capture(), any());
assertThat(this.exchange.getValue().getPayload()).isEqualTo(this.payload);
}
@@ -234,8 +243,8 @@ public class PayloadInterceptorRSocketTests {
PayloadInterceptorRSocket interceptor = new PayloadInterceptorRSocket(this.delegate,
Arrays.asList(this.interceptor), this.metadataMimeType, this.dataMimeType);
StepVerifier.create(interceptor.requestStream(this.payload))
.then(() -> this.payloadResult.assertNoSubscribers())
.verifyErrorSatisfies((e) -> assertThat(e).isEqualTo(expected));
.then(() -> this.payloadResult.assertNoSubscribers())
.verifyErrorSatisfies((e) -> assertThat(e).isEqualTo(expected));
verify(this.interceptor).intercept(this.exchange.capture(), any());
assertThat(this.exchange.getValue().getPayload()).isEqualTo(this.payload);
}
@@ -253,8 +262,11 @@ public class PayloadInterceptorRSocketTests {
};
PayloadInterceptorRSocket interceptor = new PayloadInterceptorRSocket(assertAuthentication,
Arrays.asList(this.interceptor), this.metadataMimeType, this.dataMimeType);
StepVerifier.create(interceptor.requestStream(this.payload)).then(() -> this.payloadResult.assertSubscribers())
.then(() -> this.payloadResult.emit(this.payload)).expectNext(this.payload).verifyComplete();
StepVerifier.create(interceptor.requestStream(this.payload))
.then(() -> this.payloadResult.assertSubscribers())
.then(() -> this.payloadResult.emit(this.payload))
.expectNext(this.payload)
.verifyComplete();
verify(this.interceptor).intercept(this.exchange.capture(), any());
assertThat(this.exchange.getValue().getPayload()).isEqualTo(this.payload);
verify(this.delegate).requestStream(this.payload);
@@ -267,8 +279,10 @@ public class PayloadInterceptorRSocketTests {
PayloadInterceptorRSocket interceptor = new PayloadInterceptorRSocket(this.delegate,
Arrays.asList(this.interceptor), this.metadataMimeType, this.dataMimeType);
StepVerifier.create(interceptor.requestChannel(Flux.just(this.payload)))
.then(() -> this.payloadResult.assertSubscribers()).then(() -> this.payloadResult.emit(this.payload))
.expectNext(this.payload).verifyComplete();
.then(() -> this.payloadResult.assertSubscribers())
.then(() -> this.payloadResult.emit(this.payload))
.expectNext(this.payload)
.verifyComplete();
verify(this.interceptor).intercept(this.exchange.capture(), any());
assertThat(this.exchange.getValue().getPayload()).isEqualTo(this.payload);
verify(this.delegate).requestChannel(any());
@@ -284,10 +298,11 @@ public class PayloadInterceptorRSocketTests {
Context ctx = Context.empty();
Flux<Payload> payloads = this.payloadResult.flux();
given(this.interceptor.intercept(any(), any())).willReturn(Mono.empty())
.willReturn(Mono.error(() -> new AccessDeniedException("Access Denied")));
.willReturn(Mono.error(() -> new AccessDeniedException("Access Denied")));
given(this.delegate.requestChannel(any())).willAnswer((invocation) -> {
Flux<Payload> input = invocation.getArgument(0);
return Flux.from(input).switchOnFirst((signal, innerFlux) -> innerFlux.map(Payload::getDataUtf8)
return Flux.from(input)
.switchOnFirst((signal, innerFlux) -> innerFlux.map(Payload::getDataUtf8)
.transform((data) -> Flux.<String>create((emitter) -> {
Runnable run = () -> data.subscribe(new CoreSubscriber<String>() {
@Override
@@ -311,15 +326,16 @@ public class PayloadInterceptorRSocketTests {
}
});
executors.execute(run);
})).map(DefaultPayload::create));
}))
.map(DefaultPayload::create));
});
PayloadInterceptorRSocket interceptor = new PayloadInterceptorRSocket(this.delegate,
Arrays.asList(this.interceptor), this.metadataMimeType, this.dataMimeType, ctx);
StepVerifier.create(interceptor.requestChannel(payloads).doOnDiscard(Payload.class, Payload::release))
.then(() -> this.payloadResult.assertSubscribers())
.then(() -> this.payloadResult.emit(payload, payloadTwo, payloadThree))
.assertNext((next) -> assertThat(next.getDataUtf8()).isEqualTo(payload.getDataUtf8()))
.verifyError(AccessDeniedException.class);
.then(() -> this.payloadResult.assertSubscribers())
.then(() -> this.payloadResult.emit(payload, payloadTwo, payloadThree))
.assertNext((next) -> assertThat(next.getDataUtf8()).isEqualTo(payload.getDataUtf8()))
.verifyError(AccessDeniedException.class);
verify(this.interceptor, times(2)).intercept(this.exchange.capture(), any());
assertThat(this.exchange.getValue().getPayload()).isEqualTo(payloadTwo);
verify(this.delegate).requestChannel(any());
@@ -332,8 +348,8 @@ public class PayloadInterceptorRSocketTests {
PayloadInterceptorRSocket interceptor = new PayloadInterceptorRSocket(this.delegate,
Arrays.asList(this.interceptor), this.metadataMimeType, this.dataMimeType);
StepVerifier.create(interceptor.requestChannel(Flux.just(this.payload)))
.then(() -> this.payloadResult.assertNoSubscribers())
.verifyErrorSatisfies((e) -> assertThat(e).isEqualTo(expected));
.then(() -> this.payloadResult.assertNoSubscribers())
.verifyErrorSatisfies((e) -> assertThat(e).isEqualTo(expected));
verify(this.interceptor).intercept(this.exchange.capture(), any());
assertThat(this.exchange.getValue().getPayload()).isEqualTo(this.payload);
}
@@ -352,8 +368,11 @@ public class PayloadInterceptorRSocketTests {
};
PayloadInterceptorRSocket interceptor = new PayloadInterceptorRSocket(assertAuthentication,
Arrays.asList(this.interceptor), this.metadataMimeType, this.dataMimeType);
StepVerifier.create(interceptor.requestChannel(payload)).then(() -> this.payloadResult.assertSubscribers())
.then(() -> this.payloadResult.emit(this.payload)).expectNext(this.payload).verifyComplete();
StepVerifier.create(interceptor.requestChannel(payload))
.then(() -> this.payloadResult.assertSubscribers())
.then(() -> this.payloadResult.emit(this.payload))
.expectNext(this.payload)
.verifyComplete();
verify(this.interceptor).intercept(this.exchange.capture(), any());
assertThat(this.exchange.getValue().getPayload()).isEqualTo(this.payload);
verify(this.delegate).requestChannel(any());
@@ -365,8 +384,9 @@ public class PayloadInterceptorRSocketTests {
given(this.delegate.metadataPush(any())).willReturn(this.voidResult.mono());
PayloadInterceptorRSocket interceptor = new PayloadInterceptorRSocket(this.delegate,
Arrays.asList(this.interceptor), this.metadataMimeType, this.dataMimeType);
StepVerifier.create(interceptor.metadataPush(this.payload)).then(() -> this.voidResult.assertWasSubscribed())
.verifyComplete();
StepVerifier.create(interceptor.metadataPush(this.payload))
.then(() -> this.voidResult.assertWasSubscribed())
.verifyComplete();
verify(this.interceptor).intercept(this.exchange.capture(), any());
assertThat(this.exchange.getValue().getPayload()).isEqualTo(this.payload);
}
@@ -377,8 +397,9 @@ public class PayloadInterceptorRSocketTests {
given(this.interceptor.intercept(any(), any())).willReturn(Mono.error(expected));
PayloadInterceptorRSocket interceptor = new PayloadInterceptorRSocket(this.delegate,
Arrays.asList(this.interceptor), this.metadataMimeType, this.dataMimeType);
StepVerifier.create(interceptor.metadataPush(this.payload)).then(() -> this.voidResult.assertWasNotSubscribed())
.verifyErrorSatisfies((e) -> assertThat(e).isEqualTo(expected));
StepVerifier.create(interceptor.metadataPush(this.payload))
.then(() -> this.voidResult.assertWasNotSubscribed())
.verifyErrorSatisfies((e) -> assertThat(e).isEqualTo(expected));
verify(this.interceptor).intercept(this.exchange.capture(), any());
assertThat(this.exchange.getValue().getPayload()).isEqualTo(this.payload);
}
@@ -439,7 +460,8 @@ public class PayloadInterceptorRSocketTests {
PayloadInterceptorRSocket interceptor = new PayloadInterceptorRSocket(this.delegate,
Arrays.asList(this.interceptor, this.interceptor2), this.metadataMimeType, this.dataMimeType);
assertThatExceptionOfType(RuntimeException.class)
.isThrownBy(() -> interceptor.fireAndForget(this.payload).block()).isEqualTo(expected);
.isThrownBy(() -> interceptor.fireAndForget(this.payload).block())
.isEqualTo(expected);
verify(this.interceptor).intercept(this.exchange.capture(), any());
assertThat(this.exchange.getValue().getPayload()).isEqualTo(this.payload);
verifyNoMoreInteractions(this.interceptor2);
@@ -454,7 +476,8 @@ public class PayloadInterceptorRSocketTests {
PayloadInterceptorRSocket interceptor = new PayloadInterceptorRSocket(this.delegate,
Arrays.asList(this.interceptor, this.interceptor2), this.metadataMimeType, this.dataMimeType);
assertThatExceptionOfType(RuntimeException.class)
.isThrownBy(() -> interceptor.fireAndForget(this.payload).block()).isEqualTo(expected);
.isThrownBy(() -> interceptor.fireAndForget(this.payload).block())
.isEqualTo(expected);
verify(this.interceptor).intercept(this.exchange.capture(), any());
assertThat(this.exchange.getValue().getPayload()).isEqualTo(this.payload);
verify(this.interceptor2).intercept(any(), any());
@@ -462,16 +485,18 @@ public class PayloadInterceptorRSocketTests {
}
private Mono<Authentication> assertAuthentication(Authentication authentication) {
return ReactiveSecurityContextHolder.getContext().map(SecurityContext::getAuthentication)
.doOnNext((a) -> assertThat(a).isEqualTo(authentication));
return ReactiveSecurityContextHolder.getContext()
.map(SecurityContext::getAuthentication)
.doOnNext((a) -> assertThat(a).isEqualTo(authentication));
}
private Answer<Object> withAuthenticated(Authentication authentication) {
return (invocation) -> {
PayloadInterceptorChain c = (PayloadInterceptorChain) invocation.getArguments()[1];
return c.next(new DefaultPayloadExchange(PayloadExchangeType.REQUEST_CHANNEL, this.payload,
this.metadataMimeType, this.dataMimeType))
.contextWrite(ReactiveSecurityContextHolder.withAuthentication(authentication));
return c
.next(new DefaultPayloadExchange(PayloadExchangeType.REQUEST_CHANNEL, this.payload,
this.metadataMimeType, this.dataMimeType))
.contextWrite(ReactiveSecurityContextHolder.withAuthentication(authentication));
};
}
@@ -78,7 +78,7 @@ public class PayloadSocketAcceptorInterceptorTests {
given(this.setupPayload.dataMimeType()).willReturn(MediaType.APPLICATION_JSON_VALUE);
PayloadExchange exchange = captureExchange();
assertThat(exchange.getMetadataMimeType().toString())
.isEqualTo(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString());
.isEqualTo(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString());
assertThat(exchange.getDataMimeType()).isEqualTo(MediaType.APPLICATION_JSON);
}
@@ -96,7 +96,7 @@ public class PayloadSocketAcceptorInterceptorTests {
this.acceptorInterceptor.setDefaultDataMimeType(MediaType.APPLICATION_JSON);
PayloadExchange exchange = captureExchange();
assertThat(exchange.getMetadataMimeType().toString())
.isEqualTo(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString());
.isEqualTo(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString());
assertThat(exchange.getDataMimeType()).isEqualTo(MediaType.APPLICATION_JSON);
}
@@ -84,27 +84,27 @@ public class PayloadSocketAcceptorTests {
public void constructorWhenNullDelegateThenException() {
this.delegate = null;
assertThatIllegalArgumentException()
.isThrownBy(() -> new PayloadSocketAcceptor(this.delegate, this.interceptors));
.isThrownBy(() -> new PayloadSocketAcceptor(this.delegate, this.interceptors));
}
@Test
public void constructorWhenNullInterceptorsThenException() {
this.interceptors = null;
assertThatIllegalArgumentException()
.isThrownBy(() -> new PayloadSocketAcceptor(this.delegate, this.interceptors));
.isThrownBy(() -> new PayloadSocketAcceptor(this.delegate, this.interceptors));
}
@Test
public void constructorWhenEmptyInterceptorsThenException() {
this.interceptors = Collections.emptyList();
assertThatIllegalArgumentException()
.isThrownBy(() -> new PayloadSocketAcceptor(this.delegate, this.interceptors));
.isThrownBy(() -> new PayloadSocketAcceptor(this.delegate, this.interceptors));
}
@Test
public void acceptWhenDataMimeTypeNullThenException() {
assertThatIllegalArgumentException()
.isThrownBy(() -> this.acceptor.accept(this.setupPayload, this.rSocket).block());
.isThrownBy(() -> this.acceptor.accept(this.setupPayload, this.rSocket).block());
}
@Test
@@ -112,7 +112,7 @@ public class PayloadSocketAcceptorTests {
given(this.setupPayload.dataMimeType()).willReturn(MediaType.APPLICATION_JSON_VALUE);
PayloadExchange exchange = captureExchange();
assertThat(exchange.getMetadataMimeType().toString())
.isEqualTo(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString());
.isEqualTo(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString());
assertThat(exchange.getDataMimeType()).isEqualTo(MediaType.APPLICATION_JSON);
}
@@ -130,7 +130,7 @@ public class PayloadSocketAcceptorTests {
this.acceptor.setDefaultDataMimeType(MediaType.APPLICATION_JSON);
PayloadExchange exchange = captureExchange();
assertThat(exchange.getMetadataMimeType().toString())
.isEqualTo(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString());
.isEqualTo(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString());
assertThat(exchange.getDataMimeType()).isEqualTo(MediaType.APPLICATION_JSON);
}
@@ -154,7 +154,7 @@ public class PayloadSocketAcceptorTests {
this.rSocket);
PayloadInterceptor authenticateInterceptor = (exchange, chain) -> {
Context withSecurityContext = ReactiveSecurityContextHolder
.withSecurityContext(Mono.just(expectedSecurityContext));
.withSecurityContext(Mono.just(expectedSecurityContext));
return chain.next(exchange).contextWrite(withSecurityContext);
};
List<PayloadInterceptor> interceptors = Arrays.asList(authenticateInterceptor);
@@ -44,7 +44,8 @@ public class BasicAuthenticationDecoderTests {
Map<String, Object> hints = null;
DataBuffer dataBuffer = encoder.encodeValue(expectedCredentials, factory, elementType, mimeType, hints);
UsernamePasswordMetadata actualCredentials = decoder
.decodeToMono(Mono.just(dataBuffer), elementType, mimeType, hints).block();
.decodeToMono(Mono.just(dataBuffer), elementType, mimeType, hints)
.block();
assertThat(actualCredentials).isEqualToComparingFieldByField(expectedCredentials);
}
@@ -47,7 +47,7 @@ import static org.mockito.BDDMockito.given;
public class RoutePayloadExchangeMatcherTests {
static final MimeType COMPOSITE_METADATA = MimeTypeUtils
.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString());
.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString());
@Mock
private MetadataExtractor metadataExtractor;
@@ -86,7 +86,7 @@ public class RoutePayloadExchangeMatcherTests {
public void matchesWhenNotMatchThenNotMatch() {
String route = "route";
given(this.metadataExtractor.extract(any(), any()))
.willReturn(Collections.singletonMap(MetadataExtractor.ROUTE_KEY, route));
.willReturn(Collections.singletonMap(MetadataExtractor.ROUTE_KEY, route));
PayloadExchangeMatcher.MatchResult result = this.matcher.matches(this.exchange).block();
assertThat(result.isMatch()).isFalse();
}
@@ -95,7 +95,7 @@ public class RoutePayloadExchangeMatcherTests {
public void matchesWhenMatchAndNoVariablesThenMatch() {
String route = "route";
given(this.metadataExtractor.extract(any(), any()))
.willReturn(Collections.singletonMap(MetadataExtractor.ROUTE_KEY, route));
.willReturn(Collections.singletonMap(MetadataExtractor.ROUTE_KEY, route));
given(this.routeMatcher.parseRoute(any())).willReturn(this.route);
given(this.routeMatcher.matchAndExtract(any(), any())).willReturn(Collections.emptyMap());
PayloadExchangeMatcher.MatchResult result = this.matcher.matches(this.exchange).block();
@@ -107,7 +107,7 @@ public class RoutePayloadExchangeMatcherTests {
String route = "route";
Map<String, String> variables = Collections.singletonMap("a", "b");
given(this.metadataExtractor.extract(any(), any()))
.willReturn(Collections.singletonMap(MetadataExtractor.ROUTE_KEY, route));
.willReturn(Collections.singletonMap(MetadataExtractor.ROUTE_KEY, route));
given(this.routeMatcher.parseRoute(any())).willReturn(this.route);
given(this.routeMatcher.matchAndExtract(any(), any())).willReturn(variables);
PayloadExchangeMatcher.MatchResult result = this.matcher.matches(this.exchange).block();