Mock HTTP server for testing with true streaming/SSE support, call-level response control, delay/error simulation, fluent DSL and request verification plus request journal.
Mokksy - Mock HTTP Server, built with Kotlin and Ktor.
Check out the AI-Mocks project for advanced LLM and A2A protocol mocking capabilities.
[!NOTE] Mokksy server was a part of the AI-Mocks project and has now moved to a separate repository. No artefact relocation is required.
Table of Contents
Wiremock does not support true SSE and streaming responses.
Mokksy is here to address those limitations. Particularly, it might be useful for integration testing LLM clients.
ApplicationCall objectApplication.mokksy() and Route.mokksy() extension functions — including behind authentication middlewareAdd dependencies:
Gradle build.gradle.kts:
dependencies {
// for multiplatform projects
implementation("dev.mokksy:mokksy:$latestVersion")
// for JVM projects
implementation("dev.mokksy:mokksy-jvm:$latestVersion")
}pom.xml:
<dependency>
<groupId>dev.mokksy</groupId>
<artifactId>mokksy-jvm</artifactId>
<version>[LATEST_VERSION]</version>
<scope>test</scope>
</dependency>Create and start Mokksy server:
Kotlin — all platforms (coroutine-based):
import dev.mokksy.mokksy.Mokksy
val mokksy = Mokksy()
mokksy.startSuspend()
mokksy.awaitStarted() // port() and baseUrl() are safe after this pointKotlin — JVM blocking:
import dev.mokksy.mokksy.Mokksy
val mokksy = Mokksy().start()Java — see Java API below.
Configure http client using Mokksy server's as baseUrl in your application:
val client = HttpClient {
install(DefaultRequest) {
url(mokksy.baseUrl())
}
}Mokksy supports all HTTP verbs. Here are some examples.
GET request example:
// given
val expectedResponse =
// language=json
"""
{
"response": "Pong"
}
""".trimIndent()
mokksy.get {
path = beEqual("/ping")
containsHeader("Foo", "bar")
} respondsWith {
body = expectedResponse
}
// when
val result = client.get("/ping") {
headers.append("Foo", "bar")
}
// then
result.status shouldBe HttpStatusCode.OK
result.bodyAsText() shouldBe expectedResponseWhen the request does not match - Mokksy server returns 404 (Not Found):
val notFoundResult = client.get("/ping") {
headers.append("Foo", "baz")
}
notFoundResult.status shouldBe HttpStatusCode.NotFoundPOST request example:
// given
val id = Random.nextInt()
val expectedResponse =
// language=json
"""
{
"id": "$id",
"name": "thing-$id"
}
""".trimIndent()
mokksy.post {
path = beEqual("/things")
bodyContains("\"$id\"")
} respondsWith {
body = expectedResponse
httpStatus = HttpStatusCode.Created
headers {
// type-safe builder style
append(HttpHeaders.Location, "/things/$id")
}
headers += "Foo" to "bar" // list style
}
// when
val result =
client.post("/things") {
headers.append("Content-Type", "application/json")
setBody(
// language=json
"""
{
"id": "$id"
}
""".trimIndent(),
)
}
// then
result.status shouldBe HttpStatusCode.Created
result.bodyAsText() shouldBe expectedResponse
result.headers["Location"] shouldBe "/things/$id"
result.headers["Foo"] shouldBe "bar"Server-Side Events (SSE) is a technology that allows a server to push updates to the client over a single, long-lived HTTP connection. This enables real-time updates without requiring the client to continuously poll the server for new data.
SSE streams events in a standardized format, making it easy for clients to consume the data and handle events as they arrive. It's lightweight and efficient, particularly well-suited for applications requiring real-time updates like live notifications or feed updates.
Server-Side Events (SSE) example:
mokksy.post {
path = beEqual("/sse")
} respondsWithSseStream {
flow =
flow {
delay(200.milliseconds)
emit(
ServerSentEvent(
data = "One",
),
)
delay(50.milliseconds)
emit(
ServerSentEvent(
data = "Two",
),
)
}
}
// when
val result = client.post("/sse")
// then
result.status shouldBe HttpStatusCode.OK
result.contentType() shouldBe ContentType.Text.EventStream.withCharsetIfNeeded(Charsets.UTF_8)
result.bodyAsText() shouldBe "data: One\r\ndata: Two\r\n"Mokksy provides various matcher types to specify conditions for matching incoming HTTP requests:
path("/things") or path = beEqual("/things")
containsHeader("X-Request-ID", "abc") checks for a header with an exact valuebodyContains("value") checks if the raw body string contains a substring;
bodyString += contain("value") adds a Kotest matcher directlybodyMatchesPredicate { it?.name == "foo" } matches against the typed,
deserialized request bodysuccessCallMatcher matches if a function called with the body does not throwpriority = 10 on RequestSpecificationBuilder sets the RequestSpecification.priority
of the stub; lower values indicate higher priority. Default is Int.MAX_VALUE.
Priority is a tiebreaker: it applies only when two stubs match with an equal number of conditions satisfied.
For most cases, specificity-based matching (see below) selects the right stub automatically.When multiple stubs could match the same request, Mokksy scores each one by counting how many conditions it satisfies, then selects the highest-scoring stub. A stub with two matching conditions beats a stub with one, regardless of registration order.
// Generic: matches any POST to /users
mokksy.post {
path("/users")
} respondsWith {
body = "any user"
}
// Specific: matches only requests whose body contains "admin" — two conditions
mokksy.post {
path("/users")
bodyContains("admin")
} respondsWith {
body = "admin user"
}
// Admin request → specific stub wins (score 2 beats score 1)
val adminResult = client.post("/users") { setBody("admin") }
adminResult.bodyAsText() shouldBe "admin user"
// Other request → only the generic stub matches
val genericResult = client.post("/users") { setBody("regular") }
genericResult.bodyAsText() shouldBe "any user"When no stub matches and verbose mode is enabled
(Mokksy(verbose = true) / Mokksy.create(Mokksy.create("127.0.0.1", 0, true)) for Java),
Mokksy logs the closest partial match and its failed conditions to help you diagnose the mismatch.
If multiple stubs match with the same specificity score, the one with the lower priority value wins:
// Catch-all stub with low priority (high value)
mokksy.get {
path = contain("/things")
priority = 99
} respondsWith {
body = "Generic Thing"
}
// Specific stub with high priority (low value)
mokksy.get {
path = beEqual("/things/special")
priority = 1
} respondsWith {
body = "Special Thing"
}
// when
val generic = client.get("/things/123")
val special = client.get("/things/special")
// then
generic.bodyAsText() shouldBe "Generic Thing"
special.bodyAsText() shouldBe "Special Thing"Mokksy provides two complementary verification methods that check opposite sides of the stub/request contract.
verifyNoUnmatchedStubs() fails if any registered stub was never matched by an incoming request.
Use this to catch stubs you set up but that were never actually called — a sign the code under test took
a different path than expected.
// Fails if any stub has never been matched
mokksy.verifyNoUnmatchedStubs()Note: Be careful when running tests in parallel against a single
MokksyServerinstance. Some stubs might be unmatched when one test completes. Avoid calling this in@AfterEach/@AfterTestunless each test owns its own server instance.
verifyNoUnexpectedRequests() fails if any HTTP request arrived at the server but no stub matched it.
These requests are recorded in the RequestJournal and reported together.
// Fails if any request arrived with no matching stub
mokksy.verifyNoUnexpectedRequests()Always run verifyNoUnexpectedRequests() in @AfterEach to catch requests that arrived but
matched no stub. For verifyNoUnmatchedStubs(), the right placement depends on your fixture strategy:
@TestInstance(Lifecycle.PER_METHOD) or a fresh server per test): call
both checks in @AfterEach — every stub registered during that test should have been matched
before the server is torn down.@TestInstance(Lifecycle.PER_CLASS) or a companion-object server): call
verifyNoUnmatchedStubs() in @AfterAll, immediately before shutdown(). Calling it after
each individual test would falsely report stubs registered for later tests as unmatched.@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class MyTest {
val mokksy = Mokksy.create()
lateinit var client: HttpClient
@BeforeAll
suspend fun setup() {
mokksy.startSuspend()
mokksy.awaitStarted() // port() and baseUrl() are safe after this point
client = HttpClient {
install(DefaultRequest) {
url(mokksy.baseUrl())
}
}
}
@Test
suspend fun testSomething() {
mokksy.get {
path("/hi")
} respondsWith {
delay = 100.milliseconds // wait 100ms, then reply
body = "Hello"
}
// when
val response = client.get("/hi")
// then
response.status shouldBe HttpStatusCode.OK
response.bodyAsText() shouldBe "Hello"
}
@AfterEach
fun afterEach() {
mokksy.verifyNoUnexpectedRequests()
}
@AfterAll
suspend fun afterAll() {
client.close()
mokksy.verifyNoUnmatchedStubs() // shared instance: check once, after all tests ran
mokksy.shutdownSuspend()
}
}Use the find* variants to retrieve the unmatched items directly for custom assertions:
// List<RecordedRequest> — HTTP requests with no matching stub
val unmatchedRequests: List<RecordedRequest> = mokksy.findAllUnexpectedRequests()
// List<RequestSpecification<*>> — stubs that were never triggered
val unmatchedStubs: List<RequestSpecification<*>> = mokksy.findAllUnmatchedStubs()RecordedRequest is an immutable snapshot that captures method, uri, and headers of the incoming request.
Mokksy records incoming requests in a RequestJournal. The recording mode is controlled by JournalMode in
ServerConfiguration:
| Mode | Behaviour |
|---|---|
JournalMode.LEAN (default)
|
Records only requests with no matching stub. Lower overhead; sufficient for verifyNoUnexpectedRequests(). |
JournalMode.FULL |
Records all incoming requests — both matched and unmatched. |
val mokksy = MokksyServer(
configuration = ServerConfiguration(
journalMode = JournalMode.FULL,
),
)Call resetMatchState() between scenarios to clear stub match state and the journal:
@AfterTest
fun afterEach() {
mokksy.resetMatchState()
}Note: Stubs configured with
eventuallyRemove = trueare permanently removed from the registry on first match and cannot be re-armed byresetMatchState(). Re-register them before the next scenario.
If you already own a Ktor Application — a test harness with authentication middleware, custom
plugins, or routes that must coexist with stubs — use the mokksy extension functions to mount
stub handling directly, without allocating a second embedded server.
Application.mokksy(server) installs SSE, DoubleReceive, and ContentNegotiation
automatically, then mounts a catch-all route that dispatches every incoming request through the
stub registry:
import dev.mokksy.mokksy.MokksyServer
import dev.mokksy.mokksy.mokksy
import io.ktor.server.engine.embeddedServer
import io.ktor.server.netty.Netty
val server = MokksyServer()
server.get { path("/ping") } respondsWith { body = "pong" }
embeddedServer(Netty, port = 8080) {
mokksy(server)
}.start(wait = true)Use this overload when Mokksy owns the entire application and you want the simplest possible setup.
Route.mokksy(server) mounts the stub handler inside an existing route scope. Unlike the
application-level overload, it does not install plugins — you are responsible for installing
SSE, DoubleReceive, and ContentNegotiation on the surrounding application. This makes it
suitable when Mokksy stubs coexist with real routes:
routing {
get("/health") { call.respondText("OK") }
mokksy(server)
}To place stubs behind an authentication check, install the required plugins and wrap mokksy in
an authenticate block:
install(SSE)
install(DoubleReceive)
install(ContentNegotiation) { json() }
install(Authentication) {
basic("auth-basic") {
validate { credentials ->
if (credentials.name == "user" && credentials.password == "pass") {
UserIdPrincipal(credentials.name)
} else null
}
}
}
routing {
authenticate("auth-basic") {
mokksy(server)
}
}Both extension functions accept any path pattern as a second parameter (default: "{...}",
which matches all routes). Narrow the scope by passing a prefix:
mokksy(server, path = "/api/{...}")Java callers use dev.mokksy.Mokksy — a JVM-only, AutoCloseable wrapper that exposes a
Consumer-based fluent API instead of Kotlin lambdas with receivers.
Lifecycle:
import dev.mokksy.Mokksy;
Mokksy mokksy = Mokksy.create("127.0.0.1", 0, true).start();
mokksy.get(spec -> spec.path("/ping"))
.respondsWith(builder -> builder.body("Pong"));
mokksy.shutdown();Mokksy implements AutoCloseable, so try-with-resources works for test fixtures that need a short-lived server:
try (Mokksy mokksy = Mokksy.create().start()) {
mokksy.post(spec -> spec.path("/items"))
.respondsWith(builder -> builder.body("{\"id\":\"42\"}").status(201).header("Location", "/items/42"));
}JUnit 5 setup:
import dev.mokksy.Mokksy;
import java.net.http.HttpClient;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class MyTest {
private final Mokksy mokksy = Mokksy.create();
private HttpClient httpClient;
@BeforeAll
void setUp() {
mokksy.start();
httpClient = HttpClient.newHttpClient();
}
@Test
void test() {
// call server
}
@AfterAll
void tearDown() {
mokksy.shutdown();
}
}Request matchers — the spec block mirrors the Kotlin DSL:
mokksy.post(spec -> {
spec.path("/secured");
spec.containsHeader("X-Api-Key", "secret");
spec.bodyContains("\"role\":\"admin\"");
}).respondsWith(builder -> builder.body("authorized").status(200));All HTTP verbs are available as named methods (get, post, put, delete, patch,
head, options). Use method(String, spec) for dynamic method names in parameterised tests:
mokksy.method("PATCH", spec -> spec.path("/resource"))
.respondsWith(builder -> builder.body("patched"));Testing an LLM client or any endpoint that streams data chunk-by-chunk? Use respondsWithStream
to stub a chunked HTTP response. The default Content-Type is text/event-stream; charset=UTF-8,
which matches what most streaming AI APIs and SSE endpoints produce.
Chunks from a list — the simplest case:
mokksy.get(spec -> spec.path("/stream"))
.respondsWithStream(builder -> builder
.chunks(List.of("Hello", " ", "World")));Chunks from a Stream<T> — the stream is consumed lazily when the first matching request
arrives, not when the stub is registered. This is useful for live generators or mutable sources
that should reflect their state at request time:
mokksy.get(spec -> spec.path("/events"))
.respondsWithStream(builder -> builder
.chunks(Stream.of("data1", "data2")));Delays — simulate network and processing latency at two granularities:
mokksy.get(spec -> spec.path("/slow-stream"))
.respondsWithStream(builder -> builder
.chunks(List.of("A", "B", "C"))
.delayMillis(200L) // pause before the first chunk
.delayBetweenChunksMillis(100L)); // pause between each subsequent chunkCustom Content-Type — override the default when the stream carries a different format, such
as NDJSON:
mokksy.get(spec -> spec.path("/ndjson"))
.respondsWithStream(builder -> builder
.chunks(List.of("{\"value\":1}", "{\"value\":2}"))
.contentType("application/x-ndjson"));For typed chunks, pass the class token as the first argument. Chunks are serialized to the
response body using each object's toString():
mokksy.get(spec -> spec.path("/typed"))
.respondsWithStream(MyEvent.class, builder -> builder
.chunk(new MyEvent("start"))
.chunk(new MyEvent("end")));Use MokksyJackson.create() when your tests match typed Java objects deserialized from the
request body. The API mirrors Mokksy.create() exactly — same host, port, and verbose
parameters — with an optional ObjectMapper configuration callback.
Add the dependency alongside mokksy:
testImplementation("io.ktor:ktor-serialization-jackson:$ktorVersion")Then create the server:
import com.fasterxml.jackson.databind.ObjectMapper;
import dev.mokksy.Mokksy;
import dev.mokksy.MokksyJackson;
// Default Jackson configuration
Mokksy mokksy = MokksyJackson.create().start();
// Custom ObjectMapper — e.g. register Java time / records support
Mokksy mokksy = MokksyJackson.create(ObjectMapper::findAndRegisterModules).start();Typed body matchers work the same way as in the standard API — pass the Class token to
the stub-registration method and use bodyMatchesPredicate to assert on the deserialized object:
record CreateItemRequest(String name, int quantity) {}
mokksy.post(
CreateItemRequest.class,
spec -> spec.path("/items")
.bodyMatchesPredicate(req -> "widget".equals(req.name()))
).respondsWith(builder -> builder.body("{\"id\":\"1\"}").status(201));Mokksy - Mock HTTP Server, built with Kotlin and Ktor.
Check out the AI-Mocks project for advanced LLM and A2A protocol mocking capabilities.
[!NOTE] Mokksy server was a part of the AI-Mocks project and has now moved to a separate repository. No artefact relocation is required.
Table of Contents
Wiremock does not support true SSE and streaming responses.
Mokksy is here to address those limitations. Particularly, it might be useful for integration testing LLM clients.
ApplicationCall objectApplication.mokksy() and Route.mokksy() extension functions — including behind authentication middlewareAdd dependencies:
Gradle build.gradle.kts:
dependencies {
// for multiplatform projects
implementation("dev.mokksy:mokksy:$latestVersion")
// for JVM projects
implementation("dev.mokksy:mokksy-jvm:$latestVersion")
}pom.xml:
<dependency>
<groupId>dev.mokksy</groupId>
<artifactId>mokksy-jvm</artifactId>
<version>[LATEST_VERSION]</version>
<scope>test</scope>
</dependency>Create and start Mokksy server:
Kotlin — all platforms (coroutine-based):
import dev.mokksy.mokksy.Mokksy
val mokksy = Mokksy()
mokksy.startSuspend()
mokksy.awaitStarted() // port() and baseUrl() are safe after this pointKotlin — JVM blocking:
import dev.mokksy.mokksy.Mokksy
val mokksy = Mokksy().start()Java — see Java API below.
Configure http client using Mokksy server's as baseUrl in your application:
val client = HttpClient {
install(DefaultRequest) {
url(mokksy.baseUrl())
}
}Mokksy supports all HTTP verbs. Here are some examples.
GET request example:
// given
val expectedResponse =
// language=json
"""
{
"response": "Pong"
}
""".trimIndent()
mokksy.get {
path = beEqual("/ping")
containsHeader("Foo", "bar")
} respondsWith {
body = expectedResponse
}
// when
val result = client.get("/ping") {
headers.append("Foo", "bar")
}
// then
result.status shouldBe HttpStatusCode.OK
result.bodyAsText() shouldBe expectedResponseWhen the request does not match - Mokksy server returns 404 (Not Found):
val notFoundResult = client.get("/ping") {
headers.append("Foo", "baz")
}
notFoundResult.status shouldBe HttpStatusCode.NotFoundPOST request example:
// given
val id = Random.nextInt()
val expectedResponse =
// language=json
"""
{
"id": "$id",
"name": "thing-$id"
}
""".trimIndent()
mokksy.post {
path = beEqual("/things")
bodyContains("\"$id\"")
} respondsWith {
body = expectedResponse
httpStatus = HttpStatusCode.Created
headers {
// type-safe builder style
append(HttpHeaders.Location, "/things/$id")
}
headers += "Foo" to "bar" // list style
}
// when
val result =
client.post("/things") {
headers.append("Content-Type", "application/json")
setBody(
// language=json
"""
{
"id": "$id"
}
""".trimIndent(),
)
}
// then
result.status shouldBe HttpStatusCode.Created
result.bodyAsText() shouldBe expectedResponse
result.headers["Location"] shouldBe "/things/$id"
result.headers["Foo"] shouldBe "bar"Server-Side Events (SSE) is a technology that allows a server to push updates to the client over a single, long-lived HTTP connection. This enables real-time updates without requiring the client to continuously poll the server for new data.
SSE streams events in a standardized format, making it easy for clients to consume the data and handle events as they arrive. It's lightweight and efficient, particularly well-suited for applications requiring real-time updates like live notifications or feed updates.
Server-Side Events (SSE) example:
mokksy.post {
path = beEqual("/sse")
} respondsWithSseStream {
flow =
flow {
delay(200.milliseconds)
emit(
ServerSentEvent(
data = "One",
),
)
delay(50.milliseconds)
emit(
ServerSentEvent(
data = "Two",
),
)
}
}
// when
val result = client.post("/sse")
// then
result.status shouldBe HttpStatusCode.OK
result.contentType() shouldBe ContentType.Text.EventStream.withCharsetIfNeeded(Charsets.UTF_8)
result.bodyAsText() shouldBe "data: One\r\ndata: Two\r\n"Mokksy provides various matcher types to specify conditions for matching incoming HTTP requests:
path("/things") or path = beEqual("/things")
containsHeader("X-Request-ID", "abc") checks for a header with an exact valuebodyContains("value") checks if the raw body string contains a substring;
bodyString += contain("value") adds a Kotest matcher directlybodyMatchesPredicate { it?.name == "foo" } matches against the typed,
deserialized request bodysuccessCallMatcher matches if a function called with the body does not throwpriority = 10 on RequestSpecificationBuilder sets the RequestSpecification.priority
of the stub; lower values indicate higher priority. Default is Int.MAX_VALUE.
Priority is a tiebreaker: it applies only when two stubs match with an equal number of conditions satisfied.
For most cases, specificity-based matching (see below) selects the right stub automatically.When multiple stubs could match the same request, Mokksy scores each one by counting how many conditions it satisfies, then selects the highest-scoring stub. A stub with two matching conditions beats a stub with one, regardless of registration order.
// Generic: matches any POST to /users
mokksy.post {
path("/users")
} respondsWith {
body = "any user"
}
// Specific: matches only requests whose body contains "admin" — two conditions
mokksy.post {
path("/users")
bodyContains("admin")
} respondsWith {
body = "admin user"
}
// Admin request → specific stub wins (score 2 beats score 1)
val adminResult = client.post("/users") { setBody("admin") }
adminResult.bodyAsText() shouldBe "admin user"
// Other request → only the generic stub matches
val genericResult = client.post("/users") { setBody("regular") }
genericResult.bodyAsText() shouldBe "any user"When no stub matches and verbose mode is enabled
(Mokksy(verbose = true) / Mokksy.create(Mokksy.create("127.0.0.1", 0, true)) for Java),
Mokksy logs the closest partial match and its failed conditions to help you diagnose the mismatch.
If multiple stubs match with the same specificity score, the one with the lower priority value wins:
// Catch-all stub with low priority (high value)
mokksy.get {
path = contain("/things")
priority = 99
} respondsWith {
body = "Generic Thing"
}
// Specific stub with high priority (low value)
mokksy.get {
path = beEqual("/things/special")
priority = 1
} respondsWith {
body = "Special Thing"
}
// when
val generic = client.get("/things/123")
val special = client.get("/things/special")
// then
generic.bodyAsText() shouldBe "Generic Thing"
special.bodyAsText() shouldBe "Special Thing"Mokksy provides two complementary verification methods that check opposite sides of the stub/request contract.
verifyNoUnmatchedStubs() fails if any registered stub was never matched by an incoming request.
Use this to catch stubs you set up but that were never actually called — a sign the code under test took
a different path than expected.
// Fails if any stub has never been matched
mokksy.verifyNoUnmatchedStubs()Note: Be careful when running tests in parallel against a single
MokksyServerinstance. Some stubs might be unmatched when one test completes. Avoid calling this in@AfterEach/@AfterTestunless each test owns its own server instance.
verifyNoUnexpectedRequests() fails if any HTTP request arrived at the server but no stub matched it.
These requests are recorded in the RequestJournal and reported together.
// Fails if any request arrived with no matching stub
mokksy.verifyNoUnexpectedRequests()Always run verifyNoUnexpectedRequests() in @AfterEach to catch requests that arrived but
matched no stub. For verifyNoUnmatchedStubs(), the right placement depends on your fixture strategy:
@TestInstance(Lifecycle.PER_METHOD) or a fresh server per test): call
both checks in @AfterEach — every stub registered during that test should have been matched
before the server is torn down.@TestInstance(Lifecycle.PER_CLASS) or a companion-object server): call
verifyNoUnmatchedStubs() in @AfterAll, immediately before shutdown(). Calling it after
each individual test would falsely report stubs registered for later tests as unmatched.@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class MyTest {
val mokksy = Mokksy.create()
lateinit var client: HttpClient
@BeforeAll
suspend fun setup() {
mokksy.startSuspend()
mokksy.awaitStarted() // port() and baseUrl() are safe after this point
client = HttpClient {
install(DefaultRequest) {
url(mokksy.baseUrl())
}
}
}
@Test
suspend fun testSomething() {
mokksy.get {
path("/hi")
} respondsWith {
delay = 100.milliseconds // wait 100ms, then reply
body = "Hello"
}
// when
val response = client.get("/hi")
// then
response.status shouldBe HttpStatusCode.OK
response.bodyAsText() shouldBe "Hello"
}
@AfterEach
fun afterEach() {
mokksy.verifyNoUnexpectedRequests()
}
@AfterAll
suspend fun afterAll() {
client.close()
mokksy.verifyNoUnmatchedStubs() // shared instance: check once, after all tests ran
mokksy.shutdownSuspend()
}
}Use the find* variants to retrieve the unmatched items directly for custom assertions:
// List<RecordedRequest> — HTTP requests with no matching stub
val unmatchedRequests: List<RecordedRequest> = mokksy.findAllUnexpectedRequests()
// List<RequestSpecification<*>> — stubs that were never triggered
val unmatchedStubs: List<RequestSpecification<*>> = mokksy.findAllUnmatchedStubs()RecordedRequest is an immutable snapshot that captures method, uri, and headers of the incoming request.
Mokksy records incoming requests in a RequestJournal. The recording mode is controlled by JournalMode in
ServerConfiguration:
| Mode | Behaviour |
|---|---|
JournalMode.LEAN (default)
|
Records only requests with no matching stub. Lower overhead; sufficient for verifyNoUnexpectedRequests(). |
JournalMode.FULL |
Records all incoming requests — both matched and unmatched. |
val mokksy = MokksyServer(
configuration = ServerConfiguration(
journalMode = JournalMode.FULL,
),
)Call resetMatchState() between scenarios to clear stub match state and the journal:
@AfterTest
fun afterEach() {
mokksy.resetMatchState()
}Note: Stubs configured with
eventuallyRemove = trueare permanently removed from the registry on first match and cannot be re-armed byresetMatchState(). Re-register them before the next scenario.
If you already own a Ktor Application — a test harness with authentication middleware, custom
plugins, or routes that must coexist with stubs — use the mokksy extension functions to mount
stub handling directly, without allocating a second embedded server.
Application.mokksy(server) installs SSE, DoubleReceive, and ContentNegotiation
automatically, then mounts a catch-all route that dispatches every incoming request through the
stub registry:
import dev.mokksy.mokksy.MokksyServer
import dev.mokksy.mokksy.mokksy
import io.ktor.server.engine.embeddedServer
import io.ktor.server.netty.Netty
val server = MokksyServer()
server.get { path("/ping") } respondsWith { body = "pong" }
embeddedServer(Netty, port = 8080) {
mokksy(server)
}.start(wait = true)Use this overload when Mokksy owns the entire application and you want the simplest possible setup.
Route.mokksy(server) mounts the stub handler inside an existing route scope. Unlike the
application-level overload, it does not install plugins — you are responsible for installing
SSE, DoubleReceive, and ContentNegotiation on the surrounding application. This makes it
suitable when Mokksy stubs coexist with real routes:
routing {
get("/health") { call.respondText("OK") }
mokksy(server)
}To place stubs behind an authentication check, install the required plugins and wrap mokksy in
an authenticate block:
install(SSE)
install(DoubleReceive)
install(ContentNegotiation) { json() }
install(Authentication) {
basic("auth-basic") {
validate { credentials ->
if (credentials.name == "user" && credentials.password == "pass") {
UserIdPrincipal(credentials.name)
} else null
}
}
}
routing {
authenticate("auth-basic") {
mokksy(server)
}
}Both extension functions accept any path pattern as a second parameter (default: "{...}",
which matches all routes). Narrow the scope by passing a prefix:
mokksy(server, path = "/api/{...}")Java callers use dev.mokksy.Mokksy — a JVM-only, AutoCloseable wrapper that exposes a
Consumer-based fluent API instead of Kotlin lambdas with receivers.
Lifecycle:
import dev.mokksy.Mokksy;
Mokksy mokksy = Mokksy.create("127.0.0.1", 0, true).start();
mokksy.get(spec -> spec.path("/ping"))
.respondsWith(builder -> builder.body("Pong"));
mokksy.shutdown();Mokksy implements AutoCloseable, so try-with-resources works for test fixtures that need a short-lived server:
try (Mokksy mokksy = Mokksy.create().start()) {
mokksy.post(spec -> spec.path("/items"))
.respondsWith(builder -> builder.body("{\"id\":\"42\"}").status(201).header("Location", "/items/42"));
}JUnit 5 setup:
import dev.mokksy.Mokksy;
import java.net.http.HttpClient;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class MyTest {
private final Mokksy mokksy = Mokksy.create();
private HttpClient httpClient;
@BeforeAll
void setUp() {
mokksy.start();
httpClient = HttpClient.newHttpClient();
}
@Test
void test() {
// call server
}
@AfterAll
void tearDown() {
mokksy.shutdown();
}
}Request matchers — the spec block mirrors the Kotlin DSL:
mokksy.post(spec -> {
spec.path("/secured");
spec.containsHeader("X-Api-Key", "secret");
spec.bodyContains("\"role\":\"admin\"");
}).respondsWith(builder -> builder.body("authorized").status(200));All HTTP verbs are available as named methods (get, post, put, delete, patch,
head, options). Use method(String, spec) for dynamic method names in parameterised tests:
mokksy.method("PATCH", spec -> spec.path("/resource"))
.respondsWith(builder -> builder.body("patched"));Testing an LLM client or any endpoint that streams data chunk-by-chunk? Use respondsWithStream
to stub a chunked HTTP response. The default Content-Type is text/event-stream; charset=UTF-8,
which matches what most streaming AI APIs and SSE endpoints produce.
Chunks from a list — the simplest case:
mokksy.get(spec -> spec.path("/stream"))
.respondsWithStream(builder -> builder
.chunks(List.of("Hello", " ", "World")));Chunks from a Stream<T> — the stream is consumed lazily when the first matching request
arrives, not when the stub is registered. This is useful for live generators or mutable sources
that should reflect their state at request time:
mokksy.get(spec -> spec.path("/events"))
.respondsWithStream(builder -> builder
.chunks(Stream.of("data1", "data2")));Delays — simulate network and processing latency at two granularities:
mokksy.get(spec -> spec.path("/slow-stream"))
.respondsWithStream(builder -> builder
.chunks(List.of("A", "B", "C"))
.delayMillis(200L) // pause before the first chunk
.delayBetweenChunksMillis(100L)); // pause between each subsequent chunkCustom Content-Type — override the default when the stream carries a different format, such
as NDJSON:
mokksy.get(spec -> spec.path("/ndjson"))
.respondsWithStream(builder -> builder
.chunks(List.of("{\"value\":1}", "{\"value\":2}"))
.contentType("application/x-ndjson"));For typed chunks, pass the class token as the first argument. Chunks are serialized to the
response body using each object's toString():
mokksy.get(spec -> spec.path("/typed"))
.respondsWithStream(MyEvent.class, builder -> builder
.chunk(new MyEvent("start"))
.chunk(new MyEvent("end")));Use MokksyJackson.create() when your tests match typed Java objects deserialized from the
request body. The API mirrors Mokksy.create() exactly — same host, port, and verbose
parameters — with an optional ObjectMapper configuration callback.
Add the dependency alongside mokksy:
testImplementation("io.ktor:ktor-serialization-jackson:$ktorVersion")Then create the server:
import com.fasterxml.jackson.databind.ObjectMapper;
import dev.mokksy.Mokksy;
import dev.mokksy.MokksyJackson;
// Default Jackson configuration
Mokksy mokksy = MokksyJackson.create().start();
// Custom ObjectMapper — e.g. register Java time / records support
Mokksy mokksy = MokksyJackson.create(ObjectMapper::findAndRegisterModules).start();Typed body matchers work the same way as in the standard API — pass the Class token to
the stub-registration method and use bodyMatchesPredicate to assert on the deserialized object:
record CreateItemRequest(String name, int quantity) {}
mokksy.post(
CreateItemRequest.class,
spec -> spec.path("/items")
.bodyMatchesPredicate(req -> "widget".equals(req.name()))
).respondsWith(builder -> builder.body("{\"id\":\"1\"}").status(201));