[Java 11503] (#12626)
* [JAVA-11503] Created new vertx-modules * [JAVA-11503] Moved vertx(sub-module) to vertx-modules(parent) * [JAVA-11503] Moved spring-vertx(sub-module) to vertx-modules(parent) * [JAVA-11503] Moved vertx-and-java(sub-module) to vertx-modules(parent) * [JAVA-11503] deleted modules that were moved Co-authored-by: panagiotiskakos <panagiotis.kakos@libra-is.com>
This commit is contained in:
+19
@@ -0,0 +1,19 @@
|
||||
package com.baeldung.weather;
|
||||
|
||||
import java.text.MessageFormat;
|
||||
|
||||
class CityAndDayLength {
|
||||
|
||||
private final String city;
|
||||
private final double dayLengthInHours;
|
||||
|
||||
CityAndDayLength(String city, long dayLengthInSeconds) {
|
||||
this.city = city;
|
||||
this.dayLengthInHours = dayLengthInSeconds / (60.0 * 60.0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return MessageFormat.format("In {0} there are {1,number,#0.0} hours of light.", city, dayLengthInHours);
|
||||
}
|
||||
}
|
||||
+45
@@ -0,0 +1,45 @@
|
||||
package com.baeldung.weather;
|
||||
|
||||
import io.reactivex.Flowable;
|
||||
import io.vertx.core.http.RequestOptions;
|
||||
import io.vertx.reactivex.core.http.HttpClient;
|
||||
import io.vertx.reactivex.core.http.HttpClientRequest;
|
||||
import io.vertx.reactivex.core.http.HttpClientResponse;
|
||||
|
||||
import static java.lang.String.format;
|
||||
|
||||
class MetaWeatherClient {
|
||||
|
||||
private static RequestOptions metawether = new RequestOptions()
|
||||
.setHost("www.metaweather.com")
|
||||
.setPort(443)
|
||||
.setSsl(true);
|
||||
|
||||
/**
|
||||
* @return A flowable backed by vertx that automatically sends an HTTP request at soon as the first subscription is received.
|
||||
*/
|
||||
private static Flowable<HttpClientResponse> autoPerformingReq(HttpClient httpClient, String uri) {
|
||||
HttpClientRequest req = httpClient.get(new RequestOptions(metawether).setURI(uri));
|
||||
return req.toFlowable()
|
||||
.doOnSubscribe(subscription -> req.end());
|
||||
}
|
||||
|
||||
static Flowable<HttpClientResponse> searchByCityName(HttpClient httpClient, String cityName) {
|
||||
HttpClientRequest req = httpClient.get(
|
||||
new RequestOptions()
|
||||
.setHost("www.metaweather.com")
|
||||
.setPort(443)
|
||||
.setSsl(true)
|
||||
.setURI(format("/api/location/search/?query=%s", cityName)));
|
||||
return req
|
||||
.toFlowable()
|
||||
.doOnSubscribe(subscription -> req.end());
|
||||
}
|
||||
|
||||
static Flowable<HttpClientResponse> getDataByPlaceId(HttpClient httpClient, long placeId) {
|
||||
return autoPerformingReq(
|
||||
httpClient,
|
||||
format("/api/location/%s/", placeId));
|
||||
}
|
||||
|
||||
}
|
||||
+92
@@ -0,0 +1,92 @@
|
||||
package com.baeldung.weather;
|
||||
|
||||
import io.reactivex.Flowable;
|
||||
import io.reactivex.functions.Function;
|
||||
import io.vertx.core.json.JsonObject;
|
||||
import io.vertx.reactivex.core.Vertx;
|
||||
import io.vertx.reactivex.core.buffer.Buffer;
|
||||
import io.vertx.reactivex.core.file.FileSystem;
|
||||
import io.vertx.reactivex.core.http.HttpClient;
|
||||
import io.vertx.reactivex.core.http.HttpClientResponse;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.reactivestreams.Publisher;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.time.ZonedDateTime;
|
||||
|
||||
import static com.baeldung.weather.MetaWeatherClient.getDataByPlaceId;
|
||||
import static com.baeldung.weather.MetaWeatherClient.searchByCityName;
|
||||
|
||||
public class VertxWithRxJavaIntegrationTest {
|
||||
|
||||
private Vertx vertx;
|
||||
private HttpClient httpClient;
|
||||
private FileSystem fileSystem;
|
||||
private static Logger log = LoggerFactory.getLogger(VertxWithRxJavaIntegrationTest.class);
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
vertx = Vertx.vertx();
|
||||
httpClient = vertx.createHttpClient();
|
||||
fileSystem = vertx.fileSystem();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
vertx.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldDisplayLightLenghts() throws InterruptedException {
|
||||
|
||||
// read the file that contains one city name per line
|
||||
fileSystem
|
||||
.rxReadFile("cities.txt").toFlowable()
|
||||
.doOnNext(buffer -> log.info("File buffer ---\n{}\n---", buffer))
|
||||
.flatMap(buffer -> Flowable.fromArray(buffer.toString().split("\\r?\\n")))
|
||||
.doOnNext(city -> log.info("City from file: '{}'", city))
|
||||
.filter(city -> !city.startsWith("#"))
|
||||
.doOnNext(city -> log.info("City that survived filtering: '{}'", city))
|
||||
.flatMap(city -> searchByCityName(httpClient, city))
|
||||
.flatMap(HttpClientResponse::toFlowable)
|
||||
.doOnNext(buffer -> log.info("JSON of city detail: '{}'", buffer))
|
||||
.map(extractingWoeid())
|
||||
.flatMap(cityId -> getDataByPlaceId(httpClient, cityId))
|
||||
.flatMap(toBufferFlowable())
|
||||
.doOnNext(buffer -> log.info("JSON of place detail: '{}'", buffer))
|
||||
.map(Buffer::toJsonObject)
|
||||
.map(toCityAndDayLength())
|
||||
.subscribe(System.out::println, Throwable::printStackTrace);
|
||||
|
||||
Thread.sleep(20000); // enough to give time to complete the execution
|
||||
}
|
||||
|
||||
private static Function<HttpClientResponse, Publisher<? extends Buffer>> toBufferFlowable() {
|
||||
return response -> response
|
||||
.toObservable()
|
||||
.reduce(
|
||||
Buffer.buffer(),
|
||||
Buffer::appendBuffer).toFlowable();
|
||||
}
|
||||
|
||||
private static Function<Buffer, Long> extractingWoeid() {
|
||||
return cityBuffer -> cityBuffer
|
||||
.toJsonArray()
|
||||
.getJsonObject(0)
|
||||
.getLong("woeid");
|
||||
}
|
||||
|
||||
private static Function<JsonObject, CityAndDayLength> toCityAndDayLength() {
|
||||
return json -> {
|
||||
ZonedDateTime sunRise = ZonedDateTime.parse(json.getString("sun_rise"));
|
||||
ZonedDateTime sunSet = ZonedDateTime.parse(json.getString("sun_set"));
|
||||
String cityName = json.getString("title");
|
||||
return new CityAndDayLength(
|
||||
cityName, sunSet.toEpochSecond() - sunRise.toEpochSecond());
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user