diff --git a/persistence-modules/apache-bookkeeper/pom.xml b/persistence-modules/apache-bookkeeper/pom.xml
index 46a7982b12..0beea7f1fc 100644
--- a/persistence-modules/apache-bookkeeper/pom.xml
+++ b/persistence-modules/apache-bookkeeper/pom.xml
@@ -1,46 +1,46 @@
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- 4.0.0
- apache-bookkeeper
- 0.0.1-SNAPSHOT
- apache-bookkeeper
- jar
+ 4.0.0
+ apache-bookkeeper
+ 0.0.1-SNAPSHOT
+ apache-bookkeeper
+ jar
-
- com.baeldung
- parent-modules
- 1.0.0-SNAPSHOT
- ../../
-
+
+ com.baeldung
+ parent-modules
+ 1.0.0-SNAPSHOT
+ ../../
+
-
-
- org.apache.bookkeeper
- bookkeeper-server
- ${org.apache.bookkeeper.version}
-
-
- org.slf4j
- slf4j-log4j12
-
-
-
+
+
+ org.apache.bookkeeper
+ bookkeeper-server
+ ${org.apache.bookkeeper.version}
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+
-
- org.testcontainers
- testcontainers
- 1.14.3
- test
-
-
-
+
+ org.testcontainers
+ testcontainers
+ 1.14.3
+ test
+
-
- 4.10.0
-
+
+
+
+ 4.10.0
+
diff --git a/persistence-modules/apache-bookkeeper/src/main/java/com/baeldung/tutorials/bookkeeper/BkHelper.java b/persistence-modules/apache-bookkeeper/src/main/java/com/baeldung/tutorials/bookkeeper/BkHelper.java
index 7cba88af19..55f5d7b09f 100644
--- a/persistence-modules/apache-bookkeeper/src/main/java/com/baeldung/tutorials/bookkeeper/BkHelper.java
+++ b/persistence-modules/apache-bookkeeper/src/main/java/com/baeldung/tutorials/bookkeeper/BkHelper.java
@@ -6,8 +6,8 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -23,7 +23,7 @@ import org.apache.zookeeper.AsyncCallback;
public class BkHelper {
- private static final Log log = LogFactory.getLog(BkHelper.class);
+ private static final Log LOG = LogFactory.getLog(BkHelper.class);
public static BookKeeper createBkClient(String zkConnectionString) {
try {
@@ -57,7 +57,6 @@ public class BkHelper {
* @throws Exception
*/
public static Optional findLedgerByName(BookKeeper bk, String name) throws Exception {
-
Map ledgers = new HashMap();
final AtomicInteger returnCode = new AtomicInteger(BKException.Code.OK);
final CountDownLatch processDone = new CountDownLatch(1);
@@ -67,52 +66,51 @@ public class BkHelper {
// The second callback will be called when there are no more ledgers do process or if an
// error occurs.
bk.getLedgerManager()
- .asyncProcessLedgers((ledgerId, cb) -> collectLedgers(bk, ledgerId, cb, ledgers),
- (rc, s, obj) -> {
- returnCode.set(rc);
- processDone.countDown();
- }, null, BKException.Code.OK, BKException.Code.ReadException);
-
+ .asyncProcessLedgers(
+ (ledgerId, cb) -> collectLedgers(bk, ledgerId, cb, ledgers),
+ (rc, s, obj) -> {
+ returnCode.set(rc);
+ processDone.countDown();
+ },
+ null,
+ BKException.Code.OK, BKException.Code.ReadException);
processDone.await(5, TimeUnit.MINUTES);
-
- log.info("Ledgers collected: total found=" + ledgers.size());
+ LOG.info("Ledgers collected: total found=" + ledgers.size());
byte[] nameBytes = name.getBytes();
-
Optional> entry = ledgers.entrySet()
- .stream()
- .filter((e) -> {
- Map meta = e.getValue().getCustomMetadata();
- if (meta != null) {
- log.info("ledger: " + e.getKey() + ", customMeta=" + meta);
- byte[] data = meta.get("name");
- if (data != null && Arrays.equals(data, nameBytes)) {
- return true;
- } else {
- return false;
- }
- } else {
- log.info("ledger: " + e.getKey() + ", no meta");
- return false;
- }
- })
- .findFirst();
-
+ .stream()
+ .filter((e) -> {
+ Map meta = e.getValue()
+ .getCustomMetadata();
+ if (meta != null) {
+ LOG.info("ledger: " + e.getKey() + ", customMeta=" + meta);
+ byte[] data = meta.get("name");
+ if (data != null && Arrays.equals(data, nameBytes)) {
+ return true;
+ } else {
+ return false;
+ }
+ } else {
+ LOG.info("ledger: " + e.getKey() + ", no meta");
+ return false;
+ }
+ })
+ .findFirst();
if (entry.isPresent()) {
- return Optional.of(entry.get().getKey());
+ return Optional.of(entry.get()
+ .getKey());
} else {
return Optional.empty();
}
}
public static void collectLedgers(BookKeeper bk, long ledgerId, AsyncCallback.VoidCallback cb, Map ledgers) {
- log.debug("ledgerId: " + ledgerId);
-
try {
bk.getLedgerManager()
.readLedgerMetadata(ledgerId)
.thenAccept((v) -> {
- log.debug("Got ledger metadata");
+ LOG.debug("Got ledger metadata");
ledgers.put(ledgerId, v.getValue());
})
.thenAccept((v) -> {
@@ -122,36 +120,30 @@ public class BkHelper {
throw new RuntimeException(ex);
}
}
-
+
/**
* Return a list with all available Ledgers
* @param bk
* @return
*/
public static List listAllLedgers(BookKeeper bk) {
-
final List ledgers = Collections.synchronizedList(new ArrayList<>());
- final CountDownLatch processDone = new CountDownLatch(1);
-
+ final CountDownLatch processDone = new CountDownLatch(1);
+
bk.getLedgerManager()
- .asyncProcessLedgers(
- (ledgerId,cb) -> {
- ledgers.add(ledgerId);
- cb.processResult(BKException.Code.OK, null, null);
- },
- (rc, s, obj) -> {
- processDone.countDown();
- },
- null, BKException.Code.OK, BKException.Code.ReadException);
-
- try {
- processDone.await(1,TimeUnit.MINUTES);
- return ledgers;
- }
- catch(InterruptedException ie) {
- throw new RuntimeException(ie);
- }
+ .asyncProcessLedgers((ledgerId, cb) -> {
+ ledgers.add(ledgerId);
+ cb.processResult(BKException.Code.OK, null, null);
+ },
+ (rc, s, obj) -> {
+ processDone.countDown();
+ }, null, BKException.Code.OK, BKException.Code.ReadException);
+
+ try {
+ processDone.await(1, TimeUnit.MINUTES);
+ return ledgers;
+ } catch (InterruptedException ie) {
+ throw new RuntimeException(ie);
+ }
}
-
-
}
diff --git a/persistence-modules/apache-bookkeeper/src/test/java/com/baeldung/tutorials/bookkeeper/BkHelperLiveTest.java b/persistence-modules/apache-bookkeeper/src/test/java/com/baeldung/tutorials/bookkeeper/BkHelperLiveTest.java
index 2bbf54e2b7..84a8ce3db8 100644
--- a/persistence-modules/apache-bookkeeper/src/test/java/com/baeldung/tutorials/bookkeeper/BkHelperLiveTest.java
+++ b/persistence-modules/apache-bookkeeper/src/test/java/com/baeldung/tutorials/bookkeeper/BkHelperLiveTest.java
@@ -1,9 +1,10 @@
package com.baeldung.tutorials.bookkeeper;
-import static org.junit.jupiter.api.Assertions.*;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.Enumeration;
import java.util.Iterator;
@@ -11,163 +12,128 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
-import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
-import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
-import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.bookkeeper.client.api.LedgerEntries;
-import org.apache.bookkeeper.client.api.ReadHandle;
-import org.apache.bookkeeper.client.api.WriteAdvHandle;
import org.apache.bookkeeper.client.api.WriteHandle;
-import org.apache.bookkeeper.tools.cli.commands.bookie.ListLedgersCommand;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.zookeeper.AsyncCallback;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-import com.google.common.collect.Iterables;
-
class BkHelperLiveTest extends BkHelper {
-
private static BookKeeper bk;
private byte[] ledgerPassword = "SuperS3cR37".getBytes();
-
- private static final Log log = LogFactory.getLog(BkHelperLiveTest.class);
-
+ private static final Log LOG = LogFactory.getLog(BkHelperLiveTest.class);
+
@BeforeAll
static void initBkClient() {
- bk = createBkClient("192.168.99.101:2181");
+ bk = createBkClient("192.168.99.101:2181");
}
-
+
@Test
void whenCreateLedger_thenSuccess() throws Exception {
-
LedgerHandle lh = bk.createLedger(BookKeeper.DigestType.MAC, ledgerPassword);
assertNotNull(lh);
assertNotNull(lh.getId());
-
- log.info("[I33] Ledge created: id=" + lh.getId());
+ LOG.info("[I33] Ledge created: id=" + lh.getId());
}
-
-
+
@Test
void whenCreateLedgerAsync_thenSuccess() throws Exception {
-
+
CompletableFuture cf = bk.newCreateLedgerOp()
.withDigestType(org.apache.bookkeeper.client.api.DigestType.MAC)
.withPassword("password".getBytes())
.execute();
-
+
WriteHandle handle = cf.get(1, TimeUnit.MINUTES);
assertNotNull(handle);
handle.close();
}
-
@Test
void whenAsyncCreateLedger_thenSuccess() throws Exception {
-
CountDownLatch latch = new CountDownLatch(1);
- AtomicReference handleRef =new AtomicReference<>();
-
- bk.asyncCreateLedger(3, 2, 2,
- BookKeeper.DigestType.MAC,
- ledgerPassword,
+ AtomicReference handleRef = new AtomicReference<>();
+
+ bk.asyncCreateLedger(3, 2, 2, BookKeeper.DigestType.MAC, ledgerPassword,
(rc, lh, ctx) -> {
handleRef.set(lh);
latch.countDown();
- },
- null,
- Collections.emptyMap());
-
+ }, null, Collections.emptyMap());
latch.await(1, TimeUnit.MINUTES);
LedgerHandle lh = handleRef.get();
- assertNotNull(lh);
- assertFalse(lh.isClosed(),"Ledger should be writeable");
+ assertNotNull(lh);
+ assertFalse(lh.isClosed(), "Ledger should be writeable");
}
-
@Test
void whenListLedgers_thenSuccess() throws Exception {
-
List ledgers = listAllLedgers(bk);
assertNotNull(ledgers);
}
-
+
@Test
void whenWriteEntries_thenSuccess() throws Exception {
-
- LedgerHandle lh = createLedger(bk,"myledger",ledgerPassword);
-
+ LedgerHandle lh = createLedger(bk, "myledger", ledgerPassword);
long start = System.currentTimeMillis();
- for ( int i = 0 ; i < 1000 ; i++ ) {
+ for (int i = 0; i < 1000; i++) {
byte[] data = new String("message-" + i).getBytes();
lh.append(data);
- }
-
+ }
lh.close();
long elapsed = System.currentTimeMillis() - start;
- log.info("Entries added to ledgerId " + lh.getId() + ". count=1000, elapsed=" + elapsed);
+ LOG.info("Entries added to ledgerId " + lh.getId() + ". count=1000, elapsed=" + elapsed);
}
-
- @Test
+
+ @Test
void whenWriteEntriesAsync_thenSuccess() throws Exception {
-
CompletableFuture