public class Rx extends Object
RxJava's Observable
with Guava's ListenableFuture
, and also adds tracing capabilities.
TL;DR
// subscribe to values, termination, or both Rx.subscribe(listenableOrObservable, val -> doSomething(val)); // errors are passed to Errors.log() Rx.subscribe(listenableOrObservable, Rx.onTerminate(optionalError -> maybeHandleError()); // values are ignored Rx.subscribe(listenableOrObservable, Rx.onValueOrTerminate(val -> doSomething(val), optionalError -> maybeHandleError())); // receive callbacks on a specific executor Rx.on(someExecutor).subscribe(listenableOrObservable, val -> doSomething(val)); // call unsubscribe() on the subscription to cancel it rx.Subscription subscription = Rx.subscribe(listenableOrObservable, val -> doSomething);Long version: `Rx` implements both the
Observer
and FutureCallback
interfaces by mapping them to two `Consumer`s:
onValue(Consumer<T>)
onTerminate(Consumer<Optional<Throwable>>)
onFailure(Consumer<Throwable>)
onValueOrTerminate(Consumer<T>, Consumer<Optional<Throwable>>)
onValueOrFailure(Consumer<T>, Consumer<Throwable>)
Subscription
instances
which allow you to unsubscribe from futures in the same manner as for observables.
Scheduler
s rather than Executor
s,
a Scheduler is automatically created using Schedulers
. If you'd like to specify the Scheduler manually, you can use callbackOn(Executor, Scheduler)
or you can create an executor which implements RxExecutor.Has
.Modifier and Type | Method and Description |
---|---|
static RxExecutor |
callbackOn(Executor executor)
Mechanism for specifying a specific Executor.
|
static RxExecutor |
callbackOn(Executor executor,
Scheduler scheduler)
Mechanism for specifying a specific Executor (for ListenableFuture) and Scheduler (for Observable).
|
static <T> Observable<T> |
merge(IObservable<? extends T>... toMerge)
Merges a bunch of
IObservable s into a single Observable containing the most-recent value. |
static <T> RxListener<T> |
onFailure(Consumer<Throwable> onFailure)
Creates an Rx instance which will call the given consumer whenever the followed stream
or future completes with an error.
|
static <T> RxListener<T> |
onTerminate(Consumer<Optional<Throwable>> onTerminate)
Creates an Rx instance which will call the given consumer whenever the followed stream
or future completes, whether with an error or not.
|
static <T> RxListener<T> |
onTerminateLogError(Consumer<Optional<Throwable>> onTerminate)
Creates an Rx instance which will call the given consumer whenever the followed stream
or future completes, whether with an error or not, and the error (if present) will be logged.
|
static <T> RxListener<T> |
onValue(Consumer<T> onValue)
Creates an Rx instance which will call the given consumer whenever a value is received.
|
static <T> RxListener<T> |
onValueOnFailure(Consumer<T> onValue,
Consumer<Throwable> onFailure)
Creates an Rx instance which will call onValue whenever a value is received,
and onFailure if the stream or future completes with an error.
|
static <T> RxListener<T> |
onValueOnTerminate(Consumer<T> onValue,
Consumer<Optional<Throwable>> onTerminate)
Creates an Rx instance which will call onValue whenever a value is received,
is received, and onTerminate when the future or observable completes, whether with an error or not.
|
static <T> RxListener<T> |
onValueOnTerminateLogError(Consumer<T> onValue,
Consumer<Optional<Throwable>> onTerminate)
Creates an Rx instance which will call the given consumer whenever the followed stream
or future completes, whether with an error or not, and the error (if present) will automatically be logged.
|
static RxExecutor |
sameThreadExecutor() |
static <T> Subscription |
subscribe(CompletionStage<? extends T> future,
Consumer<T> listener) |
static <T> Subscription |
subscribe(CompletionStage<? extends T> future,
RxListener<T> listener) |
static <T> Subscription |
subscribe(IObservable<? extends T> observable,
Consumer<T> listener) |
static <T> Subscription |
subscribe(IObservable<? extends T> observable,
RxListener<T> listener) |
static <T> Subscription |
subscribe(ListenableFuture<? extends T> future,
Consumer<T> listener) |
static <T> Subscription |
subscribe(ListenableFuture<? extends T> future,
RxListener<T> listener) |
static <T> Subscription |
subscribe(Observable<? extends T> observable,
Consumer<T> listener) |
static <T> Subscription |
subscribe(Observable<? extends T> observable,
RxListener<T> listener) |
public static <T> RxListener<T> onValue(Consumer<T> onValue)
public static <T> RxListener<T> onTerminate(Consumer<Optional<Throwable>> onTerminate)
public static <T> RxListener<T> onTerminateLogError(Consumer<Optional<Throwable>> onTerminate)
public static <T> RxListener<T> onFailure(Consumer<Throwable> onFailure)
public static <T> RxListener<T> onValueOnTerminate(Consumer<T> onValue, Consumer<Optional<Throwable>> onTerminate)
public static <T> RxListener<T> onValueOnTerminateLogError(Consumer<T> onValue, Consumer<Optional<Throwable>> onTerminate)
public static <T> RxListener<T> onValueOnFailure(Consumer<T> onValue, Consumer<Throwable> onFailure)
public static <T> Subscription subscribe(Observable<? extends T> observable, RxListener<T> listener)
public static <T> Subscription subscribe(Observable<? extends T> observable, Consumer<T> listener)
public static <T> Subscription subscribe(IObservable<? extends T> observable, RxListener<T> listener)
public static <T> Subscription subscribe(IObservable<? extends T> observable, Consumer<T> listener)
public static <T> Subscription subscribe(ListenableFuture<? extends T> future, RxListener<T> listener)
public static <T> Subscription subscribe(ListenableFuture<? extends T> future, Consumer<T> listener)
public static <T> Subscription subscribe(CompletionStage<? extends T> future, RxListener<T> listener)
public static <T> Subscription subscribe(CompletionStage<? extends T> future, Consumer<T> listener)
public static RxExecutor callbackOn(Executor executor)
public static RxExecutor callbackOn(Executor executor, Scheduler scheduler)
public static RxExecutor sameThreadExecutor()
@SafeVarargs public static <T> Observable<T> merge(IObservable<? extends T>... toMerge)
IObservable
s into a single Observable
containing the most-recent value.