public final class Flowables extends Object
Modifier and Type | Method and Description |
---|---|
static <T,S> Flowable<T> |
generateAsync(Supplier<S> initialState,
BiFunction<S,FlowableAsyncEmitter<T>,S> asyncGenerator,
Consumer<? super S> stateCleanup)
Generates items by invoking a callback, for each downstream request one by one, that sets up an
asynchronous call to some API that eventually responds with an item, an error or termination, while
making sure there is only one such outstanding API call in progress and honoring the
backpressure of the downstream.
|
static Flowable<Long> |
intervalBackpressure(long initialDelay,
long period,
TimeUnit unit)
Periodically tries to emit an ever increasing long value or
buffers (efficiently) such emissions until the downstream requests.
|
static Flowable<Long> |
intervalBackpressure(long initialDelay,
long period,
TimeUnit unit,
Scheduler scheduler)
Periodically tries to emit an ever increasing long value or
buffers (efficiently) such emissions until the downstream requests.
|
static Flowable<Long> |
intervalBackpressure(long period,
TimeUnit unit)
Periodically tries to emit an ever increasing long value or
buffers (efficiently) such emissions until the downstream requests.
|
static Flowable<Long> |
intervalBackpressure(long period,
TimeUnit unit,
Scheduler scheduler)
Periodically tries to emit an ever increasing long value or
buffers (efficiently) such emissions until the downstream requests.
|
static <T extends Comparable<? super T>> |
orderedMerge(boolean delayErrors,
int prefetch,
Publisher<T>... sources)
Merges the source Publishers in an ordered fashion picking the smallest of the available value from
them (determined by their natural order), allows delaying any error they may signal and sets the prefetch
amount when requesting from these Publishers.
|
static <T extends Comparable<? super T>> |
orderedMerge(boolean delayErrors,
Publisher<T>... sources)
Merges the source Publishers in an ordered fashion picking the smallest of the available value from
them (determined by their natural order) and allows delaying any error they may signal.
|
static <T> Flowable<T> |
orderedMerge(Comparator<? super T> comparator,
boolean delayErrors,
int prefetch,
Publisher<T>... sources)
Merges the source Publishers in an ordered fashion picking the smallest of the available value from
them (determined by the Comparator), allows delaying any error they may signal and sets the prefetch
amount when requesting from these Publishers.
|
static <T> Flowable<T> |
orderedMerge(Comparator<? super T> comparator,
boolean delayErrors,
Publisher<T>... sources)
Merges the source Publishers in an ordered fashion picking the smallest of the available value from
them (determined by the Comparator) and allows delaying any error they may signal.
|
static <T> Flowable<T> |
orderedMerge(Comparator<? super T> comparator,
Publisher<T>... sources)
Merges the source Publishers in an ordered fashion picking the smallest of the available value from
them (determined by the Comparator).
|
static <T extends Comparable<? super T>> |
orderedMerge(Iterable<? extends Publisher<T>> sources)
Merges the source Publishers in an ordered fashion picking the smallest of the available value from
them (determined by their natural order).
|
static <T extends Comparable<? super T>> |
orderedMerge(Iterable<? extends Publisher<T>> sources,
boolean delayErrors)
Merges the source Publishers in an ordered fashion picking the smallest of the available value from
them (determined by their natural order) and allows delaying any error they may signal.
|
static <T extends Comparable<? super T>> |
orderedMerge(Iterable<? extends Publisher<T>> sources,
boolean delayErrors,
int prefetch)
Merges the source Publishers in an ordered fashion picking the smallest of the available value from
them (determined by their natural order), allows delaying any error they may signal and sets the prefetch
amount when requesting from these Publishers.
|
static <T> Flowable<T> |
orderedMerge(Iterable<? extends Publisher<T>> sources,
Comparator<? super T> comparator)
Merges the source Publishers in an ordered fashion picking the smallest of the available value from
them (determined by the Comparator).
|
static <T> Flowable<T> |
orderedMerge(Iterable<? extends Publisher<T>> sources,
Comparator<? super T> comparator,
boolean delayErrors)
Merges the source Publishers in an ordered fashion picking the smallest of the available value from
them (determined by the Comparator) and allows delaying any error they may signal.
|
static <T> Flowable<T> |
orderedMerge(Iterable<? extends Publisher<T>> sources,
Comparator<? super T> comparator,
boolean delayErrors,
int prefetch)
Merges the source Publishers in an ordered fashion picking the smallest of the available value from
them (determined by the Comparator), allows delaying any error they may signal and sets the prefetch
amount when requesting from these Publishers.
|
static <T extends Comparable<? super T>> |
orderedMerge(Publisher<T>... sources)
Merges the source Publishers in an ordered fashion picking the smallest of the available value from
them (determined by their natural order).
|
static <T> Flowable<T> |
repeat(T item)
Repeats a scalar value indefinitely.
|
static <T> Flowable<T> |
repeatSupplier(Supplier<T> supplier)
Repeatedly calls the given Supplier to produce items indefinitely.
|
static <T,R> Flowable<R> |
zipLatest(Function<? super Object[],? extends R> combiner,
Publisher<? extends T>... sources)
Zips the latest available values of the source Publishers via a combiner function where the
emission rate is determined by the slowest Publisher and the downstream consumption rate.
|
static <T,R> Flowable<R> |
zipLatest(Function<? super Object[],? extends R> combiner,
Scheduler scheduler,
Publisher<? extends T>... sources)
Zips the latest available values of the source Publishers via a combiner function where the
emission rate is determined by the slowest Publisher and the downstream consumption rate.
|
static <T,R> Flowable<R> |
zipLatest(Iterable<? extends Publisher<? extends T>> sources,
Function<? super Object[],? extends R> combiner)
Zips the latest available values of the source Publishers via a combiner function where the
emission rate is determined by the slowest Publisher and the downstream consumption rate.
|
static <T,R> Flowable<R> |
zipLatest(Iterable<? extends Publisher<? extends T>> sources,
Function<? super Object[],? extends R> combiner,
Scheduler scheduler)
Zips the latest available values of the source Publishers via a combiner function where the
emission rate is determined by the slowest Publisher and the downstream consumption rate.
|
static <T1,T2,R> Flowable<R> |
zipLatest(Publisher<T1> source1,
Publisher<T2> source2,
BiFunction<? super T1,? super T2,? extends R> combiner)
Zips the latest available values of the source Publishers via a combiner function where the
emission rate is determined by the slowest Publisher and the downstream consumption rate.
|
static <T1,T2,R> Flowable<R> |
zipLatest(Publisher<T1> source1,
Publisher<T2> source2,
BiFunction<? super T1,? super T2,? extends R> combiner,
Scheduler scheduler)
Zips the latest available values of the source Publishers via a combiner function where the
emission rate is determined by the slowest Publisher and the downstream consumption rate.
|
static <T1,T2,T3,R> |
zipLatest(Publisher<T1> source1,
Publisher<T2> source2,
Publisher<T3> source3,
Function3<? super T1,? super T2,? super T3,? extends R> combiner)
Zips the latest available values of the source Publishers via a combiner function where the
emission rate is determined by the slowest Publisher and the downstream consumption rate.
|
static <T1,T2,T3,R> |
zipLatest(Publisher<T1> source1,
Publisher<T2> source2,
Publisher<T3> source3,
Function3<? super T1,? super T2,? super T3,? extends R> combiner,
Scheduler scheduler)
Zips the latest available values of the source Publishers via a combiner function where the
emission rate is determined by the slowest Publisher and the downstream consumption rate.
|
static <T1,T2,T3,T4,R> |
zipLatest(Publisher<T1> source1,
Publisher<T2> source2,
Publisher<T3> source3,
Publisher<T4> source4,
Function4<? super T1,? super T2,? super T3,? super T4,? extends R> combiner)
Zips the latest available values of the source Publishers via a combiner function where the
emission rate is determined by the slowest Publisher and the downstream consumption rate.
|
static <T1,T2,T3,T4,R> |
zipLatest(Publisher<T1> source1,
Publisher<T2> source2,
Publisher<T3> source3,
Publisher<T4> source4,
Function4<? super T1,? super T2,? super T3,? super T4,? extends R> combiner,
Scheduler scheduler)
Zips the latest available values of the source Publishers via a combiner function where the
emission rate is determined by the slowest Publisher and the downstream consumption rate.
|
@BackpressureSupport(value=FULL) @SchedulerSupport(value="none") @SafeVarargs public static <T extends Comparable<? super T>> Flowable<T> orderedMerge(Publisher<T>... sources)
T
- the value type of all sourcessources
- the iterable sequence of sources@BackpressureSupport(value=FULL) @SchedulerSupport(value="none") @SafeVarargs public static <T extends Comparable<? super T>> Flowable<T> orderedMerge(boolean delayErrors, Publisher<T>... sources)
T
- the value type of all sourcessources
- the iterable sequence of sourcesdelayErrors
- if true, source errors are delayed until all sources terminate in some way@BackpressureSupport(value=FULL) @SchedulerSupport(value="none") @SafeVarargs public static <T extends Comparable<? super T>> Flowable<T> orderedMerge(boolean delayErrors, int prefetch, Publisher<T>... sources)
T
- the value type of all sourcessources
- the iterable sequence of sourcesdelayErrors
- if true, source errors are delayed until all sources terminate in some wayprefetch
- the number of items to prefetch from the sources@BackpressureSupport(value=FULL) @SchedulerSupport(value="none") @SafeVarargs public static <T> Flowable<T> orderedMerge(Comparator<? super T> comparator, Publisher<T>... sources)
T
- the value type of all sourcessources
- the iterable sequence of sourcescomparator
- the comparator to use for comparing items;
it is called with the last known smallest in its first argument@BackpressureSupport(value=FULL) @SchedulerSupport(value="none") @SafeVarargs public static <T> Flowable<T> orderedMerge(Comparator<? super T> comparator, boolean delayErrors, Publisher<T>... sources)
T
- the value type of all sourcessources
- the iterable sequence of sourcescomparator
- the comparator to use for comparing items;
it is called with the last known smallest in its first argumentdelayErrors
- if true, source errors are delayed until all sources terminate in some way@BackpressureSupport(value=FULL) @SchedulerSupport(value="none") @SafeVarargs public static <T> Flowable<T> orderedMerge(Comparator<? super T> comparator, boolean delayErrors, int prefetch, Publisher<T>... sources)
T
- the value type of all sourcessources
- the iterable sequence of sourcescomparator
- the comparator to use for comparing items;
it is called with the last known smallest in its first argumentdelayErrors
- if true, source errors are delayed until all sources terminate in some wayprefetch
- the number of items to prefetch from the sources@BackpressureSupport(value=FULL) @SchedulerSupport(value="none") public static <T> Flowable<T> orderedMerge(Iterable<? extends Publisher<T>> sources, Comparator<? super T> comparator)
T
- the value type of all sourcessources
- the iterable sequence of sourcescomparator
- the comparator to use for comparing items;
it is called with the last known smallest in its first argument@BackpressureSupport(value=FULL) @SchedulerSupport(value="none") public static <T> Flowable<T> orderedMerge(Iterable<? extends Publisher<T>> sources, Comparator<? super T> comparator, boolean delayErrors)
T
- the value type of all sourcessources
- the iterable sequence of sourcescomparator
- the comparator to use for comparing items;
it is called with the last known smallest in its first argumentdelayErrors
- if true, source errors are delayed until all sources terminate in some way@BackpressureSupport(value=FULL) @SchedulerSupport(value="none") public static <T> Flowable<T> orderedMerge(Iterable<? extends Publisher<T>> sources, Comparator<? super T> comparator, boolean delayErrors, int prefetch)
T
- the value type of all sourcessources
- the iterable sequence of sourcescomparator
- the comparator to use for comparing items;
it is called with the last known smallest in its first argumentdelayErrors
- if true, source errors are delayed until all sources terminate in some wayprefetch
- the number of items to prefetch from the sources@BackpressureSupport(value=FULL) @SchedulerSupport(value="none") public static <T extends Comparable<? super T>> Flowable<T> orderedMerge(Iterable<? extends Publisher<T>> sources)
T
- the value type of all sourcessources
- the iterable sequence of sources@BackpressureSupport(value=FULL) @SchedulerSupport(value="none") public static <T extends Comparable<? super T>> Flowable<T> orderedMerge(Iterable<? extends Publisher<T>> sources, boolean delayErrors)
T
- the value type of all sourcessources
- the iterable sequence of sourcesdelayErrors
- if true, source errors are delayed until all sources terminate in some way@BackpressureSupport(value=FULL) @SchedulerSupport(value="none") public static <T extends Comparable<? super T>> Flowable<T> orderedMerge(Iterable<? extends Publisher<T>> sources, boolean delayErrors, int prefetch)
T
- the value type of all sourcessources
- the iterable sequence of sourcesdelayErrors
- if true, source errors are delayed until all sources terminate in some wayprefetch
- the number of items to prefetch from the sources@BackpressureSupport(value=FULL) @SchedulerSupport(value="none") public static <T> Flowable<T> repeat(T item)
T
- the value typeitem
- the value to repeat@BackpressureSupport(value=FULL) @SchedulerSupport(value="none") public static <T> Flowable<T> repeatSupplier(Supplier<T> supplier)
T
- the value typesupplier
- the Supplier to call@BackpressureSupport(value=FULL) @SchedulerSupport(value="io.reactivex:computation") public static Flowable<Long> intervalBackpressure(long period, TimeUnit unit)
computation
Scheduler
to time
the emission and likely deliver the value (unless backpressured).period
- the emission period (including the delay for the first emission)unit
- the emission time unit@BackpressureSupport(value=FULL) @SchedulerSupport(value="custom") public static Flowable<Long> intervalBackpressure(long period, TimeUnit unit, Scheduler scheduler)
computation
Scheduler
to time
the emission and likely deliver the value (unless backpressured).period
- the emission period (including the delay for the first emission)unit
- the emission time unitscheduler
- the scheduler to use for timing and likely emitting items@BackpressureSupport(value=FULL) @SchedulerSupport(value="io.reactivex:computation") public static Flowable<Long> intervalBackpressure(long initialDelay, long period, TimeUnit unit)
computation
Scheduler
to time
the emission and likely deliver the value (unless backpressured).initialDelay
- the initial delay before emitting the first 0Lperiod
- the emission period after the first emissionunit
- the emission time unit@BackpressureSupport(value=FULL) @SchedulerSupport(value="custom") public static Flowable<Long> intervalBackpressure(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
Scheduler
provided to time
the emission and likely deliver the value (unless backpressured).initialDelay
- the initial delay before emitting the first 0Lperiod
- the emission period (including the delay for the first emission)unit
- the emission time unitscheduler
- the scheduler to use for timing and likely emitting items@BackpressureSupport(value=UNBOUNDED_IN) @SchedulerSupport(value="none") @SafeVarargs public static <T,R> Flowable<R> zipLatest(Function<? super Object[],? extends R> combiner, Publisher<? extends T>... sources)
Non-consumed source values are overwritten by newer values. Unlike combineLatest
, source
values are not reused to form new combinations.
If any of the sources runs out of items, the other sources are cancelled and the sequence completes.
A: ---o-o-o------o-o----o---o-|-------
B: ---------x-x--x-------x-----x--x---
======= zipLatest (o, x -> M) ========
R: ---------M----M-------M-----M|-----
Scheduler
and the combined item emission happens on the thread that won the internal emission-right race.T
- the common source value typeR
- the result typecombiner
- the function receiving the latest values of the sources and returns a value
to be emitted to the downstream.sources
- the array of source Publishers to zip/combine@BackpressureSupport(value=UNBOUNDED_IN) @SchedulerSupport(value="custom") @SafeVarargs public static <T,R> Flowable<R> zipLatest(Function<? super Object[],? extends R> combiner, Scheduler scheduler, Publisher<? extends T>... sources)
Non-consumed source values are overwritten by newer values. Unlike combineLatest
, source
values are not reused to form new combinations.
If any of the sources runs out of items, the other sources are cancelled and the sequence completes.
A: ---o-o-o------o-o----o---o-|-------
B: ---------x-x--x-------x-----x--x---
======= zipLatest (o, x -> M) ========
R: ---------M----M-------M-----M|-----
Scheduler
provided.T
- the common source value typeR
- the result typecombiner
- the function receiving the latest values of the sources and returns a value
to be emitted to the downstream.sources
- the array of source Publishers to zip/combinescheduler
- the Scheduler to use for emitting items and/or terminal signals@BackpressureSupport(value=UNBOUNDED_IN) @SchedulerSupport(value="none") public static <T,R> Flowable<R> zipLatest(Iterable<? extends Publisher<? extends T>> sources, Function<? super Object[],? extends R> combiner)
Non-consumed source values are overwritten by newer values. Unlike combineLatest
, source
values are not reused to form new combinations.
If any of the sources runs out of items, the other sources are cancelled and the sequence completes.
A: ---o-o-o------o-o----o---o-|-------
B: ---------x-x--x-------x-----x--x---
======= zipLatest (o, x -> M) ========
R: ---------M----M-------M-----M|-----
Scheduler
scheduler
and the combined item emission happens on the thread that won the internal emission-right race.T
- the common source value typeR
- the result typecombiner
- the function receiving the latest values of the sources and returns a value
to be emitted to the downstream.sources
- the array of source Publishers to zip/combine@BackpressureSupport(value=UNBOUNDED_IN) @SchedulerSupport(value="custom") public static <T,R> Flowable<R> zipLatest(Iterable<? extends Publisher<? extends T>> sources, Function<? super Object[],? extends R> combiner, Scheduler scheduler)
Non-consumed source values are overwritten by newer values. Unlike combineLatest
, source
values are not reused to form new combinations.
If any of the sources runs out of items, the other sources are cancelled and the sequence completes.
A: ---o-o-o------o-o----o---o-|-------
B: ---------x-x--x-------x-----x--x---
======= zipLatest (o, x -> M) ========
R: ---------M----M-------M-----M|-----
Scheduler
provided.T
- the common source value typeR
- the result typecombiner
- the function receiving the latest values of the sources and returns a value
to be emitted to the downstream.sources
- the array of source Publishers to zip/combinescheduler
- the Scheduler to use for emitting items and/or terminal signals@BackpressureSupport(value=UNBOUNDED_IN) @SchedulerSupport(value="none") public static <T1,T2,R> Flowable<R> zipLatest(Publisher<T1> source1, Publisher<T2> source2, BiFunction<? super T1,? super T2,? extends R> combiner)
Non-consumed source values are overwritten by newer values. Unlike combineLatest
, source
values are not reused to form new combinations.
If any of the sources runs out of items, the other sources are cancelled and the sequence completes.
A: ---o-o-o------o-o----o---o-|-------
B: ---------x-x--x-------x-----x--x---
======= zipLatest (o, x -> M) ========
R: ---------M----M-------M-----M|-----
Scheduler
scheduler
and the combined item emission happens on the thread that won the internal emission-right race.T1
- the value type of the first source PublisherT2
- the value type of the second source PublisherR
- the result typecombiner
- the function receiving the latest values of the sources and returns a value
to be emitted to the downstream.source1
- the first source Publisher instancesource2
- the second source Publisher instance@BackpressureSupport(value=UNBOUNDED_IN) @SchedulerSupport(value="custom") public static <T1,T2,R> Flowable<R> zipLatest(Publisher<T1> source1, Publisher<T2> source2, BiFunction<? super T1,? super T2,? extends R> combiner, Scheduler scheduler)
Non-consumed source values are overwritten by newer values. Unlike combineLatest
, source
values are not reused to form new combinations.
If any of the sources runs out of items, the other sources are cancelled and the sequence completes.
A: ---o-o-o------o-o----o---o-|-------
B: ---------x-x--x-------x-----x--x---
======= zipLatest (o, x -> M) ========
R: ---------M----M-------M-----M|-----
Scheduler
provided.T1
- the value type of the first source PublisherT2
- the value type of the second source PublisherR
- the result typecombiner
- the function receiving the latest values of the sources and returns a value
to be emitted to the downstream.source1
- the first source Publisher instancesource2
- the second source Publisher instancescheduler
- the Scheduler to use for emitting items and/or terminal signals@BackpressureSupport(value=UNBOUNDED_IN) @SchedulerSupport(value="none") public static <T1,T2,T3,R> Flowable<R> zipLatest(Publisher<T1> source1, Publisher<T2> source2, Publisher<T3> source3, Function3<? super T1,? super T2,? super T3,? extends R> combiner)
Non-consumed source values are overwritten by newer values. Unlike combineLatest
, source
values are not reused to form new combinations.
If any of the sources runs out of items, the other sources are cancelled and the sequence completes.
A: ---o-o-o------o-o----o---o-|-------
B: ---------x-x--x-------x-----x--x---
======= zipLatest (o, x -> M) ========
R: ---------M----M-------M-----M|-----
Scheduler
scheduler
and the combined item emission happens on the thread that won the internal emission-right race.T1
- the value type of the first source PublisherT2
- the value type of the second source PublisherT3
- the value type of the third source PublisherR
- the result typecombiner
- the function receiving the latest values of the sources and returns a value
to be emitted to the downstream.source1
- the first source Publisher instancesource2
- the second source Publisher instancesource3
- the third source Publisher instance@BackpressureSupport(value=UNBOUNDED_IN) @SchedulerSupport(value="custom") public static <T1,T2,T3,R> Flowable<R> zipLatest(Publisher<T1> source1, Publisher<T2> source2, Publisher<T3> source3, Function3<? super T1,? super T2,? super T3,? extends R> combiner, Scheduler scheduler)
Non-consumed source values are overwritten by newer values. Unlike combineLatest
, source
values are not reused to form new combinations.
If any of the sources runs out of items, the other sources are cancelled and the sequence completes.
A: ---o-o-o------o-o----o---o-|-------
B: ---------x-x--x-------x-----x--x---
======= zipLatest (o, x -> M) ========
R: ---------M----M-------M-----M|-----
Scheduler
provided.T1
- the value type of the first source PublisherT2
- the value type of the second source PublisherT3
- the value type of the third source PublisherR
- the result typecombiner
- the function receiving the latest values of the sources and returns a value
to be emitted to the downstream.source1
- the first source Publisher instancesource2
- the second source Publisher instancesource3
- the third source Publisher instancescheduler
- the Scheduler to use for emitting items and/or terminal signals@BackpressureSupport(value=UNBOUNDED_IN) @SchedulerSupport(value="none") public static <T1,T2,T3,T4,R> Flowable<R> zipLatest(Publisher<T1> source1, Publisher<T2> source2, Publisher<T3> source3, Publisher<T4> source4, Function4<? super T1,? super T2,? super T3,? super T4,? extends R> combiner)
Non-consumed source values are overwritten by newer values. Unlike combineLatest
, source
values are not reused to form new combinations.
If any of the sources runs out of items, the other sources are cancelled and the sequence completes.
A: ---o-o-o------o-o----o---o-|-------
B: ---------x-x--x-------x-----x--x---
======= zipLatest (o, x -> M) ========
R: ---------M----M-------M-----M|-----
Scheduler
scheduler
and the combined item emission happens on the thread that won the internal emission-right race.T1
- the value type of the first source PublisherT2
- the value type of the second source PublisherT3
- the value type of the third source PublisherT4
- the value type of the fourth source PublisherR
- the result typecombiner
- the function receiving the latest values of the sources and returns a value
to be emitted to the downstream.source1
- the first source Publisher instancesource2
- the second source Publisher instancesource3
- the third source Publisher instancesource4
- the fourth source Publisher instance@BackpressureSupport(value=UNBOUNDED_IN) @SchedulerSupport(value="custom") public static <T1,T2,T3,T4,R> Flowable<R> zipLatest(Publisher<T1> source1, Publisher<T2> source2, Publisher<T3> source3, Publisher<T4> source4, Function4<? super T1,? super T2,? super T3,? super T4,? extends R> combiner, Scheduler scheduler)
Non-consumed source values are overwritten by newer values. Unlike combineLatest
, source
values are not reused to form new combinations.
If any of the sources runs out of items, the other sources are cancelled and the sequence completes.
A: ---o-o-o------o-o----o---o-|-------
B: ---------x-x--x-------x-----x--x---
======= zipLatest (o, x -> M) ========
R: ---------M----M-------M-----M|-----
Scheduler
provided.T1
- the value type of the first source PublisherT2
- the value type of the second source PublisherT3
- the value type of the third source PublisherT4
- the value type of the fourth source PublisherR
- the result typecombiner
- the function receiving the latest values of the sources and returns a value
to be emitted to the downstream.source1
- the first source Publisher instancesource2
- the second source Publisher instancesource3
- the third source Publisher instancesource4
- the fourth source Publisher instancescheduler
- the Scheduler to use for emitting items and/or terminal signals@CheckReturnValue @BackpressureSupport(value=FULL) @SchedulerSupport(value="none") public static <T,S> Flowable<T> generateAsync(Supplier<S> initialState, BiFunction<S,FlowableAsyncEmitter<T>,S> asyncGenerator, Consumer<? super S> stateCleanup)
This operator allows the bridging of the asynchronous and backpressurable world with the reactive world, where backpressure is the emergent effect of making sure there is only one outstanding API call at a time which responds with at most one item per invocation.
Note that the implementation may have one outstanding API call even if the downstream hasn't requested more and as such, the resulting item may get cached until the downstream requests for more.
During the async response, the invocation protocol of the FlowableAsyncEmitter
should be as follows:
(onNext | onNothing)? (onError | onComplete)?
In words, an onNext
or onNothing
(which indicates this particular API call resulted in no
items and the next API call can proceed) may be followed by a terminal event.
The methods Emitter.onNext(Object)
, Emitter.onError(Throwable)
,
Emitter.onComplete()
and FlowableAsyncEmitter.onNothing()
should not be called
concurrently with each other or outside the context of the generator. The rest of the
FlowableAsyncEmitter
methods are thread-safe.
Example:
Let's assume there is an async API with the following interface definition:
interface AsyncAPI<T> extends AutoCloseable {
CompletableFuture<Void> nextValue(Consumer<? super T> onValue);
}
When the call succeeds, the onValue
is invoked with it. If there are no more items, the
CompletableFuture
returned by the last nextValue
is completed (with null).
If there is an error, the same CompletableFuture
is completed exceptionally. Each
nextValue
invocation creates a fresh CompletableFuture
which can be cancelled
if necesary. nextValue
should not be invoked again until the onValue
callback
has been notified.AsyncAPI
instance supplied for each individual Subscriber
. The API can be transformed into
a Flowable
as follows:
Flowable<Integer> source = Flowable.<Integer, AsyncAPI<Integer>>generateAsync(
// create a fresh API instance for each individual Subscriber
() -> new AsyncAPIImpl<Integer>(),
// this BiFunction will be called once the operator is ready to receive the next item
// and will invoke it again only when that item is delivered via emitter.onNext()
(state, emitter) -> {
// issue the async API call
CompletableFuture<Void> f = state.nextValue(
// handle the value received
value -> {
// we have the option to signal that item
if (value % 2 == 0) {
emitter.onNext(value);
} else if (value == 101) {
// or stop altogether, which will also trigger a cleanup
emitter.onComplete();
} else {
// or drop it and have the operator start a new call
emitter.onNothing();
}
}
);
// This API call may not produce further items or fail
f.whenComplete((done, error) -> {
// As per the CompletableFuture API, error != null is the error outcome,
// done is always null due to the Void type
if (error != null) {
emitter.onError(error);
} else {
emitter.onComplete();
}
});
// In case the downstream cancels, the current API call
// should be cancelled as well
emitter.replaceCancellable(() -> f.cancel(true));
// some sources may want to create a fresh state object
// after each invocation of this generator
return state;
},
// cleanup the state object
state -> { state.close(); }
)
generateAsync
does not operate by default on a particular Scheduler
, however,
the signals emitted through the FlowableAsyncEmitter
may happen on any thread,
depending on the asynchronous API.T
- the generated item typeS
- the state associated with an individual subscription.initialState
- the Supplier
that returns a state object for each individual
Subscriber
to the returned Flowable
.asyncGenerator
- the BiFunction
called with the current state value and the
FlowableAsyncEmitter
object and should return a new state value
as well as prepare and issue the async API call in a way that
the call's outcome is (eventually) converted into onNext
, onError
or
onComplete
calls. The operator ensures the BiFunction
is
only invoked when the previous async call produced onNext
item and
this item has been delivered to the downstream.stateCleanup
- called at most once with the current state object to allow cleaning it up after
the flow is cancelled or terminates via Emitter.onError(Throwable)
or Emitter.onComplete()
.Flowable.generate(Supplier, BiFunction, Consumer)
,
FlowableAsyncEmitter