diff --git a/consumer_kafka_avro/README.md b/consumer_kafka_avro/README.md
new file mode 100644
index 00000000..f1731654
--- /dev/null
+++ b/consumer_kafka_avro/README.md
@@ -0,0 +1,90 @@
+# Kafka + Avro Consumer — Spring Cloud Contract samples
+
+This module contains a Spring Boot application that listens to the `book.returned` Kafka topic.
+When a message arrives, it is deserialized from Avro into a `Book` object
+and passed to `EmailService` to send a notification email.
+
+**Source:** [`BooksReturnedListener`](src/main/java/com/example/kafka/consumer/BooksReturnedListener.java)
+
+---
+
+## What is a collaboration test (consumer side)?
+
+A collaboration test verifies that the consumer can correctly handle the messages
+that the producer promised to publish.
+Instead of running a live producer,
+it uses the **stubs jar** produced by the producer's contract tests.
+Spring Cloud Contract's Stub Runner loads that jar,
+and the test triggers a specific message label to replay a realistic message on the Kafka topic.
+The consumer processes it just as it would in production.
+
+**Prerequisite:** build and install the producer stubs first:
+
+---
+
+## Background: how Avro works on the wire
+
+Confluent's serializer writes a compact 5-byte prefix before each message —
+one magic byte (`0x00`) plus a 4-byte schema ID —
+and registers the schema in a **Schema Registry**.
+The deserializer reads that ID, fetches the schema, and decodes the rest of the bytes.
+Tests in this module use a mock schema registry (`mock://test`)
+so no real registry server is needed.
+
+---
+
+## Two flavors of collaboration test
+
+The producer module defines two contracts, one per flavor.
+There is a matching collaboration test here for each.
+
+### Flavor 1 — JSON (human-readable)
+
+**Test:** [`AvroJsonCollaborationTest`](src/test/java/com/example/kafka/consumer/AvroJsonCollaborationTest.java)
+**Triggered label:** `book_returned`
+
+**How the test works:**
+
+1. Stub Runner triggers the `book_returned` label from the stubs jar.
+2. The test's `MessageVerifierSender` receives a JSON string from the stub
+ (the contract body fields),
+ converts it into a `Book` object,
+ and sends it to Kafka using `KafkaAvroSerializer`.
+3. `BooksReturnedListener` receives the Avro-deserialized `Book`
+ and calls `EmailService.sendEmail()`.
+4. The test asserts that `EmailService` was called with the expected email content.
+
+**Trade-off:** Two extra JSON ↔ Avro conversions happen during the test,
+but failure messages are easy to read —
+you see exactly which field had the wrong value.
+
+---
+
+### Flavor 2 — Binary (exact wire format)
+
+**Test:** [`AvroBinaryCollaborationTest`](src/test/java/com/example/kafka/consumer/AvroBinaryCollaborationTest.java)
+**Triggered label:** `book_returned_binary`
+
+**How the test works:**
+
+1. Stub Runner triggers the `book_returned_binary` label,
+ which delivers the raw bytes from the producer's pre-serialized
+ [`bookReturnedMessage.bin`](../producer_kafka_avro/src/test/resources/contracts/binary/bookReturnedMessage.bin) fixture.
+2. The test's `MessageVerifierSender` puts those bytes directly on the Kafka topic
+ using `ByteArraySerializer` —
+ no Avro serialization happens in the test itself.
+3. `BooksReturnedListener` receives the bytes,
+ `KafkaAvroDeserializer` decodes them into a `Book`,
+ and `EmailService.sendEmail()` is called.
+4. The test asserts that `EmailService` was called with the expected email content.
+
+**Trade-off:** The exact bytes that the producer emits in production
+travel through the consumer's full deserialization stack,
+with no intermediary conversion.
+The downside is that failure messages show raw bytes and are harder to interpret.
+
+> **Note:** Before the raw bytes can be deserialized,
+> the `Book` Avro schema must be registered in the mock schema registry for this JVM.
+> The test's `TestConfig` handles this automatically via a `@PostConstruct` method —
+> see [`AvroBinaryCollaborationTest.TestConfig#registerBookSchema`](src/test/java/com/example/kafka/consumer/AvroBinaryCollaborationTest.java)
+> for the explanation.
diff --git a/consumer_kafka_avro/pom.xml b/consumer_kafka_avro/pom.xml
new file mode 100644
index 00000000..b95dfe5d
--- /dev/null
+++ b/consumer_kafka_avro/pom.xml
@@ -0,0 +1,170 @@
+
+
+ 4.0.0
+
+ org.springframework.boot
+ spring-boot-starter-parent
+ 4.0.1
+
+
+
+ org.springframework.cloud
+ consumer_kafka_avro
+ 5.0.2-SNAPSHOT
+ jar
+
+
+ 8.1.1
+ 2025.1.0
+ 1.12.0
+
+
+
+
+ confluent
+ Confluent Maven Repository
+ https://packages.confluent.io/maven/
+
+
+ spring-snapshots
+ Spring Snapshots
+ https://repo.spring.io/snapshot
+
+ true
+
+
+
+
+
+
+ spring-snapshots
+ Spring Snapshots
+ https://repo.spring.io/snapshot
+
+ true
+
+
+
+ spring-plugin-snapshots
+ Spring Snapshots
+ https://repo.spring.io/snapshot
+
+ true
+
+
+
+
+
+
+
+ org.springframework.cloud
+ spring-cloud-dependencies
+ ${spring-cloud.version}
+ pom
+ import
+
+
+ org.testcontainers
+ testcontainers-bom
+ 1.20.4
+ pom
+ import
+
+
+ org.springframework.boot
+ spring-boot-starter-kafka
+ 4.0.1
+
+
+ io.confluent
+ kafka-avro-serializer
+ ${kafka-avro-serializer.version}
+
+
+
+
+
+
+ org.apache.avro
+ avro
+ ${avro.version}
+
+
+ org.springframework.boot
+ spring-boot-starter
+
+
+ org.springframework.boot
+ spring-boot-starter-kafka
+
+
+ io.confluent
+ kafka-avro-serializer
+
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+ org.springframework.cloud
+ spring-cloud-starter-contract-stub-runner
+ test
+
+
+
+ org.testcontainers
+ testcontainers
+ test
+
+
+ org.testcontainers
+ junit-jupiter
+ test
+
+
+ org.testcontainers
+ kafka
+ test
+
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+ 4.0.1
+
+
+
+
+
+ org.apache.avro
+ avro-maven-plugin
+ ${avro.version}
+
+
+ generate-sources
+
+ schema
+
+
+ ${project.basedir}/src/main/resources/avro
+ ${project.build.directory}/generated-sources/avro
+ String
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+
+
diff --git a/consumer_kafka_avro/src/main/java/com/example/kafka/consumer/BooksReturnedListener.java b/consumer_kafka_avro/src/main/java/com/example/kafka/consumer/BooksReturnedListener.java
new file mode 100644
index 00000000..d14896ed
--- /dev/null
+++ b/consumer_kafka_avro/src/main/java/com/example/kafka/consumer/BooksReturnedListener.java
@@ -0,0 +1,30 @@
+package com.example.kafka.consumer;
+
+import com.example.kafka.avro.Book;
+
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Component;
+
+@Component
+class BooksReturnedListener {
+
+ private final EmailService emailService;
+
+ BooksReturnedListener(EmailService emailService) {
+ this.emailService = emailService;
+ }
+
+ @KafkaListener(topics = "book.returned")
+ public void sendEmailOnBookReturned(Book book) {
+ String emailBody = """
+ Dear User,
+
+ The book you borrowed has been successfully returned:
+ Title: %s, Author: %s, ISBN: %s
+
+ """.formatted(book.getTitle(), book.getAuthor(), book.getIsbn());
+
+ emailService.sendEmail(emailBody);
+ }
+
+}
diff --git a/consumer_kafka_avro/src/main/java/com/example/kafka/consumer/EmailService.java b/consumer_kafka_avro/src/main/java/com/example/kafka/consumer/EmailService.java
new file mode 100644
index 00000000..5af1bd69
--- /dev/null
+++ b/consumer_kafka_avro/src/main/java/com/example/kafka/consumer/EmailService.java
@@ -0,0 +1,13 @@
+package com.example.kafka.consumer;
+
+import org.springframework.stereotype.Service;
+
+@Service
+public class EmailService {
+
+ public void sendEmail(String emailBody) {
+ // Simulate sending an email
+ System.out.println("Sending email:\n" + emailBody);
+ }
+
+}
diff --git a/consumer_kafka_avro/src/main/java/com/example/kafka/consumer/KafkaAvroConsumerApplication.java b/consumer_kafka_avro/src/main/java/com/example/kafka/consumer/KafkaAvroConsumerApplication.java
new file mode 100644
index 00000000..b75faa5e
--- /dev/null
+++ b/consumer_kafka_avro/src/main/java/com/example/kafka/consumer/KafkaAvroConsumerApplication.java
@@ -0,0 +1,13 @@
+package com.example.kafka.consumer;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class KafkaAvroConsumerApplication {
+
+ public static void main(String[] args) {
+ SpringApplication.run(KafkaAvroConsumerApplication.class, args);
+ }
+
+}
diff --git a/consumer_kafka_avro/src/main/resources/application.yml b/consumer_kafka_avro/src/main/resources/application.yml
new file mode 100644
index 00000000..805e229d
--- /dev/null
+++ b/consumer_kafka_avro/src/main/resources/application.yml
@@ -0,0 +1,19 @@
+spring:
+ application-name: kafka-avro-consumer
+ kafka:
+ bootstrap-servers: localhost:9092
+ # producer settings below are only used by the collaboration tests
+ # (MessageVerifierSender in AvroBinary/JsonCollaborationTest publishes
+ # the stub-triggered message onto Kafka)
+ producer:
+ key-serializer: org.apache.kafka.common.serialization.StringSerializer
+ value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
+ consumer:
+ group-id: kafka-avro-consumer-group
+ key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+ value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
+ auto-offset-reset: earliest
+ properties:
+ specific.avro.reader: true
+ properties:
+ schema.registry.url: mock://test
diff --git a/consumer_kafka_avro/src/main/resources/avro/Book.avsc b/consumer_kafka_avro/src/main/resources/avro/Book.avsc
new file mode 100644
index 00000000..c118b8bf
--- /dev/null
+++ b/consumer_kafka_avro/src/main/resources/avro/Book.avsc
@@ -0,0 +1,19 @@
+{
+ "type": "record",
+ "name": "Book",
+ "namespace": "com.example.kafka.avro",
+ "fields": [
+ {
+ "name": "isbn",
+ "type": "string"
+ },
+ {
+ "name": "title",
+ "type": "string"
+ },
+ {
+ "name": "author",
+ "type": "string"
+ }
+ ]
+}
diff --git a/consumer_kafka_avro/src/test/java/com/example/kafka/consumer/AvroBinaryCollaborationTest.java b/consumer_kafka_avro/src/test/java/com/example/kafka/consumer/AvroBinaryCollaborationTest.java
new file mode 100644
index 00000000..7bf6ec8c
--- /dev/null
+++ b/consumer_kafka_avro/src/test/java/com/example/kafka/consumer/AvroBinaryCollaborationTest.java
@@ -0,0 +1,146 @@
+package com.example.kafka.consumer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import com.example.kafka.avro.Book;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
+import org.springframework.boot.kafka.autoconfigure.KafkaProperties;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.kafka.ConfluentKafkaContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.system.OutputCaptureExtension;
+import org.springframework.cloud.contract.stubrunner.StubTrigger;
+import org.springframework.cloud.contract.stubrunner.spring.AutoConfigureStubRunner;
+import org.springframework.cloud.contract.stubrunner.spring.StubRunnerProperties;
+import org.springframework.cloud.contract.verifier.converter.YamlContract;
+import org.springframework.cloud.contract.verifier.messaging.MessageVerifierSender;
+import jakarta.annotation.PostConstruct;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.test.context.DynamicPropertyRegistry;
+import org.springframework.test.context.DynamicPropertySource;
+import org.springframework.test.context.bean.override.mockito.MockitoBean;
+
+import static java.util.Collections.emptyMap;
+import static org.awaitility.Awaitility.await;
+import static org.mockito.ArgumentMatchers.contains;
+import static org.mockito.Mockito.verify;
+
+@Tag("kafka-avro")
+@Testcontainers
+@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = {
+ AvroBinaryCollaborationTest.TestConfig.class, KafkaAvroConsumerApplication.class })
+@AutoConfigureStubRunner(ids = "org.springframework.cloud:producer_kafka_avro:+:stubs", stubsMode = StubRunnerProperties.StubsMode.LOCAL)
+@ExtendWith(OutputCaptureExtension.class)
+class AvroBinaryCollaborationTest {
+
+ @Autowired
+ StubTrigger trigger;
+
+ @MockitoBean
+ EmailService emailService;
+
+ @Container
+ static ConfluentKafkaContainer kafka = new ConfluentKafkaContainer(
+ DockerImageName.parse("confluentinc/cp-kafka"));
+
+ @DynamicPropertySource
+ static void kafkaProperties(DynamicPropertyRegistry registry) {
+ registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
+ }
+
+ @Test
+ void shouldSendEmail_onBookReturned() {
+ trigger.trigger("book_returned_binary");
+
+ // @formatter:off
+ await().untilAsserted(() ->
+ verify(emailService).sendEmail(
+ contains("Title: Contract Testing for Dummies, Author: John Doe, ISBN: 978-1234567890")));
+ // @formatter:on
+ }
+
+ @Configuration
+ static class TestConfig {
+
+ @Autowired
+ private KafkaProperties kafkaProperties;
+
+ /**
+ * Pre-registers the {@link Book} Avro schema in the in-memory mock schema registry.
+ *
+ *
Confluent's Avro wire format prefixes every message with a 5-byte header:
+ * a magic byte ({@code 0x00}) followed by a 4-byte schema ID. The consumer's
+ * {@code KafkaAvroDeserializer} reads that ID and fetches the matching schema from
+ * the registry to decode the rest of the bytes.
+ *
+ *
The {@code .bin} fixture was produced by
+ * {@code com.example.kafka.producer.AvroBinaryTestFixture} in a
+ * separate JVM run, so the schema it embedded (ID 1) no longer exists in this JVM's
+ * mock registry. Serializing a dummy {@link Book} here forces the serializer to
+ * register the schema, which assigns it ID 1 — the same ID the deserializer will
+ * look up when the fixture bytes arrive on the topic.
+ */
+ @PostConstruct
+ void registerBookSchema() {
+ try (var serializer = new KafkaAvroSerializer()) {
+ serializer.configure(kafkaProperties.buildProducerProperties(), false);
+ serializer.serialize("book.returned", Book.newBuilder()
+ .setIsbn("").setTitle("").setAuthor("").build());
+ }
+ }
+
+ @Bean
+ MessageVerifierSender> standaloneMessageVerifier() {
+ Map producerProps = new HashMap<>(kafkaProperties.buildProducerProperties());
+ producerProps.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+ org.apache.kafka.common.serialization.ByteArraySerializer.class);
+ var factory = new DefaultKafkaProducerFactory(producerProps);
+ return new AvroBinaryMessageVerifierSender<>(new KafkaTemplate<>(factory));
+ }
+
+ }
+
+ static class AvroBinaryMessageVerifierSender implements MessageVerifierSender {
+
+ private final KafkaTemplate kafkaTemplate;
+
+ @Override
+ public void send(M message, String destination, YamlContract contract) {
+ send(message, emptyMap(), destination, contract);
+ }
+
+ @Override
+ public void send(T payload, Map headers, String destination,
+ YamlContract contract) {
+ Map newHeaders = headers != null ? new HashMap<>(headers) : new HashMap<>();
+ newHeaders.put(KafkaHeaders.TOPIC, destination);
+ MessageHeaders msgHeaders = new MessageHeaders(newHeaders);
+
+ // payload is already Confluent wire-format bytes from the .bin fixture —
+ // send raw so the consumer's KafkaAvroDeserializer can decode them directly
+ var message = MessageBuilder.createMessage((byte[]) payload, msgHeaders);
+ kafkaTemplate.send(message);
+ }
+
+ AvroBinaryMessageVerifierSender(KafkaTemplate kafkaTemplate) {
+ this.kafkaTemplate = kafkaTemplate;
+ }
+
+ }
+
+}
diff --git a/consumer_kafka_avro/src/test/java/com/example/kafka/consumer/AvroJsonCollaborationTest.java b/consumer_kafka_avro/src/test/java/com/example/kafka/consumer/AvroJsonCollaborationTest.java
new file mode 100644
index 00000000..38498bec
--- /dev/null
+++ b/consumer_kafka_avro/src/test/java/com/example/kafka/consumer/AvroJsonCollaborationTest.java
@@ -0,0 +1,121 @@
+package com.example.kafka.consumer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.example.kafka.avro.Book;
+
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.kafka.ConfluentKafkaContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import wiremock.com.fasterxml.jackson.core.JsonProcessingException;
+import wiremock.com.fasterxml.jackson.databind.json.JsonMapper;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.system.OutputCaptureExtension;
+import org.springframework.cloud.contract.stubrunner.StubTrigger;
+import org.springframework.cloud.contract.stubrunner.spring.AutoConfigureStubRunner;
+import org.springframework.cloud.contract.stubrunner.spring.StubRunnerProperties;
+import org.springframework.cloud.contract.verifier.converter.YamlContract;
+import org.springframework.cloud.contract.verifier.messaging.MessageVerifierSender;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.test.context.DynamicPropertyRegistry;
+import org.springframework.test.context.DynamicPropertySource;
+import org.springframework.test.context.bean.override.mockito.MockitoBean;
+
+import static java.util.Collections.emptyMap;
+import static org.awaitility.Awaitility.await;
+import static org.mockito.ArgumentMatchers.contains;
+import static org.mockito.Mockito.verify;
+
+@Tag("kafka-avro")
+@Testcontainers
+@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = {
+ AvroJsonCollaborationTest.TestConfig.class, KafkaAvroConsumerApplication.class })
+@AutoConfigureStubRunner(ids = "org.springframework.cloud:producer_kafka_avro:+:stubs", stubsMode = StubRunnerProperties.StubsMode.LOCAL)
+@ExtendWith(OutputCaptureExtension.class)
+class AvroJsonCollaborationTest {
+
+ @Autowired
+ StubTrigger trigger;
+
+ @MockitoBean
+ EmailService emailService;
+
+ @Container
+ static ConfluentKafkaContainer kafka = new ConfluentKafkaContainer(
+ DockerImageName.parse("confluentinc/cp-kafka"));
+
+ @DynamicPropertySource
+ static void kafkaProperties(DynamicPropertyRegistry registry) {
+ registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
+ }
+
+ @Test
+ void shouldSendEmail_onBookReturned() {
+ trigger.trigger("book_returned");
+
+ // @formatter:off
+ await().untilAsserted(() ->
+ verify(emailService).sendEmail(
+ contains("Title: Contract Testing for Dummies, Author: John Doe, ISBN: 978-1234567890")));
+ // @formatter:on
+ }
+
+ @Configuration
+ static class TestConfig {
+
+ @Bean
+ MessageVerifierSender> standaloneMessageVerifier(KafkaTemplate kafkaTemplate) {
+ return new KafkaAvroMessageVerifierSender<>(kafkaTemplate);
+ }
+
+ }
+
+ static class KafkaAvroMessageVerifierSender implements MessageVerifierSender {
+
+ private final KafkaTemplate kafkaTemplate;
+
+ @Override
+ public void send(M message, String destination, YamlContract contract) {
+ send(message, emptyMap(), destination, contract);
+ }
+
+ @Override
+ public void send(T payload, Map headers, String destination,
+ YamlContract contract) {
+ Map newHeaders = headers != null ? new HashMap<>(headers) : new HashMap<>();
+ newHeaders.put(KafkaHeaders.TOPIC, destination);
+ MessageHeaders msgHeaders = new MessageHeaders(newHeaders);
+
+ try {
+ // TODO: remove this workaround after merging:
+ // https://github.com/spring-cloud/spring-cloud-contract/issues/2404
+ Book avroPayload = new JsonMapper().readValue(payload.toString(), Book.class);
+ var message = MessageBuilder.createMessage(avroPayload, msgHeaders);
+ kafkaTemplate.send(message);
+ }
+ catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ KafkaAvroMessageVerifierSender(KafkaTemplate kafkaTemplate) {
+ this.kafkaTemplate = kafkaTemplate;
+ }
+
+ }
+
+}
diff --git a/pom.xml b/pom.xml
index f893fb48..c7eb99d1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -68,6 +68,7 @@
producer_with_latest_2_2_features
producer_java
+ producer_kafka_avro
producer_kafka_middleware
producer_rabbit_middleware
@@ -83,6 +84,7 @@
consumer_with_latest_2_2_features
consumer_java
+ consumer_kafka_avro
consumer_kafka_middleware
consumer_rabbit_middleware
@@ -135,6 +137,7 @@
producer_with_latest_2_2_features
producer_java
+ producer_kafka_avro
producer_kafka_middleware
producer_rabbit_middleware
@@ -150,6 +153,7 @@
consumer_with_latest_2_2_features
consumer_java
+ consumer_kafka_avro
consumer_kafka_middleware
consumer_rabbit_middleware
diff --git a/producer_kafka_avro/README.md b/producer_kafka_avro/README.md
new file mode 100644
index 00000000..e4439d92
--- /dev/null
+++ b/producer_kafka_avro/README.md
@@ -0,0 +1,104 @@
+# Kafka + Avro Producer — Spring Cloud Contract samples
+
+This module contains a Spring Boot application that publishes a `book.returned` event to a Kafka topic
+whenever a book is returned to a library.
+The message payload is an [Avro](https://avro.apache.org/)-encoded `Book` object,
+serialized using the Confluent `KafkaAvroSerializer`.
+
+**Source:** [`BookService`](src/main/java/com/example/kafka/producer/BookService.java)
+
+---
+
+## What is a contract test (producer side)?
+
+A contract is a description of what a producer promises to publish —
+which topic, which headers, and what the message body looks like.
+Spring Cloud Contract reads the contract, generates a JUnit test,
+and runs it against the real producer code.
+If the producer still publishes the right message, the test passes and a **stubs jar** is produced.
+The consumer module uses that stubs jar to replay realistic messages in its own tests
+without needing a live producer.
+
+---
+
+## Background: how Avro works on the wire
+
+Avro does not embed the schema inside every message.
+Instead, Confluent's serializer registers the schema in a **Schema Registry**
+and writes a compact 5-byte prefix —
+one magic byte (`0x00`) plus a 4-byte schema ID —
+before the actual binary payload.
+The deserializer on the other end reads that ID,
+fetches the schema from the registry,
+and decodes the rest of the bytes.
+
+This matters for testing because there are two natural ways to express the expected message in a contract:
+
+---
+
+## Two flavors of contract
+
+### Flavor 1 — JSON (human-readable)
+
+**Contract:** [`contracts/json/bookReturnedJson.groovy`](src/test/resources/contracts/json/bookReturnedJson.groovy)
+
+The contract body is written as plain JSON fields:
+
+```groovy
+body(
+ isbn: '978-1234567890',
+ title: 'Contract Testing for Dummies',
+ author: 'John Doe'
+)
+```
+
+**How the test works:**
+
+1. SCC calls `publishBookReturned()` on the test base class.
+2. The producer serializes the `Book` to Avro bytes and sends them to the Kafka topic.
+3. The test base class consumes the message via a `KafkaListener`,
+ which deserializes the Avro bytes back into a `Book` object.
+4. SCC serializes that object to JSON and compares it field-by-field against the contract body.
+
+**Trade-off:** Two extra JSON ↔ Avro conversions happen during the test,
+but failure messages are easy to read —
+you see exactly which field had the wrong value.
+
+**Test base class:** [`AvroJsonContractTestBase`](src/test/java/com/example/kafka/producer/AvroJsonContractTestBase.java)
+
+---
+
+### Flavor 2 — Binary (exact wire format)
+
+**Contract:** [`contracts/binary/bookReturnedBinary.groovy`](src/test/resources/contracts/binary/bookReturnedBinary.groovy)
+
+The contract body references a pre-serialized binary file:
+
+```groovy
+body(fileAsBytes("bookReturnedMessage.bin"))
+```
+
+**How the test works:**
+
+1. SCC calls `publishBookReturned()` on the test base class.
+2. The producer serializes the `Book` to Avro bytes and sends them to the Kafka topic.
+3. The test base class consumes the raw bytes
+ (using `ByteArrayDeserializer`, bypassing Avro decoding).
+4. SCC compares those raw bytes byte-for-byte against
+ [`bookReturnedMessage.bin`](src/test/resources/contracts/binary/bookReturnedMessage.bin).
+
+**Trade-off:** The assertion is an exact binary comparison —
+no JSON conversions, no room for serialization drift —
+but failure messages show raw bytes and are harder to interpret.
+
+**Test base class:** [`AvroBinaryContractTestBase`](src/test/java/com/example/kafka/producer/AvroBinaryContractTestBase.java)
+
+#### Regenerating the `.bin` fixture
+
+The `.bin` file must be committed to source control.
+Run [`AvroBinaryTestFixture`](src/test/java/com/example/kafka/producer/AvroBinaryTestFixture.java)
+whenever the `Book` schema or the test data changes to produce a fresh file:
+
+```bash
+./mvnw exec:java -Dexec.mainClass=com.example.kafka.producer.AvroBinaryTestFixture
+```
\ No newline at end of file
diff --git a/producer_kafka_avro/pom.xml b/producer_kafka_avro/pom.xml
new file mode 100644
index 00000000..67825810
--- /dev/null
+++ b/producer_kafka_avro/pom.xml
@@ -0,0 +1,188 @@
+
+
+ 4.0.0
+
+ org.springframework.boot
+ spring-boot-starter-parent
+ 4.0.1
+
+
+
+ org.springframework.cloud
+ producer_kafka_avro
+ 5.0.2-SNAPSHOT
+ jar
+
+
+ 8.1.1
+ 2025.1.0
+ 1.12.0
+
+
+
+
+ confluent
+ Confluent Maven Repository
+ https://packages.confluent.io/maven/
+
+
+ spring-snapshots
+ Spring Snapshots
+ https://repo.spring.io/snapshot
+
+ true
+
+
+
+
+
+
+ spring-snapshots
+ Spring Snapshots
+ https://repo.spring.io/snapshot
+
+ true
+
+
+
+ spring-plugin-snapshots
+ Spring Snapshots
+ https://repo.spring.io/snapshot
+
+ true
+
+
+
+
+
+
+
+ org.springframework.cloud
+ spring-cloud-dependencies
+ ${spring-cloud.version}
+ pom
+ import
+
+
+ org.testcontainers
+ testcontainers-bom
+ 1.20.4
+ pom
+ import
+
+
+ org.springframework.boot
+ spring-boot-starter-kafka
+ 4.0.1
+
+
+ io.confluent
+ kafka-avro-serializer
+ ${kafka-avro-serializer.version}
+
+
+
+
+
+
+ org.apache.avro
+ avro
+ ${avro.version}
+
+
+ org.springframework.boot
+ spring-boot-starter
+
+
+ org.springframework.boot
+ spring-boot-starter-kafka
+
+
+ io.confluent
+ kafka-avro-serializer
+
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+ org.springframework.cloud
+ spring-cloud-contract-verifier
+ test
+
+
+
+ org.testcontainers
+ testcontainers
+ test
+
+
+ org.testcontainers
+ junit-jupiter
+ test
+
+
+ org.testcontainers
+ kafka
+ test
+
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+ 4.0.1
+
+
+
+
+
+ org.apache.avro
+ avro-maven-plugin
+ ${avro.version}
+
+
+ generate-sources
+
+ schema
+
+
+ ${project.basedir}/src/main/resources/avro
+ ${project.build.directory}/generated-sources/avro
+ String
+
+
+
+
+
+ org.springframework.cloud
+ spring-cloud-contract-maven-plugin
+ true
+
+ JUNIT5
+
+
+ .*json.*
+ com.example.kafka.producer.AvroJsonContractTestBase
+
+
+ .*binary.*
+ com.example.kafka.producer.AvroBinaryContractTestBase
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+
+
diff --git a/producer_kafka_avro/src/main/java/com/example/kafka/producer/BookService.java b/producer_kafka_avro/src/main/java/com/example/kafka/producer/BookService.java
new file mode 100644
index 00000000..15dd4188
--- /dev/null
+++ b/producer_kafka_avro/src/main/java/com/example/kafka/producer/BookService.java
@@ -0,0 +1,40 @@
+package com.example.kafka.producer;
+
+import java.util.Map;
+
+import com.example.kafka.avro.Book;
+
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.stereotype.Service;
+
+@Service
+class BookService {
+
+ private final KafkaTemplate kafkaTemplate;
+
+ BookService(KafkaTemplate kafkaTemplate) {
+ this.kafkaTemplate = kafkaTemplate;
+ }
+
+ void bookReturned(String isbn, String title, String author) {
+ Book payload = Book.newBuilder().setIsbn(isbn).setTitle(title).setAuthor(author)
+ .build();
+
+ // @formatter:off
+ MessageHeaders headers = new MessageHeaders(Map.of(
+ KafkaHeaders.TOPIC, "book.returned",
+ "X-Correlation-Id", "abc-123-def",
+ "X-Source-System", "library-service",
+ "X-Event-Type", "BOOK_RETURNED"
+ ));
+ // @formatter:on
+
+ Message msg = MessageBuilder.createMessage(payload, headers);
+ kafkaTemplate.send(msg);
+ }
+
+}
diff --git a/producer_kafka_avro/src/main/java/com/example/kafka/producer/KafkaAvroProducerApplication.java b/producer_kafka_avro/src/main/java/com/example/kafka/producer/KafkaAvroProducerApplication.java
new file mode 100644
index 00000000..89e1d0a9
--- /dev/null
+++ b/producer_kafka_avro/src/main/java/com/example/kafka/producer/KafkaAvroProducerApplication.java
@@ -0,0 +1,13 @@
+package com.example.kafka.producer;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class KafkaAvroProducerApplication {
+
+ public static void main(String[] args) {
+ SpringApplication.run(KafkaAvroProducerApplication.class, args);
+ }
+
+}
diff --git a/producer_kafka_avro/src/main/resources/application.yml b/producer_kafka_avro/src/main/resources/application.yml
new file mode 100644
index 00000000..c4dd6b86
--- /dev/null
+++ b/producer_kafka_avro/src/main/resources/application.yml
@@ -0,0 +1,19 @@
+spring:
+ application-name: kafka-avro-producer
+ kafka:
+ bootstrap-servers: localhost:9092
+ producer:
+ key-serializer: org.apache.kafka.common.serialization.StringSerializer
+ value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
+ # consumer settings below are only used by the contract tests
+ # (KafkaMessageVerifier in AvroBinary/JsonContractTestBase consumes
+ # the produced message to verify it against the contract)
+ consumer:
+ group-id: kafka-avro-consumer-group
+ key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+ value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
+ auto-offset-reset: earliest
+ properties:
+ specific.avro.reader: true
+ properties:
+ schema.registry.url: mock://test
diff --git a/producer_kafka_avro/src/main/resources/avro/Book.avsc b/producer_kafka_avro/src/main/resources/avro/Book.avsc
new file mode 100644
index 00000000..c118b8bf
--- /dev/null
+++ b/producer_kafka_avro/src/main/resources/avro/Book.avsc
@@ -0,0 +1,19 @@
+{
+ "type": "record",
+ "name": "Book",
+ "namespace": "com.example.kafka.avro",
+ "fields": [
+ {
+ "name": "isbn",
+ "type": "string"
+ },
+ {
+ "name": "title",
+ "type": "string"
+ },
+ {
+ "name": "author",
+ "type": "string"
+ }
+ ]
+}
diff --git a/producer_kafka_avro/src/test/java/com/example/kafka/producer/AvroBinaryContractTestBase.java b/producer_kafka_avro/src/test/java/com/example/kafka/producer/AvroBinaryContractTestBase.java
new file mode 100644
index 00000000..ee632dfb
--- /dev/null
+++ b/producer_kafka_avro/src/test/java/com/example/kafka/producer/AvroBinaryContractTestBase.java
@@ -0,0 +1,106 @@
+package com.example.kafka.producer;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.jupiter.api.Tag;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.cloud.contract.verifier.converter.YamlContract;
+import org.springframework.cloud.contract.verifier.messaging.MessageVerifierReceiver;
+import org.springframework.cloud.contract.verifier.messaging.boot.AutoConfigureMessageVerifier;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Profile;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.support.JsonKafkaHeaderMapper;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.test.context.ActiveProfiles;
+import org.springframework.test.context.DynamicPropertyRegistry;
+import org.springframework.test.context.DynamicPropertySource;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.kafka.ConfluentKafkaContainer;
+import org.testcontainers.utility.DockerImageName;
+
+@Tag("kafka-avro")
+@Testcontainers
+@SpringBootTest(
+ webEnvironment = SpringBootTest.WebEnvironment.NONE,
+ classes = { KafkaAvroProducerApplication.class, AvroBinaryContractTestBase.TestConfig.class },
+ properties = "spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer")
+@AutoConfigureMessageVerifier
+@ActiveProfiles({ "contracts", "avro-binary" })
+public class AvroBinaryContractTestBase {
+
+ @Autowired
+ private BookService bookService;
+
+ @Container
+ static ConfluentKafkaContainer kafka = new ConfluentKafkaContainer(
+ DockerImageName.parse("confluentinc/cp-kafka"));
+
+ @DynamicPropertySource
+ static void kafkaProperties(DynamicPropertyRegistry registry) {
+ registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
+ }
+
+ public void publishBookReturned() {
+ bookService.bookReturned("978-1234567890", "Contract Testing for Dummies", "John Doe");
+ }
+
+ @Configuration
+ static class TestConfig {
+
+ @Bean
+ @Profile("avro-binary")
+ KafkaMessageVerifier kafkaMessageVerifier() {
+ return new KafkaMessageVerifier();
+ }
+
+ }
+
+ static class KafkaMessageVerifier implements MessageVerifierReceiver> {
+
+ private final Map>> broker = new ConcurrentHashMap<>();
+
+ @Override
+ public Message> receive(String destination, long timeout, TimeUnit timeUnit,
+ YamlContract contract) {
+ try {
+ broker.putIfAbsent(destination, new ArrayBlockingQueue<>(1));
+ return broker.get(destination).poll(timeout, timeUnit);
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @KafkaListener(topics = { "book.returned" })
+ public void listen(ConsumerRecord, byte[]> payload,
+ @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
+ Map headers = new HashMap<>();
+ new JsonKafkaHeaderMapper().toHeaders(payload.headers(), headers);
+
+ broker.putIfAbsent(topic, new ArrayBlockingQueue<>(1));
+ broker.get(topic).add(MessageBuilder.createMessage(payload.value(),
+ new MessageHeaders(headers)));
+ }
+
+ @Override
+ public Message> receive(String destination, YamlContract contract) {
+ return receive(destination, 15, TimeUnit.SECONDS, contract);
+ }
+
+ }
+
+}
\ No newline at end of file
diff --git a/producer_kafka_avro/src/test/java/com/example/kafka/producer/AvroBinaryTestFixture.java b/producer_kafka_avro/src/test/java/com/example/kafka/producer/AvroBinaryTestFixture.java
new file mode 100644
index 00000000..6907d52d
--- /dev/null
+++ b/producer_kafka_avro/src/test/java/com/example/kafka/producer/AvroBinaryTestFixture.java
@@ -0,0 +1,64 @@
+package com.example.kafka.producer;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Map;
+
+import com.example.kafka.avro.Book;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
+
+/**
+ * Generates {@code bookReturnedBinary.bin} — a pre-serialized Avro fixture used by the
+ * {@code bookReturnedBinary.groovy} contract.
+ *
+ * Run {@code main} once from the project root whenever the {@code Book} schema or the
+ * test data changes, then commit the updated {@code .bin} file. The contract verifier
+ * compares the raw bytes received on the Kafka topic against this file, so no Avro
+ * schema knowledge is required in the contract itself.
+ *
+ *
The output uses the Confluent wire format:
+ * {@code [0x00][4-byte schema ID][Avro binary payload]}.
+ */
+class AvroBinaryTestFixture {
+
+ public static void main(String[] args) throws Exception {
+ var payload = Book.newBuilder()
+ .setIsbn("978-1234567890")
+ .setTitle("Contract Testing for Dummies")
+ .setAuthor("John Doe")
+ .build();
+
+ var bytes = toAvroBytes("book.returned", payload, "mock://test");
+ var baseDir = System.getProperty("project.basedir", System.getProperty("user.dir"));
+ var path = Paths.get(baseDir, "src/test/resources/contracts/binary/bookReturnedMessage.bin");
+
+ saveFile(bytes, path);
+ }
+
+ private static byte[] toAvroBytes(String topic, Object payload,
+ String schemaRegistryUrl) {
+ try (KafkaAvroSerializer serializer = new KafkaAvroSerializer()) {
+ serializer.configure(Map.of("schema.registry.url", schemaRegistryUrl), false);
+ return serializer.serialize(topic, payload);
+ }
+ }
+
+ private static void saveFile(byte[] bytes, Path output) throws IOException {
+ if (!Files.exists(output)) {
+ Files.createDirectories(output.getParent());
+ Files.createFile(output);
+ }
+ else {
+ throw new IOException("Output file already exists: " + output.toAbsolutePath());
+ }
+
+ try (FileOutputStream fos = new FileOutputStream(output.toFile())) {
+ fos.write(bytes);
+ }
+ System.out.printf("Written %d bytes to %s%n", bytes.length, output.toAbsolutePath());
+ }
+
+}
diff --git a/producer_kafka_avro/src/test/java/com/example/kafka/producer/AvroJsonContractTestBase.java b/producer_kafka_avro/src/test/java/com/example/kafka/producer/AvroJsonContractTestBase.java
new file mode 100644
index 00000000..51a67dfd
--- /dev/null
+++ b/producer_kafka_avro/src/test/java/com/example/kafka/producer/AvroJsonContractTestBase.java
@@ -0,0 +1,129 @@
+package com.example.kafka.producer;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.jupiter.api.Tag;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.cloud.contract.verifier.converter.YamlContract;
+import org.springframework.cloud.contract.verifier.messaging.MessageVerifierReceiver;
+import org.springframework.cloud.contract.verifier.messaging.boot.AutoConfigureMessageVerifier;
+import org.springframework.cloud.contract.verifier.messaging.internal.ContractVerifierObjectMapper;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Primary;
+import org.springframework.context.annotation.Profile;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.support.JsonKafkaHeaderMapper;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.test.context.ActiveProfiles;
+import org.springframework.test.context.DynamicPropertyRegistry;
+import org.springframework.test.context.DynamicPropertySource;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.kafka.ConfluentKafkaContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+
+import tools.jackson.databind.json.JsonMapper;
+
+@Tag("kafka-avro")
+@Testcontainers
+@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, classes = {
+ KafkaAvroProducerApplication.class, AvroJsonContractTestBase.TestConfig.class })
+@AutoConfigureMessageVerifier
+@ActiveProfiles({ "contracts", "avro-json" })
+public class AvroJsonContractTestBase {
+
+ @Autowired
+ private BookService bookService;
+
+ @Container
+ static ConfluentKafkaContainer kafka = new ConfluentKafkaContainer(
+ DockerImageName.parse("confluentinc/cp-kafka"));
+
+ @DynamicPropertySource
+ static void kafkaProperties(DynamicPropertyRegistry registry) {
+ registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
+ }
+
+ public void publishBookReturned() {
+ bookService.bookReturned("978-1234567890", "Contract Testing for Dummies",
+ "John Doe");
+ }
+
+ @Configuration
+ static class TestConfig {
+
+ @Bean
+ @Profile("avro-json")
+ KafkaMessageVerifier kafkaTemplateMessageVerifier() {
+ return new KafkaMessageVerifier();
+ }
+
+ // TODO: remove this workaround after merging:
+ // https://github.com/spring-cloud/spring-cloud-contract/pull/2405
+ @Bean
+ @Primary
+ ContractVerifierObjectMapper contractVerifierObjectMapper() {
+ var json = JsonMapper.builder()
+ .addMixIn(SpecificRecordBase.class, AvroIgnoreMixin.class).build();
+ return new ContractVerifierObjectMapper(json);
+ }
+
+ @JsonIgnoreProperties({ "schema", "specificData", "classSchema", "conversion" })
+ interface AvroIgnoreMixin {
+
+ }
+
+ }
+
+ static class KafkaMessageVerifier implements MessageVerifierReceiver> {
+
+ private final Map>> broker = new ConcurrentHashMap<>();
+
+ @Override
+ public Message> receive(String destination, long timeout, TimeUnit timeUnit,
+ YamlContract contract) {
+ try {
+ broker.putIfAbsent(destination, new ArrayBlockingQueue<>(1));
+ var messageQueue = broker.get(destination);
+ return messageQueue.poll(timeout, timeUnit);
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @KafkaListener(topics = { "book.returned" })
+ public void listen(ConsumerRecord, ?> payload,
+ @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
+ Map headers = new HashMap<>();
+ new JsonKafkaHeaderMapper().toHeaders(payload.headers(), headers);
+
+ broker.putIfAbsent(topic, new ArrayBlockingQueue<>(1));
+ var messageQueue = broker.get(topic);
+ messageQueue.add(MessageBuilder.createMessage(payload.value(),
+ new MessageHeaders(headers)));
+ }
+
+ @Override
+ public Message receive(String destination, YamlContract contract) {
+ return receive(destination, 15, TimeUnit.SECONDS, contract);
+ }
+
+ }
+
+}
diff --git a/producer_kafka_avro/src/test/resources/contracts/binary/bookReturnedBinary.groovy b/producer_kafka_avro/src/test/resources/contracts/binary/bookReturnedBinary.groovy
new file mode 100644
index 00000000..6ba53ef2
--- /dev/null
+++ b/producer_kafka_avro/src/test/resources/contracts/binary/bookReturnedBinary.groovy
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2013-present the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.springframework.cloud.contract.spec.Contract
+
+Contract.make {
+ description 'Should publish a book returned event to Kafka (verified via pre-serialized Avro binary)'
+ label 'book_returned_binary'
+ input {
+ triggeredBy('publishBookReturned()')
+ }
+ outputMessage {
+ sentTo('book.returned')
+ headers {
+ header('X-Correlation-Id', 'abc-123-def')
+ header('X-Source-System', 'library-service')
+ header('X-Event-Type', 'BOOK_RETURNED')
+ }
+ body(fileAsBytes("bookReturnedMessage.bin"))
+ }
+}
diff --git a/producer_kafka_avro/src/test/resources/contracts/binary/bookReturnedMessage.bin b/producer_kafka_avro/src/test/resources/contracts/binary/bookReturnedMessage.bin
new file mode 100644
index 00000000..2d276ac8
Binary files /dev/null and b/producer_kafka_avro/src/test/resources/contracts/binary/bookReturnedMessage.bin differ
diff --git a/producer_kafka_avro/src/test/resources/contracts/json/bookReturnedJson.groovy b/producer_kafka_avro/src/test/resources/contracts/json/bookReturnedJson.groovy
new file mode 100644
index 00000000..ac6aa458
--- /dev/null
+++ b/producer_kafka_avro/src/test/resources/contracts/json/bookReturnedJson.groovy
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2013-present the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.springframework.cloud.contract.spec.Contract
+
+Contract.make {
+ description 'Should publish a book returned event to Kafka'
+ label 'book_returned'
+ input {
+ triggeredBy('publishBookReturned()')
+ }
+ outputMessage {
+ sentTo('book.returned')
+ headers {
+ header('X-Correlation-Id', 'abc-123-def')
+ header('X-Source-System', 'library-service')
+ header('X-Event-Type', 'BOOK_RETURNED')
+ }
+ body(
+ isbn: '978-1234567890',
+ title: 'Contract Testing for Dummies',
+ author: 'John Doe'
+ )
+ }
+}