From 1285fe280d6566a19584b3e4c19fb8870243aece Mon Sep 17 00:00:00 2001 From: Chris Smith Date: Thu, 30 Apr 2026 20:37:13 +0000 Subject: [PATCH 1/2] chore(pubsub): Add sample for subscribe avro records with revisions The design is based on SubscribeWithAvroSchemaRevisionsExample.java: https://github.com/googleapis/java-pubsub/pull/1469 --- .../acceptance/data/us-states-plus.avsc | 24 +++++ .../samples/acceptance/schemas_test.rb | 49 +++++++++++ ...b_subscribe_avro_records_with_revisions.rb | 87 +++++++++++++++++++ 3 files changed, 160 insertions(+) create mode 100644 google-cloud-pubsub/samples/acceptance/data/us-states-plus.avsc create mode 100644 google-cloud-pubsub/samples/pubsub_subscribe_avro_records_with_revisions.rb diff --git a/google-cloud-pubsub/samples/acceptance/data/us-states-plus.avsc b/google-cloud-pubsub/samples/acceptance/data/us-states-plus.avsc new file mode 100644 index 000000000000..74225ae7e2e6 --- /dev/null +++ b/google-cloud-pubsub/samples/acceptance/data/us-states-plus.avsc @@ -0,0 +1,24 @@ +{ + "type":"record", + "name":"State", + "namespace":"utilities", + "doc":"A list of states in the United States of America.", + "fields":[ + { + "name":"name", + "type":"string", + "doc":"The common name of the state." + }, + { + "name":"post_abbr", + "type":"string", + "doc":"The postal code abbreviation of the state." + }, + { + "name":"population", + "type":"long", + "default":0, + "doc":"The population of the state." + } + ] +} diff --git a/google-cloud-pubsub/samples/acceptance/schemas_test.rb b/google-cloud-pubsub/samples/acceptance/schemas_test.rb index e7984ea35365..667e510ac8c5 100644 --- a/google-cloud-pubsub/samples/acceptance/schemas_test.rb +++ b/google-cloud-pubsub/samples/acceptance/schemas_test.rb @@ -28,6 +28,7 @@ require_relative "../pubsub_subscribe_avro_records" require_relative "../pubsub_publish_proto_messages" require_relative "../pubsub_subscribe_proto_messages" +require_relative "../pubsub_subscribe_avro_records_with_revisions" describe "schemas" do @@ -36,6 +37,7 @@ let(:topic_id) { random_topic_id } let(:subscription_id) { random_subscription_id } let(:avsc_file) { File.expand_path "data/us-states.avsc", __dir__ } + let(:avsc_revision_file) { File.expand_path "data/us-states-plus.avsc", __dir__ } let(:topic_admin) { pubsub.topic_admin } let(:subscription_admin) { pubsub.subscription_admin } let(:schemas) { pubsub.schemas } @@ -216,6 +218,53 @@ assert_includes out, schema1.revision_id end + + it "supports pubsub_subscribe_avro_records_with_revisions" do + # Commit Rev B first (Rev A is already created in before block) + schema_b = nil + out, _err = capture_io do + schema_b = commit_avro_schema schema_id: schema_id, avsc_file: avsc_revision_file + end + + rev_a_id = @schema.revision_id + rev_b_id = schema_b.revision_id + + # Create topic with schema range allowing both revisions + schema_settings = Google::Cloud::PubSub::V1::SchemaSettings.new schema: pubsub.schema_path(schema_id), + encoding: :BINARY, + first_revision_id: rev_a_id, + last_revision_id: rev_b_id + @topic = topic_admin.create_topic name: pubsub.topic_path(random_topic_id), + schema_settings: schema_settings + + @subscription = subscription_admin.create_subscription name: pubsub.subscription_path(random_subscription_id), + topic: @topic.name, + ack_deadline_seconds: 60 + + # Publish message 1 (Old format - valid for both) + writer = Avro::IO::DatumWriter.new avro_schema + buffer = StringIO.new + writer.write record, Avro::IO::BinaryEncoder.new(buffer) + publisher = pubsub.publisher @topic.name + publisher.publish buffer + + # Publish message 2 (New format - valid only for Rev B) + avsc_definition_plus = File.read avsc_revision_file + avro_schema_plus = Avro::Schema.parse avsc_definition_plus + record_plus = { "name" => "California", "post_abbr" => "CA", "population" => 39000000 } + + writer_plus = Avro::IO::DatumWriter.new avro_schema_plus + buffer_plus = StringIO.new + writer_plus.write record_plus, Avro::IO::BinaryEncoder.new(buffer_plus) + publisher.publish buffer_plus + + # Verify we can subscribe and decode both + expect_with_retry "pubsub_subscribe_avro_records_with_revisions" do + assert_output /Received a binary-encoded message:.*Alaska.*Received a binary-encoded message:.*California/m do + subscribe_avro_records_with_revisions subscription_id: @subscription.name + end + end + end end describe "PROTOCOL_BUFFER" do diff --git a/google-cloud-pubsub/samples/pubsub_subscribe_avro_records_with_revisions.rb b/google-cloud-pubsub/samples/pubsub_subscribe_avro_records_with_revisions.rb new file mode 100644 index 000000000000..ae541f8a216b --- /dev/null +++ b/google-cloud-pubsub/samples/pubsub_subscribe_avro_records_with_revisions.rb @@ -0,0 +1,87 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +require "google/cloud/pubsub" + +def subscribe_avro_records_with_revisions subscription_id: + # [START pubsub_subscribe_avro_records_with_revisions] + # subscription_id = "your-subscription-id" + + pubsub = Google::Cloud::PubSub.new + subscriber = pubsub.subscriber subscription_id + + # Cache for the parsed Avro schemas mapped by revision ID. + schema_cache = {} + cache_mutex = Mutex.new + + listener = subscriber.listen do |received_message| + schema_name = received_message.attributes["googclient_schemaname"] + revision_id = received_message.attributes["googclient_schemarevisionid"] + encoding = received_message.attributes["googclient_schemaencoding"] + + # Prevent concurrent threads from racing to fetch and parse the same schema. + avro_schema = cache_mutex.synchronize { schema_cache[revision_id] } + + + if avro_schema.nil? + begin + require "avro" + # The resource name format is projects/{project}/schemas/{schema}@{revision} + schema_resource = pubsub.schemas.get_schema name: "#{schema_name}@#{revision_id}" + + avro_schema = Avro::Schema.parse schema_resource.definition + + cache_mutex.synchronize { schema_cache[revision_id] = avro_schema } + rescue StandardError => e + puts "Could not get schema for revision #{revision_id}: #{e.message}" + received_message.reject! + next + end + end + + begin + case encoding + when "BINARY" + require "avro" + buffer = StringIO.new received_message.data + decoder = Avro::IO::BinaryDecoder.new buffer + reader = Avro::IO::DatumReader.new avro_schema + message_data = reader.read decoder + puts "Received a binary-encoded message:\n#{message_data}" + when "JSON" + require "json" + message_data = JSON.parse received_message.data + puts "Received a JSON-encoded message:\n#{message_data}" + else + puts "Unknown message type; rejecting message." + received_message.reject! + next + end + + received_message.acknowledge! + rescue StandardError => e + puts "Failed to process message: #{e.message}" + received_message.reject! + end + end + + listener.start + + + # Let the main thread sleep for 60 seconds so the thread for listening + # messages does not quit + sleep 60 + listener.stop.wait! + # [END pubsub_subscribe_avro_records_with_revisions] +end From 3189119becc67149de3a85f765fb665f1d1904e3 Mon Sep 17 00:00:00 2001 From: Chris Smith Date: Fri, 1 May 2026 16:58:10 +0000 Subject: [PATCH 2/2] add encoding to puts statement --- google-cloud-pubsub/samples/acceptance/schemas_test.rb | 10 +++++----- .../pubsub_subscribe_avro_records_with_revisions.rb | 8 +++----- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/google-cloud-pubsub/samples/acceptance/schemas_test.rb b/google-cloud-pubsub/samples/acceptance/schemas_test.rb index 667e510ac8c5..4befce7296ed 100644 --- a/google-cloud-pubsub/samples/acceptance/schemas_test.rb +++ b/google-cloud-pubsub/samples/acceptance/schemas_test.rb @@ -220,7 +220,7 @@ end it "supports pubsub_subscribe_avro_records_with_revisions" do - # Commit Rev B first (Rev A is already created in before block) + # Commit Rev B first (Rev A is already created in before block). schema_b = nil out, _err = capture_io do schema_b = commit_avro_schema schema_id: schema_id, avsc_file: avsc_revision_file @@ -229,7 +229,7 @@ rev_a_id = @schema.revision_id rev_b_id = schema_b.revision_id - # Create topic with schema range allowing both revisions + # Create topic with schema range allowing both revisions. schema_settings = Google::Cloud::PubSub::V1::SchemaSettings.new schema: pubsub.schema_path(schema_id), encoding: :BINARY, first_revision_id: rev_a_id, @@ -241,14 +241,14 @@ topic: @topic.name, ack_deadline_seconds: 60 - # Publish message 1 (Old format - valid for both) + # Publish message 1 (Old format - valid for both). writer = Avro::IO::DatumWriter.new avro_schema buffer = StringIO.new writer.write record, Avro::IO::BinaryEncoder.new(buffer) publisher = pubsub.publisher @topic.name publisher.publish buffer - # Publish message 2 (New format - valid only for Rev B) + # Publish message 2 (New format - valid only for Rev B). avsc_definition_plus = File.read avsc_revision_file avro_schema_plus = Avro::Schema.parse avsc_definition_plus record_plus = { "name" => "California", "post_abbr" => "CA", "population" => 39000000 } @@ -258,7 +258,7 @@ writer_plus.write record_plus, Avro::IO::BinaryEncoder.new(buffer_plus) publisher.publish buffer_plus - # Verify we can subscribe and decode both + # Verify we can subscribe and decode both. expect_with_retry "pubsub_subscribe_avro_records_with_revisions" do assert_output /Received a binary-encoded message:.*Alaska.*Received a binary-encoded message:.*California/m do subscribe_avro_records_with_revisions subscription_id: @subscription.name diff --git a/google-cloud-pubsub/samples/pubsub_subscribe_avro_records_with_revisions.rb b/google-cloud-pubsub/samples/pubsub_subscribe_avro_records_with_revisions.rb index ae541f8a216b..1da958578bff 100644 --- a/google-cloud-pubsub/samples/pubsub_subscribe_avro_records_with_revisions.rb +++ b/google-cloud-pubsub/samples/pubsub_subscribe_avro_records_with_revisions.rb @@ -33,11 +33,10 @@ def subscribe_avro_records_with_revisions subscription_id: # Prevent concurrent threads from racing to fetch and parse the same schema. avro_schema = cache_mutex.synchronize { schema_cache[revision_id] } - if avro_schema.nil? begin require "avro" - # The resource name format is projects/{project}/schemas/{schema}@{revision} + # The resource name format is projects/{project}/schemas/{schema}@{revision}. schema_resource = pubsub.schemas.get_schema name: "#{schema_name}@#{revision_id}" avro_schema = Avro::Schema.parse schema_resource.definition @@ -64,7 +63,7 @@ def subscribe_avro_records_with_revisions subscription_id: message_data = JSON.parse received_message.data puts "Received a JSON-encoded message:\n#{message_data}" else - puts "Unknown message type; rejecting message." + puts "Unknown message encoding: #{encoding}. Rejecting message." received_message.reject! next end @@ -78,9 +77,8 @@ def subscribe_avro_records_with_revisions subscription_id: listener.start - # Let the main thread sleep for 60 seconds so the thread for listening - # messages does not quit + # messages does not quit. sleep 60 listener.stop.wait! # [END pubsub_subscribe_avro_records_with_revisions]