diff --git a/core-java/src/main/java/com/baeldung/concurrent/skiplist/Event.java b/core-java/src/main/java/com/baeldung/concurrent/skiplist/Event.java new file mode 100644 index 0000000000..ce1f57bb93 --- /dev/null +++ b/core-java/src/main/java/com/baeldung/concurrent/skiplist/Event.java @@ -0,0 +1,21 @@ +package com.baeldung.concurrent.skiplist; + +import java.time.ZonedDateTime; + +public class Event { + private final ZonedDateTime eventTime; + private final String content; + + public Event(ZonedDateTime eventTime, String content) { + this.eventTime = eventTime; + this.content = content; + } + + public ZonedDateTime getEventTime() { + return eventTime; + } + + public String getContent() { + return content; + } +} diff --git a/core-java/src/main/java/com/baeldung/concurrent/skiplist/EventWindowSort.java b/core-java/src/main/java/com/baeldung/concurrent/skiplist/EventWindowSort.java new file mode 100644 index 0000000000..9eef00bd3f --- /dev/null +++ b/core-java/src/main/java/com/baeldung/concurrent/skiplist/EventWindowSort.java @@ -0,0 +1,36 @@ +package com.baeldung.concurrent.skiplist; + +import java.time.ZonedDateTime; +import java.util.Comparator; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.function.ToLongFunction; + +public class EventWindowSort { + private final ConcurrentSkipListMap events = new ConcurrentSkipListMap<>(Comparator.comparingLong(new ToLongFunction() { + @Override + public long applyAsLong(ZonedDateTime value) { + return value + .toInstant() + .toEpochMilli(); + } + })); + + public void acceptEvent(Event event) { + events.put(event.getEventTime(), event.getContent()); + } + + public ConcurrentNavigableMap getEventsFromLastMinute() { + return events.tailMap(ZonedDateTime + .now() + .minusMinutes(1)); + } + + public ConcurrentNavigableMap getEventsOlderThatOneMinute() { + return events.headMap(ZonedDateTime + .now() + .minusMinutes(1)); + } + +} + diff --git a/core-java/src/test/java/com/baeldung/concurrent/skiplist/ConcurrentSkipListSetTest.java b/core-java/src/test/java/com/baeldung/concurrent/skiplist/ConcurrentSkipListSetTest.java new file mode 100644 index 0000000000..a2dbbae520 --- /dev/null +++ b/core-java/src/test/java/com/baeldung/concurrent/skiplist/ConcurrentSkipListSetTest.java @@ -0,0 +1,120 @@ +package com.baeldung.concurrent.skiplist; + +import org.junit.Test; + +import java.time.ZonedDateTime; +import java.util.UUID; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class ConcurrentSkipListSetTest { + + @Test + public void givenThreadsProducingEvents_whenGetForEventsFromLastMinute_thenReturnThoseEventsInTheLockFreeWay() throws InterruptedException { + //given + ExecutorService executorService = Executors.newFixedThreadPool(3); + EventWindowSort eventWindowSort = new EventWindowSort(); + int numberOfThreads = 2; + //when + Runnable producer = () -> IntStream + .rangeClosed(0, 100) + .forEach(index -> eventWindowSort.acceptEvent(new Event(ZonedDateTime + .now() + .minusSeconds(index), UUID + .randomUUID() + .toString()))); + + for (int i = 0; i < numberOfThreads; i++) { + executorService.execute(producer); + } + + Thread.sleep(500); + + ConcurrentNavigableMap eventsFromLastMinute = eventWindowSort.getEventsFromLastMinute(); + + long eventsOlderThanOneMinute = eventsFromLastMinute + .entrySet() + .stream() + .filter(e -> e + .getKey() + .isBefore(ZonedDateTime + .now() + .minusMinutes(1))) + .count(); + assertEquals(eventsOlderThanOneMinute, 0); + + long eventYoungerThanOneMinute = eventsFromLastMinute + .entrySet() + .stream() + .filter(e -> e + .getKey() + .isAfter(ZonedDateTime + .now() + .minusMinutes(1))) + .count(); + + //then + assertTrue(eventYoungerThanOneMinute > 0); + + executorService.awaitTermination(1, TimeUnit.SECONDS); + executorService.shutdown(); + } + + @Test + public void givenThreadsProducingEvents_whenGetForEventsOlderThanOneMinute_thenReturnThoseEventsInTheLockFreeWay() throws InterruptedException { + //given + ExecutorService executorService = Executors.newFixedThreadPool(3); + EventWindowSort eventWindowSort = new EventWindowSort(); + int numberOfThreads = 2; + //when + Runnable producer = () -> IntStream + .rangeClosed(0, 100) + .forEach(index -> eventWindowSort.acceptEvent(new Event(ZonedDateTime + .now() + .minusSeconds(index), UUID + .randomUUID() + .toString()))); + + for (int i = 0; i < numberOfThreads; i++) { + executorService.execute(producer); + } + + Thread.sleep(500); + + ConcurrentNavigableMap eventsFromLastMinute = eventWindowSort.getEventsOlderThatOneMinute(); + + long eventsOlderThanOneMinute = eventsFromLastMinute + .entrySet() + .stream() + .filter(e -> e + .getKey() + .isBefore(ZonedDateTime + .now() + .minusMinutes(1))) + .count(); + assertTrue(eventsOlderThanOneMinute > 0); + + long eventYoungerThanOneMinute = eventsFromLastMinute + .entrySet() + .stream() + .filter(e -> e + .getKey() + .isAfter(ZonedDateTime + .now() + .minusMinutes(1))) + .count(); + + //then + assertEquals(eventYoungerThanOneMinute, 0); + + executorService.awaitTermination(1, TimeUnit.SECONDS); + executorService.shutdown(); + } + +} \ No newline at end of file