public class Rx<T> extends Object implements Observer<T>, FutureCallback<T>
RxJava's Observable
with Guava's ListenableFuture
, and also adds tracing capabilities.
TL;DR
// subscribe to values, termination, or both Rx.subscribe(listenableOrObservable, val -> doSomething(val)); // errors are passed to Errors.log() Rx.subscribe(listenableOrObservable, Rx.onTerminate(optionalError -> maybeHandleError()); // values are ignored Rx.subscribe(listenableOrObservable, Rx.onValueOrTerminate(val -> doSomething(val), optionalError -> maybeHandleError())); // receive callbacks on a specific executor Rx.on(someExecutor).subscribe(listenableOrObservable, val -> doSomething(val)); // call unsubscribe() on the subscription to cancel it rx.Subscription subscription = Rx.subscribe(listenableOrObservable, val -> doSomething);Long version:
Rx
implements both the Observer
and FutureCallback
interfaces by mapping them to two Consumer
s:
Consumer<T> onValue
Consumer<Optional<Throwable>> onTerminate
Observable.onNext(T value) -> onValue.accept(value)
Observable.onCompleted() -> onTerminate.accept(Optional.empty())
Observable.onError(Throwable error) -> onTerminate.accept(Optional.of(error))
FutureCallback.onSuccess(T value) -> onValue.accept(value); onTerminate.accept(Optional.empty());
FutureCallback.onError(Throwable error) -> onTerminate.accept(Optional.of(error))
onValue(Consumer<T>)
onTerminate(Consumer<Optional<Throwable>>)
onFailure(Consumer<Throwable>)
onValueOrTerminate(Consumer<T>, Consumer<Optional<Throwable>>)
onValueOrFailure(Consumer<T>, Consumer<Throwable>)
rxObservable.subscribe(Rx.onValue(val -> doSomething(val));
Futures.addCallback(listenableFuture, Rx.onValue(val -> doSomething(val));
Rx.subscribe(listenableOrObservable, Rx.onValue(val -> doSomething(val)));
Rx.subscribe(listenableOrObservable, val -> doSomething(val)); // automatically uses Rx.onValue()
Subscription
instances
which allow you to unsubscribe from futures in the same manner as for observables.
subscription = Rx.subscribe( ... )
Rx.on(someExecutor).subscribe( ... )
Scheduler
s rather than Executor
s,
a Scheduler is automatically created using Schedulers
. If you'd like to specify the Scheduler manually, you can use on(Executor, Scheduler)
or you can create an executor which implements Rx.HasRxExecutor
.Modifier and Type | Class and Description |
---|---|
static interface |
Rx.HasRxExecutor
Marker interface which allows an Executor to specify its own Scheduler.
|
static class |
Rx.RxExecutor
This class holds an instance of Executor (for ListenableFuture) and
Scheduler (for Observable).
|
Modifier | Constructor and Description |
---|---|
protected |
Rx(Consumer<T> onValue,
Consumer<Optional<Throwable>> onTerminate) |
Modifier and Type | Method and Description |
---|---|
static Rx.RxExecutor |
on(Executor executor)
Mechanism for specifying a specific Executor.
|
static Rx.RxExecutor |
on(Executor executor,
Scheduler scheduler)
Mechanism for specifying a specific Executor (for ListenableFuture) and Scheduler (for Observable).
|
void |
onCompleted() |
void |
onError(Throwable e) |
static <T> Rx<T> |
onFailure(Consumer<Throwable> onFailure)
Creates an Rx instance which will call the given consumer whenever the followed stream
or future completes with an error.
|
void |
onFailure(Throwable e) |
void |
onNext(T t) |
void |
onSuccess(T result) |
static <T> Rx<T> |
onTerminate(Consumer<Optional<Throwable>> onTerminate)
Creates an Rx instance which will call the given consumer whenever the followed stream
or future completes, whether with an error or not.
|
static <T> Rx<T> |
onTerminateLogError(Consumer<Optional<Throwable>> onTerminate)
Creates an Rx instance which will call the given consumer whenever the followed stream
or future completes, whether with an error or not, and the error (if present) will be logged.
|
static <T> Rx<T> |
onValue(Consumer<T> onValue)
Creates an Rx instance which will call the given consumer whenever a value is received.
|
static <T> Rx<T> |
onValueOnFailure(Consumer<T> onValue,
Consumer<Throwable> onFailure)
Creates an Rx instance which will call onValue whenever a value is received,
and onFailure if the stream or future completes with an error.
|
static <T> Rx<T> |
onValueOnTerminate(Consumer<T> onValue,
Consumer<Optional<Throwable>> onTerminate)
Creates an Rx instance which will call onValue whenever a value is received,
is received, and onTerminate when the future or observable completes, whether with an error or not.
|
static <T> Rx<T> |
onValueOnTerminateLogError(Consumer<T> onValue,
Consumer<Optional<Throwable>> onTerminate)
Creates an Rx instance which will call the given consumer whenever the followed stream
or future completes, whether with an error or not, and the error (if present) will automatically be logged.
|
static <T> Subscription |
subscribe(CompletionStage<? extends T> future,
Consumer<T> listener) |
static <T> Subscription |
subscribe(CompletionStage<? extends T> future,
Rx<T> listener) |
static <T> Subscription |
subscribe(IObservable<? extends T> observable,
Consumer<T> listener) |
static <T> Subscription |
subscribe(IObservable<? extends T> observable,
Rx<T> listener) |
static <T> Subscription |
subscribe(ListenableFuture<? extends T> future,
Consumer<T> listener) |
static <T> Subscription |
subscribe(ListenableFuture<? extends T> future,
Rx<T> listener) |
static <T> Subscription |
subscribe(Observable<? extends T> observable,
Consumer<T> listener) |
static <T> Subscription |
subscribe(Observable<? extends T> observable,
Rx<T> listener) |
public static <T> Rx<T> onValue(Consumer<T> onValue)
public static <T> Rx<T> onTerminate(Consumer<Optional<Throwable>> onTerminate)
public static <T> Rx<T> onTerminateLogError(Consumer<Optional<Throwable>> onTerminate)
public static <T> Rx<T> onFailure(Consumer<Throwable> onFailure)
public static <T> Rx<T> onValueOnTerminate(Consumer<T> onValue, Consumer<Optional<Throwable>> onTerminate)
public static <T> Rx<T> onValueOnTerminateLogError(Consumer<T> onValue, Consumer<Optional<Throwable>> onTerminate)
public static <T> Rx<T> onValueOnFailure(Consumer<T> onValue, Consumer<Throwable> onFailure)
public final void onCompleted()
onCompleted
in interface Observer<T>
public static <T> Subscription subscribe(Observable<? extends T> observable, Rx<T> listener)
public static <T> Subscription subscribe(Observable<? extends T> observable, Consumer<T> listener)
public static <T> Subscription subscribe(IObservable<? extends T> observable, Rx<T> listener)
public static <T> Subscription subscribe(IObservable<? extends T> observable, Consumer<T> listener)
public final void onSuccess(T result)
onSuccess
in interface FutureCallback<T>
public final void onFailure(Throwable e)
onFailure
in interface FutureCallback<T>
public static <T> Subscription subscribe(ListenableFuture<? extends T> future, Rx<T> listener)
public static <T> Subscription subscribe(ListenableFuture<? extends T> future, Consumer<T> listener)
public static <T> Subscription subscribe(CompletionStage<? extends T> future, Rx<T> listener)
public static <T> Subscription subscribe(CompletionStage<? extends T> future, Consumer<T> listener)
public static Rx.RxExecutor on(Executor executor)
public static Rx.RxExecutor on(Executor executor, Scheduler scheduler)