Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 6f8ad00dfa | |||
| e2fbef632c | |||
| d131ccafa2 | |||
| 458b05ed37 | |||
| 1c9f3d2f9a | |||
| 2b2bc3e575 | |||
| 5bd0f9cc2d |
@@ -5,12 +5,12 @@
|
||||
|
||||
<groupId>org.springframework.data</groupId>
|
||||
<artifactId>spring-data-elasticsearch</artifactId>
|
||||
<version>4.1.7</version>
|
||||
<version>4.1.8</version>
|
||||
|
||||
<parent>
|
||||
<groupId>org.springframework.data.build</groupId>
|
||||
<artifactId>spring-data-parent</artifactId>
|
||||
<version>2.4.7</version>
|
||||
<version>2.4.8</version>
|
||||
</parent>
|
||||
|
||||
<name>Spring Data Elasticsearch</name>
|
||||
@@ -22,7 +22,7 @@
|
||||
<elasticsearch>7.9.3</elasticsearch>
|
||||
<log4j>2.13.3</log4j>
|
||||
<netty>4.1.52.Final</netty>
|
||||
<springdata.commons>2.4.7</springdata.commons>
|
||||
<springdata.commons>2.4.8</springdata.commons>
|
||||
<testcontainers>1.15.1</testcontainers>
|
||||
<java-module-name>spring.data.elasticsearch</java-module-name>
|
||||
</properties>
|
||||
|
||||
+28
-9
@@ -145,7 +145,7 @@ import org.springframework.web.reactive.function.client.WebClient.RequestBodySpe
|
||||
*/
|
||||
public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearchClient, Indices {
|
||||
|
||||
private final HostProvider hostProvider;
|
||||
private final HostProvider<?> hostProvider;
|
||||
private final RequestCreator requestCreator;
|
||||
private Supplier<HttpHeaders> headersSupplier = () -> HttpHeaders.EMPTY;
|
||||
|
||||
@@ -155,7 +155,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
||||
*
|
||||
* @param hostProvider must not be {@literal null}.
|
||||
*/
|
||||
public DefaultReactiveElasticsearchClient(HostProvider hostProvider) {
|
||||
public DefaultReactiveElasticsearchClient(HostProvider<?> hostProvider) {
|
||||
this(hostProvider, new DefaultRequestCreator());
|
||||
}
|
||||
|
||||
@@ -166,7 +166,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
||||
* @param hostProvider must not be {@literal null}.
|
||||
* @param requestCreator must not be {@literal null}.
|
||||
*/
|
||||
public DefaultReactiveElasticsearchClient(HostProvider hostProvider, RequestCreator requestCreator) {
|
||||
public DefaultReactiveElasticsearchClient(HostProvider<?> hostProvider, RequestCreator requestCreator) {
|
||||
|
||||
Assert.notNull(hostProvider, "HostProvider must not be null");
|
||||
Assert.notNull(requestCreator, "RequestCreator must not be null");
|
||||
@@ -535,8 +535,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
||||
.flatMap(callback::doWithClient) //
|
||||
.onErrorResume(throwable -> {
|
||||
|
||||
if (throwable instanceof ConnectException) {
|
||||
|
||||
if (isCausedByConnectionException(throwable)) {
|
||||
return hostProvider.getActive(Verification.ACTIVE) //
|
||||
.flatMap(callback::doWithClient);
|
||||
}
|
||||
@@ -545,6 +544,27 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* checks if the given throwable is a {@link ConnectException} or has one in it's cause chain
|
||||
*
|
||||
* @param throwable the throwable to check
|
||||
* @return true if throwable is caused by a {@link ConnectException}
|
||||
*/
|
||||
private boolean isCausedByConnectionException(Throwable throwable) {
|
||||
|
||||
Throwable t = throwable;
|
||||
do {
|
||||
|
||||
if (t instanceof ConnectException) {
|
||||
return true;
|
||||
}
|
||||
|
||||
t = t.getCause();
|
||||
} while (t != null);
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Status> status() {
|
||||
|
||||
@@ -823,10 +843,9 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
|
||||
String mediaType = response.headers().contentType().map(MediaType::toString).orElse(XContentType.JSON.mediaType());
|
||||
|
||||
return response.body(BodyExtractors.toMono(byte[].class)) //
|
||||
.switchIfEmpty(Mono
|
||||
.error(new ElasticsearchStatusException(String.format("%s request to %s returned error code %s and no body.",
|
||||
request.getMethod(), request.getEndpoint(), statusCode), status))
|
||||
)
|
||||
.switchIfEmpty(Mono.error(
|
||||
new ElasticsearchStatusException(String.format("%s request to %s returned error code %s and no body.",
|
||||
request.getMethod(), request.getEndpoint(), statusCode), status)))
|
||||
.map(bytes -> new String(bytes, StandardCharsets.UTF_8)) //
|
||||
.flatMap(content -> contentOrError(content, mediaType, status))
|
||||
.flatMap(unused -> Mono
|
||||
|
||||
+4
-3
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2018-2020 the original author or authors.
|
||||
* Copyright 2018-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -34,9 +34,10 @@ import org.springframework.web.reactive.function.client.WebClient;
|
||||
*
|
||||
* @author Christoph Strobl
|
||||
* @author Mark Paluch
|
||||
* @author Peter-Josef Meisch
|
||||
* @since 3.2
|
||||
*/
|
||||
public interface HostProvider {
|
||||
public interface HostProvider<T extends HostProvider<T>> {
|
||||
|
||||
/**
|
||||
* Create a new {@link HostProvider} best suited for the given {@link WebClientProvider} and number of hosts.
|
||||
@@ -46,7 +47,7 @@ public interface HostProvider {
|
||||
* @param endpoints must not be {@literal null} nor empty.
|
||||
* @return new instance of {@link HostProvider}.
|
||||
*/
|
||||
static HostProvider provider(WebClientProvider clientProvider, Supplier<HttpHeaders> headersSupplier,
|
||||
static HostProvider<?> provider(WebClientProvider clientProvider, Supplier<HttpHeaders> headersSupplier,
|
||||
InetSocketAddress... endpoints) {
|
||||
|
||||
Assert.notNull(clientProvider, "WebClientProvider must not be null");
|
||||
|
||||
+47
-14
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2018-2020 the original author or authors.
|
||||
* Copyright 2018-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -20,6 +20,7 @@ import reactor.core.publisher.Mono;
|
||||
import reactor.util.function.Tuple2;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
@@ -29,6 +30,8 @@ import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.data.elasticsearch.client.ElasticsearchHost;
|
||||
import org.springframework.data.elasticsearch.client.ElasticsearchHost.State;
|
||||
import org.springframework.data.elasticsearch.client.NoReachableHostException;
|
||||
@@ -42,15 +45,19 @@ import org.springframework.web.reactive.function.client.WebClient;
|
||||
*
|
||||
* @author Christoph Strobl
|
||||
* @author Mark Paluch
|
||||
* @author Peter-Josef Meisch
|
||||
* @since 3.2
|
||||
*/
|
||||
class MultiNodeHostProvider implements HostProvider {
|
||||
class MultiNodeHostProvider implements HostProvider<MultiNodeHostProvider> {
|
||||
|
||||
private final static Logger LOG = LoggerFactory.getLogger(MultiNodeHostProvider.class);
|
||||
|
||||
private final WebClientProvider clientProvider;
|
||||
private final Supplier<HttpHeaders> headersSupplier;
|
||||
private final Map<InetSocketAddress, ElasticsearchHost> hosts;
|
||||
|
||||
MultiNodeHostProvider(WebClientProvider clientProvider, Supplier<HttpHeaders> headersSupplier, InetSocketAddress... endpoints) {
|
||||
MultiNodeHostProvider(WebClientProvider clientProvider, Supplier<HttpHeaders> headersSupplier,
|
||||
InetSocketAddress... endpoints) {
|
||||
|
||||
this.clientProvider = clientProvider;
|
||||
this.headersSupplier = headersSupplier;
|
||||
@@ -58,6 +65,8 @@ class MultiNodeHostProvider implements HostProvider {
|
||||
for (InetSocketAddress endpoint : endpoints) {
|
||||
this.hosts.put(endpoint, new ElasticsearchHost(endpoint, State.UNKNOWN));
|
||||
}
|
||||
|
||||
LOG.debug("initialized with " + hosts);
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -66,7 +75,7 @@ class MultiNodeHostProvider implements HostProvider {
|
||||
*/
|
||||
@Override
|
||||
public Mono<ClusterInformation> clusterInfo() {
|
||||
return nodes(null).map(this::updateNodeState).buffer(hosts.size())
|
||||
return checkNodes(null).map(this::updateNodeState).buffer(hosts.size())
|
||||
.then(Mono.just(new ClusterInformation(new LinkedHashSet<>(this.hosts.values()))));
|
||||
}
|
||||
|
||||
@@ -86,14 +95,19 @@ class MultiNodeHostProvider implements HostProvider {
|
||||
@Override
|
||||
public Mono<InetSocketAddress> lookupActiveHost(Verification verification) {
|
||||
|
||||
LOG.trace("lookupActiveHost " + verification + " from " + hosts());
|
||||
|
||||
if (Verification.LAZY.equals(verification)) {
|
||||
for (ElasticsearchHost entry : hosts()) {
|
||||
if (entry.isOnline()) {
|
||||
LOG.trace("lookupActiveHost returning " + entry);
|
||||
return Mono.just(entry.getEndpoint());
|
||||
}
|
||||
}
|
||||
LOG.trace("no online host found with LAZY");
|
||||
}
|
||||
|
||||
LOG.trace("searching for active host");
|
||||
return findActiveHostInKnownActives() //
|
||||
.switchIfEmpty(findActiveHostInUnresolved()) //
|
||||
.switchIfEmpty(findActiveHostInDead()) //
|
||||
@@ -105,20 +119,30 @@ class MultiNodeHostProvider implements HostProvider {
|
||||
}
|
||||
|
||||
private Mono<InetSocketAddress> findActiveHostInKnownActives() {
|
||||
return findActiveForSate(State.ONLINE);
|
||||
return findActiveForState(State.ONLINE);
|
||||
}
|
||||
|
||||
private Mono<InetSocketAddress> findActiveHostInUnresolved() {
|
||||
return findActiveForSate(State.UNKNOWN);
|
||||
return findActiveForState(State.UNKNOWN);
|
||||
}
|
||||
|
||||
private Mono<InetSocketAddress> findActiveHostInDead() {
|
||||
return findActiveForSate(State.OFFLINE);
|
||||
return findActiveForState(State.OFFLINE);
|
||||
}
|
||||
|
||||
private Mono<InetSocketAddress> findActiveForSate(State state) {
|
||||
return nodes(state).map(this::updateNodeState).filter(ElasticsearchHost::isOnline)
|
||||
.map(ElasticsearchHost::getEndpoint).next();
|
||||
private Mono<InetSocketAddress> findActiveForState(State state) {
|
||||
|
||||
LOG.trace("findActiveForState state " + state + ", current hosts: " + hosts);
|
||||
|
||||
return checkNodes(state) //
|
||||
.map(this::updateNodeState) //
|
||||
.filter(ElasticsearchHost::isOnline) //
|
||||
.map(elasticsearchHost -> {
|
||||
LOG.trace("findActiveForState returning host " + elasticsearchHost);
|
||||
return elasticsearchHost;
|
||||
}).map(ElasticsearchHost::getEndpoint) //
|
||||
.takeLast(1) //
|
||||
.next();
|
||||
}
|
||||
|
||||
private ElasticsearchHost updateNodeState(Tuple2<InetSocketAddress, State> tuple2) {
|
||||
@@ -129,17 +153,23 @@ class MultiNodeHostProvider implements HostProvider {
|
||||
return elasticsearchHost;
|
||||
}
|
||||
|
||||
private Flux<Tuple2<InetSocketAddress, State>> nodes(@Nullable State state) {
|
||||
private Flux<Tuple2<InetSocketAddress, State>> checkNodes(@Nullable State state) {
|
||||
|
||||
LOG.trace("checkNodes() with state " + state);
|
||||
|
||||
return Flux.fromIterable(hosts()) //
|
||||
.filter(entry -> state == null || entry.getState().equals(state)) //
|
||||
.map(ElasticsearchHost::getEndpoint) //
|
||||
.flatMap(host -> {
|
||||
.concatMap(host -> {
|
||||
|
||||
LOG.trace("checking host " + host);
|
||||
|
||||
Mono<ClientResponse> exchange = createWebClient(host) //
|
||||
.head().uri("/") //
|
||||
.headers(httpHeaders -> httpHeaders.addAll(headersSupplier.get())) //
|
||||
.exchange().doOnError(throwable -> {
|
||||
.exchange() //
|
||||
.timeout(Duration.ofSeconds(1)) //
|
||||
.doOnError(throwable -> {
|
||||
hosts.put(host, new ElasticsearchHost(host, State.OFFLINE));
|
||||
clientProvider.getErrorListener().accept(throwable);
|
||||
});
|
||||
@@ -147,7 +177,10 @@ class MultiNodeHostProvider implements HostProvider {
|
||||
return Mono.just(host).zipWith(exchange
|
||||
.flatMap(it -> it.releaseBody().thenReturn(it.statusCode().isError() ? State.OFFLINE : State.ONLINE)));
|
||||
}) //
|
||||
.onErrorContinue((throwable, o) -> clientProvider.getErrorListener().accept(throwable));
|
||||
.map(tuple -> {
|
||||
LOG.trace("check result " + tuple);
|
||||
return tuple;
|
||||
}).onErrorContinue((throwable, o) -> clientProvider.getErrorListener().accept(throwable));
|
||||
}
|
||||
|
||||
private List<ElasticsearchHost> hosts() {
|
||||
|
||||
+3
-2
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2018-2020 the original author or authors.
|
||||
* Copyright 2018-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -32,9 +32,10 @@ import org.springframework.web.reactive.function.client.WebClient;
|
||||
*
|
||||
* @author Christoph Strobl
|
||||
* @author Mark Paluch
|
||||
* @author Peter-Josef Meisch
|
||||
* @since 3.2
|
||||
*/
|
||||
class SingleNodeHostProvider implements HostProvider {
|
||||
class SingleNodeHostProvider implements HostProvider<SingleNodeHostProvider> {
|
||||
|
||||
private final WebClientProvider clientProvider;
|
||||
private final Supplier<HttpHeaders> headersSupplier;
|
||||
|
||||
@@ -1,6 +1,16 @@
|
||||
Spring Data Elasticsearch Changelog
|
||||
===================================
|
||||
|
||||
Changes in version 4.1.8 (2021-04-14)
|
||||
-------------------------------------
|
||||
* #1759 - health check with DefaultReactiveElasticsearchClient.
|
||||
|
||||
|
||||
Changes in version 4.0.9.RELEASE (2021-04-14)
|
||||
---------------------------------------------
|
||||
* #1759 - health check with DefaultReactiveElasticsearchClient.
|
||||
|
||||
|
||||
Changes in version 4.1.7 (2021-03-31)
|
||||
-------------------------------------
|
||||
|
||||
@@ -1575,6 +1585,8 @@ Release Notes - Spring Data Elasticsearch - Version 1.0 M1 (2014-02-07)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
Spring Data Elasticsearch 4.1.7 (2020.0.7)
|
||||
Spring Data Elasticsearch 4.1.8 (2020.0.8)
|
||||
Copyright (c) [2013-2019] Pivotal Software, Inc.
|
||||
|
||||
This product is licensed to you under the Apache License, Version 2.0 (the "License").
|
||||
@@ -27,3 +27,4 @@ conditions of the subcomponent's license, as noted in the LICENSE file.
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
+2
-2
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2018-2020 the original author or authors.
|
||||
* Copyright 2018-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -62,7 +62,7 @@ public class ReactiveElasticsearchClientUnitTests {
|
||||
|
||||
static final String HOST = ":9200";
|
||||
|
||||
MockDelegatingElasticsearchHostProvider<HostProvider> hostProvider;
|
||||
MockDelegatingElasticsearchHostProvider<? extends HostProvider<?>> hostProvider;
|
||||
ReactiveElasticsearchClient client;
|
||||
|
||||
@BeforeEach
|
||||
|
||||
+1
-1
@@ -186,7 +186,7 @@ public class ReactiveMockClientTestsUtils {
|
||||
return delegate;
|
||||
}
|
||||
|
||||
public MockDelegatingElasticsearchHostProvider<T> withActiveDefaultHost(String host) {
|
||||
public MockDelegatingElasticsearchHostProvider<? extends HostProvider<?>> withActiveDefaultHost(String host) {
|
||||
return new MockDelegatingElasticsearchHostProvider(HttpHeaders.EMPTY, clientProvider, errorCollector, delegate,
|
||||
host);
|
||||
}
|
||||
|
||||
+2
-1
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2018-2020 the original author or authors.
|
||||
* Copyright 2018-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -30,6 +30,7 @@ import org.springframework.data.elasticsearch.client.reactive.ReactiveMockClient
|
||||
|
||||
/**
|
||||
* @author Christoph Strobl
|
||||
* @author Peter-Josef Meisch
|
||||
*/
|
||||
public class SingleNodeHostProviderUnitTests {
|
||||
|
||||
|
||||
Reference in New Issue
Block a user