diff --git a/rxjava/src/main/java/com/baelding/rxjava/ColdObservableBackpressure.java b/rxjava/src/main/java/com/baelding/rxjava/ColdObservableBackpressure.java index 9855123a3b..4d3d09da7e 100644 --- a/rxjava/src/main/java/com/baelding/rxjava/ColdObservableBackpressure.java +++ b/rxjava/src/main/java/com/baelding/rxjava/ColdObservableBackpressure.java @@ -5,7 +5,10 @@ import rx.schedulers.Schedulers; public class ColdObservableBackpressure { public static void main(String[] args) throws InterruptedException { - Observable.range(1, 1_000_000).observeOn(Schedulers.computation()).subscribe(v -> ComputeFunction.compute(v), Throwable::printStackTrace); + Observable + .range(1, 1_000_000) + .observeOn(Schedulers.computation()) + .subscribe(ComputeFunction::compute, Throwable::printStackTrace); Thread.sleep(10_000); diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressureBatching.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressureBatching.java index 6acda7eaad..9f7b8c107a 100644 --- a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressureBatching.java +++ b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressureBatching.java @@ -5,9 +5,12 @@ import rx.subjects.PublishSubject; public class HotObservableBackpressureBatching { public static void main(String[] args) throws InterruptedException { - PublishSubject source = PublishSubject.create(); + PublishSubject source = PublishSubject. create(); - source.window(500).observeOn(Schedulers.computation()).subscribe(ComputeFunction::compute, Throwable::printStackTrace); + source + .window(500) + .observeOn(Schedulers.computation()) + .subscribe(ComputeFunction::compute, Throwable::printStackTrace); for (int i = 0; i < 1_000_000; i++) { source.onNext(i); diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressureBuffering.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressureBuffering.java index 50638f4c8a..720918878f 100644 --- a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressureBuffering.java +++ b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressureBuffering.java @@ -5,9 +5,12 @@ import rx.subjects.PublishSubject; public class HotObservableBackpressureBuffering { public static void main(String[] args) throws InterruptedException { - PublishSubject source = PublishSubject.create(); + PublishSubject source = PublishSubject. create(); - source.buffer(1024).observeOn(Schedulers.computation()).subscribe(ComputeFunction::compute, Throwable::printStackTrace); + source + .buffer(1024) + .observeOn(Schedulers.computation()) + .subscribe(ComputeFunction::compute, Throwable::printStackTrace); for (int i = 0; i < 1_000_000; i++) { source.onNext(i); diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressureSkipping.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressureSkipping.java index f6f8b9f563..5b0a7f3a68 100644 --- a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressureSkipping.java +++ b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressureSkipping.java @@ -7,11 +7,12 @@ import java.util.concurrent.TimeUnit; public class HotObservableBackpressureSkipping { public static void main(String[] args) throws InterruptedException { - PublishSubject source = PublishSubject.create(); + PublishSubject source = PublishSubject. create(); source.sample(100, TimeUnit.MILLISECONDS) - // .throttleFirst(100, TimeUnit.MILLISECONDS) - .observeOn(Schedulers.computation()).subscribe(ComputeFunction::compute, Throwable::printStackTrace); + //.throttleFirst(100, TimeUnit.MILLISECONDS) + .observeOn(Schedulers.computation()) + .subscribe(ComputeFunction::compute, Throwable::printStackTrace); for (int i = 0; i < 1_000_000; i++) { source.onNext(i); diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableOnBackpressure.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableOnBackpressure.java index afef8027bf..8ccd38beb6 100644 --- a/rxjava/src/main/java/com/baelding/rxjava/HotObservableOnBackpressure.java +++ b/rxjava/src/main/java/com/baelding/rxjava/HotObservableOnBackpressure.java @@ -6,12 +6,21 @@ import rx.schedulers.Schedulers; public class HotObservableOnBackpressure { public static void main(String[] args) throws InterruptedException { - Observable.range(1, 1_000_000).onBackpressureBuffer(16, () -> { - }, BackpressureOverflow.ON_OVERFLOW_DROP_OLDEST).observeOn(Schedulers.computation()).subscribe(e -> { - }, Throwable::printStackTrace); + Observable + .range(1, 1_000_000) + .onBackpressureBuffer(16, () -> { + }, BackpressureOverflow.ON_OVERFLOW_DROP_OLDEST) + .observeOn(Schedulers.computation()) + .subscribe(e -> { + }, Throwable::printStackTrace); - Observable.range(1, 1_000_000).onBackpressureDrop().observeOn(Schedulers.io()).doOnNext(ComputeFunction::compute).subscribe(v -> { - }, Throwable::printStackTrace); + Observable + .range(1, 1_000_000) + .onBackpressureDrop() + .observeOn(Schedulers.io()) + .doOnNext(ComputeFunction::compute) + .subscribe(v -> { + }, Throwable::printStackTrace); Thread.sleep(10_000); } diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableWithoutBackpressure.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableWithoutBackpressure.java index 7745dbe5c4..439db51695 100644 --- a/rxjava/src/main/java/com/baelding/rxjava/HotObservableWithoutBackpressure.java +++ b/rxjava/src/main/java/com/baelding/rxjava/HotObservableWithoutBackpressure.java @@ -1,16 +1,15 @@ package com.baelding.rxjava; - import rx.schedulers.Schedulers; import rx.subjects.PublishSubject; public class HotObservableWithoutBackpressure { public static void main(String[] args) throws InterruptedException { - PublishSubject source = PublishSubject.create(); - - source.observeOn(Schedulers.computation()) - .subscribe(ComputeFunction::compute, Throwable::printStackTrace); + PublishSubject source = PublishSubject. create(); + source + .observeOn(Schedulers.computation()) + .subscribe(ComputeFunction::compute, Throwable::printStackTrace); for (int i = 0; i < 1_000_000; i++) { source.onNext(i);