This commit is contained in:
Jonathan Cook
2019-10-23 15:01:44 +02:00
parent db85c8f275
commit 684ec0d2e3
20486 changed files with 1642483 additions and 0 deletions
@@ -0,0 +1,89 @@
package com.baeldung.crunch;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertIterableEquals;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Calendar;
import org.apache.crunch.PCollection;
import org.apache.crunch.Pipeline;
import org.apache.crunch.Source;
import org.apache.crunch.Target;
import org.apache.crunch.impl.mem.MemPipeline;
import org.apache.crunch.io.From;
import org.apache.crunch.io.To;
import org.junit.Ignore;
import org.junit.Test;
public class MemPipelineUnitTest {
private static final String INPUT_FILE_PATH = "src/test/resources/crunch/input.txt";
@Test
public void givenPipeLineAndSource_whenSourceRead_thenExpectedNumberOfRecordsRead() {
Pipeline pipeline = MemPipeline.getInstance();
Source<String> source = From.textFile(INPUT_FILE_PATH);
PCollection<String> lines = pipeline.read(source);
assertEquals(21, lines.asCollection()
.getValue()
.size());
}
@Test
public void givenPipeLine_whenTextFileRead_thenExpectedNumberOfRecordsRead() {
Pipeline pipeline = MemPipeline.getInstance();
PCollection<String> lines = pipeline.readTextFile(INPUT_FILE_PATH);
assertEquals(21, lines.asCollection()
.getValue()
.size());
}
private String createOutputPath() throws IOException {
Path path = Files.createTempDirectory("test");
final String outputFilePath = path.toString() + File.separatorChar
+ "output.text";
return outputFilePath;
}
@Test
@Ignore("Requires Hadoop binaries")
public void givenCollection_whenWriteCalled_fileWrittenSuccessfully()
throws IOException {
PCollection<String> inputStrings = MemPipeline.collectionOf("Hello",
"Apache", "Crunch", Calendar.getInstance()
.toString());
final String outputFilePath = createOutputPath();
Target target = To.textFile(outputFilePath);
inputStrings.write(target);
Pipeline pipeline = MemPipeline.getInstance();
PCollection<String> lines = pipeline.readTextFile(outputFilePath);
assertIterableEquals(inputStrings.materialize(), lines.materialize());
}
@Test
@Ignore("Requires Hadoop binaries")
public void givenPipeLine_whenWriteTextFileCalled_fileWrittenSuccessfully()
throws IOException {
Pipeline pipeline = MemPipeline.getInstance();
PCollection<String> inputStrings = MemPipeline.collectionOf("Hello",
"Apache", "Crunch", Calendar.getInstance()
.toString());
final String outputFilePath = createOutputPath();
pipeline.writeTextFile(inputStrings, outputFilePath);
PCollection<String> lines = pipeline.readTextFile(outputFilePath);
assertIterableEquals(inputStrings.materialize(), lines.materialize());
}
}
@@ -0,0 +1,41 @@
package com.baeldung.crunch;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.crunch.FilterFn;
import org.apache.crunch.PCollection;
import org.apache.crunch.impl.mem.MemPipeline;
import org.junit.Test;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
public class StopWordFilterUnitTest {
@Test
public void givenFilter_whenStopWordPassed_thenFalseReturned() {
FilterFn<String> filter = new StopWordFilter();
assertFalse(filter.accept("the"));
}
@Test
public void givenFilter_whenNonStopWordPassed_thenTrueReturned() {
FilterFn<String> filter = new StopWordFilter();
assertTrue(filter.accept("Hello"));
}
@Test
public void givenWordCollection_whenFiltered_thenStopWordsRemoved() {
PCollection<String> words = MemPipeline.collectionOf("This", "is", "a",
"test", "sentence");
PCollection<String> noStopWords = words.filter(new StopWordFilter());
assertEquals(ImmutableList.of("This", "test", "sentence"),
Lists.newArrayList(noStopWords.materialize()));
}
}
@@ -0,0 +1,21 @@
package com.baeldung.crunch;
import static org.junit.Assert.assertEquals;
import org.apache.crunch.impl.mem.emit.InMemoryEmitter;
import org.junit.Test;
import com.google.common.collect.ImmutableList;
public class ToUpperCaseFnUnitTest {
@Test
public void givenString_whenToUpperCaseFnCalled_UpperCaseStringReturned() {
InMemoryEmitter<String> emitter = new InMemoryEmitter<String>();
new ToUpperCaseFn().process("input", emitter);
assertEquals(ImmutableList.of("INPUT"), emitter.getOutput());
}
}
@@ -0,0 +1,31 @@
package com.baeldung.crunch;
import static org.junit.Assert.assertEquals;
import org.apache.crunch.PCollection;
import org.apache.crunch.impl.mem.MemPipeline;
import org.apache.crunch.types.writable.Writables;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
public class ToUpperCaseWithCounterFnUnitTest {
@Before
public void setUp() throws Exception {
MemPipeline.clearCounters();
}
@Test
public void whenFunctionCalled_counterIncementendForChangedValues() {
PCollection<String> inputStrings = MemPipeline.collectionOf("This", "is", "a", "TEST", "string");
PCollection<String> upperCaseStrings = inputStrings.parallelDo(new ToUpperCaseWithCounterFn(), Writables.strings());
assertEquals(ImmutableList.of("THIS", "IS", "A", "TEST", "STRING"), Lists.newArrayList(upperCaseStrings.materialize()));
assertEquals(4L, MemPipeline.getCounters()
.findCounter("UpperCase", "modified")
.getValue());
}
}
@@ -0,0 +1,27 @@
package com.baeldung.crunch;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import org.apache.crunch.Emitter;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class TokenizerUnitTest {
@Mock
private Emitter<String> emitter;
@Test
public void givenTokenizer_whenLineProcessed_thenOnlyExpectedWordsEmitted() {
Tokenizer splitter = new Tokenizer();
splitter.process(" hello world ", emitter);
verify(emitter).emit("hello");
verify(emitter).emit("world");
verifyNoMoreInteractions(emitter);
}
}
@@ -0,0 +1,43 @@
package com.baeldung.jcache;
import static org.junit.Assert.assertEquals;
import javax.cache.Cache;
import javax.cache.CacheManager;
import javax.cache.Caching;
import javax.cache.configuration.FactoryBuilder;
import javax.cache.configuration.MutableConfiguration;
import javax.cache.spi.CachingProvider;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class CacheLoaderIntegrationTest {
private static final String CACHE_NAME = "SimpleCache";
private Cache<Integer, String> cache;
@Before
public void setup() {
// Adding fully qualified class name because of multiple Cache Provider (Ignite and Hazelcast)
CachingProvider cachingProvider = Caching.getCachingProvider("com.hazelcast.cache.HazelcastCachingProvider");
CacheManager cacheManager = cachingProvider.getCacheManager();
MutableConfiguration<Integer, String> config = new MutableConfiguration<Integer, String>().setReadThrough(true).setCacheLoaderFactory(new FactoryBuilder.SingletonFactory<>(new SimpleCacheLoader()));
this.cache = cacheManager.createCache("SimpleCache", config);
}
@After
public void tearDown() {
Caching.getCachingProvider("com.hazelcast.cache.HazelcastCachingProvider").getCacheManager().destroyCache(CACHE_NAME);
}
@Test
public void whenReadingFromStorage_thenCorrect() {
for (int i = 1; i < 4; i++) {
String value = cache.get(i);
assertEquals("fromCache" + i, value);
}
}
}
@@ -0,0 +1,41 @@
package com.baeldung.jcache;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import javax.cache.Cache;
import javax.cache.CacheManager;
import javax.cache.Caching;
import javax.cache.configuration.MutableConfiguration;
import javax.cache.spi.CachingProvider;
import static org.junit.Assert.assertEquals;
public class EntryProcessorIntegrationTest {
private static final String CACHE_NAME = "MyCache";
private static final String CACHE_PROVIDER_NAME = "com.hazelcast.cache.HazelcastCachingProvider";
private Cache<String, String> cache;
@Before
public void instantiateCache() {
CachingProvider cachingProvider = Caching.getCachingProvider(CACHE_PROVIDER_NAME);
CacheManager cacheManager = cachingProvider.getCacheManager();
MutableConfiguration<String, String> config = new MutableConfiguration<>();
this.cache = cacheManager.createCache(CACHE_NAME, config);
this.cache.put("key", "value");
}
@After
public void tearDown() {
Caching.getCachingProvider(CACHE_PROVIDER_NAME).getCacheManager().destroyCache(CACHE_NAME);
}
@Test
public void whenModifyValue_thenCorrect() {
this.cache.invoke("key", new SimpleEntryProcessor());
assertEquals("value - modified", cache.get("key"));
}
}
@@ -0,0 +1,55 @@
package com.baeldung.jcache;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import javax.cache.Cache;
import javax.cache.CacheManager;
import javax.cache.Caching;
import javax.cache.configuration.FactoryBuilder;
import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
import javax.cache.configuration.MutableConfiguration;
import javax.cache.spi.CachingProvider;
import static org.junit.Assert.assertEquals;
public class EventListenerIntegrationTest {
private static final String CACHE_NAME = "MyCache";
private static final String CACHE_PROVIDER_NAME = "com.hazelcast.cache.HazelcastCachingProvider";
private Cache<String, String> cache;
private SimpleCacheEntryListener listener;
private MutableCacheEntryListenerConfiguration<String, String> listenerConfiguration;
@Before
public void setup() {
CachingProvider cachingProvider = Caching.getCachingProvider(CACHE_PROVIDER_NAME);
CacheManager cacheManager = cachingProvider.getCacheManager();
MutableConfiguration<String, String> config = new MutableConfiguration<String, String>();
this.cache = cacheManager.createCache("MyCache", config);
this.listener = new SimpleCacheEntryListener();
}
@After
public void tearDown() {
Caching.getCachingProvider(CACHE_PROVIDER_NAME).getCacheManager().destroyCache(CACHE_NAME);
}
@Test
public void whenRunEvent_thenCorrect() throws InterruptedException {
this.listenerConfiguration = new MutableCacheEntryListenerConfiguration<>(FactoryBuilder.factoryOf(this.listener), null, false, true);
this.cache.registerCacheEntryListener(this.listenerConfiguration);
assertEquals(false, this.listener.getCreated());
this.cache.put("key", "value");
assertEquals(true, this.listener.getCreated());
assertEquals(false, this.listener.getUpdated());
this.cache.put("key", "newValue");
assertEquals(true, this.listener.getUpdated());
}
}
@@ -0,0 +1,27 @@
package com.baeldung.jcache;
import org.junit.Test;
import javax.cache.Cache;
import javax.cache.CacheManager;
import javax.cache.Caching;
import javax.cache.configuration.MutableConfiguration;
import javax.cache.spi.CachingProvider;
import static org.junit.Assert.assertEquals;
public class JCacheIntegrationTest {
@Test
public void instantiateCache() {
CachingProvider cachingProvider = Caching.getCachingProvider("com.hazelcast.cache.HazelcastCachingProvider");
CacheManager cacheManager = cachingProvider.getCacheManager();
MutableConfiguration<String, String> config = new MutableConfiguration<>();
Cache<String, String> cache = cacheManager.createCache("simpleCache", config);
cache.put("key1", "value1");
cache.put("key2", "value2");
assertEquals("value1", cache.get("key1"));
assertEquals("value2", cache.get("key2"));
cacheManager.close();
}
}
@@ -0,0 +1,62 @@
package com.baeldung.kafkastreams;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.test.TestUtils;
import org.junit.Ignore;
import org.junit.Test;
import java.util.Arrays;
import java.util.Properties;
import java.util.regex.Pattern;
public class KafkaStreamsLiveTest {
private String bootstrapServers = "localhost:9092";
@Test
@Ignore("it needs to have kafka broker running on local")
public void shouldTestKafkaStreams() throws InterruptedException {
//given
String inputTopic = "inputTopic";
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-live-test");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Use a temporary directory for storing state, which will be automatically removed after the test.
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
//when
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textLines = builder.stream(inputTopic);
Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS);
KTable<String, Long> wordCounts = textLines
.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
.groupBy((key, word) -> word)
.count();
wordCounts.foreach((word, count) -> System.out.println("word: " + word + " -> " + count));
String outputTopic = "outputTopic";
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
wordCounts.to(stringSerde, longSerde, outputTopic);
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();
//then
Thread.sleep(30000);
streams.close();
}
}