diff --git a/jersey-client-rx/pom.xml b/jersey-client-rx/pom.xml
index cc5f28c938..fb7494fab1 100644
--- a/jersey-client-rx/pom.xml
+++ b/jersey-client-rx/pom.xml
@@ -26,6 +26,27 @@
jersey-rx-client-rxjava2
2.27
+
+ com.github.tomakehurst
+ wiremock
+ 1.58
+ test
+
+
+ org.junit.vintage
+ junit-vintage-engine
+ 5.2.0
+
+
+ org.glassfish.jersey.media
+ jersey-media-json-jackson
+ 2.22
+
+
+ com.fasterxml.jackson.jaxrs
+ jackson-jaxrs-json-provider
+ 2.4.1
+
UTF-8
diff --git a/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/ClientOrchestration.java b/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/ClientOrchestration.java
index 2b5c6bf965..5c145ca5d9 100644
--- a/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/ClientOrchestration.java
+++ b/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/ClientOrchestration.java
@@ -11,12 +11,12 @@ import java.util.logging.Level;
import org.glassfish.jersey.client.rx.rxjava.RxObservableInvokerProvider;
import org.glassfish.jersey.client.rx.rxjava.RxObservableInvoker;
import java.util.logging.Logger;
-import java.util.stream.Collectors;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.MediaType;
import org.glassfish.jersey.client.rx.rxjava2.RxFlowableInvoker;
import org.glassfish.jersey.client.rx.rxjava2.RxFlowableInvokerProvider;
import rx.Observable;
@@ -28,7 +28,7 @@ import rx.Observable;
public class ClientOrchestration {
Client client = ClientBuilder.newClient();
- WebTarget userIdService = client.target("http://localhost:8080/serviceA/id?limit=10");
+ WebTarget userIdService = client.target("http://localhost:8080/serviceA/id");
WebTarget nameService = client.target("http://localhost:8080/serviceA/{empId}/name");
WebTarget hashService = client.target("http://localhost:8080/serviceA/{comboIDandName}/hash");
@@ -46,30 +46,33 @@ public class ClientOrchestration {
public void callBackOrchestrate() {
logger.info("Orchestrating with the pyramid of doom");
- userIdService.request()
+ userIdService.request().accept(MediaType.APPLICATION_JSON)
.async()
- .get(new InvocationCallback>() {
+ .get(new InvocationCallback() {
@Override
- public void completed(List empIds) {
+ public void completed(EmployeeDTO empIdList) {
+ logger.info("[InvocationCallback] Got all the IDs " + empIdList.getEmpIds());
+ List empIds = empIdList.getEmpIds();
CountDownLatch completionTracker = new CountDownLatch(empIds.size()); //used to keep track of the progress of the subsequent calls
empIds.forEach((id) -> {
//for each employee ID, get the name
nameService.resolveTemplate("empId", id).request()
.async()
.get(new InvocationCallback() {
+
@Override
public void completed(String response) {
completionTracker.countDown();
- hashService.request().async().get(new InvocationCallback() {
+ hashService.resolveTemplate("comboIDandName", response + id).request().async().get(new InvocationCallback() {
@Override
public void completed(String response) {
- logger.log(Level.INFO, "The hash output {0}", response);
+ logger.log(Level.INFO, "[InvocationCallback] The hash output {0}", response);
}
@Override
public void failed(Throwable throwable) {
completionTracker.countDown();
- logger.log(Level.WARNING, "An error has occurred in the hashing request step {0}", throwable.getMessage());
+ logger.log(Level.WARNING, "[InvocationCallback] An error has occurred in the hashing request step {0}", throwable.getMessage());
}
});
}
@@ -77,14 +80,14 @@ public class ClientOrchestration {
@Override
public void failed(Throwable throwable) {
completionTracker.countDown();
- logger.log(Level.WARNING, "An error has occurred in the username request step {0}", throwable.getMessage());
+ logger.log(Level.WARNING, "[InvocationCallback] An error has occurred in the username request step {0}", throwable.getMessage());
}
});
});
try {
if (!completionTracker.await(10, TimeUnit.SECONDS)) { //wait for inner requests to complete in 10 seconds
- logger.warning("Some requests didn't complete within the timeout");
+ logger.warning("[InvocationCallback] Some requests didn't complete within the timeout");
}
} catch (InterruptedException ex) {
Logger.getLogger(ClientOrchestration.class.getName()).log(Level.SEVERE, null, ex);
@@ -94,24 +97,25 @@ public class ClientOrchestration {
@Override
public void failed(Throwable throwable) {
- //implement callback
+ logger.warning("Couldn't get the list of IDs");
}
});
}
public void rxOrchestrate() {
logger.info("Orchestrating with a CompletionStage");
- CompletionStage> userIdStage = userIdService.request()
+ CompletionStage userIdStage = userIdService.request().accept(MediaType.APPLICATION_JSON)
.rx()
- .get(new GenericType>() {
+ .get(new GenericType() {
})
.exceptionally((Throwable throwable) -> {
- logger.warning("An error has occurred");
+ logger.warning("[CompletionStage] An error has occurred");
return null;
});
- userIdStage.thenAcceptAsync(listOfIds -> {
- listOfIds.stream().forEach((Long id) -> {
+ userIdStage.thenAcceptAsync(empIdDto -> {
+ logger.info("[CompletionStage] Got all the IDs " + empIdDto.getEmpIds());
+ empIdDto.getEmpIds().stream().forEach((Long id) -> {
CompletableFuture completable = nameService.resolveTemplate("empId", id)
.request()
.rx()
@@ -124,9 +128,9 @@ public class ClientOrchestration {
.rx()
.get(String.class)
.toCompletableFuture()
- .thenAcceptAsync(hashValue -> logger.log(Level.INFO, "The hash output {0}", hashValue))
+ .thenAcceptAsync(hashValue -> logger.log(Level.INFO, "[CompletionFuture] The hash output {0}", hashValue))
.exceptionally((Throwable throwable) -> {
- logger.log(Level.WARNING, "Hash computation failed for {0}", id);
+ logger.log(Level.WARNING, "[CompletionStage] Hash computation failed for {0}", id);
return null;
});
@@ -140,53 +144,61 @@ public class ClientOrchestration {
public void observableJavaOrchestrate() {
logger.info("Orchestrating with Observables");
- Observable> userIdObservable = userIdService.register(RxObservableInvokerProvider.class).request()
+ Observable userIdObservable = userIdService.register(RxObservableInvokerProvider.class).request()
+ .accept(MediaType.APPLICATION_JSON)
.rx(RxObservableInvoker.class)
- .get(new GenericType>() {
+ .get(new GenericType() {
});
- userIdObservable.subscribe((List listOfIds) -> {
- Observable.from(listOfIds).map(id
- -> nameService.resolveTemplate("empId", id)
+ userIdObservable.subscribe((EmployeeDTO empIdList) -> {
+ logger.info("[Observable] Got all the IDs " + empIdList.getEmpIds());
+ Observable.from(empIdList.getEmpIds()).subscribe(id
+ -> nameService.register(RxObservableInvokerProvider.class)
+ .resolveTemplate("empId", id)
.request()
.rx(RxObservableInvoker.class)
.get(String.class)
.asObservable() //gotten the name for the given empId
- .doOnError(throwable -> logger.log(Level.WARNING, "An error has occurred in the username request step {0}", throwable.getMessage()))
- .subscribe(userName -> hashService.resolveTemplate("comboIDandName", userName + id)
+ .doOnError(throwable -> logger.log(Level.WARNING, " [Observable] An error has occurred in the username request step {0}", throwable.getMessage()))
+ .subscribe(userName -> hashService
+ .register(RxObservableInvokerProvider.class)
+ .resolveTemplate("comboIDandName", userName + id)
.request()
.rx(RxObservableInvoker.class)
.get(String.class)
.asObservable() //gotten the hash value for empId+username
- .doOnError(throwable -> logger.log(Level.WARNING, "An error has occurred in the hashing request step {0}", throwable.getMessage()))
- .subscribe(hashValue -> logger.log(Level.INFO, "The hash output {0}", hashValue))));
+ .doOnError(throwable -> logger.log(Level.WARNING, " [Observable]An error has occurred in the hashing request step {0}", throwable.getMessage()))
+ .subscribe(hashValue -> logger.log(Level.INFO, "[Observable] The hash output {0}", hashValue))));
});
}
+
public void flowableJavaOrchestrate() {
logger.info("Orchestrating with Flowable");
- Flowable> userIdObservable = userIdService.register(RxFlowableInvokerProvider.class).request()
+ Flowable userIdObservable = userIdService.register(RxFlowableInvokerProvider.class)
+ .request()
.rx(RxFlowableInvoker.class)
- .get(new GenericType>() {
+ .get(new GenericType() {
});
- Disposable subscribe = userIdObservable.subscribe((List listOfIds) -> {
+ Disposable subscribe = userIdObservable.subscribe((EmployeeDTO dto) -> {
+ List listOfIds = dto.getEmpIds();
Observable.from(listOfIds).map(id
- -> nameService.resolveTemplate("empId", id)
+ -> nameService.register(RxFlowableInvokerProvider.class)
+ .resolveTemplate("empId", id)
.request()
- .rx(RxObservableInvoker.class)
- .get(String.class)
- .asObservable() //gotten the name for the given empId
- .doOnError(throwable -> logger.log(Level.WARNING, "An error has occurred in the username request step {0}", throwable.getMessage()))
- .subscribe(userName -> hashService.resolveTemplate("comboIDandName", userName + id)
+ .rx(RxFlowableInvoker.class)
+ .get(String.class) //gotten the name for the given empId
+ .doOnError(throwable -> logger.log(Level.WARNING, "[Flowable] An error has occurred in the username request step {0}", throwable.getMessage()))
+ .subscribe(userName -> hashService.register(RxFlowableInvokerProvider.class)
+ .resolveTemplate("comboIDandName", userName + id)
.request()
- .rx(RxObservableInvoker.class)
- .get(String.class)
- .asObservable() //gotten the hash value for empId+username
- .doOnError(throwable -> logger.warning("An error has occurred in the hashing request step " + throwable.getMessage()))
- .subscribe(hashValue -> logger.log(Level.INFO, "The hash output {0}", hashValue))));
+ .rx(RxFlowableInvoker.class)
+ .get(String.class) //gotten the hash value for empId+username
+ .doOnError(throwable -> logger.warning(" [Flowable] An error has occurred in the hashing request step " + throwable.getMessage()))
+ .subscribe(hashValue -> logger.log(Level.INFO, "[Flowable] The hash output {0}", hashValue))));
});
}
diff --git a/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/EmployeeDTO.java b/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/EmployeeDTO.java
new file mode 100644
index 0000000000..ab3cfb54a2
--- /dev/null
+++ b/jersey-client-rx/src/main/java/com/baeldung/samples/jerseyrx/EmployeeDTO.java
@@ -0,0 +1,21 @@
+package com.baeldung.samples.jerseyrx;
+
+import java.util.List;
+
+/**
+ *
+ * @author SIGINT-X
+ */
+public class EmployeeDTO {
+
+ private List empIds;
+
+ public List getEmpIds() {
+ return empIds;
+ }
+
+ public void setEmpIds(List empIds) {
+ this.empIds = empIds;
+ }
+
+}
diff --git a/jersey-client-rx/src/test/java/com/baeldung/samples/jerseyrx/ClientOrchestrationTest.java b/jersey-client-rx/src/test/java/com/baeldung/samples/jerseyrx/ClientOrchestrationTest.java
new file mode 100644
index 0000000000..4286e192c0
--- /dev/null
+++ b/jersey-client-rx/src/test/java/com/baeldung/samples/jerseyrx/ClientOrchestrationTest.java
@@ -0,0 +1,67 @@
+package com.baeldung.samples.jerseyrx;
+
+import com.github.tomakehurst.wiremock.WireMockServer;
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.configureFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
+import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ *
+ * @author baeldung
+ */
+public class ClientOrchestrationTest {
+
+ ClientOrchestration orchestrator = new ClientOrchestration();
+
+ String jsonIdList = "{\"empIds\":[1,2,3,4,5,6]}";
+
+ String[] nameList = new String[]{"n/a", "Thor", "Hulk", "BlackWidow", "BlackPanther", "TheTick", "Hawkeye"};
+
+ String[] hashResultList = new String[]{"roht1", "kluh2", "WodiwKcalb3", "RehtnapKclab4", "kciteht5", "eyekwah6"};
+
+ WireMockServer wireMockServer = new WireMockServer();
+
+ @Before
+ public void setup() {
+ wireMockServer.start();
+ configureFor("localhost", 8080);
+ stubFor(get(urlEqualTo("/serviceA/id")).willReturn(aResponse().withBody(jsonIdList).withHeader("Content-Type", "application/json")));
+
+ stubFor(get(urlEqualTo("/serviceA/1/name")).willReturn(aResponse().withBody(nameList[1])));
+ stubFor(get(urlEqualTo("/serviceA/2/name")).willReturn(aResponse().withBody(nameList[2])));
+ stubFor(get(urlEqualTo("/serviceA/3/name")).willReturn(aResponse().withBody(nameList[3])));
+ stubFor(get(urlEqualTo("/serviceA/4/name")).willReturn(aResponse().withBody(nameList[4])));
+ stubFor(get(urlEqualTo("/serviceA/5/name")).willReturn(aResponse().withBody(nameList[5])));
+ stubFor(get(urlEqualTo("/serviceA/6/name")).willReturn(aResponse().withBody(nameList[6])));
+
+ stubFor(get(urlEqualTo("/serviceA/Thor1/hash")).willReturn(aResponse().withBody(hashResultList[0])));
+ stubFor(get(urlEqualTo("/serviceA/Hulk2/hash")).willReturn(aResponse().withBody(hashResultList[1])));
+ stubFor(get(urlEqualTo("/serviceA/BlackWidow3/hash")).willReturn(aResponse().withBody(hashResultList[2])));
+ stubFor(get(urlEqualTo("/serviceA/BlackPanther4/hash")).willReturn(aResponse().withBody(hashResultList[3])));
+ stubFor(get(urlEqualTo("/serviceA/TheTick5/hash")).willReturn(aResponse().withBody(hashResultList[4])));
+ stubFor(get(urlEqualTo("/serviceA/Hawkeye6/hash")).willReturn(aResponse().withBody(hashResultList[5])));
+
+ }
+
+ @Test
+ public void hits() {
+
+ orchestrator.callBackOrchestrate();
+ orchestrator.rxOrchestrate();
+ orchestrator.observableJavaOrchestrate();
+ orchestrator.flowableJavaOrchestrate();
+
+ }
+
+ @After
+ public void tearDown() {
+
+ }
+
+}