diff --git a/consumer_kafka_avro/README.md b/consumer_kafka_avro/README.md new file mode 100644 index 00000000..f1731654 --- /dev/null +++ b/consumer_kafka_avro/README.md @@ -0,0 +1,90 @@ +# Kafka + Avro Consumer — Spring Cloud Contract samples + +This module contains a Spring Boot application that listens to the `book.returned` Kafka topic. +When a message arrives, it is deserialized from Avro into a `Book` object +and passed to `EmailService` to send a notification email. + +**Source:** [`BooksReturnedListener`](src/main/java/com/example/kafka/consumer/BooksReturnedListener.java) + +--- + +## What is a collaboration test (consumer side)? + +A collaboration test verifies that the consumer can correctly handle the messages +that the producer promised to publish. +Instead of running a live producer, +it uses the **stubs jar** produced by the producer's contract tests. +Spring Cloud Contract's Stub Runner loads that jar, +and the test triggers a specific message label to replay a realistic message on the Kafka topic. +The consumer processes it just as it would in production. + +**Prerequisite:** build and install the producer stubs first: + +--- + +## Background: how Avro works on the wire + +Confluent's serializer writes a compact 5-byte prefix before each message — +one magic byte (`0x00`) plus a 4-byte schema ID — +and registers the schema in a **Schema Registry**. +The deserializer reads that ID, fetches the schema, and decodes the rest of the bytes. +Tests in this module use a mock schema registry (`mock://test`) +so no real registry server is needed. + +--- + +## Two flavors of collaboration test + +The producer module defines two contracts, one per flavor. +There is a matching collaboration test here for each. + +### Flavor 1 — JSON (human-readable) + +**Test:** [`AvroJsonCollaborationTest`](src/test/java/com/example/kafka/consumer/AvroJsonCollaborationTest.java) +**Triggered label:** `book_returned` + +**How the test works:** + +1. Stub Runner triggers the `book_returned` label from the stubs jar. +2. The test's `MessageVerifierSender` receives a JSON string from the stub + (the contract body fields), + converts it into a `Book` object, + and sends it to Kafka using `KafkaAvroSerializer`. +3. `BooksReturnedListener` receives the Avro-deserialized `Book` + and calls `EmailService.sendEmail()`. +4. The test asserts that `EmailService` was called with the expected email content. + +**Trade-off:** Two extra JSON ↔ Avro conversions happen during the test, +but failure messages are easy to read — +you see exactly which field had the wrong value. + +--- + +### Flavor 2 — Binary (exact wire format) + +**Test:** [`AvroBinaryCollaborationTest`](src/test/java/com/example/kafka/consumer/AvroBinaryCollaborationTest.java) +**Triggered label:** `book_returned_binary` + +**How the test works:** + +1. Stub Runner triggers the `book_returned_binary` label, + which delivers the raw bytes from the producer's pre-serialized + [`bookReturnedMessage.bin`](../producer_kafka_avro/src/test/resources/contracts/binary/bookReturnedMessage.bin) fixture. +2. The test's `MessageVerifierSender` puts those bytes directly on the Kafka topic + using `ByteArraySerializer` — + no Avro serialization happens in the test itself. +3. `BooksReturnedListener` receives the bytes, + `KafkaAvroDeserializer` decodes them into a `Book`, + and `EmailService.sendEmail()` is called. +4. The test asserts that `EmailService` was called with the expected email content. + +**Trade-off:** The exact bytes that the producer emits in production +travel through the consumer's full deserialization stack, +with no intermediary conversion. +The downside is that failure messages show raw bytes and are harder to interpret. + +> **Note:** Before the raw bytes can be deserialized, +> the `Book` Avro schema must be registered in the mock schema registry for this JVM. +> The test's `TestConfig` handles this automatically via a `@PostConstruct` method — +> see [`AvroBinaryCollaborationTest.TestConfig#registerBookSchema`](src/test/java/com/example/kafka/consumer/AvroBinaryCollaborationTest.java) +> for the explanation. diff --git a/consumer_kafka_avro/pom.xml b/consumer_kafka_avro/pom.xml new file mode 100644 index 00000000..b95dfe5d --- /dev/null +++ b/consumer_kafka_avro/pom.xml @@ -0,0 +1,170 @@ + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 4.0.1 + + + + org.springframework.cloud + consumer_kafka_avro + 5.0.2-SNAPSHOT + jar + + + 8.1.1 + 2025.1.0 + 1.12.0 + + + + + confluent + Confluent Maven Repository + https://packages.confluent.io/maven/ + + + spring-snapshots + Spring Snapshots + https://repo.spring.io/snapshot + + true + + + + + + + spring-snapshots + Spring Snapshots + https://repo.spring.io/snapshot + + true + + + + spring-plugin-snapshots + Spring Snapshots + https://repo.spring.io/snapshot + + true + + + + + + + + org.springframework.cloud + spring-cloud-dependencies + ${spring-cloud.version} + pom + import + + + org.testcontainers + testcontainers-bom + 1.20.4 + pom + import + + + org.springframework.boot + spring-boot-starter-kafka + 4.0.1 + + + io.confluent + kafka-avro-serializer + ${kafka-avro-serializer.version} + + + + + + + org.apache.avro + avro + ${avro.version} + + + org.springframework.boot + spring-boot-starter + + + org.springframework.boot + spring-boot-starter-kafka + + + io.confluent + kafka-avro-serializer + + + + org.springframework.boot + spring-boot-starter-test + test + + + org.springframework.cloud + spring-cloud-starter-contract-stub-runner + test + + + + org.testcontainers + testcontainers + test + + + org.testcontainers + junit-jupiter + test + + + org.testcontainers + kafka + test + + + + + + + + org.springframework.boot + spring-boot-maven-plugin + 4.0.1 + + + + + + org.apache.avro + avro-maven-plugin + ${avro.version} + + + generate-sources + + schema + + + ${project.basedir}/src/main/resources/avro + ${project.build.directory}/generated-sources/avro + String + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + diff --git a/consumer_kafka_avro/src/main/java/com/example/kafka/consumer/BooksReturnedListener.java b/consumer_kafka_avro/src/main/java/com/example/kafka/consumer/BooksReturnedListener.java new file mode 100644 index 00000000..d14896ed --- /dev/null +++ b/consumer_kafka_avro/src/main/java/com/example/kafka/consumer/BooksReturnedListener.java @@ -0,0 +1,30 @@ +package com.example.kafka.consumer; + +import com.example.kafka.avro.Book; + +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +@Component +class BooksReturnedListener { + + private final EmailService emailService; + + BooksReturnedListener(EmailService emailService) { + this.emailService = emailService; + } + + @KafkaListener(topics = "book.returned") + public void sendEmailOnBookReturned(Book book) { + String emailBody = """ + Dear User, + + The book you borrowed has been successfully returned: + Title: %s, Author: %s, ISBN: %s + + """.formatted(book.getTitle(), book.getAuthor(), book.getIsbn()); + + emailService.sendEmail(emailBody); + } + +} diff --git a/consumer_kafka_avro/src/main/java/com/example/kafka/consumer/EmailService.java b/consumer_kafka_avro/src/main/java/com/example/kafka/consumer/EmailService.java new file mode 100644 index 00000000..5af1bd69 --- /dev/null +++ b/consumer_kafka_avro/src/main/java/com/example/kafka/consumer/EmailService.java @@ -0,0 +1,13 @@ +package com.example.kafka.consumer; + +import org.springframework.stereotype.Service; + +@Service +public class EmailService { + + public void sendEmail(String emailBody) { + // Simulate sending an email + System.out.println("Sending email:\n" + emailBody); + } + +} diff --git a/consumer_kafka_avro/src/main/java/com/example/kafka/consumer/KafkaAvroConsumerApplication.java b/consumer_kafka_avro/src/main/java/com/example/kafka/consumer/KafkaAvroConsumerApplication.java new file mode 100644 index 00000000..b75faa5e --- /dev/null +++ b/consumer_kafka_avro/src/main/java/com/example/kafka/consumer/KafkaAvroConsumerApplication.java @@ -0,0 +1,13 @@ +package com.example.kafka.consumer; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class KafkaAvroConsumerApplication { + + public static void main(String[] args) { + SpringApplication.run(KafkaAvroConsumerApplication.class, args); + } + +} diff --git a/consumer_kafka_avro/src/main/resources/application.yml b/consumer_kafka_avro/src/main/resources/application.yml new file mode 100644 index 00000000..805e229d --- /dev/null +++ b/consumer_kafka_avro/src/main/resources/application.yml @@ -0,0 +1,19 @@ +spring: + application-name: kafka-avro-consumer + kafka: + bootstrap-servers: localhost:9092 + # producer settings below are only used by the collaboration tests + # (MessageVerifierSender in AvroBinary/JsonCollaborationTest publishes + # the stub-triggered message onto Kafka) + producer: + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer + consumer: + group-id: kafka-avro-consumer-group + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer + auto-offset-reset: earliest + properties: + specific.avro.reader: true + properties: + schema.registry.url: mock://test diff --git a/consumer_kafka_avro/src/main/resources/avro/Book.avsc b/consumer_kafka_avro/src/main/resources/avro/Book.avsc new file mode 100644 index 00000000..c118b8bf --- /dev/null +++ b/consumer_kafka_avro/src/main/resources/avro/Book.avsc @@ -0,0 +1,19 @@ +{ + "type": "record", + "name": "Book", + "namespace": "com.example.kafka.avro", + "fields": [ + { + "name": "isbn", + "type": "string" + }, + { + "name": "title", + "type": "string" + }, + { + "name": "author", + "type": "string" + } + ] +} diff --git a/consumer_kafka_avro/src/test/java/com/example/kafka/consumer/AvroBinaryCollaborationTest.java b/consumer_kafka_avro/src/test/java/com/example/kafka/consumer/AvroBinaryCollaborationTest.java new file mode 100644 index 00000000..7bf6ec8c --- /dev/null +++ b/consumer_kafka_avro/src/test/java/com/example/kafka/consumer/AvroBinaryCollaborationTest.java @@ -0,0 +1,146 @@ +package com.example.kafka.consumer; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import com.example.kafka.avro.Book; +import io.confluent.kafka.serializers.KafkaAvroSerializer; +import org.springframework.boot.kafka.autoconfigure.KafkaProperties; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.kafka.ConfluentKafkaContainer; +import org.testcontainers.utility.DockerImageName; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.system.OutputCaptureExtension; +import org.springframework.cloud.contract.stubrunner.StubTrigger; +import org.springframework.cloud.contract.stubrunner.spring.AutoConfigureStubRunner; +import org.springframework.cloud.contract.stubrunner.spring.StubRunnerProperties; +import org.springframework.cloud.contract.verifier.converter.YamlContract; +import org.springframework.cloud.contract.verifier.messaging.MessageVerifierSender; +import jakarta.annotation.PostConstruct; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.springframework.test.context.bean.override.mockito.MockitoBean; + +import static java.util.Collections.emptyMap; +import static org.awaitility.Awaitility.await; +import static org.mockito.ArgumentMatchers.contains; +import static org.mockito.Mockito.verify; + +@Tag("kafka-avro") +@Testcontainers +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = { + AvroBinaryCollaborationTest.TestConfig.class, KafkaAvroConsumerApplication.class }) +@AutoConfigureStubRunner(ids = "org.springframework.cloud:producer_kafka_avro:+:stubs", stubsMode = StubRunnerProperties.StubsMode.LOCAL) +@ExtendWith(OutputCaptureExtension.class) +class AvroBinaryCollaborationTest { + + @Autowired + StubTrigger trigger; + + @MockitoBean + EmailService emailService; + + @Container + static ConfluentKafkaContainer kafka = new ConfluentKafkaContainer( + DockerImageName.parse("confluentinc/cp-kafka")); + + @DynamicPropertySource + static void kafkaProperties(DynamicPropertyRegistry registry) { + registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers); + } + + @Test + void shouldSendEmail_onBookReturned() { + trigger.trigger("book_returned_binary"); + + // @formatter:off + await().untilAsserted(() -> + verify(emailService).sendEmail( + contains("Title: Contract Testing for Dummies, Author: John Doe, ISBN: 978-1234567890"))); + // @formatter:on + } + + @Configuration + static class TestConfig { + + @Autowired + private KafkaProperties kafkaProperties; + + /** + * Pre-registers the {@link Book} Avro schema in the in-memory mock schema registry. + * + *

Confluent's Avro wire format prefixes every message with a 5-byte header: + * a magic byte ({@code 0x00}) followed by a 4-byte schema ID. The consumer's + * {@code KafkaAvroDeserializer} reads that ID and fetches the matching schema from + * the registry to decode the rest of the bytes. + * + *

The {@code .bin} fixture was produced by + * {@code com.example.kafka.producer.AvroBinaryTestFixture} in a + * separate JVM run, so the schema it embedded (ID 1) no longer exists in this JVM's + * mock registry. Serializing a dummy {@link Book} here forces the serializer to + * register the schema, which assigns it ID 1 — the same ID the deserializer will + * look up when the fixture bytes arrive on the topic. + */ + @PostConstruct + void registerBookSchema() { + try (var serializer = new KafkaAvroSerializer()) { + serializer.configure(kafkaProperties.buildProducerProperties(), false); + serializer.serialize("book.returned", Book.newBuilder() + .setIsbn("").setTitle("").setAuthor("").build()); + } + } + + @Bean + MessageVerifierSender> standaloneMessageVerifier() { + Map producerProps = new HashMap<>(kafkaProperties.buildProducerProperties()); + producerProps.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + org.apache.kafka.common.serialization.ByteArraySerializer.class); + var factory = new DefaultKafkaProducerFactory(producerProps); + return new AvroBinaryMessageVerifierSender<>(new KafkaTemplate<>(factory)); + } + + } + + static class AvroBinaryMessageVerifierSender implements MessageVerifierSender { + + private final KafkaTemplate kafkaTemplate; + + @Override + public void send(M message, String destination, YamlContract contract) { + send(message, emptyMap(), destination, contract); + } + + @Override + public void send(T payload, Map headers, String destination, + YamlContract contract) { + Map newHeaders = headers != null ? new HashMap<>(headers) : new HashMap<>(); + newHeaders.put(KafkaHeaders.TOPIC, destination); + MessageHeaders msgHeaders = new MessageHeaders(newHeaders); + + // payload is already Confluent wire-format bytes from the .bin fixture — + // send raw so the consumer's KafkaAvroDeserializer can decode them directly + var message = MessageBuilder.createMessage((byte[]) payload, msgHeaders); + kafkaTemplate.send(message); + } + + AvroBinaryMessageVerifierSender(KafkaTemplate kafkaTemplate) { + this.kafkaTemplate = kafkaTemplate; + } + + } + +} diff --git a/consumer_kafka_avro/src/test/java/com/example/kafka/consumer/AvroJsonCollaborationTest.java b/consumer_kafka_avro/src/test/java/com/example/kafka/consumer/AvroJsonCollaborationTest.java new file mode 100644 index 00000000..38498bec --- /dev/null +++ b/consumer_kafka_avro/src/test/java/com/example/kafka/consumer/AvroJsonCollaborationTest.java @@ -0,0 +1,121 @@ +package com.example.kafka.consumer; + +import java.util.HashMap; +import java.util.Map; + +import com.example.kafka.avro.Book; + +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.kafka.ConfluentKafkaContainer; +import org.testcontainers.utility.DockerImageName; + +import wiremock.com.fasterxml.jackson.core.JsonProcessingException; +import wiremock.com.fasterxml.jackson.databind.json.JsonMapper; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.system.OutputCaptureExtension; +import org.springframework.cloud.contract.stubrunner.StubTrigger; +import org.springframework.cloud.contract.stubrunner.spring.AutoConfigureStubRunner; +import org.springframework.cloud.contract.stubrunner.spring.StubRunnerProperties; +import org.springframework.cloud.contract.verifier.converter.YamlContract; +import org.springframework.cloud.contract.verifier.messaging.MessageVerifierSender; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.springframework.test.context.bean.override.mockito.MockitoBean; + +import static java.util.Collections.emptyMap; +import static org.awaitility.Awaitility.await; +import static org.mockito.ArgumentMatchers.contains; +import static org.mockito.Mockito.verify; + +@Tag("kafka-avro") +@Testcontainers +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = { + AvroJsonCollaborationTest.TestConfig.class, KafkaAvroConsumerApplication.class }) +@AutoConfigureStubRunner(ids = "org.springframework.cloud:producer_kafka_avro:+:stubs", stubsMode = StubRunnerProperties.StubsMode.LOCAL) +@ExtendWith(OutputCaptureExtension.class) +class AvroJsonCollaborationTest { + + @Autowired + StubTrigger trigger; + + @MockitoBean + EmailService emailService; + + @Container + static ConfluentKafkaContainer kafka = new ConfluentKafkaContainer( + DockerImageName.parse("confluentinc/cp-kafka")); + + @DynamicPropertySource + static void kafkaProperties(DynamicPropertyRegistry registry) { + registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers); + } + + @Test + void shouldSendEmail_onBookReturned() { + trigger.trigger("book_returned"); + + // @formatter:off + await().untilAsserted(() -> + verify(emailService).sendEmail( + contains("Title: Contract Testing for Dummies, Author: John Doe, ISBN: 978-1234567890"))); + // @formatter:on + } + + @Configuration + static class TestConfig { + + @Bean + MessageVerifierSender> standaloneMessageVerifier(KafkaTemplate kafkaTemplate) { + return new KafkaAvroMessageVerifierSender<>(kafkaTemplate); + } + + } + + static class KafkaAvroMessageVerifierSender implements MessageVerifierSender { + + private final KafkaTemplate kafkaTemplate; + + @Override + public void send(M message, String destination, YamlContract contract) { + send(message, emptyMap(), destination, contract); + } + + @Override + public void send(T payload, Map headers, String destination, + YamlContract contract) { + Map newHeaders = headers != null ? new HashMap<>(headers) : new HashMap<>(); + newHeaders.put(KafkaHeaders.TOPIC, destination); + MessageHeaders msgHeaders = new MessageHeaders(newHeaders); + + try { + // TODO: remove this workaround after merging: + // https://github.com/spring-cloud/spring-cloud-contract/issues/2404 + Book avroPayload = new JsonMapper().readValue(payload.toString(), Book.class); + var message = MessageBuilder.createMessage(avroPayload, msgHeaders); + kafkaTemplate.send(message); + } + catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + KafkaAvroMessageVerifierSender(KafkaTemplate kafkaTemplate) { + this.kafkaTemplate = kafkaTemplate; + } + + } + +} diff --git a/pom.xml b/pom.xml index f893fb48..c7eb99d1 100644 --- a/pom.xml +++ b/pom.xml @@ -68,6 +68,7 @@ producer_with_latest_2_2_features producer_java + producer_kafka_avro producer_kafka_middleware producer_rabbit_middleware @@ -83,6 +84,7 @@ consumer_with_latest_2_2_features consumer_java + consumer_kafka_avro consumer_kafka_middleware consumer_rabbit_middleware @@ -135,6 +137,7 @@ producer_with_latest_2_2_features producer_java + producer_kafka_avro producer_kafka_middleware producer_rabbit_middleware @@ -150,6 +153,7 @@ consumer_with_latest_2_2_features consumer_java + consumer_kafka_avro consumer_kafka_middleware consumer_rabbit_middleware diff --git a/producer_kafka_avro/README.md b/producer_kafka_avro/README.md new file mode 100644 index 00000000..e4439d92 --- /dev/null +++ b/producer_kafka_avro/README.md @@ -0,0 +1,104 @@ +# Kafka + Avro Producer — Spring Cloud Contract samples + +This module contains a Spring Boot application that publishes a `book.returned` event to a Kafka topic +whenever a book is returned to a library. +The message payload is an [Avro](https://avro.apache.org/)-encoded `Book` object, +serialized using the Confluent `KafkaAvroSerializer`. + +**Source:** [`BookService`](src/main/java/com/example/kafka/producer/BookService.java) + +--- + +## What is a contract test (producer side)? + +A contract is a description of what a producer promises to publish — +which topic, which headers, and what the message body looks like. +Spring Cloud Contract reads the contract, generates a JUnit test, +and runs it against the real producer code. +If the producer still publishes the right message, the test passes and a **stubs jar** is produced. +The consumer module uses that stubs jar to replay realistic messages in its own tests +without needing a live producer. + +--- + +## Background: how Avro works on the wire + +Avro does not embed the schema inside every message. +Instead, Confluent's serializer registers the schema in a **Schema Registry** +and writes a compact 5-byte prefix — +one magic byte (`0x00`) plus a 4-byte schema ID — +before the actual binary payload. +The deserializer on the other end reads that ID, +fetches the schema from the registry, +and decodes the rest of the bytes. + +This matters for testing because there are two natural ways to express the expected message in a contract: + +--- + +## Two flavors of contract + +### Flavor 1 — JSON (human-readable) + +**Contract:** [`contracts/json/bookReturnedJson.groovy`](src/test/resources/contracts/json/bookReturnedJson.groovy) + +The contract body is written as plain JSON fields: + +```groovy +body( + isbn: '978-1234567890', + title: 'Contract Testing for Dummies', + author: 'John Doe' +) +``` + +**How the test works:** + +1. SCC calls `publishBookReturned()` on the test base class. +2. The producer serializes the `Book` to Avro bytes and sends them to the Kafka topic. +3. The test base class consumes the message via a `KafkaListener`, + which deserializes the Avro bytes back into a `Book` object. +4. SCC serializes that object to JSON and compares it field-by-field against the contract body. + +**Trade-off:** Two extra JSON ↔ Avro conversions happen during the test, +but failure messages are easy to read — +you see exactly which field had the wrong value. + +**Test base class:** [`AvroJsonContractTestBase`](src/test/java/com/example/kafka/producer/AvroJsonContractTestBase.java) + +--- + +### Flavor 2 — Binary (exact wire format) + +**Contract:** [`contracts/binary/bookReturnedBinary.groovy`](src/test/resources/contracts/binary/bookReturnedBinary.groovy) + +The contract body references a pre-serialized binary file: + +```groovy +body(fileAsBytes("bookReturnedMessage.bin")) +``` + +**How the test works:** + +1. SCC calls `publishBookReturned()` on the test base class. +2. The producer serializes the `Book` to Avro bytes and sends them to the Kafka topic. +3. The test base class consumes the raw bytes + (using `ByteArrayDeserializer`, bypassing Avro decoding). +4. SCC compares those raw bytes byte-for-byte against + [`bookReturnedMessage.bin`](src/test/resources/contracts/binary/bookReturnedMessage.bin). + +**Trade-off:** The assertion is an exact binary comparison — +no JSON conversions, no room for serialization drift — +but failure messages show raw bytes and are harder to interpret. + +**Test base class:** [`AvroBinaryContractTestBase`](src/test/java/com/example/kafka/producer/AvroBinaryContractTestBase.java) + +#### Regenerating the `.bin` fixture + +The `.bin` file must be committed to source control. +Run [`AvroBinaryTestFixture`](src/test/java/com/example/kafka/producer/AvroBinaryTestFixture.java) +whenever the `Book` schema or the test data changes to produce a fresh file: + +```bash +./mvnw exec:java -Dexec.mainClass=com.example.kafka.producer.AvroBinaryTestFixture +``` \ No newline at end of file diff --git a/producer_kafka_avro/pom.xml b/producer_kafka_avro/pom.xml new file mode 100644 index 00000000..67825810 --- /dev/null +++ b/producer_kafka_avro/pom.xml @@ -0,0 +1,188 @@ + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 4.0.1 + + + + org.springframework.cloud + producer_kafka_avro + 5.0.2-SNAPSHOT + jar + + + 8.1.1 + 2025.1.0 + 1.12.0 + + + + + confluent + Confluent Maven Repository + https://packages.confluent.io/maven/ + + + spring-snapshots + Spring Snapshots + https://repo.spring.io/snapshot + + true + + + + + + + spring-snapshots + Spring Snapshots + https://repo.spring.io/snapshot + + true + + + + spring-plugin-snapshots + Spring Snapshots + https://repo.spring.io/snapshot + + true + + + + + + + + org.springframework.cloud + spring-cloud-dependencies + ${spring-cloud.version} + pom + import + + + org.testcontainers + testcontainers-bom + 1.20.4 + pom + import + + + org.springframework.boot + spring-boot-starter-kafka + 4.0.1 + + + io.confluent + kafka-avro-serializer + ${kafka-avro-serializer.version} + + + + + + + org.apache.avro + avro + ${avro.version} + + + org.springframework.boot + spring-boot-starter + + + org.springframework.boot + spring-boot-starter-kafka + + + io.confluent + kafka-avro-serializer + + + + org.springframework.boot + spring-boot-starter-test + test + + + org.springframework.cloud + spring-cloud-contract-verifier + test + + + + org.testcontainers + testcontainers + test + + + org.testcontainers + junit-jupiter + test + + + org.testcontainers + kafka + test + + + + + + + + org.springframework.boot + spring-boot-maven-plugin + 4.0.1 + + + + + + org.apache.avro + avro-maven-plugin + ${avro.version} + + + generate-sources + + schema + + + ${project.basedir}/src/main/resources/avro + ${project.build.directory}/generated-sources/avro + String + + + + + + org.springframework.cloud + spring-cloud-contract-maven-plugin + true + + JUNIT5 + + + .*json.* + com.example.kafka.producer.AvroJsonContractTestBase + + + .*binary.* + com.example.kafka.producer.AvroBinaryContractTestBase + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + diff --git a/producer_kafka_avro/src/main/java/com/example/kafka/producer/BookService.java b/producer_kafka_avro/src/main/java/com/example/kafka/producer/BookService.java new file mode 100644 index 00000000..15dd4188 --- /dev/null +++ b/producer_kafka_avro/src/main/java/com/example/kafka/producer/BookService.java @@ -0,0 +1,40 @@ +package com.example.kafka.producer; + +import java.util.Map; + +import com.example.kafka.avro.Book; + +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Service; + +@Service +class BookService { + + private final KafkaTemplate kafkaTemplate; + + BookService(KafkaTemplate kafkaTemplate) { + this.kafkaTemplate = kafkaTemplate; + } + + void bookReturned(String isbn, String title, String author) { + Book payload = Book.newBuilder().setIsbn(isbn).setTitle(title).setAuthor(author) + .build(); + + // @formatter:off + MessageHeaders headers = new MessageHeaders(Map.of( + KafkaHeaders.TOPIC, "book.returned", + "X-Correlation-Id", "abc-123-def", + "X-Source-System", "library-service", + "X-Event-Type", "BOOK_RETURNED" + )); + // @formatter:on + + Message msg = MessageBuilder.createMessage(payload, headers); + kafkaTemplate.send(msg); + } + +} diff --git a/producer_kafka_avro/src/main/java/com/example/kafka/producer/KafkaAvroProducerApplication.java b/producer_kafka_avro/src/main/java/com/example/kafka/producer/KafkaAvroProducerApplication.java new file mode 100644 index 00000000..89e1d0a9 --- /dev/null +++ b/producer_kafka_avro/src/main/java/com/example/kafka/producer/KafkaAvroProducerApplication.java @@ -0,0 +1,13 @@ +package com.example.kafka.producer; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class KafkaAvroProducerApplication { + + public static void main(String[] args) { + SpringApplication.run(KafkaAvroProducerApplication.class, args); + } + +} diff --git a/producer_kafka_avro/src/main/resources/application.yml b/producer_kafka_avro/src/main/resources/application.yml new file mode 100644 index 00000000..c4dd6b86 --- /dev/null +++ b/producer_kafka_avro/src/main/resources/application.yml @@ -0,0 +1,19 @@ +spring: + application-name: kafka-avro-producer + kafka: + bootstrap-servers: localhost:9092 + producer: + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer + # consumer settings below are only used by the contract tests + # (KafkaMessageVerifier in AvroBinary/JsonContractTestBase consumes + # the produced message to verify it against the contract) + consumer: + group-id: kafka-avro-consumer-group + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer + auto-offset-reset: earliest + properties: + specific.avro.reader: true + properties: + schema.registry.url: mock://test diff --git a/producer_kafka_avro/src/main/resources/avro/Book.avsc b/producer_kafka_avro/src/main/resources/avro/Book.avsc new file mode 100644 index 00000000..c118b8bf --- /dev/null +++ b/producer_kafka_avro/src/main/resources/avro/Book.avsc @@ -0,0 +1,19 @@ +{ + "type": "record", + "name": "Book", + "namespace": "com.example.kafka.avro", + "fields": [ + { + "name": "isbn", + "type": "string" + }, + { + "name": "title", + "type": "string" + }, + { + "name": "author", + "type": "string" + } + ] +} diff --git a/producer_kafka_avro/src/test/java/com/example/kafka/producer/AvroBinaryContractTestBase.java b/producer_kafka_avro/src/test/java/com/example/kafka/producer/AvroBinaryContractTestBase.java new file mode 100644 index 00000000..ee632dfb --- /dev/null +++ b/producer_kafka_avro/src/test/java/com/example/kafka/producer/AvroBinaryContractTestBase.java @@ -0,0 +1,106 @@ +package com.example.kafka.producer; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.junit.jupiter.api.Tag; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.cloud.contract.verifier.converter.YamlContract; +import org.springframework.cloud.contract.verifier.messaging.MessageVerifierReceiver; +import org.springframework.cloud.contract.verifier.messaging.boot.AutoConfigureMessageVerifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Profile; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.JsonKafkaHeaderMapper; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.kafka.ConfluentKafkaContainer; +import org.testcontainers.utility.DockerImageName; + +@Tag("kafka-avro") +@Testcontainers +@SpringBootTest( + webEnvironment = SpringBootTest.WebEnvironment.NONE, + classes = { KafkaAvroProducerApplication.class, AvroBinaryContractTestBase.TestConfig.class }, + properties = "spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer") +@AutoConfigureMessageVerifier +@ActiveProfiles({ "contracts", "avro-binary" }) +public class AvroBinaryContractTestBase { + + @Autowired + private BookService bookService; + + @Container + static ConfluentKafkaContainer kafka = new ConfluentKafkaContainer( + DockerImageName.parse("confluentinc/cp-kafka")); + + @DynamicPropertySource + static void kafkaProperties(DynamicPropertyRegistry registry) { + registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers); + } + + public void publishBookReturned() { + bookService.bookReturned("978-1234567890", "Contract Testing for Dummies", "John Doe"); + } + + @Configuration + static class TestConfig { + + @Bean + @Profile("avro-binary") + KafkaMessageVerifier kafkaMessageVerifier() { + return new KafkaMessageVerifier(); + } + + } + + static class KafkaMessageVerifier implements MessageVerifierReceiver> { + + private final Map>> broker = new ConcurrentHashMap<>(); + + @Override + public Message receive(String destination, long timeout, TimeUnit timeUnit, + YamlContract contract) { + try { + broker.putIfAbsent(destination, new ArrayBlockingQueue<>(1)); + return broker.get(destination).poll(timeout, timeUnit); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @KafkaListener(topics = { "book.returned" }) + public void listen(ConsumerRecord payload, + @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { + Map headers = new HashMap<>(); + new JsonKafkaHeaderMapper().toHeaders(payload.headers(), headers); + + broker.putIfAbsent(topic, new ArrayBlockingQueue<>(1)); + broker.get(topic).add(MessageBuilder.createMessage(payload.value(), + new MessageHeaders(headers))); + } + + @Override + public Message receive(String destination, YamlContract contract) { + return receive(destination, 15, TimeUnit.SECONDS, contract); + } + + } + +} \ No newline at end of file diff --git a/producer_kafka_avro/src/test/java/com/example/kafka/producer/AvroBinaryTestFixture.java b/producer_kafka_avro/src/test/java/com/example/kafka/producer/AvroBinaryTestFixture.java new file mode 100644 index 00000000..6907d52d --- /dev/null +++ b/producer_kafka_avro/src/test/java/com/example/kafka/producer/AvroBinaryTestFixture.java @@ -0,0 +1,64 @@ +package com.example.kafka.producer; + +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Map; + +import com.example.kafka.avro.Book; +import io.confluent.kafka.serializers.KafkaAvroSerializer; + +/** + * Generates {@code bookReturnedBinary.bin} — a pre-serialized Avro fixture used by the + * {@code bookReturnedBinary.groovy} contract. + * + *

Run {@code main} once from the project root whenever the {@code Book} schema or the + * test data changes, then commit the updated {@code .bin} file. The contract verifier + * compares the raw bytes received on the Kafka topic against this file, so no Avro + * schema knowledge is required in the contract itself. + * + *

The output uses the Confluent wire format: + * {@code [0x00][4-byte schema ID][Avro binary payload]}. + */ +class AvroBinaryTestFixture { + + public static void main(String[] args) throws Exception { + var payload = Book.newBuilder() + .setIsbn("978-1234567890") + .setTitle("Contract Testing for Dummies") + .setAuthor("John Doe") + .build(); + + var bytes = toAvroBytes("book.returned", payload, "mock://test"); + var baseDir = System.getProperty("project.basedir", System.getProperty("user.dir")); + var path = Paths.get(baseDir, "src/test/resources/contracts/binary/bookReturnedMessage.bin"); + + saveFile(bytes, path); + } + + private static byte[] toAvroBytes(String topic, Object payload, + String schemaRegistryUrl) { + try (KafkaAvroSerializer serializer = new KafkaAvroSerializer()) { + serializer.configure(Map.of("schema.registry.url", schemaRegistryUrl), false); + return serializer.serialize(topic, payload); + } + } + + private static void saveFile(byte[] bytes, Path output) throws IOException { + if (!Files.exists(output)) { + Files.createDirectories(output.getParent()); + Files.createFile(output); + } + else { + throw new IOException("Output file already exists: " + output.toAbsolutePath()); + } + + try (FileOutputStream fos = new FileOutputStream(output.toFile())) { + fos.write(bytes); + } + System.out.printf("Written %d bytes to %s%n", bytes.length, output.toAbsolutePath()); + } + +} diff --git a/producer_kafka_avro/src/test/java/com/example/kafka/producer/AvroJsonContractTestBase.java b/producer_kafka_avro/src/test/java/com/example/kafka/producer/AvroJsonContractTestBase.java new file mode 100644 index 00000000..51a67dfd --- /dev/null +++ b/producer_kafka_avro/src/test/java/com/example/kafka/producer/AvroJsonContractTestBase.java @@ -0,0 +1,129 @@ +package com.example.kafka.producer; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.junit.jupiter.api.Tag; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.cloud.contract.verifier.converter.YamlContract; +import org.springframework.cloud.contract.verifier.messaging.MessageVerifierReceiver; +import org.springframework.cloud.contract.verifier.messaging.boot.AutoConfigureMessageVerifier; +import org.springframework.cloud.contract.verifier.messaging.internal.ContractVerifierObjectMapper; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; +import org.springframework.context.annotation.Profile; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.JsonKafkaHeaderMapper; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.kafka.ConfluentKafkaContainer; +import org.testcontainers.utility.DockerImageName; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +import tools.jackson.databind.json.JsonMapper; + +@Tag("kafka-avro") +@Testcontainers +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, classes = { + KafkaAvroProducerApplication.class, AvroJsonContractTestBase.TestConfig.class }) +@AutoConfigureMessageVerifier +@ActiveProfiles({ "contracts", "avro-json" }) +public class AvroJsonContractTestBase { + + @Autowired + private BookService bookService; + + @Container + static ConfluentKafkaContainer kafka = new ConfluentKafkaContainer( + DockerImageName.parse("confluentinc/cp-kafka")); + + @DynamicPropertySource + static void kafkaProperties(DynamicPropertyRegistry registry) { + registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers); + } + + public void publishBookReturned() { + bookService.bookReturned("978-1234567890", "Contract Testing for Dummies", + "John Doe"); + } + + @Configuration + static class TestConfig { + + @Bean + @Profile("avro-json") + KafkaMessageVerifier kafkaTemplateMessageVerifier() { + return new KafkaMessageVerifier(); + } + + // TODO: remove this workaround after merging: + // https://github.com/spring-cloud/spring-cloud-contract/pull/2405 + @Bean + @Primary + ContractVerifierObjectMapper contractVerifierObjectMapper() { + var json = JsonMapper.builder() + .addMixIn(SpecificRecordBase.class, AvroIgnoreMixin.class).build(); + return new ContractVerifierObjectMapper(json); + } + + @JsonIgnoreProperties({ "schema", "specificData", "classSchema", "conversion" }) + interface AvroIgnoreMixin { + + } + + } + + static class KafkaMessageVerifier implements MessageVerifierReceiver> { + + private final Map>> broker = new ConcurrentHashMap<>(); + + @Override + public Message receive(String destination, long timeout, TimeUnit timeUnit, + YamlContract contract) { + try { + broker.putIfAbsent(destination, new ArrayBlockingQueue<>(1)); + var messageQueue = broker.get(destination); + return messageQueue.poll(timeout, timeUnit); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @KafkaListener(topics = { "book.returned" }) + public void listen(ConsumerRecord payload, + @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { + Map headers = new HashMap<>(); + new JsonKafkaHeaderMapper().toHeaders(payload.headers(), headers); + + broker.putIfAbsent(topic, new ArrayBlockingQueue<>(1)); + var messageQueue = broker.get(topic); + messageQueue.add(MessageBuilder.createMessage(payload.value(), + new MessageHeaders(headers))); + } + + @Override + public Message receive(String destination, YamlContract contract) { + return receive(destination, 15, TimeUnit.SECONDS, contract); + } + + } + +} diff --git a/producer_kafka_avro/src/test/resources/contracts/binary/bookReturnedBinary.groovy b/producer_kafka_avro/src/test/resources/contracts/binary/bookReturnedBinary.groovy new file mode 100644 index 00000000..6ba53ef2 --- /dev/null +++ b/producer_kafka_avro/src/test/resources/contracts/binary/bookReturnedBinary.groovy @@ -0,0 +1,34 @@ +/* + * Copyright 2013-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.springframework.cloud.contract.spec.Contract + +Contract.make { + description 'Should publish a book returned event to Kafka (verified via pre-serialized Avro binary)' + label 'book_returned_binary' + input { + triggeredBy('publishBookReturned()') + } + outputMessage { + sentTo('book.returned') + headers { + header('X-Correlation-Id', 'abc-123-def') + header('X-Source-System', 'library-service') + header('X-Event-Type', 'BOOK_RETURNED') + } + body(fileAsBytes("bookReturnedMessage.bin")) + } +} diff --git a/producer_kafka_avro/src/test/resources/contracts/binary/bookReturnedMessage.bin b/producer_kafka_avro/src/test/resources/contracts/binary/bookReturnedMessage.bin new file mode 100644 index 00000000..2d276ac8 Binary files /dev/null and b/producer_kafka_avro/src/test/resources/contracts/binary/bookReturnedMessage.bin differ diff --git a/producer_kafka_avro/src/test/resources/contracts/json/bookReturnedJson.groovy b/producer_kafka_avro/src/test/resources/contracts/json/bookReturnedJson.groovy new file mode 100644 index 00000000..ac6aa458 --- /dev/null +++ b/producer_kafka_avro/src/test/resources/contracts/json/bookReturnedJson.groovy @@ -0,0 +1,38 @@ +/* + * Copyright 2013-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.springframework.cloud.contract.spec.Contract + +Contract.make { + description 'Should publish a book returned event to Kafka' + label 'book_returned' + input { + triggeredBy('publishBookReturned()') + } + outputMessage { + sentTo('book.returned') + headers { + header('X-Correlation-Id', 'abc-123-def') + header('X-Source-System', 'library-service') + header('X-Event-Type', 'BOOK_RETURNED') + } + body( + isbn: '978-1234567890', + title: 'Contract Testing for Dummies', + author: 'John Doe' + ) + } +}