diff --git a/core-java-modules/core-java-exceptions-3/src/main/java/com/baeldung/exceptions/classcastexception/Animal.java b/core-java-modules/core-java-exceptions-3/src/main/java/com/baeldung/exceptions/classcastexception/Animal.java new file mode 100644 index 0000000000..b57f68f053 --- /dev/null +++ b/core-java-modules/core-java-exceptions-3/src/main/java/com/baeldung/exceptions/classcastexception/Animal.java @@ -0,0 +1,6 @@ +package com.baeldung.exceptions.classcastexception; + +public interface Animal { + + String getName(); +} diff --git a/core-java-modules/core-java-exceptions-3/src/main/java/com/baeldung/exceptions/classcastexception/Box.java b/core-java-modules/core-java-exceptions-3/src/main/java/com/baeldung/exceptions/classcastexception/Box.java new file mode 100644 index 0000000000..bb67407218 --- /dev/null +++ b/core-java-modules/core-java-exceptions-3/src/main/java/com/baeldung/exceptions/classcastexception/Box.java @@ -0,0 +1,14 @@ +package com.baeldung.exceptions.classcastexception; + +public class Box { + + private T content; + + public T getContent() { + return content; + } + + public void setContent(T content) { + this.content = content; + } +} diff --git a/core-java-modules/core-java-exceptions-3/src/main/java/com/baeldung/exceptions/classcastexception/Frog.java b/core-java-modules/core-java-exceptions-3/src/main/java/com/baeldung/exceptions/classcastexception/Frog.java new file mode 100644 index 0000000000..0a0b2d1f63 --- /dev/null +++ b/core-java-modules/core-java-exceptions-3/src/main/java/com/baeldung/exceptions/classcastexception/Frog.java @@ -0,0 +1,9 @@ +package com.baeldung.exceptions.classcastexception; + +public class Frog extends Reptile { + + @Override + public String getName() { + return super.getName() + ": Frog"; + } +} diff --git a/core-java-modules/core-java-exceptions-3/src/main/java/com/baeldung/exceptions/classcastexception/Mammal.java b/core-java-modules/core-java-exceptions-3/src/main/java/com/baeldung/exceptions/classcastexception/Mammal.java new file mode 100644 index 0000000000..2723cc5b7b --- /dev/null +++ b/core-java-modules/core-java-exceptions-3/src/main/java/com/baeldung/exceptions/classcastexception/Mammal.java @@ -0,0 +1,9 @@ +package com.baeldung.exceptions.classcastexception; + +public class Mammal implements Animal { + + @Override + public String getName() { + return "Mammal"; + } +} diff --git a/core-java-modules/core-java-exceptions-3/src/main/java/com/baeldung/exceptions/classcastexception/Reptile.java b/core-java-modules/core-java-exceptions-3/src/main/java/com/baeldung/exceptions/classcastexception/Reptile.java new file mode 100644 index 0000000000..ed4b0273e5 --- /dev/null +++ b/core-java-modules/core-java-exceptions-3/src/main/java/com/baeldung/exceptions/classcastexception/Reptile.java @@ -0,0 +1,9 @@ +package com.baeldung.exceptions.classcastexception; + +public class Reptile implements Animal { + + @Override + public String getName() { + return "Reptile"; + } +} diff --git a/core-java-modules/core-java-exceptions-3/src/test/java/com/baeldung/exceptions/classcastexception/CheckedCastsUnitTest.java b/core-java-modules/core-java-exceptions-3/src/test/java/com/baeldung/exceptions/classcastexception/CheckedCastsUnitTest.java new file mode 100644 index 0000000000..7fac000fa8 --- /dev/null +++ b/core-java-modules/core-java-exceptions-3/src/test/java/com/baeldung/exceptions/classcastexception/CheckedCastsUnitTest.java @@ -0,0 +1,40 @@ +package com.baeldung.exceptions.classcastexception; + +import org.junit.Test; + +import java.io.Serializable; + +public class CheckedCastsUnitTest { + + @Test(expected = ClassCastException.class) + public void givenBaseTypeVariableReferencingChildInstance_whenCastToIncompatibleSubtype_thenClassCastException() { + Animal animal = new Frog(); + + //A checked downcast to Mammal is incompatible from Frog because Frog is not a subtype of Mammal. + Mammal mammal = (Mammal) animal; + } + + @Test(expected = ClassCastException.class) + public void givenBaseTypeVariableReferencingChildInstance_whenCastToIncompatibleInterface_thenClassCastException() { + Animal animal = new Frog(); + + //A checked cast to Serializable is incompatible from Frog because Frog is not a subtype of Serializable. + Serializable serial = (Serializable) animal; + } + + @Test(expected = ClassCastException.class) + public void givenObjectVariableReferencingPrimitiveArray_whenCastToBoxedTypeArray_thenClassCastException() { + Object primitives = new int[1]; + + //A checked cast to Integer[] is incompatible from primitive arrays. Auto-boxing does not work for arrays. + Integer[] integers = (Integer[]) primitives; + } + + @Test(expected = ClassCastException.class) + public void givenObjectVariableReferencingPrimitiveArray_whenCastToPromotedTypeArray_thenClassCastException() { + Object primitives = new int[1]; + + //A checked cast to long[] is incompatible from int[]. Type promotion does not work for arrays. + long[] longs = (long[]) primitives; + } +} diff --git a/core-java-modules/core-java-exceptions-3/src/test/java/com/baeldung/exceptions/classcastexception/GenericConversionUnitTest.java b/core-java-modules/core-java-exceptions-3/src/test/java/com/baeldung/exceptions/classcastexception/GenericConversionUnitTest.java new file mode 100644 index 0000000000..027ece2db7 --- /dev/null +++ b/core-java-modules/core-java-exceptions-3/src/test/java/com/baeldung/exceptions/classcastexception/GenericConversionUnitTest.java @@ -0,0 +1,21 @@ +package com.baeldung.exceptions.classcastexception; + +import org.junit.Test; + +public class GenericConversionUnitTest { + + @Test(expected = ClassCastException.class) + public void givenIncompatibleType_whenConvertInstanceOfObject_thenClassCastException() { + // Should have been null, but due to type erasure, inside convertInstanceOfObject, + // it will attempt to cast to Object instead of String, so it casts to Object, which is always possible. + String shouldBeNull = convertInstanceOfObject(123); + } + + public static T convertInstanceOfObject(Object o) { + try { + return (T) o; // Casts to Object due to type erasure + } catch (ClassCastException e) { + return null; // Will never reach this + } + } +} diff --git a/core-java-modules/core-java-exceptions-3/src/test/java/com/baeldung/exceptions/classcastexception/UncheckedConversionUnitTest.java b/core-java-modules/core-java-exceptions-3/src/test/java/com/baeldung/exceptions/classcastexception/UncheckedConversionUnitTest.java new file mode 100644 index 0000000000..60e7d5a147 --- /dev/null +++ b/core-java-modules/core-java-exceptions-3/src/test/java/com/baeldung/exceptions/classcastexception/UncheckedConversionUnitTest.java @@ -0,0 +1,17 @@ +package com.baeldung.exceptions.classcastexception; + +import org.junit.Test; + +public class UncheckedConversionUnitTest { + + @Test(expected = ClassCastException.class) + public void givenPollutedGenericType_whenGetProperty_thenClassCastException() { + Box originalBox = new Box<>(); + Box raw = originalBox; + raw.setContent(2.5); + Box bound = (Box) raw; + + //An incompatible element was found in the raw box. + Long content = bound.getContent(); + } +} diff --git a/netflix-modules/mantis/pom.xml b/netflix-modules/mantis/pom.xml new file mode 100644 index 0000000000..5d9611ccdf --- /dev/null +++ b/netflix-modules/mantis/pom.xml @@ -0,0 +1,78 @@ + + + 4.0.0 + mantis + Mantis + jar + Sample project for Netflix Mantis + + + com.baeldung + parent-boot-2 + 0.0.1-SNAPSHOT + ../../parent-boot-2 + + + + + + org.springframework.boot + spring-boot-starter + 2.1.3.RELEASE + + + + io.mantisrx + mantis-runtime + 1.2.63 + + + org.slf4j + slf4j-log4j12 + + + + + + com.fasterxml.jackson.core + jackson-databind + 2.10.2 + + + + net.andreinc.mockneat + mockneat + 0.3.8 + + + + org.projectlombok + lombok + 1.18.12 + + + + org.springframework + spring-webflux + 5.0.9.RELEASE + test + + + + io.projectreactor.netty + reactor-netty + 0.9.12.RELEASE + test + + + + + + + SpringLibReleaseRepo + https://repo.spring.io/libs-release/ + + + + diff --git a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/MantisApplication.java b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/MantisApplication.java new file mode 100644 index 0000000000..d5ffe977c3 --- /dev/null +++ b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/MantisApplication.java @@ -0,0 +1,23 @@ +package com.baeldung.netflix.mantis; + +import com.baeldung.netflix.mantis.job.LogAggregationJob; +import io.mantisrx.runtime.executor.LocalJobExecutorNetworked; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@Slf4j +@SpringBootApplication +public class MantisApplication implements CommandLineRunner { + + public static void main(String[] args) { + SpringApplication.run(MantisApplication.class, args); + } + + @Override + public void run(String... args) { + LocalJobExecutorNetworked.execute(new LogAggregationJob().getJobInstance()); + } + +} diff --git a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogAggregationJob.java b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogAggregationJob.java new file mode 100644 index 0000000000..7fc514deef --- /dev/null +++ b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogAggregationJob.java @@ -0,0 +1,37 @@ +package com.baeldung.netflix.mantis.job; + +import com.baeldung.netflix.mantis.model.LogAggregate; +import com.baeldung.netflix.mantis.source.RandomLogSource; +import com.baeldung.netflix.mantis.stage.CountLogStage; +import com.baeldung.netflix.mantis.stage.GroupLogStage; +import com.baeldung.netflix.mantis.stage.TransformLogStage; +import io.mantisrx.runtime.Job; +import io.mantisrx.runtime.MantisJob; +import io.mantisrx.runtime.MantisJobProvider; +import io.mantisrx.runtime.Metadata; +import io.mantisrx.runtime.sink.Sink; +import io.mantisrx.runtime.sink.Sinks; +import lombok.AllArgsConstructor; +import lombok.NoArgsConstructor; + +@NoArgsConstructor +@AllArgsConstructor +public class LogAggregationJob extends MantisJobProvider { + + private Sink sink = Sinks.eagerSubscribe(Sinks.sse(LogAggregate::toJsonString)); + + @Override + public Job getJobInstance() { + + return MantisJob + .source(new RandomLogSource()) + .stage(new TransformLogStage(), TransformLogStage.stageConfig()) + .stage(new GroupLogStage(), GroupLogStage.config()) + .stage(new CountLogStage(), CountLogStage.config()) + .sink(sink) + .metadata(new Metadata.Builder().build()) + .create(); + + } + +} diff --git a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogCollectingJob.java b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogCollectingJob.java new file mode 100644 index 0000000000..34ccf8355a --- /dev/null +++ b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogCollectingJob.java @@ -0,0 +1,34 @@ +package com.baeldung.netflix.mantis.job; + +import com.baeldung.netflix.mantis.model.LogEvent; +import com.baeldung.netflix.mantis.sink.LogSink; +import com.baeldung.netflix.mantis.source.RandomLogSource; +import com.baeldung.netflix.mantis.stage.TransformLogStage; +import io.mantisrx.runtime.Job; +import io.mantisrx.runtime.MantisJob; +import io.mantisrx.runtime.MantisJobProvider; +import io.mantisrx.runtime.Metadata; +import io.mantisrx.runtime.ScalarToScalar; +import io.mantisrx.runtime.sink.Sink; +import lombok.AllArgsConstructor; +import lombok.NoArgsConstructor; + +@NoArgsConstructor +@AllArgsConstructor +public class LogCollectingJob extends MantisJobProvider { + + private Sink sink = new LogSink(); + + @Override + public Job getJobInstance() { + + return MantisJob + .source(new RandomLogSource()) + .stage(new TransformLogStage(), new ScalarToScalar.Config<>()) + .sink(sink) + .metadata(new Metadata.Builder().build()) + .create(); + + } + +} diff --git a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/model/LogAggregate.java b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/model/LogAggregate.java new file mode 100644 index 0000000000..e0e3c4f9fa --- /dev/null +++ b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/model/LogAggregate.java @@ -0,0 +1,28 @@ +package com.baeldung.netflix.mantis.model; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.mantisrx.runtime.codec.JsonType; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; + +@Getter +@NoArgsConstructor +@AllArgsConstructor +public class LogAggregate implements JsonType { + + private static final ObjectMapper mapper = new ObjectMapper(); + + private Integer count; + private String level; + + public String toJsonString() { + try { + return mapper.writeValueAsString(this); + } catch (JsonProcessingException e) { + return null; + } + } + +} diff --git a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/model/LogEvent.java b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/model/LogEvent.java new file mode 100644 index 0000000000..a48dfcd5dd --- /dev/null +++ b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/model/LogEvent.java @@ -0,0 +1,35 @@ +package com.baeldung.netflix.mantis.model; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.mantisrx.runtime.codec.JsonType; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@Getter +@Setter +@NoArgsConstructor +public class LogEvent implements JsonType { + + private static final ObjectMapper mapper = new ObjectMapper(); + + private Long index; + private String level; + private String message; + + public LogEvent(String[] parts) { + this.index = Long.valueOf(parts[0]); + this.level = parts[1]; + this.message = parts[2]; + } + + public String toJsonString() { + try { + return mapper.writeValueAsString(this); + } catch (JsonProcessingException e) { + return null; + } + } + +} diff --git a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/sink/LogSink.java b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/sink/LogSink.java new file mode 100644 index 0000000000..ae2177bf87 --- /dev/null +++ b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/sink/LogSink.java @@ -0,0 +1,37 @@ +package com.baeldung.netflix.mantis.sink; + +import com.baeldung.netflix.mantis.model.LogEvent; +import io.mantisrx.runtime.Context; +import io.mantisrx.runtime.PortRequest; +import io.mantisrx.runtime.sink.SelfDocumentingSink; +import io.mantisrx.runtime.sink.ServerSentEventsSink; +import io.mantisrx.runtime.sink.Sink; +import io.mantisrx.runtime.sink.predicate.Predicate; +import rx.Observable; + +public class LogSink implements Sink { + + @Override + public void call(Context context, PortRequest portRequest, Observable logEventObservable) { + + SelfDocumentingSink sink = new ServerSentEventsSink.Builder() + .withEncoder(LogEvent::toJsonString) + .withPredicate(filterByLogMessage()) + .build(); + + logEventObservable.subscribe(); + + sink.call(context, portRequest, logEventObservable); + } + + private Predicate filterByLogMessage() { + return new Predicate<>("filter by message", + parameters -> { + if (parameters != null && parameters.containsKey("filter")) { + return logEvent -> logEvent.getMessage().contains(parameters.get("filter").get(0)); + } + return logEvent -> true; + }); + } + +} diff --git a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/source/RandomLogSource.java b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/source/RandomLogSource.java new file mode 100644 index 0000000000..fe607d9866 --- /dev/null +++ b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/source/RandomLogSource.java @@ -0,0 +1,46 @@ +package com.baeldung.netflix.mantis.source; + +import io.mantisrx.runtime.Context; +import io.mantisrx.runtime.source.Index; +import io.mantisrx.runtime.source.Source; +import lombok.extern.slf4j.Slf4j; +import net.andreinc.mockneat.MockNeat; +import rx.Observable; + +import java.util.Date; +import java.util.concurrent.TimeUnit; + +@Slf4j +public class RandomLogSource implements Source { + + private MockNeat mockDataGenerator; + + @Override + public void init(Context context, Index index) { + mockDataGenerator = MockNeat.threadLocal(); + } + + @Override + public Observable> call(Context context, Index index) { + return Observable.just( + Observable + .interval(250, TimeUnit.MILLISECONDS) + .map(this::createRandomLogEvent)); + } + + private String createRandomLogEvent(Long tick) { + String level = mockDataGenerator.probabilites(String.class) + .add(0.5, "INFO") + .add(0.3, "WARN") + .add(0.2, "ERROR") + .get(); + + String message = mockDataGenerator.probabilites(String.class) + .add(0.5, "login attempt") + .add(0.5, "user created") + .get(); + + return tick + "#" + level + "#" + message; + } + +} diff --git a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/stage/CountLogStage.java b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/stage/CountLogStage.java new file mode 100644 index 0000000000..5f02d783d0 --- /dev/null +++ b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/stage/CountLogStage.java @@ -0,0 +1,58 @@ +package com.baeldung.netflix.mantis.stage; + +import com.baeldung.netflix.mantis.model.LogAggregate; +import com.baeldung.netflix.mantis.model.LogEvent; +import io.mantisrx.common.MantisGroup; +import io.mantisrx.runtime.Context; +import io.mantisrx.runtime.GroupToScalar; +import io.mantisrx.runtime.codec.JacksonCodecs; +import io.mantisrx.runtime.computation.GroupToScalarComputation; +import io.mantisrx.runtime.parameter.ParameterDefinition; +import io.mantisrx.runtime.parameter.type.IntParameter; +import io.mantisrx.runtime.parameter.validator.Validators; +import rx.Observable; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class CountLogStage implements GroupToScalarComputation { + + private int duration; + + @Override + public void init(Context context) { + duration = (int)context.getParameters().get("LogAggregationDuration", 1000); + } + + @Override + public Observable call(Context context, Observable> mantisGroup) { + return mantisGroup + .window(duration, TimeUnit.MILLISECONDS) + .flatMap(o -> o.groupBy(MantisGroup::getKeyValue) + .flatMap(group -> group.reduce(0, (count, value) -> count = count + 1) + .map((count) -> new LogAggregate(count, group.getKey())) + )); + } + + public static GroupToScalar.Config config(){ + return new GroupToScalar.Config() + .description("sum events for a log level") + .codec(JacksonCodecs.pojo(LogAggregate.class)) + .withParameters(getParameters()); + } + + public static List> getParameters() { + List> params = new ArrayList<>(); + + params.add(new IntParameter() + .name("LogAggregationDuration") + .description("window size for aggregation in milliseconds") + .validator(Validators.range(100, 10000)) + .defaultValue(5000) + .build()) ; + + return params; + } + +} diff --git a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/stage/GroupLogStage.java b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/stage/GroupLogStage.java new file mode 100644 index 0000000000..f21c4aba7d --- /dev/null +++ b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/stage/GroupLogStage.java @@ -0,0 +1,25 @@ +package com.baeldung.netflix.mantis.stage; + +import com.baeldung.netflix.mantis.model.LogEvent; +import io.mantisrx.common.MantisGroup; +import io.mantisrx.runtime.Context; +import io.mantisrx.runtime.ScalarToGroup; +import io.mantisrx.runtime.codec.JacksonCodecs; +import io.mantisrx.runtime.computation.ToGroupComputation; +import rx.Observable; + +public class GroupLogStage implements ToGroupComputation { + + @Override + public Observable> call(Context context, Observable logEvent) { + return logEvent.map(log -> new MantisGroup<>(log.getLevel(), log)); + } + + public static ScalarToGroup.Config config(){ + return new ScalarToGroup.Config() + .description("Group event data by level") + .codec(JacksonCodecs.pojo(LogEvent.class)) + .concurrentInput(); + } + +} diff --git a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/stage/TransformLogStage.java b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/stage/TransformLogStage.java new file mode 100644 index 0000000000..33e6567d13 --- /dev/null +++ b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/stage/TransformLogStage.java @@ -0,0 +1,24 @@ +package com.baeldung.netflix.mantis.stage; + +import com.baeldung.netflix.mantis.model.LogEvent; +import io.mantisrx.runtime.Context; +import io.mantisrx.runtime.ScalarToScalar; +import io.mantisrx.runtime.codec.JacksonCodecs; +import io.mantisrx.runtime.computation.ScalarComputation; +import rx.Observable; + +public class TransformLogStage implements ScalarComputation { + + @Override + public Observable call(Context context, Observable logEntry) { + return logEntry + .map(log -> log.split("#")) + .filter(parts -> parts.length == 3) + .map(LogEvent::new); + } + + public static ScalarToScalar.Config stageConfig() { + return new ScalarToScalar.Config() + .codec(JacksonCodecs.pojo(LogEvent.class)); + } +} diff --git a/netflix-modules/mantis/src/test/java/com/baeldung/netflix/mantis/job/LogAggregationJobIntegrationTest.java b/netflix-modules/mantis/src/test/java/com/baeldung/netflix/mantis/job/LogAggregationJobIntegrationTest.java new file mode 100644 index 0000000000..b9b16e2146 --- /dev/null +++ b/netflix-modules/mantis/src/test/java/com/baeldung/netflix/mantis/job/LogAggregationJobIntegrationTest.java @@ -0,0 +1,56 @@ +package com.baeldung.netflix.mantis.job; + +import com.baeldung.netflix.mantis.model.LogAggregate; +import io.mantisrx.runtime.PortRequest; +import io.mantisrx.runtime.sink.Sinks; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import static java.util.Arrays.asList; +import static java.util.Optional.of; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class LogAggregationJobIntegrationTest extends MantisJobTestBase { + + private final static int PORT = 7382; + private final static String SINK_URL = "http://localhost:" + PORT; + + @BeforeAll + static void beforeAll() { + start(new LogAggregationJob((context, portRequest, logAggregateObservable) -> { + logAggregateObservable.subscribe(); + Sinks.sse(LogAggregate::toJsonString).call(context, new PortRequest(PORT), logAggregateObservable); + })); + } + + @Override + public String getSinkUrl() { + return SINK_URL; + } + + @Override + public Class getEventType() { + return LogAggregate.class; + } + + @Test + void whenReadingFromSink_thenShouldRetrieveCorrectNumberOfLogAggregates() { + assertEquals(of(5L), sinkStream.take(5).count().blockOptional()); + } + + @Test + void whenReadingFromSink_thenShouldRetrieveLogAggregate() { + assertNotNull(sinkStream.take(1).blockFirst()); + } + + @Test + void whenReadingFromSink_thenShouldRetrieveValidLogAggregate() { + LogAggregate logAggregate = sinkStream.take(1).blockFirst(); + + assertTrue(asList("ERROR", "WARN", "INFO").contains(logAggregate.getLevel())); + assertTrue(logAggregate.getCount() > 0); + } + +} \ No newline at end of file diff --git a/netflix-modules/mantis/src/test/java/com/baeldung/netflix/mantis/job/LogCollectingJobIntegrationTest.java b/netflix-modules/mantis/src/test/java/com/baeldung/netflix/mantis/job/LogCollectingJobIntegrationTest.java new file mode 100644 index 0000000000..87e0c194b5 --- /dev/null +++ b/netflix-modules/mantis/src/test/java/com/baeldung/netflix/mantis/job/LogCollectingJobIntegrationTest.java @@ -0,0 +1,73 @@ +package com.baeldung.netflix.mantis.job; + +import com.baeldung.netflix.mantis.model.LogEvent; +import com.baeldung.netflix.mantis.sink.LogSink; +import io.mantisrx.runtime.Context; +import io.mantisrx.runtime.PortRequest; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import rx.Observable; + +import static java.util.Arrays.asList; +import static java.util.Optional.of; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class LogCollectingJobIntegrationTest extends MantisJobTestBase { + + private final static int PORT = 7381; + private final static String SINK_URL = "http://localhost:" + PORT; + + @BeforeAll + static void beforeAll() { + + start(new LogCollectingJob(new LogSink() { + + @Override + public void call(Context context, PortRequest portRequest, Observable observable) { + super.call(context, new PortRequest(PORT), observable); + } + + })); + + } + + @Override + public String getSinkUrl() { + return SINK_URL; + } + + @Override + public Class getEventType() { + return LogEvent.class; + } + + @Test + void whenReadingFromSink_thenShouldRetrieveCorrectNumberOfLogEvents() { + assertEquals(of(5L), sinkStream.take(5).count().blockOptional()); + } + + @Test + void whenReadingFromSink_thenShouldRetrieveLogEvent() { + assertNotNull(sinkStream.take(1).blockFirst()); + } + + @Test + void whenReadingFromSink_thenShouldRetrieveValidLogEvent() { + LogEvent logEvent = sinkStream.take(1).blockFirst(); + + assertTrue(asList("ERROR", "WARN", "INFO").contains(logEvent.getLevel())); + assertTrue(asList("login attempt", "user created").contains(logEvent.getMessage())); + } + + @Test + void whenReadingFromSink_thenShouldRetrieveFilteredLogEvents() { + getSinkStream(SINK_URL + "?filter=login") + .take(7) + .toStream().forEach( + logEvent -> assertEquals("login attempt", logEvent.getMessage()) + ); + } + +} \ No newline at end of file diff --git a/netflix-modules/mantis/src/test/java/com/baeldung/netflix/mantis/job/MantisJobTestBase.java b/netflix-modules/mantis/src/test/java/com/baeldung/netflix/mantis/job/MantisJobTestBase.java new file mode 100644 index 0000000000..89425299a4 --- /dev/null +++ b/netflix-modules/mantis/src/test/java/com/baeldung/netflix/mantis/job/MantisJobTestBase.java @@ -0,0 +1,49 @@ +package com.baeldung.netflix.mantis.job; + +import io.mantisrx.runtime.Job; +import io.mantisrx.runtime.MantisJobProvider; +import io.mantisrx.runtime.executor.LocalJobExecutorNetworked; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeEach; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Flux; +import reactor.util.retry.Retry; + +import java.time.Duration; + +public abstract class MantisJobTestBase { + + private static Job jobInstance; + Flux sinkStream; + + public abstract String getSinkUrl(); + public abstract Class getEventType(); + + @BeforeEach + void setUp() { + sinkStream = getSinkStream(getSinkUrl()); + } + + @AfterAll + static void afterAll() { + stopJob(); + } + + protected Flux getSinkStream(String sinkUrl) { + return WebClient.builder().build().get() + .uri(sinkUrl) + .retrieve() + .bodyToFlux(getEventType()) + .retryWhen(Retry.fixedDelay(10, Duration.ofMillis(2000))); + } + + static void start(MantisJobProvider job) { + jobInstance = job.getJobInstance(); + new Thread(() -> LocalJobExecutorNetworked.execute(jobInstance)).start(); + } + + static void stopJob() { + jobInstance.getLifecycle().shutdown(); + } + +} diff --git a/netflix-modules/pom.xml b/netflix-modules/pom.xml index 9ed22498d8..538126fb34 100644 --- a/netflix-modules/pom.xml +++ b/netflix-modules/pom.xml @@ -15,6 +15,7 @@ genie + mantis \ No newline at end of file