1
0
mirror of synced 2026-05-22 13:23:17 +00:00

Update to Reactor 2025.0.0-SNAPSHOT

To prepare for the release we should update to Reactor
2025.0.0-SNAPSHOT to fix any issues that are present.

Closes gh-18041
This commit is contained in:
Rob Winch
2025-10-13 10:48:10 -05:00
parent 4b6c9cca7e
commit b864be92d8
18 changed files with 43 additions and 20 deletions
@@ -43,6 +43,7 @@ public final class InMemoryReactiveOneTimeTokenService implements ReactiveOneTim
} }
@Override @Override
@SuppressWarnings("NullAway") // https://github.com/uber/NullAway/issues/1290
public Mono<OneTimeToken> consume(OneTimeTokenAuthenticationToken authenticationToken) { public Mono<OneTimeToken> consume(OneTimeTokenAuthenticationToken authenticationToken) {
return Mono.just(authenticationToken).mapNotNull(this.oneTimeTokenService::consume); return Mono.just(authenticationToken).mapNotNull(this.oneTimeTokenService::consume);
} }
@@ -58,9 +58,10 @@ public final class ObservationReactiveAuthorizationManager<T>
} }
@Override @Override
@SuppressWarnings("NullAway") // https://github.com/uber/NullAway/issues/1290
public Mono<AuthorizationResult> authorize(Mono<Authentication> authentication, T object) { public Mono<AuthorizationResult> authorize(Mono<Authentication> authentication, T object) {
AuthorizationObservationContext<T> context = new AuthorizationObservationContext<>(object); AuthorizationObservationContext<T> context = new AuthorizationObservationContext<>(object);
Mono<Authentication> wrapped = authentication.map((auth) -> { Mono<Authentication> wrapped = authentication.mapNotNull((auth) -> {
context.setAuthentication(auth); context.setAuthentication(auth);
return context.getAuthentication(); return context.getAuthentication();
}); });
@@ -588,12 +588,14 @@ public final class AuthorizationAdvisorProxyFactory implements AuthorizationProx
return null; return null;
} }
@SuppressWarnings("NullAway") // https://github.com/uber/NullAway/issues/1290
private Mono<?> proxyMono(AuthorizationProxyFactory proxyFactory, Mono<?> mono) { private Mono<?> proxyMono(AuthorizationProxyFactory proxyFactory, Mono<?> mono) {
return mono.map(proxyFactory::proxy); return mono.mapNotNull(proxyFactory::proxy);
} }
@SuppressWarnings("NullAway") // https://github.com/uber/NullAway/issues/1290
private Flux<?> proxyFlux(AuthorizationProxyFactory proxyFactory, Flux<?> flux) { private Flux<?> proxyFlux(AuthorizationProxyFactory proxyFactory, Flux<?> flux) {
return flux.map(proxyFactory::proxy); return flux.mapNotNull(proxyFactory::proxy);
} }
} }
@@ -120,6 +120,7 @@ public final class AuthorizationManagerAfterReactiveMethodInterceptor implements
+ "(for example, a Mono or Flux) or the function must be a Kotlin coroutine " + "(for example, a Mono or Flux) or the function must be a Kotlin coroutine "
+ "in order to support Reactor Context"); + "in order to support Reactor Context");
Mono<Authentication> authentication = ReactiveAuthenticationUtils.getAuthentication(); Mono<Authentication> authentication = ReactiveAuthenticationUtils.getAuthentication();
@SuppressWarnings("NullAway") // Dataflow analysis limitation
Function<Signal<?>, Mono<?>> postAuthorize = (signal) -> { Function<Signal<?>, Mono<?>> postAuthorize = (signal) -> {
if (signal.isOnComplete()) { if (signal.isOnComplete()) {
return Mono.empty(); return Mono.empty();
@@ -130,12 +131,16 @@ public final class AuthorizationManagerAfterReactiveMethodInterceptor implements
if (signal.getThrowable() instanceof AuthorizationDeniedException denied) { if (signal.getThrowable() instanceof AuthorizationDeniedException denied) {
return postProcess(denied, mi); return postProcess(denied, mi);
} }
// getThrowable must be non-null because hasError() is true
return Mono.error(signal.getThrowable()); return Mono.error(signal.getThrowable());
}; };
ReactiveAdapter adapter = ReactiveAdapterRegistry.getSharedInstance().getAdapter(type); ReactiveAdapter adapter = ReactiveAdapterRegistry.getSharedInstance().getAdapter(type);
if (hasFlowReturnType) { if (hasFlowReturnType) {
if (isSuspendingFunction) { if (isSuspendingFunction) {
Publisher<?> publisher = ReactiveMethodInvocationUtils.proceed(mi); Publisher<?> publisher = ReactiveMethodInvocationUtils.proceed(mi);
if (publisher == null) {
return Flux.empty();
}
return Flux.from(publisher).materialize().flatMap(postAuthorize); return Flux.from(publisher).materialize().flatMap(postAuthorize);
} }
else { else {
@@ -148,6 +153,9 @@ public final class AuthorizationManagerAfterReactiveMethodInterceptor implements
} }
} }
Publisher<?> publisher = ReactiveMethodInvocationUtils.proceed(mi); Publisher<?> publisher = ReactiveMethodInvocationUtils.proceed(mi);
if (publisher == null) {
return Flux.empty();
}
if (isMultiValue(type, adapter)) { if (isMultiValue(type, adapter)) {
Flux<?> flux = Flux.from(publisher).materialize().flatMap(postAuthorize); Flux<?> flux = Flux.from(publisher).materialize().flatMap(postAuthorize);
return (adapter != null) ? adapter.fromPublisher(flux) : flux; return (adapter != null) ? adapter.fromPublisher(flux) : flux;
@@ -173,7 +181,7 @@ public final class AuthorizationManagerAfterReactiveMethodInterceptor implements
private Mono<Object> postProcess(AuthorizationResult decision, MethodInvocationResult methodInvocationResult) { private Mono<Object> postProcess(AuthorizationResult decision, MethodInvocationResult methodInvocationResult) {
if (decision.isGranted()) { if (decision.isGranted()) {
return Mono.just(methodInvocationResult.getResult()); return Mono.justOrEmpty(methodInvocationResult.getResult());
} }
return Mono.fromSupplier(() -> { return Mono.fromSupplier(() -> {
if (this.authorizationManager instanceof MethodAuthorizationDeniedHandler handler) { if (this.authorizationManager instanceof MethodAuthorizationDeniedHandler handler) {
@@ -35,9 +35,10 @@ final class ReactiveAuthenticationUtils {
private static final Authentication ANONYMOUS = new AnonymousAuthenticationToken("key", "anonymous", private static final Authentication ANONYMOUS = new AnonymousAuthenticationToken("key", "anonymous",
AuthorityUtils.createAuthorityList("ROLE_ANONYMOUS")); AuthorityUtils.createAuthorityList("ROLE_ANONYMOUS"));
@SuppressWarnings("NullAway") // https://github.com/uber/NullAway/issues/1290
static Mono<Authentication> getAuthentication() { static Mono<Authentication> getAuthentication() {
return ReactiveSecurityContextHolder.getContext() return ReactiveSecurityContextHolder.getContext()
.map(SecurityContext::getAuthentication) .mapNotNull(SecurityContext::getAuthentication)
.defaultIfEmpty(ANONYMOUS); .defaultIfEmpty(ANONYMOUS);
} }
@@ -28,7 +28,7 @@ import reactor.core.Exceptions;
*/ */
final class ReactiveMethodInvocationUtils { final class ReactiveMethodInvocationUtils {
static <T> @Nullable T proceed(MethodInvocation mi) { static @Nullable <T> T proceed(MethodInvocation mi) {
try { try {
return (T) mi.proceed(); return (T) mi.proceed();
} }
@@ -50,9 +50,10 @@ public class InMemoryReactiveSessionRegistry implements ReactiveSessionRegistry
} }
@Override @Override
@SuppressWarnings("NullAway") // https://github.com/uber/NullAway/issues/1290
public Flux<ReactiveSessionInformation> getAllSessions(Object principal) { public Flux<ReactiveSessionInformation> getAllSessions(Object principal) {
return Flux.fromIterable(this.sessionIdsByPrincipal.getOrDefault(principal, Collections.emptySet())) return Flux.fromIterable(this.sessionIdsByPrincipal.getOrDefault(principal, Collections.emptySet()))
.map(this.sessionById::get); .mapNotNull(this.sessionById::get);
} }
@Override @Override
+1 -1
View File
@@ -30,7 +30,7 @@ commons-collections = "commons-collections:commons-collections:3.2.2"
io-micrometer-context-propagation = "io.micrometer:context-propagation:1.1.3" io-micrometer-context-propagation = "io.micrometer:context-propagation:1.1.3"
io-micrometer-micrometer-observation = "io.micrometer:micrometer-observation:1.14.11" io-micrometer-micrometer-observation = "io.micrometer:micrometer-observation:1.14.11"
io-mockk = "io.mockk:mockk:1.14.6" io-mockk = "io.mockk:mockk:1.14.6"
io-projectreactor-reactor-bom = "io.projectreactor:reactor-bom:2025.0.0-M7" io-projectreactor-reactor-bom = "io.projectreactor:reactor-bom:2025.0.0-SNAPSHOT"
io-rsocket-rsocket-bom = { module = "io.rsocket:rsocket-bom", version.ref = "io-rsocket" } io-rsocket-rsocket-bom = { module = "io.rsocket:rsocket-bom", version.ref = "io-rsocket" }
io-spring-javaformat-spring-javaformat-checkstyle = { module = "io.spring.javaformat:spring-javaformat-checkstyle", version.ref = "io-spring-javaformat" } io-spring-javaformat-spring-javaformat-checkstyle = { module = "io.spring.javaformat:spring-javaformat-checkstyle", version.ref = "io-spring-javaformat" }
io-spring-javaformat-spring-javaformat-gradle-plugin = { module = "io.spring.javaformat:spring-javaformat-gradle-plugin", version.ref = "io-spring-javaformat" } io-spring-javaformat-spring-javaformat-gradle-plugin = { module = "io.spring.javaformat:spring-javaformat-gradle-plugin", version.ref = "io-spring-javaformat" }
@@ -138,11 +138,12 @@ public class AuthenticationPrincipalArgumentResolver implements HandlerMethodArg
} }
@Override @Override
@SuppressWarnings("NullAway") // https://github.com/uber/NullAway/issues/1290
public Mono<Object> resolveArgument(MethodParameter parameter, Message<?> message) { public Mono<Object> resolveArgument(MethodParameter parameter, Message<?> message) {
ReactiveAdapter adapter = this.adapterRegistry.getAdapter(parameter.getParameterType()); ReactiveAdapter adapter = this.adapterRegistry.getAdapter(parameter.getParameterType());
// @formatter:off // @formatter:off
return ReactiveSecurityContextHolder.getContext() return ReactiveSecurityContextHolder.getContext()
.map(SecurityContext::getAuthentication) .mapNotNull(SecurityContext::getAuthentication)
.flatMap((a) -> { .flatMap((a) -> {
Object p = resolvePrincipal(parameter, a.getPrincipal()); Object p = resolvePrincipal(parameter, a.getPrincipal());
Mono<Object> principal = Mono.justOrEmpty(p); Mono<Object> principal = Mono.justOrEmpty(p);
@@ -55,10 +55,10 @@ public class AuthorizationPayloadInterceptor implements PayloadInterceptor, Orde
} }
@Override @Override
@SuppressWarnings("NullAway") // https://github.com/uber/NullAway/issues/1290
public Mono<Void> intercept(PayloadExchange exchange, PayloadInterceptorChain chain) { public Mono<Void> intercept(PayloadExchange exchange, PayloadInterceptorChain chain) {
return ReactiveSecurityContextHolder.getContext() return ReactiveSecurityContextHolder.getContext()
.filter((c) -> c.getAuthentication() != null) .mapNotNull(SecurityContext::getAuthentication)
.map(SecurityContext::getAuthentication)
.switchIfEmpty(Mono.error(() -> new AuthenticationCredentialsNotFoundException( .switchIfEmpty(Mono.error(() -> new AuthenticationCredentialsNotFoundException(
"An Authentication (possibly AnonymousAuthenticationToken) is required."))) "An Authentication (possibly AnonymousAuthenticationToken) is required.")))
.as((authentication) -> this.authorizationManager.verify(authentication, exchange)) .as((authentication) -> this.authorizationManager.verify(authentication, exchange))
@@ -52,12 +52,13 @@ public final class PayloadExchangeMatcherReactiveAuthorizationManager
} }
@Override @Override
@SuppressWarnings("NullAway") // https://github.com/uber/NullAway/issues/1290
public Mono<AuthorizationResult> authorize(Mono<Authentication> authentication, PayloadExchange exchange) { public Mono<AuthorizationResult> authorize(Mono<Authentication> authentication, PayloadExchange exchange) {
return Flux.fromIterable(this.mappings) return Flux.fromIterable(this.mappings)
.concatMap((mapping) -> mapping.getMatcher() .concatMap((mapping) -> mapping.getMatcher()
.matches(exchange) .matches(exchange)
.filter(PayloadExchangeMatcher.MatchResult::isMatch) .filter(PayloadExchangeMatcher.MatchResult::isMatch)
.map(MatchResult::getVariables) .mapNotNull(MatchResult::getVariables)
.flatMap((variables) -> mapping.getEntry() .flatMap((variables) -> mapping.getEntry()
.authorize(authentication, new PayloadExchangeAuthorizationContext(exchange, variables)))) .authorize(authentication, new PayloadExchangeAuthorizationContext(exchange, variables))))
.next() .next()
@@ -83,11 +83,12 @@ public class AuthenticationPrincipalArgumentResolver extends HandlerMethodArgume
} }
@Override @Override
@SuppressWarnings("NullAway") // https://github.com/uber/NullAway/issues/1290
public Mono<Object> resolveArgument(MethodParameter parameter, BindingContext bindingContext, public Mono<Object> resolveArgument(MethodParameter parameter, BindingContext bindingContext,
ServerWebExchange exchange) { ServerWebExchange exchange) {
ReactiveAdapter adapter = getAdapterRegistry().getAdapter(parameter.getParameterType()); ReactiveAdapter adapter = getAdapterRegistry().getAdapter(parameter.getParameterType());
return ReactiveSecurityContextHolder.getContext() return ReactiveSecurityContextHolder.getContext()
.map(SecurityContext::getAuthentication) .mapNotNull(SecurityContext::getAuthentication)
.flatMap((authentication) -> { .flatMap((authentication) -> {
Mono<Object> principal = Mono.justOrEmpty(resolvePrincipal(parameter, authentication.getPrincipal())); Mono<Object> principal = Mono.justOrEmpty(resolvePrincipal(parameter, authentication.getPrincipal()));
return (adapter != null) ? Mono.just(adapter.fromPublisher(principal)) : principal; return (adapter != null) ? Mono.just(adapter.fromPublisher(principal)) : principal;
@@ -176,11 +176,12 @@ public class SwitchUserWebFilter implements WebFilter {
* @throws AuthenticationCredentialsNotFoundException If the target user can not be * @throws AuthenticationCredentialsNotFoundException If the target user can not be
* found by username * found by username
*/ */
@SuppressWarnings("NullAway") // https://github.com/uber/NullAway/issues/1290
protected Mono<Authentication> switchUser(WebFilterExchange webFilterExchange) { protected Mono<Authentication> switchUser(WebFilterExchange webFilterExchange) {
return this.switchUserMatcher.matches(webFilterExchange.getExchange()) return this.switchUserMatcher.matches(webFilterExchange.getExchange())
.filter(ServerWebExchangeMatcher.MatchResult::isMatch) .filter(ServerWebExchangeMatcher.MatchResult::isMatch)
.flatMap((matchResult) -> ReactiveSecurityContextHolder.getContext()) .flatMap((matchResult) -> ReactiveSecurityContextHolder.getContext())
.map(SecurityContext::getAuthentication) .mapNotNull(SecurityContext::getAuthentication)
.flatMap((currentAuthentication) -> { .flatMap((currentAuthentication) -> {
String username = getUsername(webFilterExchange.getExchange()); String username = getUsername(webFilterExchange.getExchange());
return attemptSwitchUser(currentAuthentication, username); return attemptSwitchUser(currentAuthentication, username);
@@ -197,11 +198,12 @@ public class SwitchUserWebFilter implements WebFilter {
* <code>Authentication</code> associated with this request or the user is not * <code>Authentication</code> associated with this request or the user is not
* switched. * switched.
*/ */
@SuppressWarnings("NullAway") // https://github.com/uber/NullAway/issues/1290
protected Mono<Authentication> exitSwitchUser(WebFilterExchange webFilterExchange) { protected Mono<Authentication> exitSwitchUser(WebFilterExchange webFilterExchange) {
return this.exitUserMatcher.matches(webFilterExchange.getExchange()) return this.exitUserMatcher.matches(webFilterExchange.getExchange())
.filter(ServerWebExchangeMatcher.MatchResult::isMatch) .filter(ServerWebExchangeMatcher.MatchResult::isMatch)
.flatMap((matchResult) -> ReactiveSecurityContextHolder.getContext() .flatMap((matchResult) -> ReactiveSecurityContextHolder.getContext()
.map(SecurityContext::getAuthentication) .mapNotNull(SecurityContext::getAuthentication)
.switchIfEmpty(Mono.error(this::noCurrentUserException))) .switchIfEmpty(Mono.error(this::noCurrentUserException)))
.map(this::attemptExitUser); .map(this::attemptExitUser);
} }
@@ -41,6 +41,7 @@ public final class DefaultServerGenerateOneTimeTokenRequestResolver
private Duration expiresIn = DEFAULT_EXPIRES_IN; private Duration expiresIn = DEFAULT_EXPIRES_IN;
@Override @Override
@SuppressWarnings("NullAway") // https://github.com/uber/NullAway/issues/1290
public Mono<GenerateOneTimeTokenRequest> resolve(ServerWebExchange exchange) { public Mono<GenerateOneTimeTokenRequest> resolve(ServerWebExchange exchange) {
// @formatter:off // @formatter:off
return exchange.getFormData() return exchange.getFormData()
@@ -45,10 +45,11 @@ public class AuthorizationWebFilter implements WebFilter {
} }
@Override @Override
@SuppressWarnings("NullAway") // https://github.com/uber/NullAway/issues/1290
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) { public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
return ReactiveSecurityContextHolder.getContext() return ReactiveSecurityContextHolder.getContext()
.filter((c) -> c.getAuthentication() != null) .filter((c) -> c.getAuthentication() != null)
.map(SecurityContext::getAuthentication) .mapNotNull(SecurityContext::getAuthentication)
.as((authentication) -> this.authorizationManager.verify(authentication, exchange)) .as((authentication) -> this.authorizationManager.verify(authentication, exchange))
.doOnSuccess((it) -> logger.debug("Authorization successful")) .doOnSuccess((it) -> logger.debug("Authorization successful"))
.doOnError(AccessDeniedException.class, .doOnError(AccessDeniedException.class,
@@ -42,9 +42,9 @@ public class SecurityContextServerWebExchange extends ServerWebExchangeDecorator
} }
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings({ "unchecked", "NullAway" }) // https://github.com/uber/NullAway/issues/1290
public <T extends Principal> Mono<T> getPrincipal() { public <T extends Principal> Mono<T> getPrincipal() {
return this.context.map((context) -> (T) context.getAuthentication()); return this.context.mapNotNull((context) -> (T) context.getAuthentication());
} }
} }
@@ -67,6 +67,7 @@ public class ServerCsrfTokenRequestAttributeHandler implements ServerCsrfTokenRe
this.isTokenFromMultipartDataEnabled = tokenFromMultipartDataEnabled; this.isTokenFromMultipartDataEnabled = tokenFromMultipartDataEnabled;
} }
@SuppressWarnings("NullAway") // https://github.com/uber/NullAway/issues/1290
private Mono<String> tokenFromMultipartData(ServerWebExchange exchange, CsrfToken expected) { private Mono<String> tokenFromMultipartData(ServerWebExchange exchange, CsrfToken expected) {
if (!this.isTokenFromMultipartDataEnabled) { if (!this.isTokenFromMultipartDataEnabled) {
return Mono.empty(); return Mono.empty();
@@ -78,7 +79,7 @@ public class ServerCsrfTokenRequestAttributeHandler implements ServerCsrfTokenRe
return Mono.empty(); return Mono.empty();
} }
return exchange.getMultipartData() return exchange.getMultipartData()
.map((d) -> d.getFirst(expected.getParameterName())) .mapNotNull((d) -> d.getFirst(expected.getParameterName()))
.cast(FormFieldPart.class) .cast(FormFieldPart.class)
.map(FormFieldPart::value); .map(FormFieldPart::value);
} }
@@ -71,10 +71,11 @@ public class WebSessionServerCsrfTokenRepository implements ServerCsrfTokenRepos
} }
@Override @Override
@SuppressWarnings("NullAway") // https://github.com/uber/NullAway/issues/1290
public Mono<CsrfToken> loadToken(ServerWebExchange exchange) { public Mono<CsrfToken> loadToken(ServerWebExchange exchange) {
return exchange.getSession() return exchange.getSession()
.filter((session) -> session.getAttributes().containsKey(this.sessionAttributeName)) .filter((session) -> session.getAttributes().containsKey(this.sessionAttributeName))
.map((session) -> session.getAttribute(this.sessionAttributeName)); .mapNotNull((session) -> session.getAttribute(this.sessionAttributeName));
} }
/** /**