
Reliability-first background-task API offering scheduling, retries, SQL-persistence, offline queue with replay, task chaining, tag-based cancellation, state flows and test fakes.
A reliability-first Kotlin Multiplatform background task library for Android & iOS.
RetryEngine
Flow with 10 extension operatorsFakeKmpWorker, FakeNetworkMonitor, FakeTaskRepository
KmpWorkerTestRule for JUnit4// build.gradle.kts (KMP shared module)
kotlin {
sourceSets {
commonMain.dependencies {
implementation("com.neuralheads:kmpworker:0.3.0")
}
// Android platform worker (required for androidMain)
androidMain.dependencies {
implementation("com.neuralheads:kmpworker-android-android:0.3.0")
}
}
}// HTTP transfers (download/upload with resume & checksum)
implementation("com.neuralheads:kmpworker-transfer:0.3.0")
// Testing (FakeKmpWorker + KmpWorkerTestRule)
testImplementation("com.neuralheads:kmpworker-testing:0.3.0")All classes live under io.neuralheads.kmpworker.*:
| Class | Package |
|---|---|
KmpWorker, TaskRequest, TaskState, TaskType
|
io.neuralheads.kmpworker.core |
RetryPolicy, RetryEngine, Constraints
|
io.neuralheads.kmpworker.core |
TaskChain, TaskChainBuilder, ChainPolicy
|
io.neuralheads.kmpworker.core |
TaskGraph, TaskGraphBuilder, TaskGraphExecutor
|
io.neuralheads.kmpworker.core |
RateLimiter, TelemetryCollector, ExecutionRecord
|
io.neuralheads.kmpworker.core |
AndroidKmpWorker, ForegroundConfig
|
io.neuralheads.kmpworker.android |
IOSKmpWorker, FlowWrapper, TaskStateObserver
|
io.neuralheads.kmpworker.ios |
OfflineQueue, NetworkMonitor
|
io.neuralheads.kmpworker.queue |
TransferManager, DownloadRequest, UploadRequest
|
io.neuralheads.kmpworker.transfer |
FakeKmpWorker, KmpWorkerTestRule
|
io.neuralheads.kmpworker.testing |
// 1. Register handler
kmpWorker.register("sync-users") {
repository.syncUsers()
}
// 2. Schedule
kmpWorker.enqueue(
TaskRequest(
id = "sync-users",
type = TaskType.OneTime,
constraints = Constraints(requiresInternet = true),
retryPolicy = RetryPolicy.Exponential(initialDelayMillis = 5_000, maxRetries = 3)
)
)
// 3. Observe
kmpWorker.observe("sync-users")
.onRunning { showSpinner() }
.onProgress { progress, msg -> updateBar(progress) }
.onSuccess { hideSpinner() }
.onFailed { error -> showError(error.throwable.message) }
.collect()kmpWorker.oneTime(id = "upload", retryPolicy = exponentialRetry(5.seconds, maxRetries = 3)) {
uploader.upload()
}
kmpWorker.periodic(id = "sync", repeatInterval = 6.hours) {
repository.sync()
}class MyApp : Application() {
val kmpWorker: KmpWorker by lazy {
KmpWorkerBuilder(
AndroidKmpWorker(
context = this,
telemetry = SqlDelightTelemetryCollector(database), // optional
foregroundConfig = ForegroundConfig( // optional
notificationTitle = "Syncing..."
)
)
)
.configure {
logLevel = KmpWorkerLogger.Level.DEBUG
logger = KmpWorkerAndroidLogger
}
.task("sync") { repository.sync() }
.build()
}
}WorkManager is initialized automatically via Jetpack App Startup.
let kmpWorker = IOSKmpWorker()
func application(_ application: UIApplication,
didFinishLaunchingWithOptions launchOptions: [UIApplication.LaunchOptionsKey: Any]?) -> Bool {
kmpWorker.register(taskId: "sync") { /* ... */ }
kmpWorker.initialize()
return true
}Info.plist:
<key>BGTaskSchedulerPermittedIdentifiers</key>
<array>
<string>sync</string>
</array>let observer = TaskStateObserver(flow: kmpWorker.observe(taskId: "sync"))
observer.onStateChange { state in
print("State: \(state)")
}
observer.stop()TaskType.OneTime // run once ASAP
TaskType.Periodic(repeatIntervalMillis = 900_000) // every 15 min
TaskType.ExactTime(runAtMillis = epochMs) // at specific time
TaskType.Windowed(earliestMillis = t1, latestMillis = t2) // within a windowConstraints(
requiresInternet = true,
requiresCharging = false,
batteryNotLow = true,
requiresDeviceIdle = false, // Android only
contentUris = listOf("content://media/external/images") // Android only
)val chain = TaskChain(
id = "onboarding",
steps = listOf(
TaskRequest(id = "fetch-profile", type = TaskType.OneTime),
TaskRequest(id = "upload-avatar", type = TaskType.OneTime),
TaskRequest(id = "notify-server", type = TaskType.OneTime)
)
)
kmpWorker.enqueueChain(chain, ChainPolicy.REPLACE)kmpWorker.chain("onboarding", policy = ChainPolicy.REPLACE) {
beginWith("fetch-profile")
then("upload-avatar") {
constraints = Constraints(requiresInternet = true)
}
then("notify-server") {
retryPolicy = RetryPolicy.Exponential(5_000, 3)
}
}| Policy | Behavior |
|---|---|
KEEP |
Skip if chain ID already running |
REPLACE |
Cancel existing, start new |
ALLOW_DUPLICATE |
Always enqueue (default) |
Execute tasks with complex dependencies — independent nodes run in parallel:
@OptIn(ExperimentalKmpWorkerApi::class)
kmpWorker.graph("pipeline") {
val fetch = task("fetch-data")
val process = task("process")
val validate = task("validate")
val upload = task("upload")
fetch then process // process depends on fetch
fetch then validate // validate runs PARALLEL with process
process then upload // upload waits for BOTH
validate then upload
}kmpWorker.registerWithContext("upload") {
for (i in 0..100 step 10) {
reportProgress(i / 100f, "Uploading chunk $i")
delay(500)
}
}
kmpWorker.observe("upload")
.onProgress { progress, message -> updateBar(progress) }
.collect()kmpWorker.enqueueBatch(listOf(
TaskRequest("task-1", TaskType.OneTime),
TaskRequest("task-2", TaskType.OneTime),
TaskRequest("task-3", TaskType.OneTime)
))
kmpWorker.cancelBatch(listOf("task-1", "task-2", "task-3"))@OptIn(ExperimentalKmpWorkerApi::class)
val limiter = RateLimiter(maxConcurrent = 3)
limiter.withPermit {
// At most 3 tasks execute concurrently
doHeavyWork()
}val worker = AndroidKmpWorker(
context = this,
telemetry = SqlDelightTelemetryCollector(database)
)
// Query history
val records = worker.getExecutionHistory(limit = 50)
records.forEach { println("${it.taskId}: ${it.state} (${it.durationMs}ms)") }
// Clear
worker.clearExecutionHistory()No Ktor required — uses HttpURLConnection (Android) and NSURLSession (iOS):
val manager = AndroidTransferManager() // or IOSTransferManager()
manager.download(DownloadRequest(
id = "large-file",
url = "https://example.com/file.zip",
savePath = "/downloads/file.zip",
expectedChecksum = "sha256:abc123...",
resumable = true
))
manager.observeProgress("large-file").collect { progress ->
println("${progress.percentComplete}%")
}
manager.upload(UploadRequest(
id = "backup",
url = "https://api.example.com/upload",
filePath = "/data/backup.zip"
))val queue = OfflineQueue(
worker = kmpWorker,
repository = SqlDelightTaskRepository(database),
networkMonitor = AndroidNetworkMonitor(context) // or IOSNetworkMonitor()
)
queue.start()
queue.enqueue(request)
// Online: executes immediately
// Offline: persists, replays on reconnect// Unit tests with FakeKmpWorker
val fake = FakeKmpWorker()
fake.register("sync") { repository.sync() }
fake.enqueue(TaskRequest("sync", TaskType.OneTime))
assertEquals(TaskState.Success, fake.lastStateFor("sync"))
// Simulate failures
fake.failureCount["upload"] = 2 // fails 2x, succeeds on 3rd
// Android instrumented tests
class MyWorkerTest {
@get:Rule
val rule = KmpWorkerTestRule()
@Test
fun testSync() = runTest {
rule.worker.register("sync") { /* ... */ }
rule.worker.enqueue(TaskRequest("sync", TaskType.OneTime))
assertEquals(TaskState.Success, rule.worker.lastStateFor("sync"))
}
}| Extension | Description |
|---|---|
.onRunning { } |
Task is executing |
.onSuccess { } |
Task completed |
.onFailed { error -> } |
Task failed |
.onCancelled { } |
Task was cancelled |
.onTimedOut { timeout -> } |
Task exceeded timeout |
.onTerminal { state -> } |
Any terminal state |
.onProgress { progress, msg -> } |
Progress reported |
.terminalStates() |
Filter to terminal only |
.failures() |
Filter to Failed only |
.successes() |
Filter to Success only |
.progressUpdates() |
Filter to Running with progress |
| Module | Description |
|---|---|
kmpworker |
Umbrella — includes everything |
kmpworker-core |
Core API, models, retry, chains, DAG |
kmpworker-android |
WorkManager integration |
kmpworker-persistence |
SQLDelight storage + telemetry |
kmpworker-queue |
Offline queue + network monitors |
kmpworker-transfer |
HTTP download/upload (no Ktor) |
kmpworker-testing |
FakeKmpWorker + test rule |
kmpworker-scheduler |
TaskScheduler interface |
| Tool | Version |
|---|---|
| Kotlin | 2.1.21+ |
| Android minSdk | 23 |
| Android compileSdk | 35 |
| iOS | iosX64, iosArm64, iosSimulatorArm64 |
| Gradle | 9.x |
Copyright 2026 NeuralHeads
Licensed under the Apache License, Version 2.0
A reliability-first Kotlin Multiplatform background task library for Android & iOS.
RetryEngine
Flow with 10 extension operatorsFakeKmpWorker, FakeNetworkMonitor, FakeTaskRepository
KmpWorkerTestRule for JUnit4// build.gradle.kts (KMP shared module)
kotlin {
sourceSets {
commonMain.dependencies {
implementation("com.neuralheads:kmpworker:0.3.0")
}
// Android platform worker (required for androidMain)
androidMain.dependencies {
implementation("com.neuralheads:kmpworker-android-android:0.3.0")
}
}
}// HTTP transfers (download/upload with resume & checksum)
implementation("com.neuralheads:kmpworker-transfer:0.3.0")
// Testing (FakeKmpWorker + KmpWorkerTestRule)
testImplementation("com.neuralheads:kmpworker-testing:0.3.0")All classes live under io.neuralheads.kmpworker.*:
| Class | Package |
|---|---|
KmpWorker, TaskRequest, TaskState, TaskType
|
io.neuralheads.kmpworker.core |
RetryPolicy, RetryEngine, Constraints
|
io.neuralheads.kmpworker.core |
TaskChain, TaskChainBuilder, ChainPolicy
|
io.neuralheads.kmpworker.core |
TaskGraph, TaskGraphBuilder, TaskGraphExecutor
|
io.neuralheads.kmpworker.core |
RateLimiter, TelemetryCollector, ExecutionRecord
|
io.neuralheads.kmpworker.core |
AndroidKmpWorker, ForegroundConfig
|
io.neuralheads.kmpworker.android |
IOSKmpWorker, FlowWrapper, TaskStateObserver
|
io.neuralheads.kmpworker.ios |
OfflineQueue, NetworkMonitor
|
io.neuralheads.kmpworker.queue |
TransferManager, DownloadRequest, UploadRequest
|
io.neuralheads.kmpworker.transfer |
FakeKmpWorker, KmpWorkerTestRule
|
io.neuralheads.kmpworker.testing |
// 1. Register handler
kmpWorker.register("sync-users") {
repository.syncUsers()
}
// 2. Schedule
kmpWorker.enqueue(
TaskRequest(
id = "sync-users",
type = TaskType.OneTime,
constraints = Constraints(requiresInternet = true),
retryPolicy = RetryPolicy.Exponential(initialDelayMillis = 5_000, maxRetries = 3)
)
)
// 3. Observe
kmpWorker.observe("sync-users")
.onRunning { showSpinner() }
.onProgress { progress, msg -> updateBar(progress) }
.onSuccess { hideSpinner() }
.onFailed { error -> showError(error.throwable.message) }
.collect()kmpWorker.oneTime(id = "upload", retryPolicy = exponentialRetry(5.seconds, maxRetries = 3)) {
uploader.upload()
}
kmpWorker.periodic(id = "sync", repeatInterval = 6.hours) {
repository.sync()
}class MyApp : Application() {
val kmpWorker: KmpWorker by lazy {
KmpWorkerBuilder(
AndroidKmpWorker(
context = this,
telemetry = SqlDelightTelemetryCollector(database), // optional
foregroundConfig = ForegroundConfig( // optional
notificationTitle = "Syncing..."
)
)
)
.configure {
logLevel = KmpWorkerLogger.Level.DEBUG
logger = KmpWorkerAndroidLogger
}
.task("sync") { repository.sync() }
.build()
}
}WorkManager is initialized automatically via Jetpack App Startup.
let kmpWorker = IOSKmpWorker()
func application(_ application: UIApplication,
didFinishLaunchingWithOptions launchOptions: [UIApplication.LaunchOptionsKey: Any]?) -> Bool {
kmpWorker.register(taskId: "sync") { /* ... */ }
kmpWorker.initialize()
return true
}Info.plist:
<key>BGTaskSchedulerPermittedIdentifiers</key>
<array>
<string>sync</string>
</array>let observer = TaskStateObserver(flow: kmpWorker.observe(taskId: "sync"))
observer.onStateChange { state in
print("State: \(state)")
}
observer.stop()TaskType.OneTime // run once ASAP
TaskType.Periodic(repeatIntervalMillis = 900_000) // every 15 min
TaskType.ExactTime(runAtMillis = epochMs) // at specific time
TaskType.Windowed(earliestMillis = t1, latestMillis = t2) // within a windowConstraints(
requiresInternet = true,
requiresCharging = false,
batteryNotLow = true,
requiresDeviceIdle = false, // Android only
contentUris = listOf("content://media/external/images") // Android only
)val chain = TaskChain(
id = "onboarding",
steps = listOf(
TaskRequest(id = "fetch-profile", type = TaskType.OneTime),
TaskRequest(id = "upload-avatar", type = TaskType.OneTime),
TaskRequest(id = "notify-server", type = TaskType.OneTime)
)
)
kmpWorker.enqueueChain(chain, ChainPolicy.REPLACE)kmpWorker.chain("onboarding", policy = ChainPolicy.REPLACE) {
beginWith("fetch-profile")
then("upload-avatar") {
constraints = Constraints(requiresInternet = true)
}
then("notify-server") {
retryPolicy = RetryPolicy.Exponential(5_000, 3)
}
}| Policy | Behavior |
|---|---|
KEEP |
Skip if chain ID already running |
REPLACE |
Cancel existing, start new |
ALLOW_DUPLICATE |
Always enqueue (default) |
Execute tasks with complex dependencies — independent nodes run in parallel:
@OptIn(ExperimentalKmpWorkerApi::class)
kmpWorker.graph("pipeline") {
val fetch = task("fetch-data")
val process = task("process")
val validate = task("validate")
val upload = task("upload")
fetch then process // process depends on fetch
fetch then validate // validate runs PARALLEL with process
process then upload // upload waits for BOTH
validate then upload
}kmpWorker.registerWithContext("upload") {
for (i in 0..100 step 10) {
reportProgress(i / 100f, "Uploading chunk $i")
delay(500)
}
}
kmpWorker.observe("upload")
.onProgress { progress, message -> updateBar(progress) }
.collect()kmpWorker.enqueueBatch(listOf(
TaskRequest("task-1", TaskType.OneTime),
TaskRequest("task-2", TaskType.OneTime),
TaskRequest("task-3", TaskType.OneTime)
))
kmpWorker.cancelBatch(listOf("task-1", "task-2", "task-3"))@OptIn(ExperimentalKmpWorkerApi::class)
val limiter = RateLimiter(maxConcurrent = 3)
limiter.withPermit {
// At most 3 tasks execute concurrently
doHeavyWork()
}val worker = AndroidKmpWorker(
context = this,
telemetry = SqlDelightTelemetryCollector(database)
)
// Query history
val records = worker.getExecutionHistory(limit = 50)
records.forEach { println("${it.taskId}: ${it.state} (${it.durationMs}ms)") }
// Clear
worker.clearExecutionHistory()No Ktor required — uses HttpURLConnection (Android) and NSURLSession (iOS):
val manager = AndroidTransferManager() // or IOSTransferManager()
manager.download(DownloadRequest(
id = "large-file",
url = "https://example.com/file.zip",
savePath = "/downloads/file.zip",
expectedChecksum = "sha256:abc123...",
resumable = true
))
manager.observeProgress("large-file").collect { progress ->
println("${progress.percentComplete}%")
}
manager.upload(UploadRequest(
id = "backup",
url = "https://api.example.com/upload",
filePath = "/data/backup.zip"
))val queue = OfflineQueue(
worker = kmpWorker,
repository = SqlDelightTaskRepository(database),
networkMonitor = AndroidNetworkMonitor(context) // or IOSNetworkMonitor()
)
queue.start()
queue.enqueue(request)
// Online: executes immediately
// Offline: persists, replays on reconnect// Unit tests with FakeKmpWorker
val fake = FakeKmpWorker()
fake.register("sync") { repository.sync() }
fake.enqueue(TaskRequest("sync", TaskType.OneTime))
assertEquals(TaskState.Success, fake.lastStateFor("sync"))
// Simulate failures
fake.failureCount["upload"] = 2 // fails 2x, succeeds on 3rd
// Android instrumented tests
class MyWorkerTest {
@get:Rule
val rule = KmpWorkerTestRule()
@Test
fun testSync() = runTest {
rule.worker.register("sync") { /* ... */ }
rule.worker.enqueue(TaskRequest("sync", TaskType.OneTime))
assertEquals(TaskState.Success, rule.worker.lastStateFor("sync"))
}
}| Extension | Description |
|---|---|
.onRunning { } |
Task is executing |
.onSuccess { } |
Task completed |
.onFailed { error -> } |
Task failed |
.onCancelled { } |
Task was cancelled |
.onTimedOut { timeout -> } |
Task exceeded timeout |
.onTerminal { state -> } |
Any terminal state |
.onProgress { progress, msg -> } |
Progress reported |
.terminalStates() |
Filter to terminal only |
.failures() |
Filter to Failed only |
.successes() |
Filter to Success only |
.progressUpdates() |
Filter to Running with progress |
| Module | Description |
|---|---|
kmpworker |
Umbrella — includes everything |
kmpworker-core |
Core API, models, retry, chains, DAG |
kmpworker-android |
WorkManager integration |
kmpworker-persistence |
SQLDelight storage + telemetry |
kmpworker-queue |
Offline queue + network monitors |
kmpworker-transfer |
HTTP download/upload (no Ktor) |
kmpworker-testing |
FakeKmpWorker + test rule |
kmpworker-scheduler |
TaskScheduler interface |
| Tool | Version |
|---|---|
| Kotlin | 2.1.21+ |
| Android minSdk | 23 |
| Android compileSdk | 35 |
| iOS | iosX64, iosArm64, iosSimulatorArm64 |
| Gradle | 9.x |
Copyright 2026 NeuralHeads
Licensed under the Apache License, Version 2.0