diff --git a/rxjava/src/main/java/com/baelding/rxjava/ColdObservableBackpressure.java b/rxjava/src/main/java/com/baelding/rxjava/ColdObservableBackpressure.java new file mode 100644 index 0000000000..cebc2d35f6 --- /dev/null +++ b/rxjava/src/main/java/com/baelding/rxjava/ColdObservableBackpressure.java @@ -0,0 +1,44 @@ +package com.baelding.rxjava; + + +import rx.Observable; +import rx.Subscriber; +import rx.schedulers.Schedulers; + +public class ColdObservableBackPressure { + public static void main(String[] args) throws InterruptedException { + Observable.range(1, 1_000_000) + .observeOn(Schedulers.computation()) + .subscribe(v -> ComputeFunction.compute(v), Throwable::printStackTrace); + + Thread.sleep(10_000); + + +// Observable.range(1, 1_000_000) //implementation of reactive pull backpressure on cold observable +// .subscribe(new Subscriber() { +// @Override +// public void onStart() { +// request(1); +// } +// +// public void onNext(Integer v) { +// compute(v); +// +// request(1); +// } +// +// @Override +// public void onError(Throwable ex) { +// ex.printStackTrace(); +// } +// +// @Override +// public void onCompleted() { +// System.out.println("Done!"); +// } +// }); + + } + + +} diff --git a/rxjava/src/main/java/com/baelding/rxjava/ComputeFunction.java b/rxjava/src/main/java/com/baelding/rxjava/ComputeFunction.java new file mode 100644 index 0000000000..f83f34c3ee --- /dev/null +++ b/rxjava/src/main/java/com/baelding/rxjava/ComputeFunction.java @@ -0,0 +1,37 @@ +package com.baelding.rxjava; + + +import rx.Observable; + +import java.util.List; + +public class ComputeFunction { + public static void compute(Integer v) { + try { + System.out.println("compute integer v: " + v); + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + public static void compute(List v) { + try { + System.out.println("compute integer v: " + v); + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + public static void compute(Observable v) { + try { + System.out.println("compute integer v: " + v); + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + +} diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureBatching.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureBatching.java new file mode 100644 index 0000000000..42c705774a --- /dev/null +++ b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureBatching.java @@ -0,0 +1,22 @@ +package com.baelding.rxjava; + +import rx.schedulers.Schedulers; +import rx.subjects.PublishSubject; + +public class HotObservableBackPressureBatching { + public static void main(String[] args) throws InterruptedException { + PublishSubject source = PublishSubject.create(); + + //buffer + source.window(500) + .observeOn(Schedulers.computation()) + .subscribe(ComputeFunction::compute, Throwable::printStackTrace); + + + for (int i = 0; i < 1_000_000; i++) { + source.onNext(i); + } + Thread.sleep(10_000); + } + +} diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureBuffering.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureBuffering.java new file mode 100644 index 0000000000..38451bab05 --- /dev/null +++ b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureBuffering.java @@ -0,0 +1,23 @@ +package com.baelding.rxjava; + +import rx.schedulers.Schedulers; +import rx.subjects.PublishSubject; + + +public class HotObservableBackPressureBuffering { + public static void main(String[] args) throws InterruptedException { + PublishSubject source = PublishSubject.create(); + + //buffer + source + .buffer(1024) + .observeOn(Schedulers.computation()) + .subscribe(ComputeFunction::compute, Throwable::printStackTrace); + + + for (int i = 0; i < 1_000_000; i++) { + source.onNext(i); + } + Thread.sleep(10_000); + } +} diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureSkipping.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureSkipping.java new file mode 100644 index 0000000000..691198c0fa --- /dev/null +++ b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackPressureSkipping.java @@ -0,0 +1,27 @@ +package com.baelding.rxjava; + +import rx.schedulers.Schedulers; +import rx.subjects.PublishSubject; + +import java.util.concurrent.TimeUnit; + +public class HotObservableBackPressureSkipping { + public static void main(String[] args) throws InterruptedException { + PublishSubject source = PublishSubject.create(); + + //buffer + source +// .debounce(1, TimeUnit.SECONDS) +// .sample(1, TimeUnit.SECONDS) +// .throttleFirst(100, TimeUnit.MILLISECONDS) + .throttleLast(100, TimeUnit.MILLISECONDS) + .observeOn(Schedulers.computation()) + .subscribe(ComputeFunction::compute, Throwable::printStackTrace); + + + for (int i = 0; i < 1_000_000; i++) { + source.onNext(i); + } + Thread.sleep(10_000); + } +} diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressure.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressure.java deleted file mode 100644 index bc4f6710e5..0000000000 --- a/rxjava/src/main/java/com/baelding/rxjava/HotObservableBackpressure.java +++ /dev/null @@ -1,30 +0,0 @@ -package com.baelding.rxjava; - - -import rx.schedulers.Schedulers; -import rx.subjects.PublishSubject; - -public class HotObservableBackPressure { - public static void main(String[] args) throws InterruptedException { - PublishSubject source = PublishSubject.create(); - - source - .observeOn(Schedulers.computation()) - .subscribe(v -> compute(v), Throwable::printStackTrace); - - for (int i = 0; i < 1_000_000; i++) { - source.onNext(i); - } - - Thread.sleep(10_000); - } - - private static void compute(Integer v) { - try { - System.out.println("compute integer v: "+ v); - Thread.sleep(1000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } -} diff --git a/rxjava/src/main/java/com/baelding/rxjava/HotObservableWithoutBackPressure.java b/rxjava/src/main/java/com/baelding/rxjava/HotObservableWithoutBackPressure.java new file mode 100644 index 0000000000..eea595b3b4 --- /dev/null +++ b/rxjava/src/main/java/com/baelding/rxjava/HotObservableWithoutBackPressure.java @@ -0,0 +1,21 @@ +package com.baelding.rxjava; + + +import rx.schedulers.Schedulers; +import rx.subjects.PublishSubject; + +public class HotObservableWithoutBackPressure { + public static void main(String[] args) throws InterruptedException { + PublishSubject source = PublishSubject.create(); + + //buffer + source.observeOn(Schedulers.computation()) + .subscribe(ComputeFunction::compute, Throwable::printStackTrace); + + + for (int i = 0; i < 1_000_000; i++) { + source.onNext(i); + } + Thread.sleep(10_000); + } +}