BAEL-772 Reactive Streams API with Ratpack (#11328)

* [BAEL-4849] Article code

* [BAEL-4968] Article code

* [BAEL-4968] Article code

* [BAEL-4968] Article code

* [BAEL-4968] Remove extra comments

* [BAEL-4020] Article code

* [BAEL-722] Article code
This commit is contained in:
psevestre
2021-10-14 01:04:19 -03:00
committed by GitHub
parent 2dfdb51592
commit 47abbf654b
7 changed files with 673 additions and 0 deletions
@@ -0,0 +1,63 @@
package com.baeldung.ratpack;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// Non-thread safe !!!
class CompliantPublisher implements Publisher<Integer> {
String name;
private static final Logger log = LoggerFactory.getLogger(CompliantPublisher.class);
private long available;
public CompliantPublisher(long available) {
this.available = available;
}
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
log.info("subscribe");
subscriber.onSubscribe(new CompliantSubscription(subscriber));
}
private class CompliantSubscription implements Subscription {
private Subscriber<? super Integer> subscriber;
private int recurseLevel;
private long requested;
private boolean cancelled;
public CompliantSubscription(Subscriber<? super Integer> subscriber) {
this.subscriber = subscriber;
}
@Override
public void request(long n) {
log.info("request: requested={}, available={}", n, available);
requested += n;
if ( recurseLevel > 0 ) {
return;
}
recurseLevel++;
for (int i = 0 ; i < (requested) && !cancelled && available > 0 ; i++, available-- ) {
subscriber.onNext(i);
}
subscriber.onComplete();
}
@Override
public void cancel() {
cancelled = true;
}
}
}
@@ -0,0 +1,67 @@
package com.baeldung.ratpack;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LoggingSubscriber<T> implements Subscriber<T> {
private static final Logger log = LoggerFactory.getLogger(LoggingSubscriber.class);
private Subscription subscription;
private long requested;
private long received;
private CountDownLatch finished = new CountDownLatch(1);
@Override
public void onComplete() {
log.info("onComplete: sub={}", subscription.hashCode());
finished.countDown();
}
@Override
public void onError(Throwable t) {
log.error("Error: sub={}, message={}", subscription.hashCode(), t.getMessage(),t);
finished.countDown();
}
@Override
public void onNext(T value) {
log.info("onNext: sub={}, value={}", subscription.hashCode(), value);
this.received++;
this.requested++;
subscription.request(1);
}
@Override
public void onSubscribe(Subscription sub) {
log.info("onSubscribe: sub={}", sub.hashCode());
this.subscription = sub;
this.received = 0;
this.requested = 1;
sub.request(1);
}
public long getRequested() {
return requested;
}
public long getReceived() {
return received;
}
public void block() {
try {
finished.await(10, TimeUnit.SECONDS);
}
catch(InterruptedException iex) {
throw new RuntimeException(iex);
}
}
}
@@ -0,0 +1,46 @@
package com.baeldung.ratpack;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NonCompliantPublisher implements Publisher<Integer> {
private static final Logger log = LoggerFactory.getLogger(NonCompliantPublisher.class);
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
log.info("subscribe");
subscriber.onSubscribe(new NonCompliantSubscription(subscriber));
}
private class NonCompliantSubscription implements Subscription {
private Subscriber<? super Integer> subscriber;
private int recurseLevel = 0;
public NonCompliantSubscription(Subscriber<? super Integer> subscriber) {
this.subscriber = subscriber;
}
@Override
public void request(long n) {
log.info("request: n={}", n);
if ( recurseLevel > 0 ) {
return;
}
recurseLevel++;
for (int i = 0 ; i < (n + 5) ; i ++ ) {
subscriber.onNext(i);
}
subscriber.onComplete();
}
@Override
public void cancel() {
}
}
}
@@ -0,0 +1,140 @@
package com.baeldung.ratpack;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ratpack.exec.ExecResult;
import ratpack.func.Action;
import ratpack.stream.StreamEvent;
import ratpack.stream.Streams;
import ratpack.stream.TransformablePublisher;
import ratpack.test.exec.ExecHarness;
public class RatpackStreamsUnitTest {
private static Logger log = LoggerFactory.getLogger(RatpackStreamsUnitTest.class);
@Test
public void whenPublish_thenSuccess() {
Publisher<String> pub = Streams.publish(Arrays.asList("hello", "hello again"));
LoggingSubscriber<String> sub = new LoggingSubscriber<String>();
pub.subscribe(sub);
sub.block();
}
@Test
public void whenYield_thenSuccess() {
Publisher<String> pub = Streams.yield((t) -> {
return t.getRequestNum() < 5 ? "hello" : null;
});
LoggingSubscriber<String> sub = new LoggingSubscriber<String>();
pub.subscribe(sub);
sub.block();
assertEquals(5, sub.getReceived());
}
@Test
public void whenPeriodic_thenSuccess() {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
Publisher<String> pub = Streams.periodically(executor, Duration.ofSeconds(1), (t) -> {
return t < 5 ? String.format("hello %d",t): null;
});
LoggingSubscriber<String> sub = new LoggingSubscriber<String>();
pub.subscribe(sub);
sub.block();
assertEquals(5, sub.getReceived());
}
@Test
public void whenMap_thenSuccess() throws Exception {
TransformablePublisher<String> pub = Streams.yield( t -> {
return t.getRequestNum() < 5 ? t.getRequestNum() : null;
})
.map(v -> String.format("item %d", v));
ExecResult<List<String>> result = ExecHarness.yieldSingle((c) -> pub.toList() );
assertTrue("should succeed", result.isSuccess());
assertEquals("should have 5 items",5,result.getValue().size());
}
@Test
public void whenNonCompliantPublisherWithBuffer_thenSuccess() throws Exception {
TransformablePublisher<Integer> pub = Streams.transformable(new NonCompliantPublisher())
.wiretap(new LoggingAction("before buffer"))
.buffer()
.wiretap(new LoggingAction("after buffer"))
.take(1);
LoggingSubscriber<Integer> sub = new LoggingSubscriber<>();
pub.subscribe(sub);
sub.block();
}
@Test
public void whenNonCompliantPublisherWithoutBuffer_thenSuccess() throws Exception {
TransformablePublisher<Integer> pub = Streams.transformable(new NonCompliantPublisher())
.wiretap(new LoggingAction(""))
.take(1);
LoggingSubscriber<Integer> sub = new LoggingSubscriber<>();
pub.subscribe(sub);
sub.block();
}
@Test
public void whenCompliantPublisherWithoutBatch_thenSuccess() throws Exception {
TransformablePublisher<Integer> pub = Streams.transformable(new CompliantPublisher(10))
.wiretap(new LoggingAction(""));
LoggingSubscriber<Integer> sub = new LoggingSubscriber<>();
pub.subscribe(sub);
sub.block();
}
@Test
public void whenCompliantPublisherWithBatch_thenSuccess() throws Exception {
TransformablePublisher<Integer> pub = Streams.transformable(new CompliantPublisher(10))
.wiretap(new LoggingAction("before batch"))
.batch(5, Action.noop())
.wiretap(new LoggingAction("after batch"));
LoggingSubscriber<Integer> sub = new LoggingSubscriber<>();
pub.subscribe(sub);
sub.block();
}
private static class LoggingAction implements Action<StreamEvent<Integer>>{
private final String label;
public LoggingAction(String label) {
this.label = label;
}
@Override
public void execute(StreamEvent<Integer> e) throws Exception {
log.info("{}: event={}", label,e);
}
}
}