1
0
mirror of synced 2026-05-22 21:33:16 +00:00

Replace removed context-related operators

Closes gh-11194
This commit is contained in:
Marcus Da Coregio
2022-05-10 14:50:19 -03:00
parent b803e845e7
commit 806e05855c
45 changed files with 194 additions and 204 deletions
@@ -84,8 +84,7 @@ public class AnonymousPayloadInterceptor implements PayloadInterceptor, Ordered
return ReactiveSecurityContextHolder.getContext().switchIfEmpty(Mono.defer(() -> {
AnonymousAuthenticationToken authentication = new AnonymousAuthenticationToken(this.key, this.principal,
this.authorities);
return chain.next(exchange)
.subscriberContext(ReactiveSecurityContextHolder.withAuthentication(authentication))
return chain.next(exchange).contextWrite(ReactiveSecurityContextHolder.withAuthentication(authentication))
.then(Mono.empty());
})).flatMap((securityContext) -> chain.next(exchange));
}
@@ -78,7 +78,7 @@ public class AuthenticationPayloadInterceptor implements PayloadInterceptor, Ord
}
private Mono<Void> onAuthenticationSuccess(Mono<Void> payload, Authentication authentication) {
return payload.subscriberContext(ReactiveSecurityContextHolder.withAuthentication(authentication));
return payload.contextWrite(ReactiveSecurityContextHolder.withAuthentication(authentication));
}
}
@@ -76,7 +76,7 @@ class ContextPayloadInterceptorChain implements PayloadInterceptorChain {
@Override
public Mono<Void> next(PayloadExchange exchange) {
return Mono.defer(() -> shouldIntercept() ? this.currentInterceptor.intercept(exchange, this.next)
: Mono.subscriberContext().doOnNext((c) -> this.context = c).then());
: Mono.deferContextual(Mono::just).cast(Context.class).doOnNext((c) -> this.context = c).then());
}
Context getContext() {
@@ -72,29 +72,28 @@ class PayloadInterceptorRSocket extends RSocketProxy {
@Override
public Mono<Void> fireAndForget(Payload payload) {
return intercept(PayloadExchangeType.FIRE_AND_FORGET, payload)
.flatMap((context) -> this.source.fireAndForget(payload).subscriberContext(context));
.flatMap((context) -> this.source.fireAndForget(payload).contextWrite(context));
}
@Override
public Mono<Payload> requestResponse(Payload payload) {
return intercept(PayloadExchangeType.REQUEST_RESPONSE, payload)
.flatMap((context) -> this.source.requestResponse(payload).subscriberContext(context));
.flatMap((context) -> this.source.requestResponse(payload).contextWrite(context));
}
@Override
public Flux<Payload> requestStream(Payload payload) {
return intercept(PayloadExchangeType.REQUEST_STREAM, payload)
.flatMapMany((context) -> this.source.requestStream(payload).subscriberContext(context));
.flatMapMany((context) -> this.source.requestStream(payload).contextWrite(context));
}
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
return Flux.from(payloads).switchOnFirst((signal, innerFlux) -> {
Payload firstPayload = signal.get();
return intercept(PayloadExchangeType.REQUEST_CHANNEL, firstPayload).flatMapMany(
(context) -> innerFlux.index().concatMap((tuple) -> justOrIntercept(tuple.getT1(), tuple.getT2()))
.transform((securedPayloads) -> this.source.requestChannel(securedPayloads))
.subscriberContext(context));
return intercept(PayloadExchangeType.REQUEST_CHANNEL, firstPayload).flatMapMany((context) -> innerFlux
.index().concatMap((tuple) -> justOrIntercept(tuple.getT1(), tuple.getT2()))
.transform((securedPayloads) -> this.source.requestChannel(securedPayloads)).contextWrite(context));
});
}
@@ -105,7 +104,7 @@ class PayloadInterceptorRSocket extends RSocketProxy {
@Override
public Mono<Void> metadataPush(Payload payload) {
return intercept(PayloadExchangeType.METADATA_PUSH, payload)
.flatMap((c) -> this.source.metadataPush(payload).subscriberContext(c));
.flatMap((c) -> this.source.metadataPush(payload).contextWrite(c));
}
private Mono<Context> intercept(PayloadExchangeType type, Payload payload) {
@@ -114,7 +113,7 @@ class PayloadInterceptorRSocket extends RSocketProxy {
DefaultPayloadExchange exchange = new DefaultPayloadExchange(type, payload, this.metadataMimeType,
this.dataMimeType);
return chain.next(exchange).then(Mono.fromCallable(() -> chain.getContext()))
.defaultIfEmpty(Context.empty()).subscriberContext(this.context);
.defaultIfEmpty(Context.empty()).contextWrite(this.context);
});
}
@@ -69,12 +69,13 @@ class PayloadSocketAcceptor implements SocketAcceptor {
MimeType metadataMimeType = parseMimeType(setup.metadataMimeType(), this.defaultMetadataMimeType);
Assert.notNull(metadataMimeType, "No `metadataMimeType` in ConnectionSetupPayload and no default value");
// FIXME do we want to make the sendingSocket available in the PayloadExchange
return intercept(setup, dataMimeType, metadataMimeType)
.flatMap(
(ctx) -> this.delegate.accept(setup, sendingSocket)
.map((acceptingSocket) -> new PayloadInterceptorRSocket(acceptingSocket,
this.interceptors, metadataMimeType, dataMimeType, ctx))
.subscriberContext(ctx));
return intercept(setup, dataMimeType,
metadataMimeType)
.flatMap(
(ctx) -> this.delegate.accept(setup, sendingSocket)
.map((acceptingSocket) -> new PayloadInterceptorRSocket(acceptingSocket,
this.interceptors, metadataMimeType, dataMimeType, ctx))
.contextWrite(ctx));
}
private Mono<Context> intercept(Payload payload, MimeType dataMimeType, MimeType metadataMimeType) {
@@ -91,7 +91,7 @@ public class AnonymousPayloadInterceptorTests {
AuthenticationPayloadInterceptorChain chain = new AuthenticationPayloadInterceptorChain();
TestingAuthenticationToken expected = new TestingAuthenticationToken("test", "password");
this.interceptor.intercept(this.exchange, chain)
.subscriberContext(ReactiveSecurityContextHolder.withAuthentication(expected)).block();
.contextWrite(ReactiveSecurityContextHolder.withAuthentication(expected)).block();
Authentication authentication = chain.getAuthentication();
assertThat(authentication).isEqualTo(expected);
}
@@ -83,7 +83,7 @@ public class AuthorizationPayloadInterceptorTests {
AuthorityReactiveAuthorizationManager.hasRole("USER"));
Context userContext = ReactiveSecurityContextHolder
.withAuthentication(new TestingAuthenticationToken("user", "password"));
Mono<Void> intercept = interceptor.intercept(this.exchange, this.chain).subscriberContext(userContext);
Mono<Void> intercept = interceptor.intercept(this.exchange, this.chain).contextWrite(userContext);
StepVerifier.create(intercept).then(() -> this.chainResult.assertWasNotSubscribed())
.verifyError(AccessDeniedException.class);
}
@@ -95,7 +95,7 @@ public class AuthorizationPayloadInterceptorTests {
AuthenticatedReactiveAuthorizationManager.authenticated());
Context userContext = ReactiveSecurityContextHolder
.withAuthentication(new TestingAuthenticationToken("user", "password", "ROLE_USER"));
Mono<Void> intercept = interceptor.intercept(this.exchange, this.chain).subscriberContext(userContext);
Mono<Void> intercept = interceptor.intercept(this.exchange, this.chain).contextWrite(userContext);
StepVerifier.create(intercept).then(() -> this.chainResult.assertWasSubscribed()).verifyComplete();
}
@@ -471,7 +471,7 @@ public class PayloadInterceptorRSocketTests {
PayloadInterceptorChain c = (PayloadInterceptorChain) invocation.getArguments()[1];
return c.next(new DefaultPayloadExchange(PayloadExchangeType.REQUEST_CHANNEL, this.payload,
this.metadataMimeType, this.dataMimeType))
.subscriberContext(ReactiveSecurityContextHolder.withAuthentication(authentication));
.contextWrite(ReactiveSecurityContextHolder.withAuthentication(authentication));
};
}
@@ -155,7 +155,7 @@ public class PayloadSocketAcceptorTests {
PayloadInterceptor authenticateInterceptor = (exchange, chain) -> {
Context withSecurityContext = ReactiveSecurityContextHolder
.withSecurityContext(Mono.just(expectedSecurityContext));
return chain.next(exchange).subscriberContext(withSecurityContext);
return chain.next(exchange).contextWrite(withSecurityContext);
};
List<PayloadInterceptor> interceptors = Arrays.asList(authenticateInterceptor);
this.acceptor = new PayloadSocketAcceptor(captureSecurityContext, interceptors);