Reference: Ops: Resilience4J

Installation (Gradle)

dependencies {
    
    implementation(platform("org.http4k:http4k-bom:6.36.0.0"))

    implementation("org.http4k:http4k-ops-resilience4j")
}

About

This module provides configurable Filters to provide CircuitBreaking, RateLimiting, Retrying and Bulkheading, by integrating with the awesome Resilience4J library.

Circuit Breaking

A Circuit Filter detects failures and then Opens for a set period to allow the underlying system to recover.

Kotlin example_circuit.kt
package content.ecosystem.http4k.reference.resilience4j

import io.github.resilience4j.circuitbreaker.CircuitBreaker
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig.SlidingWindowSynchronizationStrategy.LOCK_FREE
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig.SlidingWindowType.COUNT_BASED
import org.http4k.core.Method.GET
import org.http4k.core.Request
import org.http4k.core.Response
import org.http4k.core.Status.Companion.INTERNAL_SERVER_ERROR
import org.http4k.core.Status.Companion.OK
import org.http4k.core.then
import org.http4k.filter.ResilienceFilters
import java.time.Duration
import java.util.ArrayDeque

// Circuit state transition: CLOSED (ok) -> OPEN (dead) -> HALF_OPEN (test) -> CLOSED (ok)
fun main() {

    // these example responses are queued up to trigger the circuit state changes
    val responses = ArrayDeque<Response>()
    responses.add(Response(INTERNAL_SERVER_ERROR))
    responses.add(Response(OK))
    responses.add(Response(OK))

    // configure the circuit breaker filter here
    val circuitBreaker = CircuitBreaker.of(
        "circuit",
        CircuitBreakerConfig.custom()
            .slidingWindow(2, 2, COUNT_BASED, LOCK_FREE)
            .permittedNumberOfCallsInHalfOpenState(2)
            .waitDurationInOpenState(Duration.ofSeconds(1))
            .build()
    )

    val circuited = ResilienceFilters.CircuitBreak(circuitBreaker,
        isError = { r: Response -> !r.status.successful } // this defaults to >= 500
    ).then { responses.removeFirst() }

    println(
        "Result: " + circuited(
            Request(
                GET,
                "/"
            )
        ).status + " Circuit is: " + circuitBreaker.state
    )
    println(
        "Result: " + circuited(
            Request(
                GET,
                "/"
            )
        ).status + " Circuit is: " + circuitBreaker.state
    )
    Thread.sleep(1100) // wait for reset
    println(
        "Result: " + circuited(
            Request(
                GET,
                "/"
            )
        ).status + " Circuit is: " + circuitBreaker.state
    )
    println(
        "Result: " + circuited(
            Request(
                GET,
                "/"
            )
        ).status + " Circuit is: " + circuitBreaker.state
    )
}

Rate Limiting

A RateLimit Filter monitors the number of requests over a set window.

Kotlin example_ratelimiter.kt
package content.ecosystem.http4k.reference.resilience4j

import io.github.resilience4j.ratelimiter.RateLimiter
import io.github.resilience4j.ratelimiter.RateLimiterConfig
import org.http4k.core.Method.GET
import org.http4k.core.Request
import org.http4k.core.Response
import org.http4k.core.Status.Companion.OK
import org.http4k.core.then
import org.http4k.filter.ResilienceFilters
import java.time.Duration

fun main() {

    // configure the rate limiter filter here
    val config = RateLimiterConfig.custom()
        .limitRefreshPeriod(Duration.ofSeconds(1))
        .limitForPeriod(1)
        .timeoutDuration(Duration.ofMillis(10)).build()

    // set up the responses to sleep for a bit
    val rateLimits = ResilienceFilters.RateLimit(RateLimiter.of("ratelimiter", config))
        .then { Response(OK) }

    println(rateLimits(Request(GET, "/")).status)
    println(rateLimits(Request(GET, "/")).status)
}

Retrying

A Retrying Filter retries requests if a failure is generated.

Kotlin example_retrying.kt
package content.ecosystem.http4k.reference.resilience4j

import io.github.resilience4j.retry.Retry
import io.github.resilience4j.retry.RetryConfig
import org.http4k.core.Method.GET
import org.http4k.core.Request
import org.http4k.core.Response
import org.http4k.core.Status.Companion.INTERNAL_SERVER_ERROR
import org.http4k.core.Status.Companion.OK
import org.http4k.core.then
import org.http4k.filter.ResilienceFilters
import java.util.ArrayDeque

fun main() {

    // configure the retry filter here, with max attempts and backoff
    val retry = Retry.of("retrying", RetryConfig.custom<RetryConfig>()
        .maxAttempts(3)
        .intervalFunction { attempt: Int -> (attempt * 2).toLong() }
        .build())

    // queued up responses
    val responses = ArrayDeque<Response>()
    responses.add(Response(INTERNAL_SERVER_ERROR))
    responses.add(Response(OK))

    val retrying = ResilienceFilters.RetryFailures(retry,
        isError = { r: Response -> !r.status.successful }
    ).then {
        val response = responses.removeFirst()
        println("trying request, will return " + response.status)
        response
    }

    println(retrying(Request(GET, "/")))
}

Bulkheading

A Bulkhead Filter limits the amount of parallel calls that can be executed.

Kotlin example_bulkheading.kt
package content.ecosystem.http4k.reference.resilience4j

import io.github.resilience4j.bulkhead.Bulkhead
import io.github.resilience4j.bulkhead.BulkheadConfig
import org.http4k.core.Method.GET
import org.http4k.core.Request
import org.http4k.core.Response
import org.http4k.core.Status.Companion.OK
import org.http4k.core.then
import org.http4k.filter.ResilienceFilters
import java.time.Duration
import kotlin.concurrent.thread

fun main() {

    // configure the Bulkhead filter here
    val config = BulkheadConfig.custom()
        .maxConcurrentCalls(5)
        .maxWaitDuration(Duration.ofMillis(1000))
        .build()

    val bulkheading = ResilienceFilters.Bulkheading(Bulkhead.of("bulkhead", config)).then {
        Thread.sleep(100)
        Response(OK)
    }

    // throw a bunch of requests at the filter - only 5 should pass
    for (it in 1..10) {
        thread {
            println(bulkheading(Request(GET, "/")).status)
        }
    }
}
scarf