JAVA-2802 - Copied the separate draft code for Mongo DB Reactive Articles and configured it with persistence modules.
This commit is contained in:
+13
@@ -0,0 +1,13 @@
|
||||
package com.baeldung.couchbase;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration;
|
||||
|
||||
@SpringBootApplication(exclude = MongoAutoConfiguration.class)
|
||||
public class ReactiveCouchbaseApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(ReactiveCouchbaseApplication.class, args);
|
||||
}
|
||||
}
|
||||
+43
@@ -0,0 +1,43 @@
|
||||
package com.baeldung.couchbase.configuration;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.PropertySource;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
@Configuration
|
||||
@PropertySource("classpath:couchbase.properties")
|
||||
public class CouchbaseProperties {
|
||||
|
||||
private final List<String> bootstrapHosts;
|
||||
private final String bucketName;
|
||||
private final String bucketPassword;
|
||||
private final int port;
|
||||
|
||||
public CouchbaseProperties(@Value("${spring.couchbase.bootstrap-hosts}") final List<String> bootstrapHosts, @Value("${spring.couchbase.bucket.name}") final String bucketName, @Value("${spring.couchbase.bucket.password}") final String bucketPassword,
|
||||
@Value("${spring.couchbase.port}") final int port) {
|
||||
this.bootstrapHosts = Collections.unmodifiableList(bootstrapHosts);
|
||||
this.bucketName = bucketName;
|
||||
this.bucketPassword = bucketPassword;
|
||||
this.port = port;
|
||||
}
|
||||
|
||||
public List<String> getBootstrapHosts() {
|
||||
return bootstrapHosts;
|
||||
}
|
||||
|
||||
public String getBucketName() {
|
||||
return bucketName;
|
||||
}
|
||||
|
||||
public String getBucketPassword() {
|
||||
return bucketPassword;
|
||||
}
|
||||
|
||||
public int getPort() {
|
||||
return port;
|
||||
}
|
||||
}
|
||||
+15
@@ -0,0 +1,15 @@
|
||||
package com.baeldung.couchbase.configuration;
|
||||
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Primary;
|
||||
import org.springframework.data.couchbase.repository.config.EnableReactiveCouchbaseRepositories;
|
||||
|
||||
@Configuration
|
||||
@EnableReactiveCouchbaseRepositories("com.baeldung.couchbase.domain.repository.n1ql")
|
||||
@Primary
|
||||
public class N1QLReactiveCouchbaseConfiguration extends ReactiveCouchbaseConfiguration {
|
||||
|
||||
public N1QLReactiveCouchbaseConfiguration(CouchbaseProperties couchbaseProperties) {
|
||||
super(couchbaseProperties);
|
||||
}
|
||||
}
|
||||
+48
@@ -0,0 +1,48 @@
|
||||
package com.baeldung.couchbase.configuration;
|
||||
|
||||
import com.couchbase.client.java.env.CouchbaseEnvironment;
|
||||
import com.couchbase.client.java.env.DefaultCouchbaseEnvironment;
|
||||
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.data.couchbase.config.AbstractReactiveCouchbaseConfiguration;
|
||||
import org.springframework.data.couchbase.config.BeanNames;
|
||||
import org.springframework.data.couchbase.repository.support.IndexManager;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public abstract class ReactiveCouchbaseConfiguration extends AbstractReactiveCouchbaseConfiguration {
|
||||
|
||||
private CouchbaseProperties couchbaseProperties;
|
||||
|
||||
public ReactiveCouchbaseConfiguration(final CouchbaseProperties couchbaseProperties) {
|
||||
this.couchbaseProperties = couchbaseProperties;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<String> getBootstrapHosts() {
|
||||
return couchbaseProperties.getBootstrapHosts();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getBucketName() {
|
||||
return couchbaseProperties.getBucketName();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getBucketPassword() {
|
||||
return couchbaseProperties.getBucketPassword();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CouchbaseEnvironment couchbaseEnvironment() {
|
||||
return DefaultCouchbaseEnvironment
|
||||
.builder()
|
||||
.bootstrapHttpDirectPort(couchbaseProperties.getPort())
|
||||
.build();
|
||||
}
|
||||
|
||||
@Bean(name = BeanNames.COUCHBASE_INDEX_MANAGER)
|
||||
public IndexManager couchbaseIndexManager() {
|
||||
return new IndexManager(true, true, false);
|
||||
}
|
||||
}
|
||||
+13
@@ -0,0 +1,13 @@
|
||||
package com.baeldung.couchbase.configuration;
|
||||
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.data.couchbase.repository.config.EnableReactiveCouchbaseRepositories;
|
||||
|
||||
@Configuration
|
||||
@EnableReactiveCouchbaseRepositories("com.baeldung.couchbase.domain.repository.view")
|
||||
public class ViewReactiveCouchbaseConfiguration extends ReactiveCouchbaseConfiguration {
|
||||
|
||||
public ViewReactiveCouchbaseConfiguration(CouchbaseProperties couchbaseProperties) {
|
||||
super(couchbaseProperties);
|
||||
}
|
||||
}
|
||||
+43
@@ -0,0 +1,43 @@
|
||||
package com.baeldung.couchbase.domain;
|
||||
|
||||
import org.springframework.data.annotation.Id;
|
||||
import org.springframework.data.couchbase.core.mapping.Document;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.UUID;
|
||||
|
||||
@Document
|
||||
public class Person {
|
||||
|
||||
@Id private UUID id;
|
||||
private String firstName;
|
||||
|
||||
public Person(final UUID id, final String firstName) {
|
||||
this.id = id;
|
||||
this.firstName = firstName;
|
||||
}
|
||||
|
||||
private Person() {
|
||||
}
|
||||
|
||||
public UUID getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public String getFirstName() {
|
||||
return firstName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
Person person = (Person) o;
|
||||
return Objects.equals(id, person.id) && Objects.equals(firstName, person.firstName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(id, firstName);
|
||||
}
|
||||
}
|
||||
+16
@@ -0,0 +1,16 @@
|
||||
package com.baeldung.couchbase.domain.repository.n1ql;
|
||||
|
||||
import com.baeldung.couchbase.domain.Person;
|
||||
import org.springframework.data.couchbase.core.query.N1qlPrimaryIndexed;
|
||||
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
|
||||
import org.springframework.stereotype.Repository;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
@Repository
|
||||
@N1qlPrimaryIndexed
|
||||
public interface N1QLPersonRepository extends ReactiveCrudRepository<Person, UUID> {
|
||||
|
||||
Flux<Person> findAllByFirstName(final String firstName);
|
||||
}
|
||||
+13
@@ -0,0 +1,13 @@
|
||||
package com.baeldung.couchbase.domain.repository.n1ql;
|
||||
|
||||
import com.baeldung.couchbase.domain.Person;
|
||||
import org.springframework.data.couchbase.core.query.N1qlPrimaryIndexed;
|
||||
import org.springframework.data.repository.reactive.ReactiveSortingRepository;
|
||||
import org.springframework.stereotype.Repository;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
@Repository
|
||||
@N1qlPrimaryIndexed
|
||||
public interface N1QLSortingPersonRepository extends ReactiveSortingRepository<Person, UUID> {
|
||||
}
|
||||
+20
@@ -0,0 +1,20 @@
|
||||
package com.baeldung.couchbase.domain.repository.view;
|
||||
|
||||
import com.baeldung.couchbase.domain.Person;
|
||||
import org.springframework.data.couchbase.core.query.View;
|
||||
import org.springframework.data.couchbase.core.query.ViewIndexed;
|
||||
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
|
||||
import org.springframework.stereotype.Repository;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
@Repository
|
||||
@ViewIndexed(designDoc = ViewPersonRepository.DESIGN_DOCUMENT)
|
||||
public interface ViewPersonRepository extends ReactiveCrudRepository<Person, UUID> {
|
||||
|
||||
String DESIGN_DOCUMENT = "person";
|
||||
|
||||
@View(designDocument = ViewPersonRepository.DESIGN_DOCUMENT)
|
||||
Flux<Person> findByFirstName(String firstName);
|
||||
}
|
||||
+15
@@ -0,0 +1,15 @@
|
||||
package com.baeldung.r2dbc;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.context.annotation.ComponentScan;
|
||||
|
||||
@SpringBootApplication
|
||||
@ComponentScan(basePackages = "com.baeldung.r2dbc")
|
||||
public class R2dbcApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(R2dbcApplication.class, args);
|
||||
}
|
||||
|
||||
}
|
||||
+21
@@ -0,0 +1,21 @@
|
||||
package com.baeldung.r2dbc.configuration;
|
||||
|
||||
import io.r2dbc.h2.H2ConnectionConfiguration;
|
||||
import io.r2dbc.h2.H2ConnectionFactory;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.data.r2dbc.config.AbstractR2dbcConfiguration;
|
||||
import org.springframework.data.r2dbc.repository.config.EnableR2dbcRepositories;
|
||||
|
||||
@Configuration
|
||||
@EnableR2dbcRepositories(basePackages = "com.baeldung.r2dbc.repository")
|
||||
public class R2DBCConfiguration extends AbstractR2dbcConfiguration {
|
||||
@Bean
|
||||
public H2ConnectionFactory connectionFactory() {
|
||||
return new H2ConnectionFactory(
|
||||
H2ConnectionConfiguration.builder()
|
||||
.url("mem:testdb;DB_CLOSE_DELAY=-1;TRACE_LEVEL_FILE=4")
|
||||
.username("sa")
|
||||
.build());
|
||||
}
|
||||
}
|
||||
+18
@@ -0,0 +1,18 @@
|
||||
package com.baeldung.r2dbc.model;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import org.springframework.data.annotation.Id;
|
||||
import org.springframework.data.relational.core.mapping.Table;
|
||||
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@Table
|
||||
public class Player {
|
||||
@Id
|
||||
Integer id;
|
||||
String name;
|
||||
Integer age;
|
||||
}
|
||||
+17
@@ -0,0 +1,17 @@
|
||||
package com.baeldung.r2dbc.repository;
|
||||
|
||||
import org.springframework.data.r2dbc.repository.Query;
|
||||
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
|
||||
|
||||
import com.baeldung.r2dbc.model.Player;
|
||||
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
public interface PlayerRepository extends ReactiveCrudRepository<Player, Integer> {
|
||||
|
||||
@Query("select id, name, age from player where name = $1")
|
||||
Flux<Player> findAllByName(String name);
|
||||
|
||||
@Query("select * from player where age = $1")
|
||||
Flux<Player> findByAge(int age);
|
||||
}
|
||||
+25
@@ -0,0 +1,25 @@
|
||||
package com.baeldung.reactive;
|
||||
|
||||
import com.mongodb.reactivestreams.client.MongoClient;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
|
||||
|
||||
@SpringBootApplication
|
||||
public class Spring5ReactiveApplication{
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(Spring5ReactiveApplication.class, args);
|
||||
}
|
||||
|
||||
@Autowired
|
||||
MongoClient mongoClient;
|
||||
|
||||
@Bean
|
||||
public ReactiveMongoTemplate reactiveMongoTemplate() {
|
||||
return new ReactiveMongoTemplate(mongoClient, "test");
|
||||
}
|
||||
|
||||
}
|
||||
+21
@@ -0,0 +1,21 @@
|
||||
package com.baeldung.reactive.model;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.ToString;
|
||||
import org.springframework.data.annotation.Id;
|
||||
import org.springframework.data.mongodb.core.mapping.Document;
|
||||
|
||||
@Document
|
||||
@Data
|
||||
@ToString
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
public class Account {
|
||||
|
||||
@Id
|
||||
private String id;
|
||||
private String owner;
|
||||
private Double value;
|
||||
}
|
||||
+15
@@ -0,0 +1,15 @@
|
||||
package com.baeldung.reactive.repository;
|
||||
|
||||
import com.baeldung.reactive.model.Account;
|
||||
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
|
||||
import org.springframework.stereotype.Repository;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
@Repository
|
||||
public interface AccountCrudRepository extends ReactiveCrudRepository<Account, String> {
|
||||
|
||||
public Flux<Account> findAllByValue(Double value);
|
||||
|
||||
public Mono<Account> findFirstByOwner(Mono<String> owner);
|
||||
}
|
||||
+7
@@ -0,0 +1,7 @@
|
||||
package com.baeldung.reactive.repository;
|
||||
|
||||
import com.baeldung.reactive.model.Account;
|
||||
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
|
||||
|
||||
public interface AccountMongoRepository extends ReactiveMongoRepository<Account, String> {
|
||||
}
|
||||
+15
@@ -0,0 +1,15 @@
|
||||
package com.baeldung.reactive.repository;
|
||||
|
||||
import com.baeldung.reactive.model.Account;
|
||||
import io.reactivex.Observable;
|
||||
import io.reactivex.Single;
|
||||
import org.springframework.data.repository.reactive.RxJava2CrudRepository;
|
||||
import org.springframework.stereotype.Repository;
|
||||
|
||||
@Repository
|
||||
public interface AccountRxJavaRepository extends RxJava2CrudRepository<Account, String>{
|
||||
|
||||
public Observable<Account> findAllByValue(Double value);
|
||||
|
||||
public Single<Account> findFirstByOwner(Single<String> owner);
|
||||
}
|
||||
+33
@@ -0,0 +1,33 @@
|
||||
package com.baeldung.reactive.template;
|
||||
|
||||
import com.baeldung.reactive.model.Account;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
|
||||
import org.springframework.data.mongodb.core.ReactiveRemoveOperation;
|
||||
import org.springframework.stereotype.Service;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
@Service
|
||||
public class AccountTemplateOperations {
|
||||
|
||||
@Autowired
|
||||
ReactiveMongoTemplate template;
|
||||
|
||||
public Mono<Account> findById(String id) {
|
||||
return template.findById(id, Account.class);
|
||||
}
|
||||
|
||||
public Flux<Account> findAll() {
|
||||
return template.findAll(Account.class);
|
||||
}
|
||||
|
||||
public Mono<Account> save(Mono<Account> account) {
|
||||
return template.save(account);
|
||||
}
|
||||
|
||||
public ReactiveRemoveOperation.ReactiveRemove<Account> deleteAll() {
|
||||
return template.remove(Account.class);
|
||||
}
|
||||
|
||||
}
|
||||
+11
@@ -0,0 +1,11 @@
|
||||
package com.baeldung.tailablecursor;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
|
||||
@SpringBootApplication
|
||||
public class LogsCounterApplication {
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(LogsCounterApplication.class, args);
|
||||
}
|
||||
}
|
||||
+21
@@ -0,0 +1,21 @@
|
||||
package com.baeldung.tailablecursor.domain;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import org.springframework.data.annotation.Id;
|
||||
import org.springframework.data.mongodb.core.mapping.Document;
|
||||
|
||||
@Data
|
||||
@Document
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class Log {
|
||||
@Id
|
||||
private String id;
|
||||
private String service;
|
||||
private LogLevel level;
|
||||
private String message;
|
||||
}
|
||||
+5
@@ -0,0 +1,5 @@
|
||||
package com.baeldung.tailablecursor.domain;
|
||||
|
||||
public enum LogLevel {
|
||||
ERROR, WARN, INFO
|
||||
}
|
||||
+12
@@ -0,0 +1,12 @@
|
||||
package com.baeldung.tailablecursor.repository;
|
||||
|
||||
import com.baeldung.tailablecursor.domain.Log;
|
||||
import com.baeldung.tailablecursor.domain.LogLevel;
|
||||
import org.springframework.data.mongodb.repository.Tailable;
|
||||
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
public interface LogsRepository extends ReactiveCrudRepository<Log, String> {
|
||||
@Tailable
|
||||
Flux<Log> findByLevel(LogLevel level);
|
||||
}
|
||||
+62
@@ -0,0 +1,62 @@
|
||||
package com.baeldung.tailablecursor.service;
|
||||
|
||||
import com.baeldung.tailablecursor.domain.Log;
|
||||
import com.baeldung.tailablecursor.domain.LogLevel;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.data.mongodb.core.MongoTemplate;
|
||||
import org.springframework.data.mongodb.core.mapping.Document;
|
||||
import org.springframework.data.mongodb.core.messaging.DefaultMessageListenerContainer;
|
||||
import org.springframework.data.mongodb.core.messaging.MessageListener;
|
||||
import org.springframework.data.mongodb.core.messaging.MessageListenerContainer;
|
||||
import org.springframework.data.mongodb.core.messaging.TailableCursorRequest;
|
||||
|
||||
import javax.annotation.PreDestroy;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.springframework.data.mongodb.core.query.Criteria.where;
|
||||
import static org.springframework.data.mongodb.core.query.Query.query;
|
||||
|
||||
@Slf4j
|
||||
public class ErrorLogsCounter implements LogsCounter {
|
||||
|
||||
private static final String LEVEL_FIELD_NAME = "level";
|
||||
|
||||
private final String collectionName;
|
||||
private final MessageListenerContainer container;
|
||||
|
||||
private final AtomicInteger counter = new AtomicInteger();
|
||||
|
||||
public ErrorLogsCounter(MongoTemplate mongoTemplate,
|
||||
String collectionName) {
|
||||
this.collectionName = collectionName;
|
||||
this.container = new DefaultMessageListenerContainer(mongoTemplate);
|
||||
|
||||
container.start();
|
||||
TailableCursorRequest<Log> request = getTailableCursorRequest();
|
||||
container.register(request, Log.class);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private TailableCursorRequest<Log> getTailableCursorRequest() {
|
||||
MessageListener<Document, Log> listener = message -> {
|
||||
log.info("ERROR log received: {}", message.getBody());
|
||||
counter.incrementAndGet();
|
||||
};
|
||||
|
||||
return TailableCursorRequest.builder()
|
||||
.collection(collectionName)
|
||||
.filter(query(where(LEVEL_FIELD_NAME).is(LogLevel.ERROR)))
|
||||
.publishTo(listener)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int count() {
|
||||
return counter.get();
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
public void close() {
|
||||
container.stop();
|
||||
}
|
||||
}
|
||||
+36
@@ -0,0 +1,36 @@
|
||||
package com.baeldung.tailablecursor.service;
|
||||
|
||||
import com.baeldung.tailablecursor.domain.Log;
|
||||
import com.baeldung.tailablecursor.domain.LogLevel;
|
||||
import com.baeldung.tailablecursor.repository.LogsRepository;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import reactor.core.Disposable;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import javax.annotation.PreDestroy;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
@Slf4j
|
||||
public class InfoLogsCounter implements LogsCounter {
|
||||
|
||||
private final AtomicInteger counter = new AtomicInteger();
|
||||
private final Disposable subscription;
|
||||
|
||||
public InfoLogsCounter(LogsRepository repository) {
|
||||
Flux<Log> stream = repository.findByLevel(LogLevel.INFO);
|
||||
this.subscription = stream.subscribe(logEntity -> {
|
||||
log.info("INFO log received: " + logEntity);
|
||||
counter.incrementAndGet();
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public int count() {
|
||||
return this.counter.get();
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
public void close() {
|
||||
this.subscription.dispose();
|
||||
}
|
||||
}
|
||||
+5
@@ -0,0 +1,5 @@
|
||||
package com.baeldung.tailablecursor.service;
|
||||
|
||||
public interface LogsCounter {
|
||||
int count();
|
||||
}
|
||||
+41
@@ -0,0 +1,41 @@
|
||||
package com.baeldung.tailablecursor.service;
|
||||
|
||||
import com.baeldung.tailablecursor.domain.Log;
|
||||
import com.baeldung.tailablecursor.domain.LogLevel;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.data.mongodb.core.ReactiveMongoOperations;
|
||||
import reactor.core.Disposable;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import javax.annotation.PreDestroy;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.springframework.data.mongodb.core.query.Criteria.where;
|
||||
import static org.springframework.data.mongodb.core.query.Query.query;
|
||||
|
||||
@Slf4j
|
||||
public class WarnLogsCounter implements LogsCounter {
|
||||
|
||||
private static final String LEVEL_FIELD_NAME = "level";
|
||||
|
||||
private final AtomicInteger counter = new AtomicInteger();
|
||||
private final Disposable subscription;
|
||||
|
||||
public WarnLogsCounter(ReactiveMongoOperations template) {
|
||||
Flux<Log> stream = template.tail(query(where(LEVEL_FIELD_NAME).is(LogLevel.WARN)), Log.class);
|
||||
subscription = stream.subscribe(logEntity -> {
|
||||
log.warn("WARN log received: " + logEntity);
|
||||
counter.incrementAndGet();
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public int count() {
|
||||
return counter.get();
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
public void close() {
|
||||
subscription.dispose();
|
||||
}
|
||||
}
|
||||
+4
@@ -0,0 +1,4 @@
|
||||
spring.couchbase.bucket.name=default
|
||||
spring.couchbase.bootstrap-hosts=localhost
|
||||
spring.couchbase.port=8091
|
||||
spring.couchbase.bucket.password=123456
|
||||
@@ -0,0 +1,13 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<configuration>
|
||||
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder>
|
||||
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
|
||||
</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<root level="INFO">
|
||||
<appender-ref ref="STDOUT" />
|
||||
</root>
|
||||
</configuration>
|
||||
Reference in New Issue
Block a user