
Enhances coroutines with additional operators and utilities for reactive programming, replicating features found in frameworks like RxJS, RxJava, and RxSwift. Includes operators for creating, transforming, and managing flows, such as `concat`, `defer`, `interval`, `bufferCount`, and `flatMapFirst`, among others. Ideal for complex asynchronous data streams and error handling.
Kotlinx Coroutines Flow Extensions. Extensions to the Kotlin Flow library. Kotlin Flow extensions. Multiplatform Kotlinx Coroutines Flow Extensions. Multiplatform Extensions to the Kotlin Flow library. Multiplatform Kotlin Flow extensions. RxJS Kotlin Coroutines Flow. RxSwift Kotlin Coroutines Flow. RxJava Kotlin Coroutines Flow. RxJS Kotlin Flow. RxSwift Kotlin Flow. RxJava Kotlin Flow. RxJS Coroutines Flow. RxSwift Coroutines Flow. RxJava Coroutines Flow. Kotlin Flow operators. Coroutines Flow operators.
Liked some of my work? Buy me a coffee (or more likely a beer)
android.jvm.js (IR).wasmJs.iosArm64, iosX64, iosSimulatorArm64.watchosArm32, watchosArm64, watchosX64, watchosSimulatorArm64, watchosDeviceArm64.tvosX64, tvosSimulatorArm64, tvosArm64.macosX64, macosArm64.mingwX64linuxX64, linuxArm64.androidNativeArm32, androidNativeArm64, androidNativeX86, androidNativeX64.Note: I gladly accept PRs, ideas, opinions, or improvements. Thank you! :)
allprojects {
repositories {
...
mavenCentral()
}
}implementation("io.github.hoc081098:FlowExt:1.0.0")allprojects {
repositories {
...
maven(url = "https://central.sonatype.com/repository/maven-snapshots/")
}
}
dependencies {
implementation("io.github.hoc081098:FlowExt:1.0.1-SNAPSHOT")
}allprojects {
repositories {
...
maven { url "https://s01.oss.sonatype.org/content/repositories/snapshots/" }
}
}
dependencies {
implementation("io.github.hoc081098:FlowExt:1.0.1-SNAPSHOT")
}Create
Intermediate operators
bufferCountcombinecastcastNotNullcastNullablecatchAndReturn, catchAndResumechunkedsafeCastconcatWithstartWithflatMapFirstexhaustMapflattenFirstflatMapConcatEagermapEagerflattenEagerexhaustAllgroupByignoreElementsmapIndexedmapTomapToUnitmapToResultmapResultCatchingthrowFailurematerializedematerializeraceWithambWithpairwiserepeatretryWhenWithDelayStrategyretryWhenWithExponentialBackoffretryWithExponentialBackoffscanWithselectskipUntildropUntiltakeUntilthrottleTimewithLatestFromzipWithNextplusBuffers the source Flow values until the size hits the maximum bufferSize given.
Note, chunked is an alias to bufferCount.
range(start = 0, count = 10)
.bufferCount(bufferSize = 3)
.collect { println("bufferCount: $it") }
println("---")
range(start = 0, count = 10)
.bufferCount(bufferSize = 3, startBufferEvery = 2)
.collect { println("bufferCount: $it") }Output:
bufferCount: [0, 1, 2]
bufferCount: [3, 4, 5]
bufferCount: [6, 7, 8]
bufferCount: [9]
---
bufferCount: [0, 1, 2]
bufferCount: [2, 3, 4]
bufferCount: [4, 5, 6]
bufferCount: [6, 7, 8]
bufferCount: [8, 9]
Creates an output Flow which sequentially emits all values from the first given Flow and then moves on to the next.
concat(
flow1 = flowOf(1, 2, 3),
flow2 = flowOf(4, 5, 6)
).collect { println("concat: $it") }Output:
concat: 1
concat: 2
concat: 3
concat: 4
concat: 5
concat: 6
Creates a Flow that, on collection, calls a Flow factory to make a Flow for each new FlowCollector.
In some circumstances, waiting until the last minute (that is, until collection time)
to generate the Flow can ensure that collectors receive the freshest data.
var count = 0L
val flow = defer {
delay(count)
flowOf(count++)
}
flow.collect { println("defer: $it") }
println("---")
flow.collect { println("defer: $it") }
println("---")
flow.collect { println("defer: $it") }Output:
defer: 0
---
defer: 1
---
defer: 2
Creates a cold flow that produces a single value from the given function.
It calls the function for each new FlowCollector.
See also flowFromSuspend for the suspend version.
var count = 0L
val flow = flowFromNonSuspend { count++ }
flow.collect { println("flowFromNonSuspend: $it") }
println("---")
flow.collect { println("flowFromNonSuspend: $it") }
println("---")
flow.collect { println("flowFromNonSuspend: $it") }Output:
flowFromNonSuspend: 0
---
flowFromNonSuspend: 1
---
flowFromNonSuspend: 2
Creates a cold flow that produces a single value from the given function.
It calls the function for each new FlowCollector.
See also flowFromNonSuspend for the non-suspend version.
var count = 0L
val flow = flowFromSuspend {
delay(count)
count++
}
flow.collect { println("flowFromSuspend: $it") }
println("---")
flow.collect { println("flowFromSuspend: $it") }
println("---")
flow.collect { println("flowFromSuspend: $it") }Output:
flowFromSuspend: 0
---
flowFromSuspend: 1
---
flowFromSuspend: 2
Returns a Flow that emits a 0L after the initialDelay and ever-increasing numbers
after each period of time thereafter.
interval(initialDelay = 100.milliseconds, period = 1.seconds)
.take(5)
.collect { println("interval: $it") }Output:
interval: 0
interval: 1
interval: 2
interval: 3
interval: 4
Returns a NeverFlow that never emits any values to the FlowCollector and never completes.
neverFlow()
.startWith(7)
.collect { println("neverFlow: $it") }
println("Completed!")Output:
neverFlow: 7
// Never prints "Completed!"
Mirrors the one Flow in an Iterable of several Flows that first either emits a value
or sends a termination event (error or complete event).
When you pass a number of source Flows to race, it will pass through the emissions
and events of exactly one of these Flows: the first one that sends an event to race,
either by emitting a value or sending an error or complete event.
race will cancel the emissions and events of all of the other source Flows.
race(
flow {
delay(100)
emit(1)
emit(2)
emit(3)
},
flow {
delay(200)
emit(2)
emit(3)
emit(4)
}
).collect { println("race: $it") }Output:
race: 1
race: 2
race: 3
Creates a Flow that emits a sequence of numbers within a specified range.
range(start = 0, count = 5)
.collect { println("range: $it") }Output:
range: 1
range: 2
range: 3
range: 4
Creates a Flow that will wait for a given duration, before emitting the value.
timer(value = Unit, duration = 1.seconds)
.collect { println("timer: $it") }Output:
// After 1 second
timer: kotlin.Unit
combine versions for 6 - 12 Flows.Catches exceptions in the flow completion, and emits a single item or resumes with another flow.
Similar to
flowOf(1, 2)
.concatWith(flow { throw RuntimeException("original error") })
.catchAndReturn(3)
.collect { v: Int -> println("catchAndReturn: $v") }
println("---")
flowOf(1, 2)
.concatWith(flow { throw RuntimeException("original error") })
.catchAndReturn { e: Throwable -> e.message?.length ?: 0 }
.collect { v: Int -> println("catchAndReturn: $v") }
println("---")
flowOf(1, 2)
.concatWith(flow { throw RuntimeException("original error") })
.catchAndResume(flowOf(3, 4))
.collect { v: Int -> println("catchAndResume: $v") }
println("---")
flowOf(1, 2)
.concatWith(flow { throw RuntimeException("original error") })
.catchAndResume { e: Throwable -> flowOf(e.message?.length ?: 0) }
.collect { v: Int -> println("catchAndResume: $v") }Output:
catchAndReturn: 1
catchAndReturn: 2
catchAndReturn: 3
---
catchAndReturn: 1
catchAndReturn: 2
catchAndReturn: 14
---
catchAndResume: 1
catchAndResume: 2
catchAndResume: 3
catchAndResume: 4
---
catchAndResume: 1
catchAndResume: 2
catchAndResume: 14
Adapt this Flow to be a Flow<R>.
This Flow is wrapped as a Flow<R> which checks at run-time that each value event emitted
by this Flow is also an instance of R.
At the collection time, if this Flow has any value that is not an instance of R,
a ClassCastException will be thrown.
flowOf<Any?>(1, 2, 3)
.cast<Int>()
.collect { v: Int -> println("cast: $v") }Output:
cast: 1
cast: 2
cast: 3
Adapt this Flow<T?> to be a Flow<T>.
At the collection time, if this Flow has any null value,
a NullPointerException will be thrown.
flowOf<Int?>(1, 2, 3)
.castNotNull()
.collect { v: Int -> println("castNotNull: $v") }Output:
castNotNull: 1
castNotNull: 2
castNotNull: 3
Adapt this Flow<*> to be a Flow<R?>.
At the collection time, if this Flow has any value that is not an instance of R, null will be emitted.
flowOf<Any?>(1, 2, 3, "Kotlin", null)
.safeCast<Int?>()
.collect { v: Int? -> println("safeCast: $v") }Output:
safeCast: 1
safeCast: 2
safeCast: 3
safeCast: null
safeCast: null
Returns a Flow that emits the items emitted from the current Flow, then the next, one after the other, without interleaving them.
Note, plus is an alias to concatWith.
flowOf(1, 2, 3)
.concatWith(flowOf(4, 5, 6))
.collect { println("concatWith: $it") }
println("---")
val flow1 = flowOf(1, 2, 3)
val flow2 = flowOf(4, 5, 6)
(flow1 + flow2).collect { println("plus: $it") }Output:
concatWith: 1
concatWith: 2
concatWith: 3
concatWith: 4
concatWith: 5
concatWith: 6
---
plus: 1
plus: 2
plus: 3
plus: 4
plus: 5
plus: 6
Returns a Flow that emits a specified item (or many items) before it begins to emit items emitted by the current Flow.
flowOf(1, 2, 3)
.startWith(0)
.collect { println("startWith: $i") }Output:
startWith: 0
startWith: 1
startWith: 2
startWith: 3
Projects each source value to a Flow which is merged in the output Flow only if the previous projected Flow has completed.
If value is received while there is some projected Flow sequence being merged, it will simply be ignored.
This method is a shortcut for map(transform).flattenFirst().
range(1, 5)
.onEach { delay(100) }
.flatMapFirst { timer(it, 130) }
.collect { println("flatMapFirst: $it") }Output:
flatMapFirst: 1
flatMapFirst: 3
flatMapFirst: 5
Converts a higher-order Flow into a first-order Flow by dropping inner Flow while the previous inner Flow has not yet completed.
range(1, 5)
.onEach { delay(100) }
.map { timer(it, 130) }
.flattenFirst()
.collect { println("flattenFirst: $it") }Output:
flattenFirst: 1
flattenFirst: 3
flattenFirst: 5
Groups the items emitted by the current Flow according to a specified criterion,
and emits these grouped items as GroupedFlows.
range(1, 10)
.groupBy { it % 2 }
.flatMapMerge { groupedFlow ->
groupedFlow
.map { groupedFlow.key to it }
}
.collect { println("groupBy: $it") }Output:
groupBy: (1, 1)
groupBy: (0, 2)
groupBy: (1, 3)
groupBy: (0, 4)
groupBy: (1, 5)
groupBy: (0, 6)
groupBy: (1, 7)
groupBy: (0, 8)
groupBy: (1, 9)
groupBy: (0, 10)
Ignores all elements emitted by the source Flow, only passes calls of complete or error.
flowOf("you", "talking", "to", "me")
.ignoreElements()
.materialize()
.collect { println("ignoreElements: $it") }Output:
ignoreElements: Event.Complete
Transforms elements emitted by the original Flow by applying transform, that returns another flow,
and then merging and flattening these flows.
This operator calls transform sequentially and then concatenates the resulting flows with a concurrency
limit on the number of concurrently collected flows.
It is a shortcut for map(transform).flattenConcatEager(concurrency).
range(1, 5)
.onEach { delay(100) }
.flatMapConcatEager(concurrency = 2) { v ->
timer(v, 130)
.onStart { println("flatMapConcatEager: onStart $v") }
.onCompletion { println("flatMapConcatEager: onCompletion $v") }
}
.collect { println("flatMapConcatEager: $it") }Output:
flatMapConcatEager: onStart 1
flatMapConcatEager: onStart 2
flatMapConcatEager: 1
flatMapConcatEager: onCompletion 1
flatMapConcatEager: onStart 3
flatMapConcatEager: 2
flatMapConcatEager: onCompletion 2
flatMapConcatEager: onStart 4
flatMapConcatEager: 3
flatMapConcatEager: onCompletion 3
flatMapConcatEager: onStart 5
flatMapConcatEager: 4
flatMapConcatEager: onCompletion 4
flatMapConcatEager: 5
flatMapConcatEager: onCompletion 5
Returns a flow containing the results of applying the given transform function
to each value and its index in the original flow.
range(1, 3)
.mapIndexed { index, value -> index to value }
.collect { println("mapIndexed: $it") }Output:
mapIndexed: (0, 1)
mapIndexed: (1, 2)
mapIndexed: (2, 3)
Emits the given constant value on the output Flow every time the source Flow emits a value.
range(1, 3)
.mapTo("Value")
.collect { println("mapTo: $it") }Output:
mapTo: Value
mapTo: Value
mapTo: Value
Emits kotlin.Unit value on the output Flow every time the source Flow emits a value.
range(1, 3)
.mapToUnit()
.collect { println("mapToUnit: $it") }Output:
mapToUnit: kotlin.Unit
mapToUnit: kotlin.Unit
mapToUnit: kotlin.Unit
Maps values in the Flow to successful results (aka Result.success),
and catches and wraps any exception into a failure result (aka Result.failure).
flowOf(1, 2)
.concatWith(flow { throw RuntimeException("error") })
.mapToResult()
.collect { result: Result<Int> -> println("mapToResult: $result") }Output:
mapToResult: Success(1)
mapToResult: Success(2)
mapToResult: Failure(java.lang.RuntimeException: error)
Maps a Flow of Results to a Flow of a mapped Results.
Any exception thrown by the transform function is caught,
and emitted as a failure result] (aka Result.failure) to the resulting flow.
flowOf(1, 2)
.concatWith(flow { throw RuntimeException("original error") })
.mapToResult()
.mapResultCatching {
if (it == 1) throw RuntimeException("another error")
else (it * 2).toString()
}
.collect { result: Result<String> -> println("mapResultCatching: $result") }Output:
mapResultCatching: Failure(java.lang.RuntimeException: another error)
mapResultCatching: Success(4)
mapResultCatching: Failure(java.lang.RuntimeException: original error)
Maps a Flow of Results to a Flow of values from successful results.
Failure results are re-thrown as exceptions.
try {
flowOf(1, 2)
.concatWith(flow { throw RuntimeException("original error") })
.mapToResult()
.throwFailure()
.collect { v: Int -> println("throwFailure: $v") }
} catch (e: Throwable) {
println("throwFailure: caught $e")
}Output:
throwFailure: 1
throwFailure: 2
throwFailure: caught java.lang.RuntimeException: original error
Represents all of the notifications from the source Flow as value emissions marked with their original types within Event objects.
flowOf(1, 2, 3)
.materialize()
.collect { println("materialize: $it") }Output:
materialize: Event.Value(1)
materialize: Event.Value(2)
materialize: Event.Value(3)
materialize: Event.Complete
Converts a Flow of Event objects into the emissions that they represent.
flowOf(Event.Value(1), Event.Value(2), Event.Value(3))
.dematerialize()
.collect { println("dematerialize: $it") }Output:
dematerialize: 1
dematerialize: 2
dematerialize: 3
Mirrors the current Flow or the other Flows provided of which the first either emits a value
or sends a termination event (error or complete event).
flow {
delay(100)
emit(1)
emit(2)
emit(3)
}.raceWith(
flow {
delay(200)
emit(2)
emit(3)
emit(4)
}
).collect { println("raceWith: $it") }Output:
raceWith: 1
raceWith: 2
raceWith: 3
Groups pairs of consecutive emissions together and emits them as a pair.
Emits the (n)th and (n-1)th events as a pair.
The first value won't be emitted until the second one arrives.
Note, zipWithNext is an alias to pairwise.
range(0, 4)
.pairwise()
.collect { println("pairwise: $it") }
println("---")
range(0, 4)
.zipWithNext { a, b -> "$a -> $b" }
.collect { println("zipWithNext: $it") }Output:
pairwise: (0, 1)
pairwise: (1, 2)
pairwise: (2, 3)
---
zipWithNext: 0 -> 1
zipWithNext: 1 -> 2
zipWithNext: 2 -> 3
Returns a Flow that will recollect to the source stream when the source stream completes.
flowFromSuspend {
println("Start collecting...")
Random
.nextInt(0..3)
.also { println("Emit: $it") }
}
.repeat(
delay = 1.seconds,
count = 10
)
.filter { it == 2 }
.take(1)
.collect { println("repeat: $it") }Output:
Start collecting...
Emit: 1
Start collecting...
Emit: 3
Start collecting...
Emit: 1
Start collecting...
Emit: 0
Start collecting...
Emit: 1
Start collecting...
Emit: 3
Start collecting...
Emit: 2
repeat: 2
Retries collection of the given flow when an exception occurs in the upstream flow and the
predicate returns true. The predicate also receives an attempt number as parameter,
starting from zero on the initial call. When predicate returns true, the next retries will be
delayed after a duration computed by DelayStrategy.nextDelay.
var count = -1
flowFromSuspend {
++count
println("Call count=$count")
when (count) {
0 -> throw MyException(message = "Will retry...", cause = null)
1 -> "Result: count=$count"
else -> error("Unexpected: count=$count")
}
}
.retryWhenWithDelayStrategy(
strategy = DelayStrategy.FixedTimeDelayStrategy(duration = 200.milliseconds),
predicate = { cause, attempt -> cause is MyException && attempt < 1 }
)
.collect { println("retryWhenWithDelayStrategy: $it") }Output:
Call count=0
Call count=1
retryWhenWithDelayStrategy: Result: count=1
Retries collection of the given flow with exponential backoff delay strategy
when an exception occurs in the upstream flow and the predicate returns true. When predicate returns true,
the next retries will be delayed after a duration computed by DelayStrategy.ExponentialBackoffDelayStrategy.
var count = -1
flowFromSuspend {
++count
println("Call count=$count")
when (count) {
0 -> throw MyException(message = "Will retry...", cause = null)
1 -> "Result: count=$count"
else -> error("Unexpected: count=$count")
}
}
.retryWhenWithExponentialBackoff(
initialDelay = 500.milliseconds,
factor = 2.0,
) { cause, attempt -> cause is MyException && attempt < 1 }
.collect { println("retryWhenWithExponentialBackoff: $it") }Output:
Call count=0
Call count=1
retryWhenWithExponentialBackoff: Result: count=1
Retries collection of the given flow with exponential backoff delay strategy
when an exception occurs in the upstream flow and the predicate returns true. When predicate returns true,
the next retries will be delayed after a duration computed by DelayStrategy.ExponentialBackoffDelayStrategy.
var count = -1
flowFromSuspend {
++count
println("Call count=$count")
when (count) {
0 -> throw MyException(message = "Will retry...", cause = null)
1 -> "Result: count=$count"
else -> error("Unexpected: count=$count")
}
}
.retryWithExponentialBackoff(
maxAttempt = 2,
initialDelay = 500.milliseconds,
factor = 2.0,
) { it is MyException }
.collect { println("retryWithExponentialBackoff: $it") }Output:
Call count=0
Call count=1
retryWithExponentialBackoff: Result: count=1
Folds the given flow with [operation], emitting every intermediate result, including the initial value supplied by [initialSupplier] at the collection time.
This is a variant of scan that the initial value is lazily supplied,
which is useful when the initial value is expensive to create
or depends on a logic that should be executed at the collection time (lazy semantics).
var count = 0
val mutex = Mutex()
suspend fun calculateInitialValue(): Int {
println("calculateInitialValue")
delay(1000)
return mutex.withLock { count++ }
}
flowOf(1, 2, 3)
.scanWith(::calculateInitialValue) { acc, e -> acc + e }
.collect { println("scanWith[1]: $it") }
flowOf(1, 2, 3)
.scanWith(::calculateInitialValue) { acc, e -> acc + e }
.collect { println("scanWith[2]: $it") }Output:
calculateInitialValue
scanWith[1]: 0
scanWith[1]: 1
scanWith[1]: 3
scanWith[1]: 6
calculateInitialValue
scanWith[2]: 1
scanWith[2]: 2
scanWith[2]: 4
scanWith[2]: 7
Inspirited by NgRx memoized selector.
Selectors are pure functions used for obtaining slices of a Flow of state.
FlowExt provides a few helper functions for optimizing this selection.
data class UiState(
val items: List<String> = emptyList(),
val term: String? = null,
val isLoading: Boolean = false,
val error: Throwable? = null
)
flow {
println("select: emit 1")
emit(UiState())
println("select: emit 2")
emit(
UiState(
items = listOf("a", "b", "c"),
term = "a",
isLoading = true,
error = Throwable("error")
)
)
println("select: emit 3")
emit(
UiState(
items = listOf("a", "b", "c"),
term = "a",
isLoading = false,
error = Throwable("error")
)
)
println("select: emit 4")
emit(
UiState(
items = listOf("a", "b", "c"),
term = "b",
isLoading = false,
error = Throwable("error")
)
)
}
.select(
selector1 = { it.items },
selector2 = { it.term },
projector = { items, term ->
term?.let { v ->
items.filter { it.contains(v, ignoreCase = true) }
}
}
)
.collect { println("select: $it") }Output:
select: emit 1
select: null
select: emit 2
select: [a]
select: emit 3
select: emit 4
select: [b]
Returns a Flow that skips items emitted by the source Flow until a second Flow emits a value or completes.
flowOf(1, 2, 3)
.onEach { delay(100) }
.skipUntil(timer(Unit, 150))
.collect { println("skipUntil: $it") }Output:
skipUntil: 2
skipUntil: 3
Emits the values emitted by the source Flow until a notifier Flow emits a value or completes.
range(0, 5)
.onEach { delay(100) }
.takeUntil(timer(Unit, 270.milliseconds))
.collect { println("takeUntil: $it") }Output:
takeUntil: 0
takeUntil: 1
Returns a Flow that emits a value from the source Flow, then ignores subsequent source values
for a duration determined by durationSelector, then repeats this process for the next source value.
(1..10)
.asFlow()
.onEach { delay(200) }
.throttleTime(500)
.collect { println("throttleTime: $it") }Output:
throttleTime: 1
throttleTime: 4
throttleTime: 7
throttleTime: 10
Merges two Flows into one Flow by combining each value from self with the latest value from the second Flow, if any.
Values emitted by self before the second Flow has emitted any values will be omitted.
range(0, 5)
.onEach { delay(100) }
.withLatestFrom(
range(0, 10)
.onEach { delay(70) }
)
.collect { println("withLatestFrom: $it") }Output:
withLatestFrom: (0, 0)
withLatestFrom: (1, 1)
withLatestFrom: (2, 3)
withLatestFrom: (3, 4)
withLatestFrom: (4, 6)
... and more, please check out Docs 1.x/Docs snapshot.
MIT License
Copyright (c) 2021-2024 Petrus Nguyễn Thái Học
Kotlinx Coroutines Flow Extensions. Extensions to the Kotlin Flow library. Kotlin Flow extensions. Multiplatform Kotlinx Coroutines Flow Extensions. Multiplatform Extensions to the Kotlin Flow library. Multiplatform Kotlin Flow extensions. RxJS Kotlin Coroutines Flow. RxSwift Kotlin Coroutines Flow. RxJava Kotlin Coroutines Flow. RxJS Kotlin Flow. RxSwift Kotlin Flow. RxJava Kotlin Flow. RxJS Coroutines Flow. RxSwift Coroutines Flow. RxJava Coroutines Flow. Kotlin Flow operators. Coroutines Flow operators.
Liked some of my work? Buy me a coffee (or more likely a beer)
android.jvm.js (IR).wasmJs.iosArm64, iosX64, iosSimulatorArm64.watchosArm32, watchosArm64, watchosX64, watchosSimulatorArm64, watchosDeviceArm64.tvosX64, tvosSimulatorArm64, tvosArm64.macosX64, macosArm64.mingwX64linuxX64, linuxArm64.androidNativeArm32, androidNativeArm64, androidNativeX86, androidNativeX64.Note: I gladly accept PRs, ideas, opinions, or improvements. Thank you! :)
allprojects {
repositories {
...
mavenCentral()
}
}implementation("io.github.hoc081098:FlowExt:1.0.0")allprojects {
repositories {
...
maven(url = "https://central.sonatype.com/repository/maven-snapshots/")
}
}
dependencies {
implementation("io.github.hoc081098:FlowExt:1.0.1-SNAPSHOT")
}allprojects {
repositories {
...
maven { url "https://s01.oss.sonatype.org/content/repositories/snapshots/" }
}
}
dependencies {
implementation("io.github.hoc081098:FlowExt:1.0.1-SNAPSHOT")
}Create
Intermediate operators
bufferCountcombinecastcastNotNullcastNullablecatchAndReturn, catchAndResumechunkedsafeCastconcatWithstartWithflatMapFirstexhaustMapflattenFirstflatMapConcatEagermapEagerflattenEagerexhaustAllgroupByignoreElementsmapIndexedmapTomapToUnitmapToResultmapResultCatchingthrowFailurematerializedematerializeraceWithambWithpairwiserepeatretryWhenWithDelayStrategyretryWhenWithExponentialBackoffretryWithExponentialBackoffscanWithselectskipUntildropUntiltakeUntilthrottleTimewithLatestFromzipWithNextplusBuffers the source Flow values until the size hits the maximum bufferSize given.
Note, chunked is an alias to bufferCount.
range(start = 0, count = 10)
.bufferCount(bufferSize = 3)
.collect { println("bufferCount: $it") }
println("---")
range(start = 0, count = 10)
.bufferCount(bufferSize = 3, startBufferEvery = 2)
.collect { println("bufferCount: $it") }Output:
bufferCount: [0, 1, 2]
bufferCount: [3, 4, 5]
bufferCount: [6, 7, 8]
bufferCount: [9]
---
bufferCount: [0, 1, 2]
bufferCount: [2, 3, 4]
bufferCount: [4, 5, 6]
bufferCount: [6, 7, 8]
bufferCount: [8, 9]
Creates an output Flow which sequentially emits all values from the first given Flow and then moves on to the next.
concat(
flow1 = flowOf(1, 2, 3),
flow2 = flowOf(4, 5, 6)
).collect { println("concat: $it") }Output:
concat: 1
concat: 2
concat: 3
concat: 4
concat: 5
concat: 6
Creates a Flow that, on collection, calls a Flow factory to make a Flow for each new FlowCollector.
In some circumstances, waiting until the last minute (that is, until collection time)
to generate the Flow can ensure that collectors receive the freshest data.
var count = 0L
val flow = defer {
delay(count)
flowOf(count++)
}
flow.collect { println("defer: $it") }
println("---")
flow.collect { println("defer: $it") }
println("---")
flow.collect { println("defer: $it") }Output:
defer: 0
---
defer: 1
---
defer: 2
Creates a cold flow that produces a single value from the given function.
It calls the function for each new FlowCollector.
See also flowFromSuspend for the suspend version.
var count = 0L
val flow = flowFromNonSuspend { count++ }
flow.collect { println("flowFromNonSuspend: $it") }
println("---")
flow.collect { println("flowFromNonSuspend: $it") }
println("---")
flow.collect { println("flowFromNonSuspend: $it") }Output:
flowFromNonSuspend: 0
---
flowFromNonSuspend: 1
---
flowFromNonSuspend: 2
Creates a cold flow that produces a single value from the given function.
It calls the function for each new FlowCollector.
See also flowFromNonSuspend for the non-suspend version.
var count = 0L
val flow = flowFromSuspend {
delay(count)
count++
}
flow.collect { println("flowFromSuspend: $it") }
println("---")
flow.collect { println("flowFromSuspend: $it") }
println("---")
flow.collect { println("flowFromSuspend: $it") }Output:
flowFromSuspend: 0
---
flowFromSuspend: 1
---
flowFromSuspend: 2
Returns a Flow that emits a 0L after the initialDelay and ever-increasing numbers
after each period of time thereafter.
interval(initialDelay = 100.milliseconds, period = 1.seconds)
.take(5)
.collect { println("interval: $it") }Output:
interval: 0
interval: 1
interval: 2
interval: 3
interval: 4
Returns a NeverFlow that never emits any values to the FlowCollector and never completes.
neverFlow()
.startWith(7)
.collect { println("neverFlow: $it") }
println("Completed!")Output:
neverFlow: 7
// Never prints "Completed!"
Mirrors the one Flow in an Iterable of several Flows that first either emits a value
or sends a termination event (error or complete event).
When you pass a number of source Flows to race, it will pass through the emissions
and events of exactly one of these Flows: the first one that sends an event to race,
either by emitting a value or sending an error or complete event.
race will cancel the emissions and events of all of the other source Flows.
race(
flow {
delay(100)
emit(1)
emit(2)
emit(3)
},
flow {
delay(200)
emit(2)
emit(3)
emit(4)
}
).collect { println("race: $it") }Output:
race: 1
race: 2
race: 3
Creates a Flow that emits a sequence of numbers within a specified range.
range(start = 0, count = 5)
.collect { println("range: $it") }Output:
range: 1
range: 2
range: 3
range: 4
Creates a Flow that will wait for a given duration, before emitting the value.
timer(value = Unit, duration = 1.seconds)
.collect { println("timer: $it") }Output:
// After 1 second
timer: kotlin.Unit
combine versions for 6 - 12 Flows.Catches exceptions in the flow completion, and emits a single item or resumes with another flow.
Similar to
flowOf(1, 2)
.concatWith(flow { throw RuntimeException("original error") })
.catchAndReturn(3)
.collect { v: Int -> println("catchAndReturn: $v") }
println("---")
flowOf(1, 2)
.concatWith(flow { throw RuntimeException("original error") })
.catchAndReturn { e: Throwable -> e.message?.length ?: 0 }
.collect { v: Int -> println("catchAndReturn: $v") }
println("---")
flowOf(1, 2)
.concatWith(flow { throw RuntimeException("original error") })
.catchAndResume(flowOf(3, 4))
.collect { v: Int -> println("catchAndResume: $v") }
println("---")
flowOf(1, 2)
.concatWith(flow { throw RuntimeException("original error") })
.catchAndResume { e: Throwable -> flowOf(e.message?.length ?: 0) }
.collect { v: Int -> println("catchAndResume: $v") }Output:
catchAndReturn: 1
catchAndReturn: 2
catchAndReturn: 3
---
catchAndReturn: 1
catchAndReturn: 2
catchAndReturn: 14
---
catchAndResume: 1
catchAndResume: 2
catchAndResume: 3
catchAndResume: 4
---
catchAndResume: 1
catchAndResume: 2
catchAndResume: 14
Adapt this Flow to be a Flow<R>.
This Flow is wrapped as a Flow<R> which checks at run-time that each value event emitted
by this Flow is also an instance of R.
At the collection time, if this Flow has any value that is not an instance of R,
a ClassCastException will be thrown.
flowOf<Any?>(1, 2, 3)
.cast<Int>()
.collect { v: Int -> println("cast: $v") }Output:
cast: 1
cast: 2
cast: 3
Adapt this Flow<T?> to be a Flow<T>.
At the collection time, if this Flow has any null value,
a NullPointerException will be thrown.
flowOf<Int?>(1, 2, 3)
.castNotNull()
.collect { v: Int -> println("castNotNull: $v") }Output:
castNotNull: 1
castNotNull: 2
castNotNull: 3
Adapt this Flow<*> to be a Flow<R?>.
At the collection time, if this Flow has any value that is not an instance of R, null will be emitted.
flowOf<Any?>(1, 2, 3, "Kotlin", null)
.safeCast<Int?>()
.collect { v: Int? -> println("safeCast: $v") }Output:
safeCast: 1
safeCast: 2
safeCast: 3
safeCast: null
safeCast: null
Returns a Flow that emits the items emitted from the current Flow, then the next, one after the other, without interleaving them.
Note, plus is an alias to concatWith.
flowOf(1, 2, 3)
.concatWith(flowOf(4, 5, 6))
.collect { println("concatWith: $it") }
println("---")
val flow1 = flowOf(1, 2, 3)
val flow2 = flowOf(4, 5, 6)
(flow1 + flow2).collect { println("plus: $it") }Output:
concatWith: 1
concatWith: 2
concatWith: 3
concatWith: 4
concatWith: 5
concatWith: 6
---
plus: 1
plus: 2
plus: 3
plus: 4
plus: 5
plus: 6
Returns a Flow that emits a specified item (or many items) before it begins to emit items emitted by the current Flow.
flowOf(1, 2, 3)
.startWith(0)
.collect { println("startWith: $i") }Output:
startWith: 0
startWith: 1
startWith: 2
startWith: 3
Projects each source value to a Flow which is merged in the output Flow only if the previous projected Flow has completed.
If value is received while there is some projected Flow sequence being merged, it will simply be ignored.
This method is a shortcut for map(transform).flattenFirst().
range(1, 5)
.onEach { delay(100) }
.flatMapFirst { timer(it, 130) }
.collect { println("flatMapFirst: $it") }Output:
flatMapFirst: 1
flatMapFirst: 3
flatMapFirst: 5
Converts a higher-order Flow into a first-order Flow by dropping inner Flow while the previous inner Flow has not yet completed.
range(1, 5)
.onEach { delay(100) }
.map { timer(it, 130) }
.flattenFirst()
.collect { println("flattenFirst: $it") }Output:
flattenFirst: 1
flattenFirst: 3
flattenFirst: 5
Groups the items emitted by the current Flow according to a specified criterion,
and emits these grouped items as GroupedFlows.
range(1, 10)
.groupBy { it % 2 }
.flatMapMerge { groupedFlow ->
groupedFlow
.map { groupedFlow.key to it }
}
.collect { println("groupBy: $it") }Output:
groupBy: (1, 1)
groupBy: (0, 2)
groupBy: (1, 3)
groupBy: (0, 4)
groupBy: (1, 5)
groupBy: (0, 6)
groupBy: (1, 7)
groupBy: (0, 8)
groupBy: (1, 9)
groupBy: (0, 10)
Ignores all elements emitted by the source Flow, only passes calls of complete or error.
flowOf("you", "talking", "to", "me")
.ignoreElements()
.materialize()
.collect { println("ignoreElements: $it") }Output:
ignoreElements: Event.Complete
Transforms elements emitted by the original Flow by applying transform, that returns another flow,
and then merging and flattening these flows.
This operator calls transform sequentially and then concatenates the resulting flows with a concurrency
limit on the number of concurrently collected flows.
It is a shortcut for map(transform).flattenConcatEager(concurrency).
range(1, 5)
.onEach { delay(100) }
.flatMapConcatEager(concurrency = 2) { v ->
timer(v, 130)
.onStart { println("flatMapConcatEager: onStart $v") }
.onCompletion { println("flatMapConcatEager: onCompletion $v") }
}
.collect { println("flatMapConcatEager: $it") }Output:
flatMapConcatEager: onStart 1
flatMapConcatEager: onStart 2
flatMapConcatEager: 1
flatMapConcatEager: onCompletion 1
flatMapConcatEager: onStart 3
flatMapConcatEager: 2
flatMapConcatEager: onCompletion 2
flatMapConcatEager: onStart 4
flatMapConcatEager: 3
flatMapConcatEager: onCompletion 3
flatMapConcatEager: onStart 5
flatMapConcatEager: 4
flatMapConcatEager: onCompletion 4
flatMapConcatEager: 5
flatMapConcatEager: onCompletion 5
Returns a flow containing the results of applying the given transform function
to each value and its index in the original flow.
range(1, 3)
.mapIndexed { index, value -> index to value }
.collect { println("mapIndexed: $it") }Output:
mapIndexed: (0, 1)
mapIndexed: (1, 2)
mapIndexed: (2, 3)
Emits the given constant value on the output Flow every time the source Flow emits a value.
range(1, 3)
.mapTo("Value")
.collect { println("mapTo: $it") }Output:
mapTo: Value
mapTo: Value
mapTo: Value
Emits kotlin.Unit value on the output Flow every time the source Flow emits a value.
range(1, 3)
.mapToUnit()
.collect { println("mapToUnit: $it") }Output:
mapToUnit: kotlin.Unit
mapToUnit: kotlin.Unit
mapToUnit: kotlin.Unit
Maps values in the Flow to successful results (aka Result.success),
and catches and wraps any exception into a failure result (aka Result.failure).
flowOf(1, 2)
.concatWith(flow { throw RuntimeException("error") })
.mapToResult()
.collect { result: Result<Int> -> println("mapToResult: $result") }Output:
mapToResult: Success(1)
mapToResult: Success(2)
mapToResult: Failure(java.lang.RuntimeException: error)
Maps a Flow of Results to a Flow of a mapped Results.
Any exception thrown by the transform function is caught,
and emitted as a failure result] (aka Result.failure) to the resulting flow.
flowOf(1, 2)
.concatWith(flow { throw RuntimeException("original error") })
.mapToResult()
.mapResultCatching {
if (it == 1) throw RuntimeException("another error")
else (it * 2).toString()
}
.collect { result: Result<String> -> println("mapResultCatching: $result") }Output:
mapResultCatching: Failure(java.lang.RuntimeException: another error)
mapResultCatching: Success(4)
mapResultCatching: Failure(java.lang.RuntimeException: original error)
Maps a Flow of Results to a Flow of values from successful results.
Failure results are re-thrown as exceptions.
try {
flowOf(1, 2)
.concatWith(flow { throw RuntimeException("original error") })
.mapToResult()
.throwFailure()
.collect { v: Int -> println("throwFailure: $v") }
} catch (e: Throwable) {
println("throwFailure: caught $e")
}Output:
throwFailure: 1
throwFailure: 2
throwFailure: caught java.lang.RuntimeException: original error
Represents all of the notifications from the source Flow as value emissions marked with their original types within Event objects.
flowOf(1, 2, 3)
.materialize()
.collect { println("materialize: $it") }Output:
materialize: Event.Value(1)
materialize: Event.Value(2)
materialize: Event.Value(3)
materialize: Event.Complete
Converts a Flow of Event objects into the emissions that they represent.
flowOf(Event.Value(1), Event.Value(2), Event.Value(3))
.dematerialize()
.collect { println("dematerialize: $it") }Output:
dematerialize: 1
dematerialize: 2
dematerialize: 3
Mirrors the current Flow or the other Flows provided of which the first either emits a value
or sends a termination event (error or complete event).
flow {
delay(100)
emit(1)
emit(2)
emit(3)
}.raceWith(
flow {
delay(200)
emit(2)
emit(3)
emit(4)
}
).collect { println("raceWith: $it") }Output:
raceWith: 1
raceWith: 2
raceWith: 3
Groups pairs of consecutive emissions together and emits them as a pair.
Emits the (n)th and (n-1)th events as a pair.
The first value won't be emitted until the second one arrives.
Note, zipWithNext is an alias to pairwise.
range(0, 4)
.pairwise()
.collect { println("pairwise: $it") }
println("---")
range(0, 4)
.zipWithNext { a, b -> "$a -> $b" }
.collect { println("zipWithNext: $it") }Output:
pairwise: (0, 1)
pairwise: (1, 2)
pairwise: (2, 3)
---
zipWithNext: 0 -> 1
zipWithNext: 1 -> 2
zipWithNext: 2 -> 3
Returns a Flow that will recollect to the source stream when the source stream completes.
flowFromSuspend {
println("Start collecting...")
Random
.nextInt(0..3)
.also { println("Emit: $it") }
}
.repeat(
delay = 1.seconds,
count = 10
)
.filter { it == 2 }
.take(1)
.collect { println("repeat: $it") }Output:
Start collecting...
Emit: 1
Start collecting...
Emit: 3
Start collecting...
Emit: 1
Start collecting...
Emit: 0
Start collecting...
Emit: 1
Start collecting...
Emit: 3
Start collecting...
Emit: 2
repeat: 2
Retries collection of the given flow when an exception occurs in the upstream flow and the
predicate returns true. The predicate also receives an attempt number as parameter,
starting from zero on the initial call. When predicate returns true, the next retries will be
delayed after a duration computed by DelayStrategy.nextDelay.
var count = -1
flowFromSuspend {
++count
println("Call count=$count")
when (count) {
0 -> throw MyException(message = "Will retry...", cause = null)
1 -> "Result: count=$count"
else -> error("Unexpected: count=$count")
}
}
.retryWhenWithDelayStrategy(
strategy = DelayStrategy.FixedTimeDelayStrategy(duration = 200.milliseconds),
predicate = { cause, attempt -> cause is MyException && attempt < 1 }
)
.collect { println("retryWhenWithDelayStrategy: $it") }Output:
Call count=0
Call count=1
retryWhenWithDelayStrategy: Result: count=1
Retries collection of the given flow with exponential backoff delay strategy
when an exception occurs in the upstream flow and the predicate returns true. When predicate returns true,
the next retries will be delayed after a duration computed by DelayStrategy.ExponentialBackoffDelayStrategy.
var count = -1
flowFromSuspend {
++count
println("Call count=$count")
when (count) {
0 -> throw MyException(message = "Will retry...", cause = null)
1 -> "Result: count=$count"
else -> error("Unexpected: count=$count")
}
}
.retryWhenWithExponentialBackoff(
initialDelay = 500.milliseconds,
factor = 2.0,
) { cause, attempt -> cause is MyException && attempt < 1 }
.collect { println("retryWhenWithExponentialBackoff: $it") }Output:
Call count=0
Call count=1
retryWhenWithExponentialBackoff: Result: count=1
Retries collection of the given flow with exponential backoff delay strategy
when an exception occurs in the upstream flow and the predicate returns true. When predicate returns true,
the next retries will be delayed after a duration computed by DelayStrategy.ExponentialBackoffDelayStrategy.
var count = -1
flowFromSuspend {
++count
println("Call count=$count")
when (count) {
0 -> throw MyException(message = "Will retry...", cause = null)
1 -> "Result: count=$count"
else -> error("Unexpected: count=$count")
}
}
.retryWithExponentialBackoff(
maxAttempt = 2,
initialDelay = 500.milliseconds,
factor = 2.0,
) { it is MyException }
.collect { println("retryWithExponentialBackoff: $it") }Output:
Call count=0
Call count=1
retryWithExponentialBackoff: Result: count=1
Folds the given flow with [operation], emitting every intermediate result, including the initial value supplied by [initialSupplier] at the collection time.
This is a variant of scan that the initial value is lazily supplied,
which is useful when the initial value is expensive to create
or depends on a logic that should be executed at the collection time (lazy semantics).
var count = 0
val mutex = Mutex()
suspend fun calculateInitialValue(): Int {
println("calculateInitialValue")
delay(1000)
return mutex.withLock { count++ }
}
flowOf(1, 2, 3)
.scanWith(::calculateInitialValue) { acc, e -> acc + e }
.collect { println("scanWith[1]: $it") }
flowOf(1, 2, 3)
.scanWith(::calculateInitialValue) { acc, e -> acc + e }
.collect { println("scanWith[2]: $it") }Output:
calculateInitialValue
scanWith[1]: 0
scanWith[1]: 1
scanWith[1]: 3
scanWith[1]: 6
calculateInitialValue
scanWith[2]: 1
scanWith[2]: 2
scanWith[2]: 4
scanWith[2]: 7
Inspirited by NgRx memoized selector.
Selectors are pure functions used for obtaining slices of a Flow of state.
FlowExt provides a few helper functions for optimizing this selection.
data class UiState(
val items: List<String> = emptyList(),
val term: String? = null,
val isLoading: Boolean = false,
val error: Throwable? = null
)
flow {
println("select: emit 1")
emit(UiState())
println("select: emit 2")
emit(
UiState(
items = listOf("a", "b", "c"),
term = "a",
isLoading = true,
error = Throwable("error")
)
)
println("select: emit 3")
emit(
UiState(
items = listOf("a", "b", "c"),
term = "a",
isLoading = false,
error = Throwable("error")
)
)
println("select: emit 4")
emit(
UiState(
items = listOf("a", "b", "c"),
term = "b",
isLoading = false,
error = Throwable("error")
)
)
}
.select(
selector1 = { it.items },
selector2 = { it.term },
projector = { items, term ->
term?.let { v ->
items.filter { it.contains(v, ignoreCase = true) }
}
}
)
.collect { println("select: $it") }Output:
select: emit 1
select: null
select: emit 2
select: [a]
select: emit 3
select: emit 4
select: [b]
Returns a Flow that skips items emitted by the source Flow until a second Flow emits a value or completes.
flowOf(1, 2, 3)
.onEach { delay(100) }
.skipUntil(timer(Unit, 150))
.collect { println("skipUntil: $it") }Output:
skipUntil: 2
skipUntil: 3
Emits the values emitted by the source Flow until a notifier Flow emits a value or completes.
range(0, 5)
.onEach { delay(100) }
.takeUntil(timer(Unit, 270.milliseconds))
.collect { println("takeUntil: $it") }Output:
takeUntil: 0
takeUntil: 1
Returns a Flow that emits a value from the source Flow, then ignores subsequent source values
for a duration determined by durationSelector, then repeats this process for the next source value.
(1..10)
.asFlow()
.onEach { delay(200) }
.throttleTime(500)
.collect { println("throttleTime: $it") }Output:
throttleTime: 1
throttleTime: 4
throttleTime: 7
throttleTime: 10
Merges two Flows into one Flow by combining each value from self with the latest value from the second Flow, if any.
Values emitted by self before the second Flow has emitted any values will be omitted.
range(0, 5)
.onEach { delay(100) }
.withLatestFrom(
range(0, 10)
.onEach { delay(70) }
)
.collect { println("withLatestFrom: $it") }Output:
withLatestFrom: (0, 0)
withLatestFrom: (1, 1)
withLatestFrom: (2, 3)
withLatestFrom: (3, 4)
withLatestFrom: (4, 6)
... and more, please check out Docs 1.x/Docs snapshot.
MIT License
Copyright (c) 2021-2024 Petrus Nguyễn Thái Học