Spark graphx moved to a new project due to a incompatibility of scala compiler

This commit is contained in:
Norberto Ritzmann Jr
2019-10-06 21:57:54 +02:00
parent db85c8f275
commit 45be61376e
20299 changed files with 1635696 additions and 0 deletions
+17
View File
@@ -0,0 +1,17 @@
### Relevant articles
- [Introduction to Reladomo](https://www.baeldung.com/reladomo)
- [Introduction to ORMLite](https://www.baeldung.com/ormlite)
- [Introduction To Kryo](https://www.baeldung.com/kryo)
- [Introduction to KafkaStreams in Java](https://www.baeldung.com/java-kafka-streams)
- [Guide to Java Data Objects](https://www.baeldung.com/jdo)
- [Intro to JDO Queries 2/2](https://www.baeldung.com/jdo-queries)
- [Introduction to HikariCP](https://www.baeldung.com/hikaricp)
- [Introduction to JCache](https://www.baeldung.com/jcache)
- [A Guide to Apache Ignite](https://www.baeldung.com/apache-ignite)
- [Apache Ignite with Spring Data](https://www.baeldung.com/apache-ignite-spring-data)
- [A Guide to Apache Crunch](https://www.baeldung.com/apache-crunch)
- [Intro to Apache Storm](https://www.baeldung.com/apache-storm)
- [Guide to Ebean ORM](https://www.baeldung.com/ebean-orm)
- [Introduction to Kafka Connectors](https://www.baeldung.com/kafka-connectors-guide)
- [Kafka Connect Example with MQTT and MongoDB](https://www.baeldung.com/kafka-connect-mqtt-mongodb)
- [Building a Data Pipeline with Flink and Kafka](https://www.baeldung.com/kafka-flink-data-pipeline)
Binary file not shown.
+1
View File
@@ -0,0 +1 @@
log4j.rootLogger=INFO, stdout
+26
View File
@@ -0,0 +1,26 @@
<?xml version="1.0" encoding="UTF-8"?><root>
</root>
+465
View File
@@ -0,0 +1,465 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>libraries-data</artifactId>
<name>libraries-data</name>
<packaging>jar</packaging>
<parent>
<groupId>com.baeldung</groupId>
<artifactId>parent-modules</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>${kryo.version}</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>${h2.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.goldmansachs.reladomo</groupId>
<artifactId>reladomo</artifactId>
<version>${reladomo.version}</version>
</dependency>
<dependency>
<groupId>com.goldmansachs.reladomo</groupId>
<artifactId>reladomo-test-util</artifactId>
<version>${reladomo.version}</version>
</dependency>
<dependency>
<groupId>com.j256.ormlite</groupId>
<artifactId>ormlite-jdbc</artifactId>
<version>${ormlite.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-core</artifactId>
<version>${ignite.version}</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-spring-data</artifactId>
<version>${ignite.version}</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-indexing</artifactId>
<version>${ignite.version}</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>${gson.version}</version>
</dependency>
<!-- Hikari CP -->
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>${HikariCP.version}</version>
<scope>compile</scope>
</dependency>
<!-- JDO -->
<dependency>
<groupId>org.datanucleus</groupId>
<artifactId>javax.jdo</artifactId>
<version>${javax.jdo.version}</version>
</dependency>
<dependency>
<groupId>org.datanucleus</groupId>
<artifactId>datanucleus-core</artifactId>
<version>${datanucleus.version}</version>
</dependency>
<dependency>
<groupId>org.datanucleus</groupId>
<artifactId>datanucleus-api-jdo</artifactId>
<version>${datanucleus.version}</version>
</dependency>
<dependency>
<groupId>org.datanucleus</groupId>
<artifactId>datanucleus-rdbms</artifactId>
<version>${datanucleus.version}</version>
</dependency>
<dependency>
<groupId>org.datanucleus</groupId>
<artifactId>datanucleus-maven-plugin</artifactId>
<version>${datanucleus-maven-plugin.version}</version>
</dependency>
<dependency>
<groupId>org.datanucleus</groupId>
<artifactId>datanucleus-xml</artifactId>
<version>${datanucleus-xml.version}</version>
</dependency>
<dependency>
<groupId>org.datanucleus</groupId>
<artifactId>datanucleus-jdo-query</artifactId>
<version>${datanucleus-jdo-query.version}</version>
</dependency>
<!-- Jcache -->
<dependency>
<groupId>javax.cache</groupId>
<artifactId>cache-api</artifactId>
<version>${cache.version}</version>
</dependency>
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast</artifactId>
<version>${hazelcast.version}</version>
</dependency>
<!-- crunch project -->
<dependency>
<groupId>org.apache.crunch</groupId>
<artifactId>crunch-core</artifactId>
<version>${org.apache.crunch.crunch-core.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${org.apache.hadoop.hadoop-client}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<version>3.0.1</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<artifactId>commons-logging</artifactId>
<groupId>commons-logging</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<artifactId>commons-logging</artifactId>
<groupId>commons-logging</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_2.11</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj.version}</version>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>${awaitility.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility-proxy</artifactId>
<version>${awaitility.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.ebean</groupId>
<artifactId>ebean</artifactId>
<version>${ebean.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Reladomo -->
<plugin>
<artifactId>maven-antrun-plugin</artifactId>
<version>${maven-antrun-plugin.version}</version>
<executions>
<execution>
<id>generateMithra</id>
<phase>generate-sources</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<tasks>
<property name="plugin_classpath" refid="maven.plugin.classpath" />
<taskdef name="gen-reladomo" classpath="plugin_classpath" classname="com.gs.fw.common.mithra.generator.MithraGenerator" />
<gen-reladomo xml="${project.basedir}/src/main/resources/reladomo/ReladomoClassList.xml" generateGscListMethod="true"
generatedDir="${project.build.directory}/generated-sources/reladomo" nonGeneratedDir="${project.basedir}/src/main/java" />
<taskdef name="gen-ddl" classname="com.gs.fw.common.mithra.generator.dbgenerator.MithraDbDefinitionGenerator" loaderRef="reladomoGenerator">
<classpath refid="maven.plugin.classpath" />
</taskdef>
<gen-ddl xml="${project.basedir}/src/main/resources/reladomo/ReladomoClassList.xml"
generatedDir="${project.build.directory}/generated-db/sql" databaseType="postgres" />
</tasks>
</configuration>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>com.goldmansachs.reladomo</groupId>
<artifactId>reladomogen</artifactId>
<version>${reladomo.version}</version>
</dependency>
<dependency>
<groupId>com.goldmansachs.reladomo</groupId>
<artifactId>reladomo-gen-util</artifactId>
<version>${reladomo.version}</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>${build-helper-maven-plugin.version}</version>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>${project.build.directory}/generated-sources/reladomo</source>
</sources>
</configuration>
</execution>
<execution>
<id>add-resource</id>
<phase>generate-resources</phase>
<goals>
<goal>add-resource</goal>
</goals>
<configuration>
<resources>
<resource>
<directory>${project.build.directory}/generated-db/</directory>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
<!-- /Reladomo -->
<!-- JDO Plugin -->
<plugin>
<groupId>org.datanucleus</groupId>
<artifactId>datanucleus-maven-plugin</artifactId>
<version>${datanucleus-maven-plugin.version}</version>
<configuration>
<api>JDO</api>
<props>${basedir}/datanucleus.properties</props>
<log4jConfiguration>${basedir}/log4j.properties</log4jConfiguration>
<verbose>true</verbose>
<fork>false</fork>
<!-- Solve windows line too long error -->
</configuration>
<executions>
<execution>
<phase>process-classes</phase>
<goals>
<goal>enhance</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.3</version>
<configuration>
<descriptors>
<descriptor>src/main/assembly/hadoop-job.xml</descriptor>
</descriptors>
<archive>
<manifest>
<mainClass>com.baeldung.crunch.WordCount</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>io.ebean</groupId>
<artifactId>ebean-maven-plugin</artifactId>
<version>11.11.2</version>
<executions>
<!-- enhance main classes -->
<execution>
<id>main</id>
<phase>process-classes</phase>
<configuration>
<transformArgs>debug=1</transformArgs>
</configuration>
<goals>
<goal>enhance</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<properties>
<storm.version>1.2.2</storm.version>
<kryo.version>4.0.1</kryo.version>
<reladomo.version>16.5.1</reladomo.version>
<ormlite.version>5.0</ormlite.version>
<kafka.version>1.0.0</kafka.version>
<ignite.version>2.4.0</ignite.version>
<gson.version>2.8.2</gson.version>
<cache.version>1.1.0</cache.version>
<flink.version>1.5.0</flink.version>
<awaitility.version>3.0.0</awaitility.version>
<assertj.version>3.6.2</assertj.version>
<hazelcast.version>3.8.4</hazelcast.version>
<maven-antrun-plugin.version>1.8</maven-antrun-plugin.version>
<build-helper-maven-plugin.version>3.0.0</build-helper-maven-plugin.version>
<HikariCP.version>2.7.2</HikariCP.version>
<javax.jdo.version>3.2.0-m7</javax.jdo.version>
<datanucleus.version>5.1.1</datanucleus.version>
<datanucleus-maven-plugin.version>5.0.2</datanucleus-maven-plugin.version>
<datanucleus-xml.version>5.0.0-release</datanucleus-xml.version>
<datanucleus-jdo-query.version>5.0.4</datanucleus-jdo-query.version>
<org.apache.crunch.crunch-core.version>0.15.0</org.apache.crunch.crunch-core.version>
<org.apache.hadoop.hadoop-client>2.2.0</org.apache.hadoop.hadoop-client>
<ebean.version>11.22.4</ebean.version>
<slf4j.version>1.7.25</slf4j.version>
<logback.version>1.0.1</logback.version>
</properties>
</project>
@@ -0,0 +1,28 @@
<?xml version="1.0" encoding="UTF-8"?>
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id>job</id>
<formats>
<format>jar</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<dependencySets>
<dependencySet>
<unpack>false</unpack>
<scope>runtime</scope>
<outputDirectory>lib</outputDirectory>
<excludes>
<exclude>${groupId}:${artifactId}</exclude>
</excludes>
</dependencySet>
<dependencySet>
<unpack>true</unpack>
<includes>
<include>${groupId}:${artifactId}</include>
</includes>
</dependencySet>
</dependencySets>
</assembly>
@@ -0,0 +1,25 @@
package com.baeldung.crunch;
import java.util.Set;
import org.apache.crunch.FilterFn;
import com.google.common.collect.ImmutableSet;
/**
* A filter that removes known stop words.
*/
public class StopWordFilter extends FilterFn<String> {
// English stop words, borrowed from Lucene.
private static final Set<String> STOP_WORDS = ImmutableSet
.copyOf(new String[] { "a", "and", "are", "as", "at", "be", "but", "by",
"for", "if", "in", "into", "is", "it", "no", "not", "of", "on",
"or", "s", "such", "t", "that", "the", "their", "then", "there",
"these", "they", "this", "to", "was", "will", "with" });
@Override
public boolean accept(String word) {
return !STOP_WORDS.contains(word);
}
}
@@ -0,0 +1,11 @@
package com.baeldung.crunch;
import org.apache.crunch.MapFn;
public class ToUpperCaseFn extends MapFn<String, String> {
@Override
public String map(String input) {
return input != null ? input.toUpperCase() : input;
}
}
@@ -0,0 +1,20 @@
package com.baeldung.crunch;
import org.apache.crunch.MapFn;
@SuppressWarnings("serial")
public class ToUpperCaseWithCounterFn extends MapFn<String, String> {
@Override
public String map(String input) {
if (input == null) {
return input;
} else {
String output = input.toUpperCase();
if (!input.equals(output)) {
increment("UpperCase", "modified");
}
return output;
}
}
}
@@ -0,0 +1,23 @@
package com.baeldung.crunch;
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import com.google.common.base.Splitter;
/**
* Splits a line of text, filtering known stop words.
*/
public class Tokenizer extends DoFn<String, String> {
private static final Splitter SPLITTER = Splitter
.onPattern("\\s+")
.omitEmptyStrings();
@Override
public void process(String line,
Emitter<String> emitter) {
for (String word : SPLITTER.split(line)) {
emitter.emit(word);
}
}
}
@@ -0,0 +1,62 @@
package com.baeldung.crunch;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pipeline;
import org.apache.crunch.PipelineResult;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.types.writable.Writables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* A word count example for Apache Crunch, based on Crunch's example projects.
*/
public class WordCount extends Configured implements Tool {
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new WordCount(), args);
}
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: hadoop jar crunch-1.0.0-SNAPSHOT-job.jar" + " [generic options] input output");
System.err.println();
GenericOptionsParser.printGenericCommandUsage(System.err);
return 1;
}
String inputPath = args[0];
String outputPath = args[1];
// Create an object to coordinate pipeline creation and execution.
Pipeline pipeline = new MRPipeline(WordCount.class, getConf());
// Reference a given text file as a collection of Strings.
PCollection<String> lines = pipeline.readTextFile(inputPath);
// Define a function that splits each line in a PCollection of Strings into
// a PCollection made up of the individual words in the file.
// The second argument sets the serialization format.
PCollection<String> words = lines.parallelDo(new Tokenizer(), Writables.strings());
// Take the collection of words and remove known stop words.
PCollection<String> noStopWords = words.filter(new StopWordFilter());
// The count method applies a series of Crunch primitives and returns
// a map of the unique words in the input PCollection to their counts.
PTable<String, Long> counts = noStopWords.count();
// Instruct the pipeline to write the resulting counts to a text file.
pipeline.writeTextFile(counts, outputPath);
// Execute the pipeline as a MapReduce.
PipelineResult result = pipeline.done();
return result.succeeded() ? 0 : 1;
}
}
@@ -0,0 +1,75 @@
package com.baeldung.ebean.app;
import java.util.Arrays;
import com.baeldung.ebean.model.Address;
import com.baeldung.ebean.model.Customer;
import io.ebean.Ebean;
import io.ebean.EbeanServer;
import io.ebean.annotation.Transactional;
public class App {
public static void main(String[] args) {
insertAndDeleteInsideTransaction();
crudOperations();
queryCustomers();
}
@Transactional
public static void insertAndDeleteInsideTransaction() {
Customer c1 = getCustomer();
EbeanServer server = Ebean.getDefaultServer();
server.save(c1);
Customer foundC1 = server.find(Customer.class, c1.getId());
server.delete(foundC1);
}
public static void crudOperations() {
Address a1 = new Address("5, Wide Street", null, "New York");
Customer c1 = new Customer("John Wide", a1);
EbeanServer server = Ebean.getDefaultServer();
server.save(c1);
c1.setName("Jane Wide");
c1.setAddress(null);
server.save(c1);
Customer foundC1 = Ebean.find(Customer.class, c1.getId());
Ebean.delete(foundC1);
}
public static void queryCustomers() {
Address a1 = new Address("1, Big Street", null, "New York");
Customer c1 = new Customer("Big John", a1);
Address a2 = new Address("2, Big Street", null, "New York");
Customer c2 = new Customer("Big John", a2);
Address a3 = new Address("3, Big Street", null, "San Jose");
Customer c3 = new Customer("Big Bob", a3);
Ebean.saveAll(Arrays.asList(c1, c2, c3));
Customer customer = Ebean.find(Customer.class)
.select("name")
.fetch("address", "city")
.where()
.eq("city", "San Jose")
.findOne();
Ebean.deleteAll(Arrays.asList(c1, c2, c3));
}
private static Customer getCustomer() {
Address a1 = new Address("1, Big Street", null, "New York");
Customer c1 = new Customer("Big John", a1);
return c1;
}
}
@@ -0,0 +1,26 @@
package com.baeldung.ebean.app;
import java.util.Properties;
import io.ebean.EbeanServer;
import io.ebean.EbeanServerFactory;
import io.ebean.config.ServerConfig;
public class App2 {
public static void main(String[] args) {
ServerConfig cfg = new ServerConfig();
cfg.setDefaultServer(true);
Properties properties = new Properties();
properties.put("ebean.db.ddl.generate", "true");
properties.put("ebean.db.ddl.run", "true");
properties.put("datasource.db.username", "sa");
properties.put("datasource.db.password", "");
properties.put("datasource.db.databaseUrl", "jdbc:h2:mem:app2");
properties.put("datasource.db.databaseDriver", "org.h2.Driver");
cfg.loadFromProperties(properties);
EbeanServer server = EbeanServerFactory.create(cfg);
}
}
@@ -0,0 +1,48 @@
package com.baeldung.ebean.model;
import javax.persistence.Entity;
@Entity
public class Address extends BaseModel {
public Address(String addressLine1, String addressLine2, String city) {
super();
this.addressLine1 = addressLine1;
this.addressLine2 = addressLine2;
this.city = city;
}
private String addressLine1;
private String addressLine2;
private String city;
public String getAddressLine1() {
return addressLine1;
}
public void setAddressLine1(String addressLine1) {
this.addressLine1 = addressLine1;
}
public String getAddressLine2() {
return addressLine2;
}
public void setAddressLine2(String addressLine2) {
this.addressLine2 = addressLine2;
}
public String getCity() {
return city;
}
public void setCity(String city) {
this.city = city;
}
@Override
public String toString() {
return "Address [id=" + id + ", addressLine1=" + addressLine1 + ", addressLine2=" + addressLine2 + ", city=" + city + "]";
}
}
@@ -0,0 +1,59 @@
package com.baeldung.ebean.model;
import java.time.Instant;
import javax.persistence.Id;
import javax.persistence.MappedSuperclass;
import javax.persistence.Version;
import io.ebean.annotation.WhenCreated;
import io.ebean.annotation.WhenModified;
@MappedSuperclass
public abstract class BaseModel {
@Id
protected long id;
@Version
protected long version;
@WhenCreated
protected Instant createdOn;
@WhenModified
protected Instant modifiedOn;
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public Instant getCreatedOn() {
return createdOn;
}
public void setCreatedOn(Instant createdOn) {
this.createdOn = createdOn;
}
public Instant getModifiedOn() {
return modifiedOn;
}
public void setModifiedOn(Instant modifiedOn) {
this.modifiedOn = modifiedOn;
}
public long getVersion() {
return version;
}
public void setVersion(long version) {
this.version = version;
}
}
@@ -0,0 +1,42 @@
package com.baeldung.ebean.model;
import javax.persistence.CascadeType;
import javax.persistence.Entity;
import javax.persistence.OneToOne;
@Entity
public class Customer extends BaseModel {
public Customer(String name, Address address) {
super();
this.name = name;
this.address = address;
}
private String name;
@OneToOne(cascade = CascadeType.ALL)
Address address;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Address getAddress() {
return address;
}
public void setAddress(Address address) {
this.address = address;
}
@Override
public String toString() {
return "Customer [id=" + id + ", name=" + name + ", address=" + address + "]";
}
}
@@ -0,0 +1,44 @@
package com.baeldung.hikaricp;
import java.sql.Connection;
import java.sql.SQLException;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
public class DataSource {
private static HikariConfig config = new HikariConfig();
private static HikariDataSource ds;
static {
// config = new HikariConfig("datasource.properties");
// Properties props = new Properties();
// props.setProperty("dataSourceClassName", "org.h2.Driver");
// props.setProperty("dataSource.user", "");
// props.setProperty("dataSource.password", "");
// props.put("dataSource.logWriter", new PrintWriter(System.out));
// config = new HikariConfig(props);
config.setJdbcUrl("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;INIT=runscript from 'classpath:/db.sql'");
config.setUsername("");
config.setPassword("");
config.addDataSourceProperty("cachePrepStmts", "true");
config.addDataSourceProperty("prepStmtCacheSize", "250");
config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
ds = new HikariDataSource(config);
// ds.setJdbcUrl("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;INIT=runscript from 'classpath:/db.sql'");
// ds.setUsername("");
// ds.setPassword("");
}
private DataSource() {
}
public static Connection getConnection() throws SQLException {
return ds.getConnection();
}
}
@@ -0,0 +1,85 @@
package com.baeldung.hikaricp;
import java.sql.Date;
public class Employee {
private int empNo;
private String ename;
private String job;
private int mgr;
private Date hiredate;
private int sal;
private int comm;
private int deptno;
public int getEmpNo() {
return empNo;
}
public void setEmpNo(int empNo) {
this.empNo = empNo;
}
public String getEname() {
return ename;
}
public void setEname(String ename) {
this.ename = ename;
}
public String getJob() {
return job;
}
public void setJob(String job) {
this.job = job;
}
public int getMgr() {
return mgr;
}
public void setMgr(int mgr) {
this.mgr = mgr;
}
public Date getHiredate() {
return hiredate;
}
public void setHiredate(Date hiredate) {
this.hiredate = hiredate;
}
public int getSal() {
return sal;
}
public void setSal(int sal) {
this.sal = sal;
}
public int getComm() {
return comm;
}
public void setComm(int comm) {
this.comm = comm;
}
public int getDeptno() {
return deptno;
}
public void setDeptno(int deptno) {
this.deptno = deptno;
}
@Override
public String toString() {
return String.format("Employee [empNo=%d, ename=%s, job=%s, mgr=%d, hiredate=%s, sal=%d, comm=%d, deptno=%d]", empNo, ename, job, mgr, hiredate, sal, comm, deptno);
}
}
@@ -0,0 +1,40 @@
package com.baeldung.hikaricp;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
public class HikariCPDemo {
public static List<Employee> fetchData() {
final String SQL_QUERY = "select * from emp";
List<Employee> employees = null;
try (Connection con = DataSource.getConnection(); PreparedStatement pst = con.prepareStatement(SQL_QUERY); ResultSet rs = pst.executeQuery();) {
employees = new ArrayList<Employee>();
Employee employee;
while (rs.next()) {
employee = new Employee();
employee.setEmpNo(rs.getInt("empno"));
employee.setEname(rs.getString("ename"));
employee.setJob(rs.getString("job"));
employee.setMgr(rs.getInt("mgr"));
employee.setHiredate(rs.getDate("hiredate"));
employee.setSal(rs.getInt("sal"));
employee.setComm(rs.getInt("comm"));
employee.setDeptno(rs.getInt("deptno"));
employees.add(employee);
}
} catch (SQLException e) {
e.printStackTrace();
}
return employees;
}
public static void main(String[] args) {
fetchData();
}
}
@@ -0,0 +1,14 @@
package com.baeldung.ignite.cache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.lifecycle.LifecycleBean;
import org.apache.ignite.lifecycle.LifecycleEventType;
public class CustomLifecycleBean implements LifecycleBean {
@Override
public void onLifecycleEvent(LifecycleEventType lifecycleEventType) throws IgniteException {
if (lifecycleEventType == LifecycleEventType.AFTER_NODE_START) {
//do something right after the Ignite node starts
}
}
}
@@ -0,0 +1,58 @@
package com.baeldung.ignite.cache;
import com.baeldung.ignite.model.Employee;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.configuration.IgniteConfiguration;
import java.util.List;
public class IgniteCacheExample {
public static void main(String[] args) {
Ignite ignite = Ignition.ignite();
IgniteCache<Integer, String> cache = ignite.cache("baeldungCache");
cache.put(1, "baeldung cache value");
String message = cache.get(1);
}
private static void getObjectFromCache(Ignite ignite) {
IgniteCache<Integer, Employee> cache = ignite.getOrCreateCache("baeldungCache");
cache.put(1, new Employee(1, "John", true));
cache.put(2, new Employee(2, "Anna", false));
cache.put(3, new Employee(3, "George", true));
Employee employee = cache.get(1);
}
private static void getFromCacheWithSQl(Ignite ignite) {
IgniteCache<Integer, Employee> cache = ignite.cache("baeldungCache");
SqlFieldsQuery sql = new SqlFieldsQuery(
"select name from Employee where isEmployed = 'true'");
QueryCursor<List<?>> cursor = cache.query(sql);
for (List<?> row : cursor) {
System.out.println(row.get(0));
}
}
private static void customInitialization() {
IgniteConfiguration configuration = new IgniteConfiguration();
configuration.setLifecycleBeans(new CustomLifecycleBean());
Ignite ignite = Ignition.start(configuration);
}
}
@@ -0,0 +1,58 @@
package com.baeldung.ignite.jdbc;
import java.sql.*;
/**
* Created by Gebruiker on 3/14/2018.
*/
public class IgniteJDBC {
public static void main(String[] args) throws ClassNotFoundException, SQLException {
Class.forName("org.apache.ignite.IgniteJdbcThinDriver");
Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1/");
createDatabaseTables(conn);
insertData(conn);
getData(conn);
}
private static void createDatabaseTables(Connection conn) throws SQLException {
Statement sql = conn.createStatement();
sql.executeUpdate("CREATE TABLE Employee (" +
" id INTEGER PRIMARY KEY, name VARCHAR, isEmployed timyint(1)) " +
" WITH \"template=replicated\"");
sql.executeUpdate("CREATE INDEX idx_employee_name ON Employee (name)");
}
private static void insertData(Connection conn) throws SQLException {
PreparedStatement sql =
conn.prepareStatement("INSERT INTO Employee (id, name, isEmployed) VALUES (?, ?, ?)");
sql.setLong(1, 1);
sql.setString(2, "James");
sql.setBoolean(3, true);
sql.executeUpdate();
sql.setLong(1, 2);
sql.setString(2, "Monica");
sql.setBoolean(3, false);
sql.executeUpdate();
}
private static void getData(Connection conn) throws SQLException {
Statement sql = conn.createStatement();
ResultSet rs = sql.executeQuery("SELECT e.name, e.isEmployed " +
" FROM Employee e " +
" WHERE e.isEmployed = TRUE ");
while (rs.next())
System.out.println(rs.getString(1) + ", " + rs.getString(2));
}
}
@@ -0,0 +1,48 @@
package com.baeldung.ignite.model;
public class Employee {
private Integer id;
private String name;
private boolean isEmployed;
public Employee(Integer id, String name, boolean isEmployed) {
this.id = id;
this.name = name;
this.isEmployed = isEmployed;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public boolean isEmployed() {
return isEmployed;
}
public void setEmployed(boolean employed) {
isEmployed = employed;
}
@Override
public String toString() {
return "Employee{" +
"id=" + id +
", name='" + name + '\'' +
", isEmployed=" + isEmployed +
'}';
}
}
@@ -0,0 +1,53 @@
package com.baeldung.ignite.spring;
import com.baeldung.ignite.spring.config.SpringDataConfig;
import com.baeldung.ignite.spring.dto.EmployeeDTO;
import com.baeldung.ignite.spring.repository.EmployeeRepository;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import java.util.List;
/**
* Created by Gebruiker on 4/12/2018.
*/
public class IgniteApp {
public static void main (String[] args) {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
context.register(SpringDataConfig.class);
context.refresh();
EmployeeRepository repository = context.getBean(EmployeeRepository.class);
EmployeeDTO employeeDTO = new EmployeeDTO();
employeeDTO.setId(1);
employeeDTO.setName("John");
employeeDTO.setEmployed(true);
repository.save(employeeDTO.getId(), employeeDTO);
EmployeeDTO employee = repository.getEmployeeDTOById(employeeDTO.getId());
System.out.println(employee);
}
private void getUsingTheCache(Integer employeeId) {
Ignite ignite = Ignition.ignite();
IgniteCache<Integer, EmployeeDTO> cache = ignite.cache("baeldungCache");
EmployeeDTO employeeDTO = cache.get(employeeId);
System.out.println(employeeDTO);
SqlFieldsQuery sql = new SqlFieldsQuery(
"select * from EmployeeDTO where isEmployed = 'true'");
QueryCursor<List<?>> cursor = cache.query(sql);
}
}
@@ -0,0 +1,29 @@
package com.baeldung.ignite.spring.config;
import com.baeldung.ignite.spring.dto.EmployeeDTO;
import com.baeldung.ignite.spring.repository.EmployeeRepository;
import org.apache.ignite.Ignite;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.springdata.repository.config.EnableIgniteRepositories;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableIgniteRepositories(basePackageClasses = EmployeeRepository.class)
@ComponentScan(basePackages = "com.baeldung.ignite.spring.repository")
public class SpringDataConfig {
@Bean
public Ignite igniteInstance() {
IgniteConfiguration config = new IgniteConfiguration();
CacheConfiguration cache = new CacheConfiguration("baeldungCache");
cache.setIndexedTypes(Integer.class, EmployeeDTO.class);
config.setCacheConfiguration(cache);
return Ignition.start(config);
}
}
@@ -0,0 +1,48 @@
package com.baeldung.ignite.spring.dto;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import java.io.Serializable;
public class EmployeeDTO implements Serializable {
@QuerySqlField(index = true)
private Integer id;
@QuerySqlField(index = true)
private String name;
@QuerySqlField(index = true)
private boolean isEmployed;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public boolean isEmployed() {
return isEmployed;
}
public void setEmployed(boolean employed) {
isEmployed = employed;
}
@Override
public String toString() {
return "EmployeeDTO{" +
"id=" + id +
", name='" + name + '\'' +
", isEmployed=" + isEmployed +
'}';
}
}
@@ -0,0 +1,13 @@
package com.baeldung.ignite.spring.repository;
import com.baeldung.ignite.spring.dto.EmployeeDTO;
import org.apache.ignite.springdata.repository.IgniteRepository;
import org.apache.ignite.springdata.repository.config.RepositoryConfig;
import org.springframework.stereotype.Repository;
@Repository
@RepositoryConfig(cacheName = "baeldungCache")
public interface EmployeeRepository extends IgniteRepository<EmployeeDTO, Integer> {
EmployeeDTO getEmployeeDTOById(Integer id);
}
@@ -0,0 +1,24 @@
package com.baeldung.ignite.stream;
import com.baeldung.ignite.model.Employee;
import org.apache.ignite.configuration.CacheConfiguration;
import javax.cache.configuration.FactoryBuilder;
import javax.cache.expiry.CreatedExpiryPolicy;
import javax.cache.expiry.Duration;
import java.util.concurrent.TimeUnit;
public class CacheConfig {
public static CacheConfiguration<Integer, Employee> employeeCache() {
CacheConfiguration<Integer, Employee> config = new CacheConfiguration<>("baeldungEmployees");
config.setIndexedTypes(Integer.class, Employee.class);
config.setExpiryPolicyFactory(FactoryBuilder.factoryOf(
new CreatedExpiryPolicy(new Duration(TimeUnit.SECONDS, 5))));
return config;
}
}
@@ -0,0 +1,46 @@
package com.baeldung.ignite.stream;
import com.baeldung.ignite.model.Employee;
import com.google.gson.Gson;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.Ignition;
import org.apache.ignite.stream.StreamTransformer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
public class IgniteStream {
private static final Gson GSON = new Gson();
public static void main(String[] args) throws Exception {
Ignition.setClientMode(true);
Ignite ignite = Ignition.start();
IgniteCache<Integer, Employee> cache = ignite.getOrCreateCache(CacheConfig.employeeCache());
IgniteDataStreamer<Integer, Employee> streamer = ignite.dataStreamer(cache.getName());
streamer.allowOverwrite(true);
streamer.receiver(StreamTransformer.from((e, arg) -> {
Employee employee = e.getValue();
employee.setEmployed(true);
e.setValue(employee);
return employee;
}));
Path path = Paths.get(IgniteStream.class.getResource("employees.txt").toURI());
Files.lines(path)
.forEach(line -> {
Employee employee = GSON.fromJson(line, Employee.class);
streamer.addData(employee.getId(), employee);
});
}
}
@@ -0,0 +1,34 @@
package com.baeldung.jcache;
import java.io.Serializable;
import javax.cache.event.CacheEntryCreatedListener;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryListenerException;
import javax.cache.event.CacheEntryUpdatedListener;
public class SimpleCacheEntryListener implements CacheEntryCreatedListener<String, String>, CacheEntryUpdatedListener<String, String>, Serializable {
/**
*
*/
private static final long serialVersionUID = -712657810462878763L;
private boolean updated;
private boolean created;
public boolean getUpdated() {
return this.updated;
}
public boolean getCreated() {
return this.created;
}
public void onUpdated(Iterable<CacheEntryEvent<? extends String, ? extends String>> events) throws CacheEntryListenerException {
this.updated = true;
}
public void onCreated(Iterable<CacheEntryEvent<? extends String, ? extends String>> events) throws CacheEntryListenerException {
this.created = true;
}
}
@@ -0,0 +1,24 @@
package com.baeldung.jcache;
import java.util.HashMap;
import java.util.Map;
import javax.cache.integration.CacheLoader;
import javax.cache.integration.CacheLoaderException;
public class SimpleCacheLoader implements CacheLoader<Integer, String> {
@Override
public String load(Integer key) throws CacheLoaderException {
return "fromCache" + key;
}
@Override
public Map<Integer, String> loadAll(Iterable<? extends Integer> keys) throws CacheLoaderException {
Map<Integer, String> data = new HashMap<>();
for (int key : keys) {
data.put(key, load(key));
}
return data;
}
}
@@ -0,0 +1,25 @@
package com.baeldung.jcache;
import java.io.Serializable;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.MutableEntry;
public class SimpleEntryProcessor implements EntryProcessor<String, String, String>, Serializable {
/**
*
*/
private static final long serialVersionUID = -5616476363722945132L;
public String process(MutableEntry<String, String> entry, Object... args) throws EntryProcessorException {
if (entry.exists()) {
String current = entry.getValue();
entry.setValue(current + " - modified");
return current;
}
return null;
}
}
@@ -0,0 +1,299 @@
package com.baeldung.jdo;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.jdo.PersistenceManager;
import javax.jdo.PersistenceManagerFactory;
import javax.jdo.Query;
import javax.jdo.Transaction;
import org.datanucleus.api.jdo.JDOPersistenceManagerFactory;
import org.datanucleus.metadata.PersistenceUnitMetaData;
public class GuideToJDO {
private static final Logger LOGGER = Logger.getLogger(GuideToJDO.class.getName());
private Random rnd = new Random();
private PersistenceUnitMetaData pumd;
private PersistenceUnitMetaData pumdXML;
public static void main(String[] args) {
new GuideToJDO();
}
public GuideToJDO() {
CreateH2Properties();
CreateXMLProperties();
CreateProducts();
ListProducts();
QueryJDOQL();
QuerySQL();
QueryJPQL();
UpdateProducts();
ListProducts();
DeleteProducts();
ListProducts();
persistXML();
listXMLProducts();
}
public void CreateH2Properties() {
pumd = new PersistenceUnitMetaData("dynamic-unit", "RESOURCE_LOCAL", null);
pumd.addClassName("com.baeldung.jdo.Product");
pumd.setExcludeUnlistedClasses();
pumd.addProperty("javax.jdo.option.ConnectionDriverName", "org.h2.Driver");
pumd.addProperty("javax.jdo.option.ConnectionURL", "jdbc:h2:mem:mypersistence");
pumd.addProperty("javax.jdo.option.ConnectionUserName", "sa");
pumd.addProperty("javax.jdo.option.ConnectionPassword", "");
pumd.addProperty("datanucleus.autoCreateSchema", "true");
}
public void CreateXMLProperties() {
pumdXML = new PersistenceUnitMetaData("dynamic-unit", "RESOURCE_LOCAL", null);
pumdXML.addClassName("com.baeldung.jdo.ProductXML");
pumdXML.setExcludeUnlistedClasses();
pumdXML.addProperty("javax.jdo.option.ConnectionURL", "xml:file:myPersistence.xml");
pumdXML.addProperty("datanucleus.autoCreateSchema", "true");
}
public void CreateProducts() {
PersistenceManagerFactory pmf = new JDOPersistenceManagerFactory(pumd, null);
PersistenceManager pm = pmf.getPersistenceManager();
Transaction tx = pm.currentTransaction();
try {
tx.begin();
Product product = new Product("Tablet", 80.0);
pm.makePersistent(product);
Product product2 = new Product("Phone", 20.0);
pm.makePersistent(product2);
Product product3 = new Product("Laptop", 200.0);
pm.makePersistent(product3);
for (int i = 0; i < 100; i++) {
String nam = "Product-" + i;
double price = rnd.nextDouble();
Product productx = new Product(nam, price);
pm.makePersistent(productx);
}
tx.commit();
} finally {
if (tx.isActive()) {
tx.rollback();
}
pm.close();
}
}
@SuppressWarnings("rawtypes")
public void UpdateProducts() {
PersistenceManagerFactory pmf = new JDOPersistenceManagerFactory(pumd, null);
PersistenceManager pm = pmf.getPersistenceManager();
Transaction tx = pm.currentTransaction();
try {
tx.begin();
Query query = pm.newQuery(Product.class, "name == \"Phone\"");
Collection result = (Collection) query.execute();
Product product = (Product) result.iterator().next();
product.setName("Android Phone");
tx.commit();
} finally {
if (tx.isActive()) {
tx.rollback();
}
pm.close();
}
}
@SuppressWarnings("rawtypes")
public void DeleteProducts() {
PersistenceManagerFactory pmf = new JDOPersistenceManagerFactory(pumd, null);
PersistenceManager pm = pmf.getPersistenceManager();
Transaction tx = pm.currentTransaction();
try {
tx.begin();
Query query = pm.newQuery(Product.class, "name == \"Android Phone\"");
Collection result = (Collection) query.execute();
Product product = (Product) result.iterator().next();
pm.deletePersistent(product);
tx.commit();
} finally {
if (tx.isActive()) {
tx.rollback();
}
pm.close();
}
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public void ListProducts() {
PersistenceManagerFactory pmf = new JDOPersistenceManagerFactory(pumd, null);
PersistenceManager pm = pmf.getPersistenceManager();
Transaction tx = pm.currentTransaction();
try {
tx.begin();
Query q = pm.newQuery("SELECT FROM " + Product.class.getName() + " WHERE price > 10");
List<Product> products = (List<Product>) q.execute();
Iterator<Product> iter = products.iterator();
while (iter.hasNext()) {
Product p = iter.next();
LOGGER.log(Level.WARNING, "Product name: {0} - Price: {1}", new Object[] { p.name, p.price });
}
LOGGER.log(Level.INFO, "--------------------------------------------------------------");
tx.commit();
} finally {
if (tx.isActive()) {
tx.rollback();
}
pm.close();
}
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public void QueryJDOQL() {
PersistenceManagerFactory pmf = new JDOPersistenceManagerFactory(pumd, null);
PersistenceManager pm = pmf.getPersistenceManager();
Transaction tx = pm.currentTransaction();
try {
tx.begin();
// Declarative JDOQL :
LOGGER.log(Level.INFO, "Declarative JDOQL --------------------------------------------------------------");
Query qDJDOQL = pm.newQuery(Product.class);
qDJDOQL.setFilter("name == 'Tablet' && price == price_value");
qDJDOQL.declareParameters("double price_value");
List<Product> resultsqDJDOQL = qDJDOQL.setParameters(80.0).executeList();
Iterator<Product> iterDJDOQL = resultsqDJDOQL.iterator();
while (iterDJDOQL.hasNext()) {
Product p = iterDJDOQL.next();
LOGGER.log(Level.WARNING, "Product name: {0} - Price: {1}", new Object[] { p.name, p.price });
}
LOGGER.log(Level.INFO, "--------------------------------------------------------------");
tx.commit();
} finally {
if (tx.isActive()) {
tx.rollback();
}
pm.close();
}
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public void QuerySQL() {
PersistenceManagerFactory pmf = new JDOPersistenceManagerFactory(pumd, null);
PersistenceManager pm = pmf.getPersistenceManager();
Transaction tx = pm.currentTransaction();
try {
tx.begin();
// SQL :
LOGGER.log(Level.INFO, "SQL --------------------------------------------------------------");
Query query = pm.newQuery("javax.jdo.query.SQL", "SELECT * FROM PRODUCT");
query.setClass(Product.class);
List<Product> results = query.executeList();
Iterator<Product> iter = results.iterator();
while (iter.hasNext()) {
Product p = iter.next();
LOGGER.log(Level.WARNING, "Product name: {0} - Price: {1}", new Object[] { p.name, p.price });
}
LOGGER.log(Level.INFO, "--------------------------------------------------------------");
tx.commit();
} finally {
if (tx.isActive()) {
tx.rollback();
}
pm.close();
}
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public void QueryJPQL() {
PersistenceManagerFactory pmf = new JDOPersistenceManagerFactory(pumd, null);
PersistenceManager pm = pmf.getPersistenceManager();
Transaction tx = pm.currentTransaction();
try {
tx.begin();
// JPQL :
LOGGER.log(Level.INFO, "JPQL --------------------------------------------------------------");
Query q = pm.newQuery("JPQL", "SELECT p FROM " + Product.class.getName() + " p WHERE p.name = 'Laptop'");
List results = (List) q.execute();
Iterator<Product> iter = results.iterator();
while (iter.hasNext()) {
Product p = iter.next();
LOGGER.log(Level.WARNING, "Product name: {0} - Price: {1}", new Object[] { p.name, p.price });
}
LOGGER.log(Level.INFO, "--------------------------------------------------------------");
tx.commit();
} finally {
if (tx.isActive()) {
tx.rollback();
}
pm.close();
}
}
public void persistXML() {
PersistenceManagerFactory pmf = new JDOPersistenceManagerFactory(pumdXML, null);
PersistenceManager pm = pmf.getPersistenceManager();
Transaction tx = pm.currentTransaction();
try {
tx.begin();
ProductXML productXML = new ProductXML(0, "Tablet", 80.0);
pm.makePersistent(productXML);
ProductXML productXML2 = new ProductXML(1, "Phone", 20.0);
pm.makePersistent(productXML2);
ProductXML productXML3 = new ProductXML(2, "Laptop", 200.0);
pm.makePersistent(productXML3);
tx.commit();
} finally {
if (tx.isActive()) {
tx.rollback();
}
pm.close();
}
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public void listXMLProducts() {
PersistenceManagerFactory pmf = new JDOPersistenceManagerFactory(pumdXML, null);
PersistenceManager pm = pmf.getPersistenceManager();
Transaction tx = pm.currentTransaction();
try {
tx.begin();
Query q = pm.newQuery("SELECT FROM " + ProductXML.class.getName());
List<ProductXML> products = (List<ProductXML>) q.execute();
Iterator<ProductXML> iter = products.iterator();
while (iter.hasNext()) {
ProductXML p = iter.next();
LOGGER.log(Level.WARNING, "Product name: {0} - Price: {1}", new Object[] { p.getName(), p.getPrice() });
pm.deletePersistent(p);
}
LOGGER.log(Level.INFO, "--------------------------------------------------------------");
tx.commit();
} finally {
if (tx.isActive()) {
tx.rollback();
}
pm.close();
}
}
}
@@ -0,0 +1,43 @@
package com.baeldung.jdo;
import javax.jdo.annotations.IdGeneratorStrategy;
import javax.jdo.annotations.PersistenceCapable;
import javax.jdo.annotations.Persistent;
import javax.jdo.annotations.PrimaryKey;
@PersistenceCapable
public class Product {
@PrimaryKey
@Persistent(valueStrategy = IdGeneratorStrategy.INCREMENT)
long id;
String name = null;
Double price = 0.0;
public Product() {
this.name = null;
this.price = 0.0;
}
public Product(String name, Double price) {
this.name = name;
this.price = price;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Double getPrice() {
return price;
}
public void setPrice(Double price) {
this.price = price;
}
}
@@ -0,0 +1,44 @@
package com.baeldung.jdo;
import javax.jdo.annotations.PersistenceCapable;
import javax.jdo.annotations.PrimaryKey;
import javax.xml.bind.annotation.XmlAttribute;
@PersistenceCapable()
public class ProductXML {
@XmlAttribute
private long productNumber = 0;
@PrimaryKey
private String name = null;
private Double price = 0.0;
public ProductXML() {
this.productNumber = 0;
this.name = null;
this.price = 0.0;
}
public ProductXML(long productNumber, String name, Double price) {
this.productNumber = productNumber;
this.name = name;
this.price = price;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Double getPrice() {
return price;
}
public void setPrice(Double price) {
this.price = price;
}
}
@@ -0,0 +1,96 @@
package com.baeldung.jdo.query;
import java.util.List;
import javax.jdo.JDOQLTypedQuery;
import javax.jdo.PersistenceManager;
import javax.jdo.PersistenceManagerFactory;
import javax.jdo.Query;
import org.datanucleus.api.jdo.JDOPersistenceManagerFactory;
import org.datanucleus.metadata.PersistenceUnitMetaData;
public class MyApp {
private static PersistenceManagerFactory pmf;
private static PersistenceManager pm;
public static void main(String[] args) {
defineDynamicPersistentUnit();
createTestData();
queryUsingJDOQL();
queryUsingTypedJDOQL();
queryUsingSQL();
queryUsingJPQL();
}
public static void createTestData() {
ProductItem item1 = new ProductItem("supportedItem", "price less than 10", "SoldOut", 5);
ProductItem item2 = new ProductItem("pro2", "price less than 10", "InStock", 8);
ProductItem item3 = new ProductItem("pro3", "price more than 10", "SoldOut", 15);
if (pm != null) {
pm.makePersistent(item1);
pm.makePersistent(item2);
pm.makePersistent(item3);
}
}
public static void defineDynamicPersistentUnit() {
PersistenceUnitMetaData pumd = new PersistenceUnitMetaData("dynamic-unit", "RESOURCE_LOCAL", null);
pumd.addProperty("javax.jdo.option.ConnectionURL", "jdbc:mysql://localhost:3306/jdo_db");
pumd.addProperty("javax.jdo.option.ConnectionUserName", "root");
pumd.addProperty("javax.jdo.option.ConnectionPassword", "admin");
pumd.addProperty("javax.jdo.option.ConnectionDriverName", "com.mysql.jdbc.Driver");
pumd.addProperty("datanucleus.schema.autoCreateAll", "true");
pmf = new JDOPersistenceManagerFactory(pumd, null);
pm = pmf.getPersistenceManager();
}
public static void queryUsingJDOQL() {
Query query = pm.newQuery("SELECT FROM com.baeldung.jdo.query.ProductItem " + "WHERE price < threshold PARAMETERS double threshold");
List<ProductItem> explicitParamResults = (List<ProductItem>) query.execute(10);
query = pm.newQuery("SELECT FROM " + "com.baeldung.jdo.query.ProductItem WHERE price < :threshold");
query.setParameters("double threshold");
List<ProductItem> explicitParamResults2 = (List<ProductItem>) query.execute(10);
query = pm.newQuery("SELECT FROM " + "com.baeldung.jdo.query.ProductItem WHERE price < :threshold");
List<ProductItem> implicitParamResults = (List<ProductItem>) query.execute(10);
}
public static void queryUsingTypedJDOQL() {
JDOQLTypedQuery<ProductItem> tq = pm.newJDOQLTypedQuery(ProductItem.class);
QProductItem cand = QProductItem.candidate();
tq = tq.filter(cand.price.lt(10).and(cand.name.startsWith("pro")));
List<ProductItem> results = tq.executeList();
}
public static void queryUsingSQL() {
Query query = pm.newQuery("javax.jdo.query.SQL", "select * from " + "product_item where price < ? and status = ?");
query.setClass(ProductItem.class);
query.setParameters(10, "InStock");
List<ProductItem> results = query.executeList();
}
public static void queryUsingJPQL() {
Query query = pm.newQuery("JPQL", "select i from " + "com.baeldung.jdo.query.ProductItem i where i.price < 10" + " and i.status = 'InStock'");
List<ProductItem> results = (List<ProductItem>) query.execute();
}
public static void namedQuery() {
Query<ProductItem> query = pm.newNamedQuery(ProductItem.class, "PriceBelow10");
List<ProductItem> results = query.executeList();
}
}
@@ -0,0 +1,70 @@
package com.baeldung.jdo.query;
import javax.jdo.annotations.IdGeneratorStrategy;
import javax.jdo.annotations.PersistenceCapable;
import javax.jdo.annotations.Persistent;
import javax.jdo.annotations.PrimaryKey;
@PersistenceCapable(table = "product_item")
public class ProductItem {
@PrimaryKey
@Persistent(valueStrategy = IdGeneratorStrategy.INCREMENT)
int id;
String name;
String description;
String status;
double price;
public ProductItem() {
}
public ProductItem(String name, String description, String status, double price) {
this.name = name;
this.description = description;
this.status = status;
this.price = price;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public double getPrice() {
return price;
}
public void setPrice(double price) {
this.price = price;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
}
@@ -0,0 +1,66 @@
package com.baeldung.jdo.xml;
import java.util.ArrayList;
import java.util.List;
import javax.jdo.annotations.Element;
import javax.jdo.annotations.PersistenceCapable;
import javax.jdo.annotations.PrimaryKey;
import javax.xml.bind.annotation.XmlAttribute;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlElementWrapper;
@PersistenceCapable(schema = "/myproduct/people", table = "person")
public class AnnotadedPerson {
@XmlAttribute
private long personNum;
@PrimaryKey
private String firstName;
private String lastName;
@XmlElementWrapper(name = "phone-numbers")
@XmlElement(name = "phone-number")
@Element(types = String.class)
private List phoneNumbers = new ArrayList();
public AnnotadedPerson(long personNum, String firstName, String lastName) {
super();
this.personNum = personNum;
this.firstName = firstName;
this.lastName = lastName;
}
public long getPersonNum() {
return personNum;
}
public void setPersonNum(long personNum) {
this.personNum = personNum;
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
public List getPhoneNumbers() {
return phoneNumbers;
}
public void setPhoneNumbers(List phoneNumbers) {
this.phoneNumbers = phoneNumbers;
}
}
@@ -0,0 +1,105 @@
package com.baeldung.jdo.xml;
import java.util.List;
import javax.jdo.JDOHelper;
import javax.jdo.PersistenceManager;
import javax.jdo.PersistenceManagerFactory;
import javax.jdo.Query;
import javax.jdo.Transaction;
import org.datanucleus.api.jdo.JDOPersistenceManagerFactory;
import org.datanucleus.metadata.PersistenceUnitMetaData;
public class MyApp {
private static PersistenceUnitMetaData pumd;
private static PersistenceManagerFactory pmf;
private static PersistenceManager pm;
public static void main(String[] args) {
// persist product object using dynamic persistence unit
defineDynamicPersistentUnit();
Product product = new Product("id1", "Sony Discman", "A standard discman from Sony", 49.99);
persistObject(product);
closePersistenceManager();
// persist AnnotatedPerson object using named pmf
defineNamedPersistenceManagerFactory("XmlDatastore");
AnnotadedPerson annotatedPerson = new AnnotadedPerson(654320, "annotated", "person");
annotatedPerson.getPhoneNumbers().add("999999999");
annotatedPerson.getPhoneNumbers().add("000000000");
persistObject(annotatedPerson);
queryAnnotatedPersonsInXML();
closePersistenceManager();
// persist Person object using PMF created by properties file
definePersistenceManagerFactoryUsingPropertiesFile("META-INF\\datanucleus.properties");
Person person = new Person(654321, "bealdung", "author");
person.getPhoneNumbers().add("123456789");
person.getPhoneNumbers().add("987654321");
persistObject(person);
queryPersonsInXML();
closePersistenceManager();
}
public static void defineDynamicPersistentUnit() {
PersistenceUnitMetaData pumd = new PersistenceUnitMetaData("dynamic-unit", "RESOURCE_LOCAL", null);
pumd.addProperty("javax.jdo.option.ConnectionURL", "xml:file:myfile_dynamicPMF.xml");
pumd.addProperty("datanucleus.schema.autoCreateAll", "true");
pumd.addProperty("datanucleus.xml.indentSize", "4");
pmf = new JDOPersistenceManagerFactory(pumd, null);
pm = pmf.getPersistenceManager();
}
public static void defineNamedPersistenceManagerFactory(String pmfName) {
pmf = JDOHelper.getPersistenceManagerFactory("XmlDatastore");
pm = pmf.getPersistenceManager();
}
public static void definePersistenceManagerFactoryUsingPropertiesFile(String filePath) {
pmf = JDOHelper.getPersistenceManagerFactory(filePath);
pm = pmf.getPersistenceManager();
}
public static void closePersistenceManager() {
if (pm != null && !pm.isClosed()) {
pm.close();
}
}
public static void persistObject(Object obj) {
Transaction tx = pm.currentTransaction();
try {
tx.begin();
pm.makePersistent(obj);
tx.commit();
} finally {
if (tx.isActive()) {
tx.rollback();
}
}
}
public static void queryPersonsInXML() {
Query<Person> query = pm.newQuery(Person.class);
List<Person> result = query.executeList();
System.out.println("name: " + result.get(0).getFirstName());
}
public static void queryAnnotatedPersonsInXML() {
Query<AnnotadedPerson> query = pm.newQuery(AnnotadedPerson.class);
List<AnnotadedPerson> result = query.executeList();
System.out.println("name: " + result.get(0).getFirstName());
}
}
@@ -0,0 +1,58 @@
package com.baeldung.jdo.xml;
import java.util.ArrayList;
import java.util.List;
import javax.jdo.annotations.PersistenceCapable;
import javax.jdo.annotations.PrimaryKey;
@PersistenceCapable
public class Person {
private long personNum;
@PrimaryKey
private String firstName;
private String lastName;
private List phoneNumbers = new ArrayList();
public Person(long personNum, String firstName, String lastName) {
super();
this.personNum = personNum;
this.firstName = firstName;
this.lastName = lastName;
}
public long getPersonNum() {
return personNum;
}
public void setPersonNum(long personNum) {
this.personNum = personNum;
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
public List getPhoneNumbers() {
return phoneNumbers;
}
public void setPhoneNumbers(List phoneNumbers) {
this.phoneNumbers = phoneNumbers;
}
}
@@ -0,0 +1,58 @@
package com.baeldung.jdo.xml;
import javax.jdo.annotations.PersistenceCapable;
import javax.jdo.annotations.PrimaryKey;
@PersistenceCapable
public class Product {
@PrimaryKey
String id;
String name;
String description;
double price;
public Product() {
}
public Product(String id, String name, String description, double price) {
this.id = id;
this.name = name;
this.description = description;
this.price = price;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public double getPrice() {
return price;
}
public void setPrice(double price) {
this.price = price;
}
}
@@ -0,0 +1,16 @@
package com.baeldung.kryo;
import java.io.Serializable;
public class ComplexClass implements Serializable{
private static final long serialVersionUID = 123456L;
private String name = "Bael";
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
@@ -0,0 +1,54 @@
package com.baeldung.kryo;
import com.esotericsoftware.kryo.DefaultSerializer;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoSerializable;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import java.util.Date;
@DefaultSerializer(PersonSerializer.class)
public class Person implements KryoSerializable {
private String name = "John Doe";
private int age = 18;
private Date birthDate = new Date(933191282821L);
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public Date getBirthDate() {
return birthDate;
}
public void setBirthDate(Date birthDate) {
this.birthDate = birthDate;
}
@Override
public void write(Kryo kryo, Output output) {
output.writeString(name);
output.writeLong(birthDate.getTime());
output.writeInt(age);
}
@Override
public void read(Kryo kryo, Input input) {
name = input.readString();
birthDate = new Date(input.readLong());
age = input.readInt();
}
}
@@ -0,0 +1,33 @@
package com.baeldung.kryo;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import java.util.Date;
public class PersonSerializer extends Serializer<Person> {
@Override
public void write(Kryo kryo, Output output, Person object) {
output.writeString(object.getName());
output.writeLong(object.getBirthDate()
.getTime());
}
@Override
public Person read(Kryo kryo, Input input, Class<Person> type) {
Person person = new Person();
person.setName(input.readString());
long birthDate = input.readLong();
person.setBirthDate(new Date(birthDate));
person.setAge(calculateAge(birthDate));
return person;
}
private int calculateAge(long birthDate) {
// Some custom logic
return 18;
}
}
@@ -0,0 +1,37 @@
package com.baeldung.ormlite;
import com.j256.ormlite.field.DatabaseField;
import com.j256.ormlite.table.DatabaseTable;
@DatabaseTable(tableName = "addresses")
public class Address {
@DatabaseField(generatedId = true)
private long addressId;
@DatabaseField(canBeNull = false)
private String addressLine;
public Address() {
}
public Address(String addressLine) {
this.addressLine = addressLine;
}
public long getAddressId() {
return addressId;
}
public void setAddressId(long addressId) {
this.addressId = addressId;
}
public String getAddressLine() {
return addressLine;
}
public void setAddressLine(String addressLine) {
this.addressLine = addressLine;
}
}
@@ -0,0 +1,49 @@
package com.baeldung.ormlite;
import com.j256.ormlite.field.DatabaseField;
import com.j256.ormlite.table.DatabaseTable;
@DatabaseTable
public class Book {
@DatabaseField(generatedId = true)
private long bookId;
@DatabaseField
private String title;
@DatabaseField(foreign = true, foreignAutoRefresh = true, foreignAutoCreate = true)
private Library library;
public Book() {
}
public Book(String title) {
this.title = title;
}
public long getBookId() {
return bookId;
}
public void setBookId(long bookId) {
this.bookId = bookId;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public Library getLibrary() {
return library;
}
public void setLibrary(Library library) {
this.library = library;
}
}
@@ -0,0 +1,58 @@
package com.baeldung.ormlite;
import com.j256.ormlite.dao.ForeignCollection;
import com.j256.ormlite.field.DatabaseField;
import com.j256.ormlite.field.ForeignCollectionField;
import com.j256.ormlite.table.DatabaseTable;
@DatabaseTable(tableName = "libraries", daoClass = LibraryDaoImpl.class)
public class Library {
@DatabaseField(generatedId = true)
private long libraryId;
@DatabaseField(canBeNull = false)
private String name;
@DatabaseField(foreign = true, foreignAutoCreate = true, foreignAutoRefresh = true)
private Address address;
@ForeignCollectionField(eager = false)
private ForeignCollection<Book> books;
public Library() {
}
public long getLibraryId() {
return libraryId;
}
public void setLibraryId(long libraryId) {
this.libraryId = libraryId;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Address getAddress() {
return address;
}
public void setAddress(Address address) {
this.address = address;
}
public ForeignCollection<Book> getBooks() {
return books;
}
public void setBooks(ForeignCollection<Book> books) {
this.books = books;
}
}
@@ -0,0 +1,10 @@
package com.baeldung.ormlite;
import java.sql.SQLException;
import java.util.List;
import com.j256.ormlite.dao.Dao;
public interface LibraryDao extends Dao<Library, Long> {
public List<Library> findByName(String name) throws SQLException;
}
@@ -0,0 +1,21 @@
package com.baeldung.ormlite;
import java.sql.SQLException;
import java.util.List;
import com.j256.ormlite.dao.BaseDaoImpl;
import com.j256.ormlite.support.ConnectionSource;
public class LibraryDaoImpl extends BaseDaoImpl<Library, Long> implements LibraryDao {
public LibraryDaoImpl(ConnectionSource connectionSource) throws SQLException {
super(connectionSource, Library.class);
}
@Override
public List<Library> findByName(String name) throws SQLException {
return super.queryForEq("name", name);
}
}
@@ -0,0 +1,15 @@
package com.baeldung.reladomo;
public class Department extends DepartmentAbstract {
public Department() {
super();
// You must not modify this constructor. Mithra calls this internally.
// You can call this constructor. You can also add new constructors.
}
public Department(long id, String name) {
super();
this.setId(id);
this.setName(name);
}
}
@@ -0,0 +1,4 @@
package com.baeldung.reladomo;
public class DepartmentDatabaseObject extends DepartmentDatabaseObjectAbstract
{
}
@@ -0,0 +1,25 @@
package com.baeldung.reladomo;
import com.gs.fw.finder.Operation;
import java.util.*;
public class DepartmentList extends DepartmentListAbstract
{
public DepartmentList()
{
super();
}
public DepartmentList(int initialSize)
{
super(initialSize);
}
public DepartmentList(Collection c)
{
super(c);
}
public DepartmentList(Operation operation)
{
super(operation);
}
}
@@ -0,0 +1,16 @@
package com.baeldung.reladomo;
public class Employee extends EmployeeAbstract
{
public Employee()
{
super();
// You must not modify this constructor. Mithra calls this internally.
// You can call this constructor. You can also add new constructors.
}
public Employee(long id, String name){
super();
this.setId(id);
this.setName(name);
}
}
@@ -0,0 +1,4 @@
package com.baeldung.reladomo;
public class EmployeeDatabaseObject extends EmployeeDatabaseObjectAbstract
{
}
@@ -0,0 +1,25 @@
package com.baeldung.reladomo;
import com.gs.fw.finder.Operation;
import java.util.*;
public class EmployeeList extends EmployeeListAbstract
{
public EmployeeList()
{
super();
}
public EmployeeList(int initialSize)
{
super(initialSize);
}
public EmployeeList(Collection c)
{
super(c);
}
public EmployeeList(Operation operation)
{
super(operation);
}
}
@@ -0,0 +1,54 @@
package com.baeldung.reladomo;
import java.io.InputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.gs.fw.common.mithra.MithraManager;
import com.gs.fw.common.mithra.MithraManagerProvider;
public class ReladomoApplication {
public static void main(String[] args) {
try {
ReladomoConnectionManager.getInstance().createTables();
} catch (Exception e1) {
e1.printStackTrace();
}
MithraManager mithraManager = MithraManagerProvider.getMithraManager();
mithraManager.setTransactionTimeout(120);
try (InputStream is = ReladomoApplication.class.getClassLoader().getResourceAsStream("ReladomoRuntimeConfig.xml")) {
MithraManagerProvider.getMithraManager().readConfiguration(is);
Department department = new Department(1, "IT");
Employee employee = new Employee(1, "John");
department.getEmployees().add(employee);
department.cascadeInsert();
Department depFound = DepartmentFinder.findByPrimaryKey(1);
System.out.println("Department Name:" + department.getName());
Employee empFound = EmployeeFinder.findOne(EmployeeFinder.name().eq("John"));
System.out.println("Employee Id:" + empFound.getId());
empFound.setName("Steven");
empFound.delete();
Department depDetached = DepartmentFinder.findByPrimaryKey(1).getDetachedCopy();
mithraManager.executeTransactionalCommand(tx -> {
Department dep = new Department(2, "HR");
Employee emp = new Employee(2, "Jim");
dep.getEmployees().add(emp);
dep.cascadeInsert();
return null;
});
} catch (java.io.IOException e) {
e.printStackTrace();
}
}
}
@@ -0,0 +1,92 @@
package com.baeldung.reladomo;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.TimeZone;
import java.util.stream.Stream;
import org.h2.tools.RunScript;
import com.gs.fw.common.mithra.bulkloader.BulkLoader;
import com.gs.fw.common.mithra.bulkloader.BulkLoaderException;
import com.gs.fw.common.mithra.connectionmanager.SourcelessConnectionManager;
import com.gs.fw.common.mithra.connectionmanager.XAConnectionManager;
import com.gs.fw.common.mithra.databasetype.DatabaseType;
import com.gs.fw.common.mithra.databasetype.H2DatabaseType;
public class ReladomoConnectionManager implements SourcelessConnectionManager {
private static ReladomoConnectionManager instance;
private XAConnectionManager xaConnectionManager;
private final String databaseName = "myDb";
public static synchronized ReladomoConnectionManager getInstance() {
if (instance == null) {
instance = new ReladomoConnectionManager();
}
return instance;
}
private ReladomoConnectionManager() {
this.createConnectionManager();
}
private XAConnectionManager createConnectionManager() {
xaConnectionManager = new XAConnectionManager();
xaConnectionManager.setDriverClassName("org.h2.Driver");
xaConnectionManager.setJdbcConnectionString("jdbc:h2:mem:" + databaseName);
xaConnectionManager.setJdbcUser("sa");
xaConnectionManager.setJdbcPassword("");
xaConnectionManager.setPoolName("My Connection Pool");
xaConnectionManager.setInitialSize(1);
xaConnectionManager.setPoolSize(10);
xaConnectionManager.initialisePool();
return xaConnectionManager;
}
@Override
public BulkLoader createBulkLoader() throws BulkLoaderException {
return null;
}
@Override
public Connection getConnection() {
return xaConnectionManager.getConnection();
}
@Override
public DatabaseType getDatabaseType() {
return H2DatabaseType.getInstance();
}
@Override
public TimeZone getDatabaseTimeZone() {
return TimeZone.getDefault();
}
@Override
public String getDatabaseIdentifier() {
return databaseName;
}
public void createTables() throws Exception {
Path ddlPath = Paths.get(ClassLoader.getSystemResource("sql").toURI());
try (Connection conn = xaConnectionManager.getConnection(); Stream<Path> list = Files.list(ddlPath);) {
list.forEach(path -> {
try {
RunScript.execute(conn, Files.newBufferedReader(path));
} catch (SQLException | IOException exc) {
exc.printStackTrace();
}
});
}
}
}
@@ -0,0 +1,34 @@
package com.baeldung.storm;
import com.baeldung.storm.bolt.AggregatingBolt;
import com.baeldung.storm.bolt.FileWritingBolt;
import com.baeldung.storm.bolt.FilteringBolt;
import com.baeldung.storm.bolt.PrintingBolt;
import com.baeldung.storm.spout.RandomNumberSpout;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseWindowedBolt;
public class TopologyRunner {
public static void main(String[] args) {
runTopology();
}
public static void runTopology() {
String filePath = "./src/main/resources/operations.txt";
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("randomNumberSpout", new RandomNumberSpout());
builder.setBolt("filteringBolt", new FilteringBolt()).shuffleGrouping("randomNumberSpout");
builder.setBolt("aggregatingBolt", new AggregatingBolt()
.withTimestampField("timestamp")
.withLag(BaseWindowedBolt.Duration.seconds(1))
.withWindow(BaseWindowedBolt.Duration.seconds(5))).shuffleGrouping("filteringBolt");
builder.setBolt("fileBolt", new FileWritingBolt(filePath)).shuffleGrouping("aggregatingBolt");
Config config = new Config();
config.setDebug(false);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("Test", config, builder.createTopology());
}
}
@@ -0,0 +1,39 @@
package com.baeldung.storm.bolt;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.windowing.TupleWindow;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
public class AggregatingBolt extends BaseWindowedBolt {
private OutputCollector outputCollector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.outputCollector = collector;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sumOfOperations", "beginningTimestamp", "endTimestamp"));
}
@Override
public void execute(TupleWindow tupleWindow) {
List<Tuple> tuples = tupleWindow.get();
tuples.sort(Comparator.comparing(a -> a.getLongByField("timestamp")));
//This is safe since the window is calculated basing on Tuple's timestamp, thus it can't really be empty
Long beginningTimestamp = tuples.get(0).getLongByField("timestamp");
Long endTimestamp = tuples.get(tuples.size() - 1).getLongByField("timestamp");
int sumOfOperations = tuples.stream().mapToInt(tuple -> tuple.getIntegerByField("operation")).sum();
Values values = new Values(sumOfOperations, beginningTimestamp, endTimestamp);
outputCollector.emit(values);
}
}
@@ -0,0 +1,72 @@
package com.baeldung.storm.bolt;
import com.baeldung.storm.model.AggregatedWindow;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Map;
public class FileWritingBolt extends BaseRichBolt {
public static Logger logger = LoggerFactory.getLogger(FileWritingBolt.class);
private BufferedWriter writer;
private String filePath;
private ObjectMapper objectMapper;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
try {
writer = new BufferedWriter(new FileWriter(filePath));
} catch (IOException e) {
logger.error("Failed to open a file for writing.", e);
}
}
@Override
public void execute(Tuple tuple) {
int sumOfOperations = tuple.getIntegerByField("sumOfOperations");
long beginningTimestamp = tuple.getLongByField("beginningTimestamp");
long endTimestamp = tuple.getLongByField("endTimestamp");
if(sumOfOperations > 200) {
AggregatedWindow aggregatedWindow = new AggregatedWindow(sumOfOperations, beginningTimestamp, endTimestamp);
try {
writer.write(objectMapper.writeValueAsString(aggregatedWindow));
writer.write("\n");
writer.flush();
} catch (IOException e) {
logger.error("Failed to write data to file.", e);
}
}
}
public FileWritingBolt(String filePath) {
this.filePath = filePath;
}
@Override
public void cleanup() {
try {
writer.close();
} catch (IOException e) {
logger.error("Failed to close the writer!");
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}
@@ -0,0 +1,22 @@
package com.baeldung.storm.bolt;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
public class FilteringBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
int operation = tuple.getIntegerByField("operation");
if(operation > 0 ) {
basicOutputCollector.emit(tuple.getValues());
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("operation", "timestamp"));
}
}
@@ -0,0 +1,18 @@
package com.baeldung.storm.bolt;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;
public class PrintingBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
System.out.println(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}
@@ -0,0 +1,16 @@
package com.baeldung.storm.model;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
@JsonSerialize
public class AggregatedWindow {
int sumOfOperations;
long beginningTimestamp;
long endTimestamp;
public AggregatedWindow(int sumOfOperations, long beginningTimestamp, long endTimestamp) {
this.sumOfOperations = sumOfOperations;
this.beginningTimestamp = beginningTimestamp;
this.endTimestamp = endTimestamp;
}
}
@@ -0,0 +1,40 @@
package com.baeldung.storm.model;
public class User {
private String username;
private String password;
private String email;
private int age;
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getEmail() {
return email;
}
public void setEmail(String email) {
this.email = email;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
@@ -0,0 +1,30 @@
package com.baeldung.storm.serialization;
import com.baeldung.storm.model.User;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
public class UserSerializer extends Serializer<User>{
@Override
public void write(Kryo kryo, Output output, User user) {
output.writeString(user.getEmail());
output.writeString(user.getUsername());
output.write(user.getAge());
}
@Override
public User read(Kryo kryo, Input input, Class<User> aClass) {
User user = new User();
String email = input.readString();
String name = input.readString();
int age = input.read();
user.setAge(age);
user.setEmail(email);
user.setUsername(name);
return user;
}
}
@@ -0,0 +1,35 @@
package com.baeldung.storm.spout;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.Map;
import java.util.Random;
public class RandomIntSpout extends BaseRichSpout {
private Random random;
private SpoutOutputCollector outputCollector;
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
random = new Random();
outputCollector = spoutOutputCollector;
}
@Override
public void nextTuple() {
Utils.sleep(1000);
outputCollector.emit(new Values(random.nextInt(), System.currentTimeMillis()));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("randomInt", "timestamp"));
}
}
@@ -0,0 +1,40 @@
package com.baeldung.storm.spout;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.Map;
import java.util.Random;
public class RandomNumberSpout extends BaseRichSpout {
private Random random;
private SpoutOutputCollector collector;
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
random = new Random();
collector = spoutOutputCollector;
}
@Override
public void nextTuple() {
Utils.sleep(1000);
//This will select random int from the range (0, 100)
int operation = random.nextInt(101);
long timestamp = System.currentTimeMillis();
Values values = new Values(operation, timestamp);
collector.emit(values);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("operation", "timestamp"));
}
}
@@ -0,0 +1,20 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test
@@ -0,0 +1,20 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test
@@ -0,0 +1,44 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
# Replace the relative path below with an absolute path if you are planning to start Kafka Connect from within a
# directory other than the home directory of Confluent Platform.
plugin.path=C:\Software\confluent-5.0.0\share\java
#plugin.path=./share/java
@@ -0,0 +1,88 @@
##
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
# This file contains some of the configurations for the Kafka Connect distributed worker. This file is intended
# to be used with the examples, and some settings may differ from those used in a production system, especially
# the `bootstrap.servers` and those specifying replication factors.
# A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
bootstrap.servers=localhost:9092
# unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
group.id=connect-cluster
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true
# Topic to use for storing offsets. This topic should have many partitions and be replicated and compacted.
# Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
#offset.storage.partitions=25
# Topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated,
# and compacted topic. Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
config.storage.topic=connect-configs
config.storage.replication.factor=1
# Topic to use for storing statuses. This topic can have multiple partitions and should be replicated and compacted.
# Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
status.storage.topic=connect-status
status.storage.replication.factor=1
#status.storage.partitions=5
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
# These are provided to inform the user about the presence of the REST host and port configs
# Hostname & Port for the REST API to listen on. If this is set, it will bind to the interface used to listen to requests.
#rest.host.name=
#rest.port=8083
# The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
#rest.advertised.host.name=
#rest.advertised.port=
# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
# Replace the relative path below with an absolute path if you are planning to start Kafka Connect from within a
# directory other than the home directory of Confluent Platform.
plugin.path=./share/java
@@ -0,0 +1,9 @@
{
"name": "local-file-sink",
"config": {
"connector.class": "FileStreamSink",
"tasks.max": 1,
"file": "test-distributed.sink.txt",
"topics": "connect-distributed"
}
}
@@ -0,0 +1,9 @@
{
"name": "local-file-source",
"config": {
"connector.class": "FileStreamSource",
"tasks.max": 1,
"file": "test-distributed.txt",
"topic": "connect-distributed"
}
}
@@ -0,0 +1,88 @@
##
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
# This file contains some of the configurations for the Kafka Connect distributed worker. This file is intended
# to be used with the examples, and some settings may differ from those used in a production system, especially
# the `bootstrap.servers` and those specifying replication factors.
# A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
bootstrap.servers=localhost:9092
# unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
group.id=connect-cluster
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=false
value.converter.schemas.enable=false
# Topic to use for storing offsets. This topic should have many partitions and be replicated and compacted.
# Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
#offset.storage.partitions=25
# Topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated,
# and compacted topic. Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
config.storage.topic=connect-configs
config.storage.replication.factor=1
# Topic to use for storing statuses. This topic can have multiple partitions and should be replicated and compacted.
# Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
status.storage.topic=connect-status
status.storage.replication.factor=1
#status.storage.partitions=5
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
# These are provided to inform the user about the presence of the REST host and port configs
# Hostname & Port for the REST API to listen on. If this is set, it will bind to the interface used to listen to requests.
#rest.host.name=
#rest.port=8083
# The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
#rest.advertised.host.name=
#rest.advertised.port=
# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
# Replace the relative path below with an absolute path if you are planning to start Kafka Connect from within a
# directory other than the home directory of Confluent Platform.
plugin.path=./share/java
@@ -0,0 +1,15 @@
{
"name": "local-file-source",
"config": {
"connector.class": "FileStreamSource",
"tasks.max": 1,
"file": "transformation.txt",
"topic": "connect-transformation",
"transforms": "MakeMap,InsertSource",
"transforms.MakeMap.type": "org.apache.kafka.connect.transforms.HoistField$Value",
"transforms.MakeMap.field": "line",
"transforms.InsertSource.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertSource.static.field": "data_source",
"transforms.InsertSource.static.value": "test-file-source"
}
}
@@ -0,0 +1,14 @@
{
"name": "mongodb-sink",
"config": {
"connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
"tasks.max": 1,
"topics": "connect-custom",
"mongodb.connection.uri": "mongodb://mongo-db/test?retryWrites=true",
"mongodb.collection": "MyCollection",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false
}
}
@@ -0,0 +1,13 @@
{
"name": "mqtt-source",
"config": {
"connector.class": "io.confluent.connect.mqtt.MqttSourceConnector",
"tasks.max": 1,
"mqtt.server.uri": "tcp://mosquitto:1883",
"mqtt.topics": "baeldung",
"kafka.topic": "connect-custom",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"confluent.topic.bootstrap.servers": "kafka:9092",
"confluent.topic.replication.factor": 1
}
}
@@ -0,0 +1,94 @@
version: '3.3'
services:
mosquitto:
image: eclipse-mosquitto:1.5.5
hostname: mosquitto
container_name: mosquitto
expose:
- "1883"
ports:
- "1883:1883"
zookeeper:
image: zookeeper:3.4.9
restart: unless-stopped
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
ZOO_SERVERS: server.1=zookeeper:2888:3888
volumes:
- ./zookeeper/data:/data
- ./zookeeper/datalog:/datalog
kafka:
image: confluentinc/cp-kafka:5.1.0
hostname: kafka
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- ./kafka/data:/var/lib/kafka/data
depends_on:
- zookeeper
kafka-connect:
image: confluentinc/cp-kafka-connect:5.1.0
hostname: kafka-connect
container_name: kafka-connect
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_PLUGIN_PATH: '/usr/share/java,/etc/kafka-connect/jars'
CONNECT_CONFLUENT_TOPIC_REPLICATION_FACTOR: 1
volumes:
- /tmp/custom/jars:/etc/kafka-connect/jars
depends_on:
- zookeeper
- kafka
- mosquitto
mongo-db:
image: mongo:4.0.5
hostname: mongo-db
container_name: mongo-db
expose:
- "27017"
ports:
- "27017:27017"
command: --bind_ip_all --smallfiles
volumes:
- ./mongo-db:/data
mongoclient:
image: mongoclient/mongoclient:2.2.0
container_name: mongoclient
hostname: mongoclient
depends_on:
- mongo-db
ports:
- 3000:3000
environment:
MONGO_URL: "mongodb://mongo-db:27017"
PORT: 3000
expose:
- "3000"
@@ -0,0 +1,4 @@
javax.jdo.PersistenceManagerFactoryClass=org.datanucleus.api.jdo.JDOPersistenceManagerFactory
javax.jdo.option.ConnectionURL= xml:file:myfile-ds.xml
datanucleus.xml.indentSize=6
datanucleus.schema.autoCreateAll=true
@@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<jdoconfig xmlns="http://xmlns.jcp.org/xml/ns/jdo/jdoconfig"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/jdo/jdoconfig
http://xmlns.jcp.org/xml/ns/jdo/jdoconfig_3_2.xsd"
version="3.2">
<!-- Datastore Txn PMF -->
<persistence-manager-factory name="XmlDatastore">
<property name="javax.jdo.PersistenceManagerFactoryClass"
value="org.datanucleus.api.jdo.JDOPersistenceManagerFactory" />
<property name="javax.jdo.option.ConnectionURL" value="xml:file:namedPMF-ds.xml" />
<property name="datanucleus.xml.indentSize" value="6" />
<property name="datanucleus.schema.autoCreateAll"
value="true" />
</persistence-manager-factory>
</jdoconfig>
@@ -0,0 +1,29 @@
<jdo xmlns="http://xmlns.jcp.org/xml/ns/jdo/jdo" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/jdo/jdo http://xmlns.jcp.org/xml/ns/jdo/jdo_3_1.xsd"
version="3.1">
<package name="com.baeldung.jdo.xml">
<class name="Person" detachable="true" schema="/myproduct/people"
table="person">
<field name="personNum">
<extension vendor-name="datanucleus" key="XmlAttribute"
value="true" />
</field>
<field name="firstName" primary-key="true" /> <!-- PK since JAXB requires String -->
<field name="lastName" />
<field name="phoneNumbers">
<collection element-type="java.lang.String" />
<element column="phoneNumber" />
</field>
</class>
</package>
<package name="com.baeldung.jdo.query">
<class name="ProductItem" detachable="true" table="product_item">
<query name="PriceBelow10" language="javax.jdo.query.SQL">
<![CDATA[SELECT * FROM PRODUCT_ITEM WHERE PRICE < 10
]]></query>
</class>
</package>
</jdo>
@@ -0,0 +1,6 @@
<MithraRuntime>
<ConnectionManager className="com.baeldung.reladomo.ReladomoConnectionManager ">
<MithraObjectConfiguration className="com.baeldung.reladomo.Department" cacheType="partial"/>
<MithraObjectConfiguration className="com.baeldung.reladomo.Employee " cacheType="partial"/>
</ConnectionManager>
</MithraRuntime>
+51
View File
@@ -0,0 +1,51 @@
drop table if exists emp;
drop table if exists dept;
create table dept(
deptno numeric,
dname varchar(14),
loc varchar(13),
constraint pk_dept primary key (deptno)
);
create table emp(
empno numeric,
ename varchar(10),
job varchar(9),
mgr numeric,
hiredate date,
sal numeric,
comm numeric,
deptno numeric,
constraint pk_emp primary key (empno),
constraint fk_deptno foreign key (deptno) references dept (deptno)
);
insert into dept values(10, 'ACCOUNTING', 'NEW YORK');
insert into dept values(20, 'RESEARCH', 'DALLAS');
insert into dept values(30, 'SALES', 'CHICAGO');
insert into dept values(40, 'OPERATIONS', 'BOSTON');
insert into emp values(
7839, 'KING', 'PRESIDENT', null,
to_date('17-11-1981','dd-mm-yyyy'),
7698, null, 10
);
insert into emp values(
7698, 'BLAKE', 'MANAGER', 7839,
to_date('1-5-1981','dd-mm-yyyy'),
7782, null, 20
);
insert into emp values(
7782, 'CLARK', 'MANAGER', 7839,
to_date('9-6-1981','dd-mm-yyyy'),
7566, null, 30
);
insert into emp values(
7566, 'JONES', 'MANAGER', 7839,
to_date('2-4-1981','dd-mm-yyyy'),
7839, null, 40
);
commit;
@@ -0,0 +1,3 @@
entity-packages: com.baeldung.ebean.model
transactional-packages: com.baeldung.ebean.app
querybean-packages: com.baeldung.ebean.app
@@ -0,0 +1,7 @@
ebean.db.ddl.generate=true
ebean.db.ddl.run=true
datasource.db.username=sa
datasource.db.password=
datasource.db.databaseUrl=jdbc:h2:mem:customer
datasource.db.databaseDriver=org.h2.Driver
@@ -0,0 +1,3 @@
{id:"1", name="John", isEmployed: "true"}
{id:"1", name="Anna", isEmployed: "false"}
{id:"1", name="George", isEmployed: "true"}
@@ -0,0 +1,15 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
</pattern>
</encoder>
</appender>
<logger name="io.ebean.DDL" level="TRACE"/>
<logger name="io.ebean.SQL" level="TRACE"/>
<logger name="io.ebean.TXN" level="TRACE"/>
<root level="WARN">
<appender-ref ref="STDOUT" />
</root>
</configuration>
@@ -0,0 +1,11 @@
<MithraObject objectType="transactional">
<PackageName>com.baeldung.reladomo</PackageName>
<ClassName>Department</ClassName>
<DefaultTable>departments</DefaultTable>
<Attribute name="id" javaType="long" columnName="department_id" primaryKey="true"/>
<Attribute name="name" javaType="String" columnName="name" maxLength="50" truncate="true"/>
<Relationship name="employees" relatedObject="Employee" cardinality="one-to-many" reverseRelationshipName="department" relatedIsDependent="true">
Employee.departmentId = this.id
</Relationship>
</MithraObject>
@@ -0,0 +1,9 @@
<MithraObject objectType="transactional">
<PackageName>com.baeldung.reladomo</PackageName>
<ClassName>Employee</ClassName>
<DefaultTable>employees</DefaultTable>
<Attribute name="id" javaType="long" columnName="employee_id" primaryKey="true"/>
<Attribute name="name" javaType="String" columnName="name" maxLength="50" truncate="true"/>
<Attribute name="departmentId" javaType="long" columnName="department_id"/>
</MithraObject>
@@ -0,0 +1,4 @@
<Mithra>
<MithraObjectResource name="Department"/>
<MithraObjectResource name="Employee"/>
</Mithra>
@@ -0,0 +1,10 @@
<jmapper>
<class name="com.baeldung.jmapper.UserDto">
<attribute name="id">
<value name="id"/>
</attribute>
<attribute name="username">
<value name="email"/>
</attribute>
</class>
</jmapper>
@@ -0,0 +1,5 @@
<jmapper>
<class name="com.baeldung.jmapper.UserDto1">
<global/>
</class>
</jmapper>
@@ -0,0 +1,21 @@
<jmapper>
<class name="com.baeldung.jmapper.relational.User">
<attribute name="id">
<value name="id"/>
<classes>
<class name="com.baeldung.jmapper.relational.UserDto1"/>
<class name="com.baeldung.jmapper.relational.UserDto2"/>
</classes>
</attribute>
<attribute name="email">
<attributes>
<attribute name="username"/>
<attribute name="email"/>
</attributes>
<classes>
<class name="com.baeldung.jmapper.relational.UserDto1"/>
<class name="com.baeldung.jmapper.relational.UserDto2"/>
</classes>
</attribute>
</class>
</jmapper>
@@ -0,0 +1,89 @@
package com.baeldung.crunch;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertIterableEquals;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Calendar;
import org.apache.crunch.PCollection;
import org.apache.crunch.Pipeline;
import org.apache.crunch.Source;
import org.apache.crunch.Target;
import org.apache.crunch.impl.mem.MemPipeline;
import org.apache.crunch.io.From;
import org.apache.crunch.io.To;
import org.junit.Ignore;
import org.junit.Test;
public class MemPipelineUnitTest {
private static final String INPUT_FILE_PATH = "src/test/resources/crunch/input.txt";
@Test
public void givenPipeLineAndSource_whenSourceRead_thenExpectedNumberOfRecordsRead() {
Pipeline pipeline = MemPipeline.getInstance();
Source<String> source = From.textFile(INPUT_FILE_PATH);
PCollection<String> lines = pipeline.read(source);
assertEquals(21, lines.asCollection()
.getValue()
.size());
}
@Test
public void givenPipeLine_whenTextFileRead_thenExpectedNumberOfRecordsRead() {
Pipeline pipeline = MemPipeline.getInstance();
PCollection<String> lines = pipeline.readTextFile(INPUT_FILE_PATH);
assertEquals(21, lines.asCollection()
.getValue()
.size());
}
private String createOutputPath() throws IOException {
Path path = Files.createTempDirectory("test");
final String outputFilePath = path.toString() + File.separatorChar
+ "output.text";
return outputFilePath;
}
@Test
@Ignore("Requires Hadoop binaries")
public void givenCollection_whenWriteCalled_fileWrittenSuccessfully()
throws IOException {
PCollection<String> inputStrings = MemPipeline.collectionOf("Hello",
"Apache", "Crunch", Calendar.getInstance()
.toString());
final String outputFilePath = createOutputPath();
Target target = To.textFile(outputFilePath);
inputStrings.write(target);
Pipeline pipeline = MemPipeline.getInstance();
PCollection<String> lines = pipeline.readTextFile(outputFilePath);
assertIterableEquals(inputStrings.materialize(), lines.materialize());
}
@Test
@Ignore("Requires Hadoop binaries")
public void givenPipeLine_whenWriteTextFileCalled_fileWrittenSuccessfully()
throws IOException {
Pipeline pipeline = MemPipeline.getInstance();
PCollection<String> inputStrings = MemPipeline.collectionOf("Hello",
"Apache", "Crunch", Calendar.getInstance()
.toString());
final String outputFilePath = createOutputPath();
pipeline.writeTextFile(inputStrings, outputFilePath);
PCollection<String> lines = pipeline.readTextFile(outputFilePath);
assertIterableEquals(inputStrings.materialize(), lines.materialize());
}
}
@@ -0,0 +1,41 @@
package com.baeldung.crunch;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.crunch.FilterFn;
import org.apache.crunch.PCollection;
import org.apache.crunch.impl.mem.MemPipeline;
import org.junit.Test;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
public class StopWordFilterUnitTest {
@Test
public void givenFilter_whenStopWordPassed_thenFalseReturned() {
FilterFn<String> filter = new StopWordFilter();
assertFalse(filter.accept("the"));
}
@Test
public void givenFilter_whenNonStopWordPassed_thenTrueReturned() {
FilterFn<String> filter = new StopWordFilter();
assertTrue(filter.accept("Hello"));
}
@Test
public void givenWordCollection_whenFiltered_thenStopWordsRemoved() {
PCollection<String> words = MemPipeline.collectionOf("This", "is", "a",
"test", "sentence");
PCollection<String> noStopWords = words.filter(new StopWordFilter());
assertEquals(ImmutableList.of("This", "test", "sentence"),
Lists.newArrayList(noStopWords.materialize()));
}
}
@@ -0,0 +1,21 @@
package com.baeldung.crunch;
import static org.junit.Assert.assertEquals;
import org.apache.crunch.impl.mem.emit.InMemoryEmitter;
import org.junit.Test;
import com.google.common.collect.ImmutableList;
public class ToUpperCaseFnUnitTest {
@Test
public void givenString_whenToUpperCaseFnCalled_UpperCaseStringReturned() {
InMemoryEmitter<String> emitter = new InMemoryEmitter<String>();
new ToUpperCaseFn().process("input", emitter);
assertEquals(ImmutableList.of("INPUT"), emitter.getOutput());
}
}
@@ -0,0 +1,31 @@
package com.baeldung.crunch;
import static org.junit.Assert.assertEquals;
import org.apache.crunch.PCollection;
import org.apache.crunch.impl.mem.MemPipeline;
import org.apache.crunch.types.writable.Writables;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
public class ToUpperCaseWithCounterFnUnitTest {
@Before
public void setUp() throws Exception {
MemPipeline.clearCounters();
}
@Test
public void whenFunctionCalled_counterIncementendForChangedValues() {
PCollection<String> inputStrings = MemPipeline.collectionOf("This", "is", "a", "TEST", "string");
PCollection<String> upperCaseStrings = inputStrings.parallelDo(new ToUpperCaseWithCounterFn(), Writables.strings());
assertEquals(ImmutableList.of("THIS", "IS", "A", "TEST", "STRING"), Lists.newArrayList(upperCaseStrings.materialize()));
assertEquals(4L, MemPipeline.getCounters()
.findCounter("UpperCase", "modified")
.getValue());
}
}
@@ -0,0 +1,27 @@
package com.baeldung.crunch;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import org.apache.crunch.Emitter;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class TokenizerUnitTest {
@Mock
private Emitter<String> emitter;
@Test
public void givenTokenizer_whenLineProcessed_thenOnlyExpectedWordsEmitted() {
Tokenizer splitter = new Tokenizer();
splitter.process(" hello world ", emitter);
verify(emitter).emit("hello");
verify(emitter).emit("world");
verifyNoMoreInteractions(emitter);
}
}
@@ -0,0 +1,17 @@
package com.baeldung.hikaricp;
import org.junit.Test;
import java.util.List;
import static org.junit.Assert.assertEquals;
public class HikariCPIntegrationTest {
@Test
public void givenConnection_thenFetchDbData() {
List<Employee> employees = HikariCPDemo.fetchData();
assertEquals(4, employees.size());
}
}

Some files were not shown because too many files have changed in this diff Show More