
Lightweight library for building asynchronous, composable data pipelines using Flow. Enables chaining of mapping, buffering, and parallel transformation steps for efficient data flow processing.
Fipe는 Kotlin Flow를 사용하여 비동기적이고 조립 가능한 데이터 파이프라인을 구축하기 위한 경량 Kotlin Multiplatform 라이브러리입니다.
작고 집중된 처리 단계를 연결하여(예: 매핑, 버퍼링, 병렬 변환) 복잡하고 효율적인 데이터 흐름을 구성할 수 있습니다.
then) 연결하여 데이터 스트림을 처리합니다. // 간단한 파이프라인 예제
fun main() = runBlocking {
val bufferSize = 30
val doublingStep = MapStep<Int, Int> { delay(30); it * 2 }
val toDoubleStep = MapStep<Int, Double> { delay(60); it.toDouble() }
val incrementStep = MapStep<Double, Double> { it + 1 }
fun getPipeline(useBuffer: Boolean): Fipe<Int, Double> = fipe<Int>()
.then(doublingStep)
.let { if (useBuffer) it.thenBuffer(bufferSize) else it }
.then(toDoubleStep)
.let { if (useBuffer) it.thenBuffer(bufferSize) else it }
.then(incrementStep)
suspend fun Fipe<Int, Double>.pipelineResult(): List<Double> {
return Dispatchers.Default {
toFlow(
flow {
repeat(bufferSize) {
delay(30)
emit(it)
}
}
).take(bufferSize).toList()
}
}
val n = bufferSize
val t1 = 30L
val t2 = 60L
val t3 = 0L
val expectedSequential = ((t1 + t2 + t3) * n).milliseconds
val expectedPipeline = (t1 + (n - 1) * maxOf(t1, t2, t3)).milliseconds
val pipelineWithoutBuffer = getPipeline(false)
val pipelineWithBuffer = getPipeline(true)
val timeWithoutBuffer = measureTime { pipelineWithoutBuffer.pipelineResult() }
val timeWithBuffer = measureTime { pipelineWithBuffer.pipelineResult() }
println("이론상 최소 시간 (순차 처리): $expectedSequential")
println("이론상 최소 시간 (파이프라인 처리): $expectedPipeline")
println("실제 순차 처리 시간: $timeWithoutBuffer")
println("실제 파이프라인 처리 시간: $timeWithBuffer")
}이론상 최소 시간 (순차 처리): 2.7s
이론상 최소 시간 (파이프라인 처리): 1.77s
실제 순차 처리 시간: 3.958783583s
실제 파이프라인 처리 시간: 2.050898667s
파이프라인은 .then(...)을 사용하여 단계를 연결하며 구성됩니다.
각 단계는 흐름 내에서 요소를 변환하거나, 버퍼링하거나, 병렬 처리하는 등 다양한 방식으로 데이터를 처리할 수 있습니다.
이처럼 작은 처리 단계를 변수화하고 조립할 수 있기 때문에 복잡한 데이터 처리 로직을 효율적이고 유연하게 구현할 수 있습니다.
이 프로젝트는 Apache License 2.0 하에 라이선스가 부여되었습니다.
Fipe는 Kotlin Flow를 사용하여 비동기적이고 조립 가능한 데이터 파이프라인을 구축하기 위한 경량 Kotlin Multiplatform 라이브러리입니다.
작고 집중된 처리 단계를 연결하여(예: 매핑, 버퍼링, 병렬 변환) 복잡하고 효율적인 데이터 흐름을 구성할 수 있습니다.
then) 연결하여 데이터 스트림을 처리합니다. // 간단한 파이프라인 예제
fun main() = runBlocking {
val bufferSize = 30
val doublingStep = MapStep<Int, Int> { delay(30); it * 2 }
val toDoubleStep = MapStep<Int, Double> { delay(60); it.toDouble() }
val incrementStep = MapStep<Double, Double> { it + 1 }
fun getPipeline(useBuffer: Boolean): Fipe<Int, Double> = fipe<Int>()
.then(doublingStep)
.let { if (useBuffer) it.thenBuffer(bufferSize) else it }
.then(toDoubleStep)
.let { if (useBuffer) it.thenBuffer(bufferSize) else it }
.then(incrementStep)
suspend fun Fipe<Int, Double>.pipelineResult(): List<Double> {
return Dispatchers.Default {
toFlow(
flow {
repeat(bufferSize) {
delay(30)
emit(it)
}
}
).take(bufferSize).toList()
}
}
val n = bufferSize
val t1 = 30L
val t2 = 60L
val t3 = 0L
val expectedSequential = ((t1 + t2 + t3) * n).milliseconds
val expectedPipeline = (t1 + (n - 1) * maxOf(t1, t2, t3)).milliseconds
val pipelineWithoutBuffer = getPipeline(false)
val pipelineWithBuffer = getPipeline(true)
val timeWithoutBuffer = measureTime { pipelineWithoutBuffer.pipelineResult() }
val timeWithBuffer = measureTime { pipelineWithBuffer.pipelineResult() }
println("이론상 최소 시간 (순차 처리): $expectedSequential")
println("이론상 최소 시간 (파이프라인 처리): $expectedPipeline")
println("실제 순차 처리 시간: $timeWithoutBuffer")
println("실제 파이프라인 처리 시간: $timeWithBuffer")
}이론상 최소 시간 (순차 처리): 2.7s
이론상 최소 시간 (파이프라인 처리): 1.77s
실제 순차 처리 시간: 3.958783583s
실제 파이프라인 처리 시간: 2.050898667s
파이프라인은 .then(...)을 사용하여 단계를 연결하며 구성됩니다.
각 단계는 흐름 내에서 요소를 변환하거나, 버퍼링하거나, 병렬 처리하는 등 다양한 방식으로 데이터를 처리할 수 있습니다.
이처럼 작은 처리 단계를 변수화하고 조립할 수 있기 때문에 복잡한 데이터 처리 로직을 효율적이고 유연하게 구현할 수 있습니다.
이 프로젝트는 Apache License 2.0 하에 라이선스가 부여되었습니다.