Reference: Kafka: Rest Proxy

Installation

dependencies {
    
    implementation(platform("org.http4k:http4k-bom:6.36.0.0"))

    implementation("org.http4k:http4k-connect-kafka-rest")
    implementation("org.http4k:http4k-connect-kafka-rest-fake")
}

There are 2 distinct APIs in the Rest proxy:

v2

The main KafkaRest connector provides the following v2 Actions:

  • CreateConsumer
  • DeleteConsumer
  • GetOffsets
  • GetPartitions
  • SeekOffsets
  • CommitOffsets
  • ConsumeRecords
  • ProduceMessages
  • SubscribeToTopics

In addition, you can use a KafkaRestConsumer which provides the following Actions:

  • ConsumeRecords
  • Delete
  • GetOffsets
  • SeekOffsets
  • CommitOffsets
  • SubscribeToTopics

Record formats

The following formats of Kafka records are supported currently. Partition keys are optional and null by default:

JSON

All keys and messages will be auto-marshalled to JSON using the standard Moshi instance (which supports most common JDK types):

Kotlin records_json_v2.kt
package content.ecosystem.connect.reference.kafka.rest

import org.http4k.connect.kafka.rest.model.PartitionId
import org.http4k.connect.kafka.rest.v2.model.Record
import org.http4k.connect.kafka.rest.v2.model.Records

val jsonRecords = Records.Json(listOf(Record("123", "value", PartitionId.of(123))))

AVRO

Support for GenericContainer classes (auto-generated from schema). The Key and Value schemas will be extracted from the Key and Value and sent with the message automatically.

Kotlin records_avro.kt
package content.ecosystem.connect.reference.kafka.rest

import org.http4k.connect.kafka.rest.v2.model.Records

val a = Records.Avro(
    listOf(
//        Record(
//            RandomEvent(UUID.nameUUIDFromBytes(it.toByteArray())),
//            RandomEvent(UUID(0, 0), PartitionId.of(123))
//        )
    )
)

Binary

Record contents are specified using Base64 type for wire transport:

Kotlin records_binary_v2.kt
package content.ecosystem.connect.reference.kafka.rest

import org.http4k.connect.kafka.rest.model.PartitionId
import org.http4k.connect.kafka.rest.v2.model.Record
import org.http4k.connect.kafka.rest.v2.model.Records
import org.http4k.connect.model.Base64Blob

val binaryRecords =
    Records.Binary(listOf(Record(Base64Blob.encode("123"), Base64Blob.encode("456"), PartitionId.of(123))))

Notes on message production

Messages can be sent to the broker with or without PartitionIds. If you want to use a strategy for partitioning, the Partitioner interface can be implemented and used as below. RoundRobin and Sticky (key-hash % Partitions) strategies come out of the box.

Kotlin produce_v2.kt
package content.ecosystem.connect.reference.kafka.rest

import org.http4k.client.JavaHttpClient
import org.http4k.connect.kafka.rest.Http
import org.http4k.connect.kafka.rest.KafkaRest
import org.http4k.connect.kafka.rest.extensions.RoundRobinRecordPartitioner
import org.http4k.connect.kafka.rest.model.Topic
import org.http4k.connect.kafka.rest.v3.extensions.produceRecordsWithPartitions
import org.http4k.connect.kafka.rest.v3.model.ClusterId
import org.http4k.core.Credentials
import org.http4k.core.Uri

val kafkaRestV2 = KafkaRest.Http(
    Credentials("user", "password"), Uri.of("http://restproxy"), JavaHttpClient()
)

val produceResult = kafkaRestV2.produceRecordsWithPartitions(
    Topic.of("topic"),
    ClusterId.of("clusterId"),
    listOf(),
    ::RoundRobinRecordPartitioner
)

To keep things simple with respect to partition allocation and rebalancing, the above code will fetch the available partitions on each send to the REST proxy using the /topics/$topic/partitions call. This is obviously not very efficient, but can be reimplemented as needed using any caching strategy which you might wish to implement.

v3 (Confluent API)

The main KafkaRest connector provides the following v2 Actions:

  • GetPartitions
  • GetTopic
  • GetTopics
  • ProduceRecords

Record formats

The following formats of Kafka records are supported currently. Partition keys are optional and null by default:

JSON

All keys and messages will be auto-marshalled to JSON using the standard Moshi instance (which supports most common JDK types):

Kotlin records_json_v3.kt
package content.ecosystem.connect.reference.kafka.rest

import org.http4k.connect.kafka.rest.v3.model.Record
import org.http4k.connect.kafka.rest.v3.model.RecordData

val jsonV3Record = Record(RecordData.Json(mapOf("key" to "value")))

Binary

Record contents are specified using Base64 type for wire transport:

Kotlin records_binary_v3.kt
package content.ecosystem.connect.reference.kafka.rest

import org.http4k.connect.kafka.rest.v3.model.RecordData
import org.http4k.connect.kafka.rest.v3.model.Record
import org.http4k.connect.model.Base64Blob

val binaryV3Record = Record(RecordData.Binary(Base64Blob.encode("foo1")))

Notes on message production

Messages can be sent to the broker with or without PartitionIds. If you want to use a strategy for partitioning, the Partitioner interface can be implemented and used as below. RoundRobin and Sticky (key-hash % Partitions) strategies come out of the box.

Kotlin produce_v3.kt
package content.ecosystem.connect.reference.kafka.rest

import org.http4k.client.JavaHttpClient
import org.http4k.connect.kafka.rest.Http
import org.http4k.connect.kafka.rest.KafkaRest
import org.http4k.connect.kafka.rest.extensions.RoundRobinRecordPartitioner
import org.http4k.connect.kafka.rest.model.Topic
import org.http4k.connect.kafka.rest.v3.extensions.produceRecordsWithPartitions
import org.http4k.connect.kafka.rest.v3.model.ClusterId
import org.http4k.connect.kafka.rest.v3.model.Record
import org.http4k.connect.kafka.rest.v3.model.RecordData.Binary
import org.http4k.connect.model.Base64Blob
import org.http4k.core.Credentials
import org.http4k.core.Uri

val kafkaRestV3 = KafkaRest.Http(
    Credentials("user", "password"), Uri.of("http://restproxy"), JavaHttpClient()
)

val topic = Topic.of("topic")
val clusterId = ClusterId.of("cluster")

val produceV3Result = kafkaRestV3.produceRecordsWithPartitions(
    topic,
    clusterId,
    listOf(Record(Binary(Base64Blob.encode("foo1")))),
    ::RoundRobinRecordPartitioner
)

To keep things simple with respect to partition allocation and rebalancing, the above code will fetch the available partitions on each send to the REST proxy using the /kafka/v3/clusters/$id/topics/$topic/partitions call. This is obviously not very efficient, but can be reimplemented as needed using any caching strategy which you might wish to implement.

Fake

The Fake provides the all of the above endpoints listed for v2 and v3, which is enough for basic consumer lifecycle and production and consumption of records. Note that consumers by default will start at the start of the topic stream, although they can be committed to.

“auto.commit.enable” is enabled by default but can be set to “false” for manual committing of offsets.

Default Fake port: 30091

To start:

Kotlin fake.kt
package content.ecosystem.connect.reference.kafka.rest

import org.http4k.chaos.start
import org.http4k.connect.kafka.rest.FakeKafkaRest

val kafkaRest = FakeKafkaRest().start()
scarf