diff --git a/rxjava/src/main/java/com/baelding/rxjava/ComputeFunction.java b/rxjava/src/main/java/com/baelding/rxjava/ComputeFunction.java index 557eb04612..bebccd0300 100644 --- a/rxjava/src/main/java/com/baelding/rxjava/ComputeFunction.java +++ b/rxjava/src/main/java/com/baelding/rxjava/ComputeFunction.java @@ -26,7 +26,7 @@ public class ComputeFunction { public static void compute(Observable v) { try { - System.out.println("compute integer v: " + v); + v.forEach(System.out::println); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureBatching.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureBatching.java index 42c705774a..c13f94c4f7 100644 --- a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureBatching.java +++ b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureBatching.java @@ -7,7 +7,6 @@ public class HotObservableBackPressureBatching { public static void main(String[] args) throws InterruptedException { PublishSubject source = PublishSubject.create(); - //buffer source.window(500) .observeOn(Schedulers.computation()) .subscribe(ComputeFunction::compute, Throwable::printStackTrace); diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureBuffering.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureBuffering.java index 38451bab05..67922e2efb 100644 --- a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureBuffering.java +++ b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureBuffering.java @@ -8,7 +8,6 @@ public class HotObservableBackPressureBuffering { public static void main(String[] args) throws InterruptedException { PublishSubject source = PublishSubject.create(); - //buffer source .buffer(1024) .observeOn(Schedulers.computation()) diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureSkipping.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureSkipping.java index 691198c0fa..2e8244166f 100644 --- a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureSkipping.java +++ b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureSkipping.java @@ -9,12 +9,8 @@ public class HotObservableBackPressureSkipping { public static void main(String[] args) throws InterruptedException { PublishSubject source = PublishSubject.create(); - //buffer - source -// .debounce(1, TimeUnit.SECONDS) -// .sample(1, TimeUnit.SECONDS) -// .throttleFirst(100, TimeUnit.MILLISECONDS) - .throttleLast(100, TimeUnit.MILLISECONDS) + source.sample(100, TimeUnit.MILLISECONDS) +// .throttleFirst(100, TimeUnit.MILLISECONDS) .observeOn(Schedulers.computation()) .subscribe(ComputeFunction::compute, Throwable::printStackTrace); diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableOnBackPressure.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableOnBackPressure.java index f61536e827..bf86312fff 100644 --- a/rxjava/src/main/java/com/baelding/rxjava/HotObservableOnBackPressure.java +++ b/rxjava/src/main/java/com/baelding/rxjava/HotObservableOnBackPressure.java @@ -5,8 +5,6 @@ import rx.BackpressureOverflow; import rx.Observable; import rx.schedulers.Schedulers; -import java.util.concurrent.TimeUnit; - public class HotObservableOnBackPressure { public static void main(String[] args) throws InterruptedException { Observable.range(1, 1_000_000) @@ -18,9 +16,8 @@ public class HotObservableOnBackPressure { }, Throwable::printStackTrace); - Observable.interval(1, TimeUnit.MINUTES) + Observable.range(1, 1_000_000) .onBackpressureDrop() -// .onBackpressureLatest() .observeOn(Schedulers.io()) .doOnNext(ComputeFunction::compute) .subscribe(v -> { diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableWithoutBackPressure.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableWithoutBackPressure.java index eea595b3b4..07ff5b42c0 100644 --- a/rxjava/src/main/java/com/baelding/rxjava/HotObservableWithoutBackPressure.java +++ b/rxjava/src/main/java/com/baelding/rxjava/HotObservableWithoutBackPressure.java @@ -8,9 +8,8 @@ public class HotObservableWithoutBackPressure { public static void main(String[] args) throws InterruptedException { PublishSubject source = PublishSubject.create(); - //buffer source.observeOn(Schedulers.computation()) - .subscribe(ComputeFunction::compute, Throwable::printStackTrace); + .subscribe(ComputeFunction::compute, Throwable::printStackTrace); for (int i = 0; i < 1_000_000; i++) {