From 802ee4211322e9a8a42a2c37f13a2a601b2237fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Serdar=20=C3=96zer?= Date: Fri, 17 Apr 2026 13:43:45 +0200 Subject: [PATCH 1/6] feat: replace loggregator V1 UDP emitter with gRPC V2 client feat: replace loggregator V1 UDP emitter with gRPC V2 client Remove the loggregator_emitter gem (V1, UDP/beefcake) and replace it with a new LoggregatorEmitter::Client backed by gRPC V2. The client uses lazy stub initialization to avoid gRPC background threads conflicting with Puma's thread pool at startup. Config keys renamed from `router`/`internal_url` to `endpoint`, with optional mTLS cert fields added to all three schemas (api, clock, worker). Integration and unit tests updated to use the V2 gRPC fake server. --- Gemfile | 1 - Gemfile.lock | 4 - config/cloud_controller.yml | 3 +- .../config_schemas/api_schema.rb | 6 +- .../config_schemas/clock_schema.rb | 5 +- .../config_schemas/worker_schema.rb | 5 +- lib/cloud_controller/runner.rb | 20 +-- lib/delayed_job/delayed_worker.rb | 20 ++- lib/loggregator-api/v2/envelope_pb.rb | 2 +- lib/loggregator-api/v2/ingress_pb.rb | 21 +++ lib/loggregator-api/v2/ingress_services_pb.rb | 26 ++++ lib/loggregator_emitter/client.rb | 56 ++++++++ spec/integration/app_log_emitter_spec.rb | 18 +-- spec/support/fake_loggregator_server.rb | 48 ++++--- spec/unit/lib/cloud_controller/runner_spec.rb | 4 +- .../lib/loggregator_emitter/client_spec.rb | 120 ++++++++++++++++++ 16 files changed, 302 insertions(+), 57 deletions(-) create mode 100644 lib/loggregator-api/v2/ingress_pb.rb create mode 100644 lib/loggregator-api/v2/ingress_services_pb.rb create mode 100644 lib/loggregator_emitter/client.rb create mode 100644 spec/unit/lib/loggregator_emitter/client_spec.rb diff --git a/Gemfile b/Gemfile index b8148ea8e21..f8a1a1d2a83 100644 --- a/Gemfile +++ b/Gemfile @@ -12,7 +12,6 @@ gem 'hashdiff' gem 'httpclient' gem 'json-diff' gem 'json-schema' -gem 'loggregator_emitter', '~> 5.0' gem 'mime-types', '~> 3.7' gem 'multipart-parser' gem 'netaddr', '>= 2.0.4' diff --git a/Gemfile.lock b/Gemfile.lock index 701a7e21682..ff07d0d4e45 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -94,7 +94,6 @@ GEM ms_rest_azure (~> 0.7.0) backport (1.2.0) base64 (0.3.0) - beefcake (1.0.0) benchmark (0.5.0) bigdecimal (4.1.1) bit-struct (0.17) @@ -284,8 +283,6 @@ GEM rb-fsevent (~> 0.10, >= 0.10.3) rb-inotify (~> 0.9, >= 0.9.10) logger (1.7.0) - loggregator_emitter (5.2.0) - beefcake (~> 1.0.0) loofah (2.25.1) crass (~> 1.0.2) nokogiri (>= 1.12.0) @@ -636,7 +633,6 @@ DEPENDENCIES json-diff json-schema listen - loggregator_emitter (~> 5.0) machinist (~> 1.0.6) mime-types (~> 3.7) mock_redis diff --git a/config/cloud_controller.yml b/config/cloud_controller.yml index 1395d928f3d..43f6f384344 100644 --- a/config/cloud_controller.yml +++ b/config/cloud_controller.yml @@ -122,8 +122,7 @@ log_audit_events: false telemetry_log_path: spec/artifacts/cloud_controller_telemetry.log loggregator: - router: "127.0.0.1:3456" - internal_url: 'http://loggregator-trafficcontroller.service.cf.internal:8081' + endpoint: "127.0.0.1:3456" logcache: host: 'http://doppler.service.cf.internal' diff --git a/lib/cloud_controller/config_schemas/api_schema.rb b/lib/cloud_controller/config_schemas/api_schema.rb index e6ab9e260b6..cd67c616787 100644 --- a/lib/cloud_controller/config_schemas/api_schema.rb +++ b/lib/cloud_controller/config_schemas/api_schema.rb @@ -316,8 +316,10 @@ class ApiSchema < VCAP::Config }, optional(:loggregator) => { - router: String, - internal_url: String + endpoint: String, + optional(:ca_file) => String, + optional(:cert_file) => String, + optional(:key_file) => String }, optional(:fluent) => { diff --git a/lib/cloud_controller/config_schemas/clock_schema.rb b/lib/cloud_controller/config_schemas/clock_schema.rb index 17a681c4823..e87886b87aa 100644 --- a/lib/cloud_controller/config_schemas/clock_schema.rb +++ b/lib/cloud_controller/config_schemas/clock_schema.rb @@ -180,7 +180,10 @@ class ClockSchema < VCAP::Config optional(:uaa_client_scope) => String, optional(:loggregator) => { - router: String + endpoint: String, + optional(:ca_file) => String, + optional(:cert_file) => String, + optional(:key_file) => String }, optional(:fluent) => { diff --git a/lib/cloud_controller/config_schemas/worker_schema.rb b/lib/cloud_controller/config_schemas/worker_schema.rb index 92f1ebad887..bd9286300da 100644 --- a/lib/cloud_controller/config_schemas/worker_schema.rb +++ b/lib/cloud_controller/config_schemas/worker_schema.rb @@ -195,7 +195,10 @@ class WorkerSchema < VCAP::Config }, optional(:loggregator) => { - router: String + endpoint: String, + optional(:ca_file) => String, + optional(:cert_file) => String, + optional(:key_file) => String }, optional(:fluent) => { diff --git a/lib/cloud_controller/runner.rb b/lib/cloud_controller/runner.rb index 8132025bd17..b597bcd624d 100644 --- a/lib/cloud_controller/runner.rb +++ b/lib/cloud_controller/runner.rb @@ -3,7 +3,7 @@ require 'cloud_controller/uaa/uaa_token_decoder' require 'cloud_controller/uaa/uaa_verification_keys' require 'app_log_emitter' -require 'loggregator_emitter' +require 'loggregator_emitter/client' require 'fluent_emitter' require 'cloud_controller/rack_app_builder' require 'cloud_controller/metrics/periodic_updater' @@ -174,16 +174,20 @@ def setup_blobstore end def setup_app_log_emitter + VCAP::AppLogEmitter.logger = logger VCAP::AppLogEmitter.fluent_emitter = fluent_emitter if @config.get(:fluent) - if @config.get(:loggregator) && @config.get( - :loggregator, :router - ) - VCAP::AppLogEmitter.emitter = LoggregatorEmitter::Emitter.new(@config.get(:loggregator, :router), 'cloud_controller', 'API', - @config.get(:index)) - end + return unless @config.get(:loggregator) && @config.get(:loggregator, :endpoint) - VCAP::AppLogEmitter.logger = logger + VCAP::AppLogEmitter.emitter = LoggregatorEmitter::Client.new( + endpoint: @config.get(:loggregator, :endpoint), + origin: 'cloud_controller', + source_type: 'API', + instance_id: @config.get(:index), + ca_cert_file: @config.get(:loggregator, :ca_file), + client_cert_file: @config.get(:loggregator, :cert_file), + client_key_file: @config.get(:loggregator, :key_file) + ) end def fluent_emitter diff --git a/lib/delayed_job/delayed_worker.rb b/lib/delayed_job/delayed_worker.rb index 7fd04df2d59..cf89ecfb2ba 100644 --- a/lib/delayed_job/delayed_worker.rb +++ b/lib/delayed_job/delayed_worker.rb @@ -3,6 +3,7 @@ require 'puma' require 'prometheus/middleware/exporter' require 'cloud_controller/standalone_metrics_webserver' +require 'loggregator_emitter/client' class CloudController::DelayedWorker DEFAULT_READ_AHEAD_POSTGRES = 0 @@ -96,15 +97,20 @@ def get_initialized_delayed_worker(config, logger) end def setup_app_log_emitter(config, logger) + VCAP::AppLogEmitter.logger = logger VCAP::AppLogEmitter.fluent_emitter = fluent_emitter(config) if config.get(:fluent) - if config.get(:loggregator) && config.get( - :loggregator, :router - ) - VCAP::AppLogEmitter.emitter = LoggregatorEmitter::Emitter.new(config.get(:loggregator, :router), 'cloud_controller', 'API', - config.get(:index)) - end - VCAP::AppLogEmitter.logger = logger + return unless config.get(:loggregator) && config.get(:loggregator, :endpoint) + + VCAP::AppLogEmitter.emitter = LoggregatorEmitter::Client.new( + endpoint: config.get(:loggregator, :endpoint), + origin: 'cloud_controller', + source_type: 'API', + instance_id: config.get(:index), + ca_cert_file: config.get(:loggregator, :ca_file), + client_cert_file: config.get(:loggregator, :cert_file), + client_key_file: config.get(:loggregator, :key_file) + ) end def fluent_emitter(config) diff --git a/lib/loggregator-api/v2/envelope_pb.rb b/lib/loggregator-api/v2/envelope_pb.rb index e510bb2125e..2c3418d27a6 100644 --- a/lib/loggregator-api/v2/envelope_pb.rb +++ b/lib/loggregator-api/v2/envelope_pb.rb @@ -7,7 +7,7 @@ descriptor_data = "\n!loggregator-api/v2/envelope.proto\x12\x0eloggregator.v2\"\xb5\x04\n\x08\x45nvelope\x12\x11\n\ttimestamp\x18\x01 \x01(\x03\x12\x1c\n\tsource_id\x18\x02 \x01(\tR\tsource_id\x12 \n\x0binstance_id\x18\x08 \x01(\tR\x0binstance_id\x12V\n\x0f\x64\x65precated_tags\x18\x03 \x03(\x0b\x32,.loggregator.v2.Envelope.DeprecatedTagsEntryR\x0f\x64\x65precated_tags\x12\x30\n\x04tags\x18\t \x03(\x0b\x32\".loggregator.v2.Envelope.TagsEntry\x12\"\n\x03log\x18\x04 \x01(\x0b\x32\x13.loggregator.v2.LogH\x00\x12*\n\x07\x63ounter\x18\x05 \x01(\x0b\x32\x17.loggregator.v2.CounterH\x00\x12&\n\x05gauge\x18\x06 \x01(\x0b\x32\x15.loggregator.v2.GaugeH\x00\x12&\n\x05timer\x18\x07 \x01(\x0b\x32\x15.loggregator.v2.TimerH\x00\x12&\n\x05\x65vent\x18\n \x01(\x0b\x32\x15.loggregator.v2.EventH\x00\x1aL\n\x13\x44\x65precatedTagsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12$\n\x05value\x18\x02 \x01(\x0b\x32\x15.loggregator.v2.Value:\x02\x38\x01\x1a+\n\tTagsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\t\n\x07message\"8\n\rEnvelopeBatch\x12\'\n\x05\x62\x61tch\x18\x01 \x03(\x0b\x32\x18.loggregator.v2.Envelope\"E\n\x05Value\x12\x0e\n\x04text\x18\x01 \x01(\tH\x00\x12\x11\n\x07integer\x18\x02 \x01(\x03H\x00\x12\x11\n\x07\x64\x65\x63imal\x18\x03 \x01(\x01H\x00\x42\x06\n\x04\x64\x61ta\"X\n\x03Log\x12\x0f\n\x07payload\x18\x01 \x01(\x0c\x12&\n\x04type\x18\x02 \x01(\x0e\x32\x18.loggregator.v2.Log.Type\"\x18\n\x04Type\x12\x07\n\x03OUT\x10\x00\x12\x07\n\x03\x45RR\x10\x01\"5\n\x07\x43ounter\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\r\n\x05\x64\x65lta\x18\x02 \x01(\x04\x12\r\n\x05total\x18\x03 \x01(\x04\"\x88\x01\n\x05Gauge\x12\x33\n\x07metrics\x18\x01 \x03(\x0b\x32\".loggregator.v2.Gauge.MetricsEntry\x1aJ\n\x0cMetricsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12)\n\x05value\x18\x02 \x01(\x0b\x32\x1a.loggregator.v2.GaugeValue:\x02\x38\x01\")\n\nGaugeValue\x12\x0c\n\x04unit\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x01\"2\n\x05Timer\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\r\n\x05start\x18\x02 \x01(\x03\x12\x0c\n\x04stop\x18\x03 \x01(\x03\"$\n\x05\x45vent\x12\r\n\x05title\x18\x01 \x01(\t\x12\x0c\n\x04\x62ody\x18\x02 \x01(\tBs\n\x1forg.cloudfoundry.loggregator.v2B\x13LoggregatorEnvelopeZ;code.cloudfoundry.org/go-loggregator/v10/rpc/loggregator_v2b\x06proto3" -pool = Google::Protobuf::DescriptorPool.generated_pool +pool = ::Google::Protobuf::DescriptorPool.generated_pool pool.add_serialized_file(descriptor_data) module Loggregator diff --git a/lib/loggregator-api/v2/ingress_pb.rb b/lib/loggregator-api/v2/ingress_pb.rb new file mode 100644 index 00000000000..be0cbf97c2c --- /dev/null +++ b/lib/loggregator-api/v2/ingress_pb.rb @@ -0,0 +1,21 @@ +# frozen_string_literal: true +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: loggregator-api/v2/ingress.proto + +require 'google/protobuf' + +require 'loggregator-api/v2/envelope_pb' + + +descriptor_data = "\n loggregator-api/v2/ingress.proto\x12\x0eloggregator.v2\x1a!loggregator-api/v2/envelope.proto\"\x11\n\x0fIngressResponse\"\x15\n\x13\x42\x61tchSenderResponse\"\x0e\n\x0cSendResponse2\xf0\x01\n\x07Ingress\x12G\n\x06Sender\x12\x18.loggregator.v2.Envelope\x1a\x1f.loggregator.v2.IngressResponse\"\x00(\x01\x12U\n\x0b\x42\x61tchSender\x12\x1d.loggregator.v2.EnvelopeBatch\x1a#.loggregator.v2.BatchSenderResponse\"\x00(\x01\x12\x45\n\x04Send\x12\x1d.loggregator.v2.EnvelopeBatch\x1a\x1c.loggregator.v2.SendResponse\"\x00\x42r\n\x1forg.cloudfoundry.loggregator.v2B\x12LoggregatorIngressZ;code.cloudfoundry.org/go-loggregator/v10/rpc/loggregator_v2b\x06proto3" + +pool = ::Google::Protobuf::DescriptorPool.generated_pool +pool.add_serialized_file(descriptor_data) + +module Loggregator + module V2 + IngressResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("loggregator.v2.IngressResponse").msgclass + BatchSenderResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("loggregator.v2.BatchSenderResponse").msgclass + SendResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("loggregator.v2.SendResponse").msgclass + end +end diff --git a/lib/loggregator-api/v2/ingress_services_pb.rb b/lib/loggregator-api/v2/ingress_services_pb.rb new file mode 100644 index 00000000000..c42d8d048dc --- /dev/null +++ b/lib/loggregator-api/v2/ingress_services_pb.rb @@ -0,0 +1,26 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# Source: loggregator-api/v2/ingress.proto for package 'loggregator.v2' + +require 'grpc' +require 'loggregator-api/v2/ingress_pb' + +module Loggregator + module V2 + module Ingress + class Service + + include ::GRPC::GenericService + + self.marshal_class_method = :encode + self.unmarshal_class_method = :decode + self.service_name = 'loggregator.v2.Ingress' + + rpc :Sender, stream(::Loggregator::V2::Envelope), ::Loggregator::V2::IngressResponse + rpc :BatchSender, stream(::Loggregator::V2::EnvelopeBatch), ::Loggregator::V2::BatchSenderResponse + rpc :Send, ::Loggregator::V2::EnvelopeBatch, ::Loggregator::V2::SendResponse + end + + Stub = Service.rpc_stub_class + end + end +end diff --git a/lib/loggregator_emitter/client.rb b/lib/loggregator_emitter/client.rb new file mode 100644 index 00000000000..ab7757a6bce --- /dev/null +++ b/lib/loggregator_emitter/client.rb @@ -0,0 +1,56 @@ +require 'grpc' +require 'loggregator-api/v2/ingress_services_pb' + +module LoggregatorEmitter + class Client + def initialize(endpoint:, origin:, source_type:, instance_id: nil, ca_cert_file: nil, client_cert_file: nil, client_key_file: nil) + raise ArgumentError.new('Must provide a valid endpoint') if endpoint.nil? || endpoint.empty? + raise ArgumentError.new('Must provide a valid origin') unless origin + raise ArgumentError.new('Must provide a valid source_type') unless source_type + + @endpoint = endpoint + @credentials = build_credentials(ca_cert_file, client_cert_file, client_key_file) + @default_tags = { 'origin' => origin, 'source_type' => source_type } + @instance_id = instance_id && instance_id.to_s + end + + def emit(app_id, message, tags={}) + envelope = create_envelope(app_id, message, Loggregator::V2::Log::Type::OUT, tags) + stub.send(Loggregator::V2::EnvelopeBatch.new(batch: [envelope])) + end + + def emit_error(app_id, message, tags={}) + envelope = create_envelope(app_id, message, Loggregator::V2::Log::Type::ERR, tags) + stub.send(Loggregator::V2::EnvelopeBatch.new(batch: [envelope])) + end + + private + + def stub + @stub ||= Loggregator::V2::Ingress::Stub.new(@endpoint, @credentials) + end + + def create_envelope(app_id, message, type, tags) + Loggregator::V2::Envelope.new( + source_id: app_id, + instance_id: @instance_id, + timestamp: Process.clock_gettime(Process::CLOCK_REALTIME, :nanosecond), + log: Loggregator::V2::Log.new( + payload: message.encode('UTF-8'), + type: type + ), + tags: @default_tags.merge(tags.transform_keys(&:to_s).transform_values(&:to_s)) + ) + end + + def build_credentials(ca_cert_file, client_cert_file, client_key_file) + return :this_channel_is_insecure unless ca_cert_file && client_cert_file && client_key_file + + GRPC::Core::ChannelCredentials.new( + File.read(ca_cert_file), + File.read(client_key_file), + File.read(client_cert_file) + ) + end + end +end diff --git a/spec/integration/app_log_emitter_spec.rb b/spec/integration/app_log_emitter_spec.rb index a45e81344dd..c396e819704 100644 --- a/spec/integration/app_log_emitter_spec.rb +++ b/spec/integration/app_log_emitter_spec.rb @@ -1,12 +1,8 @@ require 'spec_helper' -require 'securerandom' require 'tempfile' RSpec.describe 'Cloud controller Loggregator Integration', type: :integration do before(:all) do - @loggregator_server = FakeLoggregatorServer.new(12_345) - @loggregator_server.start - @authed_headers = { 'Authorization' => "bearer #{admin_token}", 'Accept' => 'application/json', @@ -18,6 +14,7 @@ config = VCAP::CloudController::YAMLConfig.safe_load_file(base_cc_config_file).deep_merge( VCAP::CloudController::YAMLConfig.safe_load_file(port_8181_overrides) ) + config['loggregator'] = { 'endpoint' => 'localhost:12345' } @cc_config_file = Tempfile.new('cc_config.yml') @cc_config_file.write(YAML.dump(config)) @@ -25,6 +22,9 @@ start_cc(debug: false, config: @cc_config_file.path) + @loggregator_server = FakeLoggregatorServer.new(12_345) + @loggregator_server.start + org = org_with_default_quota(@authed_headers) org_guid = org.json_body['metadata']['guid'] @@ -43,7 +43,7 @@ @loggregator_server.stop end - it 'send logs to the loggregator' do + it 'sends logs to the loggregator' do app = make_post_request('/v2/apps', { 'name' => 'foo_app', @@ -57,9 +57,9 @@ expect(messages.size).to eq(1) message = messages.first - expect(message.message).to eq "Created app with guid #{app_id}" - expect(message.app_id).to eq app_id - expect(message.source_type).to eq 'API' - expect(message.message_type).to eq LogMessage::MessageType::OUT + expect(message.source_id).to eq(app_id) + expect(message.log.type).to eq(:OUT) + expect(message.log.payload).to eq("Created app with guid #{app_id}") + expect(message.tags['source_type']).to eq('API') end end diff --git a/spec/support/fake_loggregator_server.rb b/spec/support/fake_loggregator_server.rb index 98be910caf7..d7a1b986cb8 100644 --- a/spec/support/fake_loggregator_server.rb +++ b/spec/support/fake_loggregator_server.rb @@ -1,32 +1,42 @@ -require 'socket' -require 'sonde' +require 'grpc' +require 'loggregator-api/v2/ingress_services_pb' class FakeLoggregatorServer - attr_reader :messages, :port, :sock + attr_reader :port def initialize(port) - @messages = [] @port = port - @sock = UDPSocket.new + @envelopes = [] + @mutex = Mutex.new end def start - @sock.bind('localhost', port) - - @thread = Thread.new do - loop do - stuff = @sock.recv(65_536) - envelope = ::Sonde::Envelope.decode(stuff) - messages << envelope.logMessage - rescue Beefcake::Message::WrongTypeError, Beefcake::Message::RequiredFieldNotSetError, Beefcake::Message::InvalidValueError => e - puts 'ERROR' - puts e - end - end + service = FakeIngressService.new(@envelopes, @mutex) + @server = GRPC::RpcServer.new + @server.add_http2_port("localhost:#{@port}", :this_port_is_insecure) + @server.handle(service) + @thread = Thread.new { @server.run } + @server.wait_till_running end def stop - Thread.kill(@thread) - @sock.close + @server.stop + @thread.join + end + + def messages + @mutex.synchronize { @envelopes.flat_map { |batch| batch.batch.to_a } } + end + + class FakeIngressService < Loggregator::V2::Ingress::Service + def initialize(envelopes, mutex) + @envelopes = envelopes + @mutex = mutex + end + + def send(envelope_batch, _call) + @mutex.synchronize { @envelopes << envelope_batch } + Loggregator::V2::SendResponse.new + end end end diff --git a/spec/unit/lib/cloud_controller/runner_spec.rb b/spec/unit/lib/cloud_controller/runner_spec.rb index 715f7151ebc..2cf316e5d5a 100644 --- a/spec/unit/lib/cloud_controller/runner_spec.rb +++ b/spec/unit/lib/cloud_controller/runner_spec.rb @@ -116,8 +116,8 @@ module VCAP::CloudController end it 'sets up loggregator emitter' do - loggregator_emitter = double(:loggregator_emitter) - expect(LoggregatorEmitter::Emitter).to receive(:new).and_return(loggregator_emitter) + loggregator_emitter = instance_double(LoggregatorEmitter::Client) + expect(LoggregatorEmitter::Client).to receive(:new).and_return(loggregator_emitter) expect(VCAP::AppLogEmitter).to receive(:emitter=).with(loggregator_emitter) subject end diff --git a/spec/unit/lib/loggregator_emitter/client_spec.rb b/spec/unit/lib/loggregator_emitter/client_spec.rb new file mode 100644 index 00000000000..b7eab756927 --- /dev/null +++ b/spec/unit/lib/loggregator_emitter/client_spec.rb @@ -0,0 +1,120 @@ +require 'rspec' +require 'loggregator_emitter/client' + +RSpec.describe LoggregatorEmitter::Client do + let(:stub) { instance_double(Loggregator::V2::Ingress::Stub) } + + describe '#initialize' do + let(:endpoint) { 'localhost:1234' } + let(:origin) { 'cloud_controller' } + let(:source_type) { 'API' } + let(:instance_id) { 0 } + + it 'raises ArgumentError when endpoint is empty' do + expect do + described_class.new(endpoint: '', origin: origin, source_type: source_type, instance_id: instance_id) + end.to raise_error(ArgumentError, 'Must provide a valid endpoint') + end + + it 'raises ArgumentError when endpoint is nil' do + expect do + described_class.new(endpoint: nil, origin: origin, source_type: source_type, instance_id: instance_id) + end.to raise_error(ArgumentError, 'Must provide a valid endpoint') + end + + it 'raises ArgumentError when origin is nil' do + expect do + described_class.new(endpoint: endpoint, origin: nil, source_type: source_type, instance_id: instance_id) + end.to raise_error(ArgumentError, 'Must provide a valid origin') + end + + it 'raises ArgumentError when source_type is nil' do + expect do + described_class.new(endpoint: endpoint, origin: origin, source_type: nil, instance_id: instance_id) + end.to raise_error(ArgumentError, 'Must provide a valid source_type') + end + + it 'creates a client with valid arguments' do + expect do + described_class.new(endpoint: endpoint, origin: origin, source_type: source_type, instance_id: instance_id) + end.not_to raise_error + end + end + + describe '#emit' do + subject(:client) do + described_class.new(endpoint: 'localhost:1234', origin: 'cloud_controller', source_type: 'API', instance_id: 0) + end + + before do + allow(Loggregator::V2::Ingress::Stub).to receive(:new).and_return(stub) + allow(stub).to receive(:send) + end + + it 'sends an envelope with OUT type' do + client.emit('app-guid-123', 'some log message') + expect(stub).to have_received(:send) do |batch| + envelope = batch.batch.first + expect(envelope.source_id).to eq('app-guid-123') + expect(envelope.log.type).to eq(:OUT) + expect(envelope.log.payload).to eq('some log message') + end + end + end + + describe '#emit_error' do + subject(:client) do + described_class.new(endpoint: 'localhost:1234', origin: 'cloud_controller', source_type: 'API', instance_id: 0) + end + + before do + allow(Loggregator::V2::Ingress::Stub).to receive(:new).and_return(stub) + allow(stub).to receive(:send) + end + + it 'sends an envelope with ERR type' do + client.emit_error('app-guid-123', 'some error message') + expect(stub).to have_received(:send) do |batch| + envelope = batch.batch.first + expect(envelope.source_id).to eq('app-guid-123') + expect(envelope.log.type).to eq(:ERR) + expect(envelope.log.payload).to eq('some error message') + end + end + end + + describe 'credentials' do + before do + allow(Loggregator::V2::Ingress::Stub).to receive(:new).and_return(stub) + allow(stub).to receive(:send) + end + + it 'uses insecure credentials when no cert files are provided' do + client = described_class.new(endpoint: 'localhost:1234', origin: 'cloud_controller', source_type: 'API', instance_id: 0) + client.emit('app-guid-123', 'message') + expect(Loggregator::V2::Ingress::Stub).to have_received(:new).with('localhost:1234', :this_channel_is_insecure) + end + + it 'uses TLS credentials when all cert files are provided' do + tls_creds = instance_double(GRPC::Core::ChannelCredentials) + allow(File).to receive(:read).with('/certs/ca.crt').and_return('ca-cert-content') + allow(File).to receive(:read).with('/certs/client.key').and_return('client-key-content') + allow(File).to receive(:read).with('/certs/client.crt').and_return('client-cert-content') + allow(GRPC::Core::ChannelCredentials).to receive(:new).and_return(tls_creds) + + client = described_class.new( + endpoint: 'localhost:1234', + origin: 'cloud_controller', + source_type: 'API', + instance_id: 0, + ca_cert_file: '/certs/ca.crt', + client_cert_file: '/certs/client.crt', + client_key_file: '/certs/client.key' + ) + client.emit('app-guid-123', 'message') + + expect(GRPC::Core::ChannelCredentials).to have_received(:new).with('ca-cert-content', 'client-key-content', 'client-cert-content') + expect(Loggregator::V2::Ingress::Stub).to have_received(:new).with('localhost:1234', tls_creds) + end + end +end From 13891a2a166267ad037a41e9d9ba1abf2c783c30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Serdar=20=C3=96zer?= Date: Fri, 17 Apr 2026 14:12:12 +0200 Subject: [PATCH 2/6] fix: AppLogEmitter classes unit tests are fixed --- spec/unit/lib/app_log_emitter_spec.rb | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/spec/unit/lib/app_log_emitter_spec.rb b/spec/unit/lib/app_log_emitter_spec.rb index 676bebcbd2c..0c0373fe8ef 100644 --- a/spec/unit/lib/app_log_emitter_spec.rb +++ b/spec/unit/lib/app_log_emitter_spec.rb @@ -1,4 +1,5 @@ require 'spec_helper' +require 'loggregator_emitter/client' module VCAP RSpec.describe AppLogEmitter do @@ -18,12 +19,12 @@ module VCAP describe 'when no emitter is set' do it 'does not emit errors' do - expect_any_instance_of(LoggregatorEmitter::Emitter).not_to receive(:emit_error) + expect_any_instance_of(LoggregatorEmitter::Client).not_to receive(:emit_error) AppLogEmitter.emit_error('app_id', 'error message') end it 'does not emit' do - expect_any_instance_of(LoggregatorEmitter::Emitter).not_to receive(:emit) + expect_any_instance_of(LoggregatorEmitter::Client).not_to receive(:emit) AppLogEmitter.emit('app_id', 'log message') end end @@ -62,7 +63,7 @@ module VCAP let(:org) { VCAP::CloudController::Organization.make } let(:space) { VCAP::CloudController::Space.make(organization: org) } let(:app) { VCAP::CloudController::AppModel.make(space:) } - let(:emitter) { LoggregatorEmitter::Emitter.new('127.0.0.1:1234', 'cloud_controller', 'API', 1) } + let(:emitter) { instance_double(LoggregatorEmitter::Client) } before do AppLogEmitter.emitter = emitter From f144c6f1d06577d562d9567bef31143d63f8f237 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Serdar=20=C3=96zer?= Date: Mon, 20 Apr 2026 08:54:38 +0200 Subject: [PATCH 3/6] fix: loggregator configs removed from clock config schema --- lib/cloud_controller/config_schemas/clock_schema.rb | 7 ------- 1 file changed, 7 deletions(-) diff --git a/lib/cloud_controller/config_schemas/clock_schema.rb b/lib/cloud_controller/config_schemas/clock_schema.rb index e87886b87aa..cdb3b02ab2d 100644 --- a/lib/cloud_controller/config_schemas/clock_schema.rb +++ b/lib/cloud_controller/config_schemas/clock_schema.rb @@ -179,13 +179,6 @@ class ClockSchema < VCAP::Config optional(:uaa_client_secret) => String, optional(:uaa_client_scope) => String, - optional(:loggregator) => { - endpoint: String, - optional(:ca_file) => String, - optional(:cert_file) => String, - optional(:key_file) => String - }, - optional(:fluent) => { optional(:host) => String, optional(:port) => Integer From ab9ae017d90258a9af4d809f956a0df640fbb2d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Serdar=20=C3=96zer?= Date: Tue, 21 Apr 2026 16:07:08 +0200 Subject: [PATCH 4/6] fix: harden loggregator gRPC client for Puma and mTLS Defer GRPC::Core object creation to the first emit call so gRPC is not initialized before Puma forks. Add tls_subject_name support to allow SSL target override when the server cert CN does not match the endpoint IP (e.g. CN=metron on 127.0.0.1). Add 10s call timeout consistent with the logcache client. --- .../config_schemas/api_schema.rb | 3 ++- .../config_schemas/worker_schema.rb | 3 ++- lib/cloud_controller/runner.rb | 3 ++- lib/delayed_job/delayed_worker.rb | 3 ++- lib/loggregator_emitter/client.rb | 26 ++++++++++++------- .../lib/loggregator_emitter/client_spec.rb | 9 ++++--- 6 files changed, 31 insertions(+), 16 deletions(-) diff --git a/lib/cloud_controller/config_schemas/api_schema.rb b/lib/cloud_controller/config_schemas/api_schema.rb index cd67c616787..cb8287f4a0e 100644 --- a/lib/cloud_controller/config_schemas/api_schema.rb +++ b/lib/cloud_controller/config_schemas/api_schema.rb @@ -319,7 +319,8 @@ class ApiSchema < VCAP::Config endpoint: String, optional(:ca_file) => String, optional(:cert_file) => String, - optional(:key_file) => String + optional(:key_file) => String, + optional(:subject_name) => String }, optional(:fluent) => { diff --git a/lib/cloud_controller/config_schemas/worker_schema.rb b/lib/cloud_controller/config_schemas/worker_schema.rb index bd9286300da..3f485715ae2 100644 --- a/lib/cloud_controller/config_schemas/worker_schema.rb +++ b/lib/cloud_controller/config_schemas/worker_schema.rb @@ -198,7 +198,8 @@ class WorkerSchema < VCAP::Config endpoint: String, optional(:ca_file) => String, optional(:cert_file) => String, - optional(:key_file) => String + optional(:key_file) => String, + optional(:subject_name) => String }, optional(:fluent) => { diff --git a/lib/cloud_controller/runner.rb b/lib/cloud_controller/runner.rb index b597bcd624d..1b59dc20812 100644 --- a/lib/cloud_controller/runner.rb +++ b/lib/cloud_controller/runner.rb @@ -186,7 +186,8 @@ def setup_app_log_emitter instance_id: @config.get(:index), ca_cert_file: @config.get(:loggregator, :ca_file), client_cert_file: @config.get(:loggregator, :cert_file), - client_key_file: @config.get(:loggregator, :key_file) + client_key_file: @config.get(:loggregator, :key_file), + tls_subject_name: @config.get(:loggregator, :subject_name) ) end diff --git a/lib/delayed_job/delayed_worker.rb b/lib/delayed_job/delayed_worker.rb index cf89ecfb2ba..1501de6e476 100644 --- a/lib/delayed_job/delayed_worker.rb +++ b/lib/delayed_job/delayed_worker.rb @@ -109,7 +109,8 @@ def setup_app_log_emitter(config, logger) instance_id: config.get(:index), ca_cert_file: config.get(:loggregator, :ca_file), client_cert_file: config.get(:loggregator, :cert_file), - client_key_file: config.get(:loggregator, :key_file) + client_key_file: config.get(:loggregator, :key_file), + tls_subject_name: config.get(:loggregator, :subject_name) ) end diff --git a/lib/loggregator_emitter/client.rb b/lib/loggregator_emitter/client.rb index ab7757a6bce..3c3ddc38d33 100644 --- a/lib/loggregator_emitter/client.rb +++ b/lib/loggregator_emitter/client.rb @@ -1,15 +1,18 @@ -require 'grpc' require 'loggregator-api/v2/ingress_services_pb' +require 'loggregator-api/v2/envelope_pb' module LoggregatorEmitter class Client - def initialize(endpoint:, origin:, source_type:, instance_id: nil, ca_cert_file: nil, client_cert_file: nil, client_key_file: nil) + def initialize(endpoint:, origin:, source_type:, instance_id: nil, ca_cert_file: nil, client_cert_file: nil, client_key_file: nil, tls_subject_name: nil) raise ArgumentError.new('Must provide a valid endpoint') if endpoint.nil? || endpoint.empty? raise ArgumentError.new('Must provide a valid origin') unless origin raise ArgumentError.new('Must provide a valid source_type') unless source_type @endpoint = endpoint - @credentials = build_credentials(ca_cert_file, client_cert_file, client_key_file) + @ca_cert_file = ca_cert_file + @client_cert_file = client_cert_file + @client_key_file = client_key_file + @tls_subject_name = tls_subject_name @default_tags = { 'origin' => origin, 'source_type' => source_type } @instance_id = instance_id && instance_id.to_s end @@ -27,7 +30,12 @@ def emit_error(app_id, message, tags={}) private def stub - @stub ||= Loggregator::V2::Ingress::Stub.new(@endpoint, @credentials) + @stub ||= Loggregator::V2::Ingress::Stub.new( + @endpoint, + build_credentials, + channel_args: @tls_subject_name ? { GRPC::Core::Channel::SSL_TARGET => @tls_subject_name } : {}, + timeout: 10 + ) end def create_envelope(app_id, message, type, tags) @@ -43,13 +51,13 @@ def create_envelope(app_id, message, type, tags) ) end - def build_credentials(ca_cert_file, client_cert_file, client_key_file) - return :this_channel_is_insecure unless ca_cert_file && client_cert_file && client_key_file + def build_credentials + return :this_channel_is_insecure unless @ca_cert_file && @client_cert_file && @client_key_file GRPC::Core::ChannelCredentials.new( - File.read(ca_cert_file), - File.read(client_key_file), - File.read(client_cert_file) + File.read(@ca_cert_file), + File.read(@client_key_file), + File.read(@client_cert_file) ) end end diff --git a/spec/unit/lib/loggregator_emitter/client_spec.rb b/spec/unit/lib/loggregator_emitter/client_spec.rb index b7eab756927..e23c19167fc 100644 --- a/spec/unit/lib/loggregator_emitter/client_spec.rb +++ b/spec/unit/lib/loggregator_emitter/client_spec.rb @@ -92,7 +92,8 @@ it 'uses insecure credentials when no cert files are provided' do client = described_class.new(endpoint: 'localhost:1234', origin: 'cloud_controller', source_type: 'API', instance_id: 0) client.emit('app-guid-123', 'message') - expect(Loggregator::V2::Ingress::Stub).to have_received(:new).with('localhost:1234', :this_channel_is_insecure) + expect(Loggregator::V2::Ingress::Stub).to have_received(:new).with('localhost:1234', :this_channel_is_insecure, + channel_args: {}, timeout: 10) end it 'uses TLS credentials when all cert files are provided' do @@ -109,12 +110,14 @@ instance_id: 0, ca_cert_file: '/certs/ca.crt', client_cert_file: '/certs/client.crt', - client_key_file: '/certs/client.key' + client_key_file: '/certs/client.key', + tls_subject_name: 'metron' ) client.emit('app-guid-123', 'message') expect(GRPC::Core::ChannelCredentials).to have_received(:new).with('ca-cert-content', 'client-key-content', 'client-cert-content') - expect(Loggregator::V2::Ingress::Stub).to have_received(:new).with('localhost:1234', tls_creds) + expect(Loggregator::V2::Ingress::Stub).to have_received(:new).with('localhost:1234', tls_creds, channel_args: { 'grpc.ssl_target_name_override': 'metron' }, + timeout: 10) end end end From 6461ed3c73ace3f22f2b9178941185d4ebbc7af7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Serdar=20=C3=96zer?= Date: Tue, 21 Apr 2026 16:18:48 +0200 Subject: [PATCH 5/6] fix: change parameter name to subject_name, drop prefix tls --- lib/cloud_controller/runner.rb | 2 +- lib/delayed_job/delayed_worker.rb | 2 +- lib/loggregator_emitter/client.rb | 6 +++--- spec/unit/lib/loggregator_emitter/client_spec.rb | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/lib/cloud_controller/runner.rb b/lib/cloud_controller/runner.rb index 1b59dc20812..2b32bfea526 100644 --- a/lib/cloud_controller/runner.rb +++ b/lib/cloud_controller/runner.rb @@ -187,7 +187,7 @@ def setup_app_log_emitter ca_cert_file: @config.get(:loggregator, :ca_file), client_cert_file: @config.get(:loggregator, :cert_file), client_key_file: @config.get(:loggregator, :key_file), - tls_subject_name: @config.get(:loggregator, :subject_name) + subject_name: @config.get(:loggregator, :subject_name) ) end diff --git a/lib/delayed_job/delayed_worker.rb b/lib/delayed_job/delayed_worker.rb index 1501de6e476..20629dc4a31 100644 --- a/lib/delayed_job/delayed_worker.rb +++ b/lib/delayed_job/delayed_worker.rb @@ -110,7 +110,7 @@ def setup_app_log_emitter(config, logger) ca_cert_file: config.get(:loggregator, :ca_file), client_cert_file: config.get(:loggregator, :cert_file), client_key_file: config.get(:loggregator, :key_file), - tls_subject_name: config.get(:loggregator, :subject_name) + subject_name: config.get(:loggregator, :subject_name) ) end diff --git a/lib/loggregator_emitter/client.rb b/lib/loggregator_emitter/client.rb index 3c3ddc38d33..463083b28ff 100644 --- a/lib/loggregator_emitter/client.rb +++ b/lib/loggregator_emitter/client.rb @@ -3,7 +3,7 @@ module LoggregatorEmitter class Client - def initialize(endpoint:, origin:, source_type:, instance_id: nil, ca_cert_file: nil, client_cert_file: nil, client_key_file: nil, tls_subject_name: nil) + def initialize(endpoint:, origin:, source_type:, instance_id: nil, ca_cert_file: nil, client_cert_file: nil, client_key_file: nil, subject_name: nil) raise ArgumentError.new('Must provide a valid endpoint') if endpoint.nil? || endpoint.empty? raise ArgumentError.new('Must provide a valid origin') unless origin raise ArgumentError.new('Must provide a valid source_type') unless source_type @@ -12,7 +12,7 @@ def initialize(endpoint:, origin:, source_type:, instance_id: nil, ca_cert_file: @ca_cert_file = ca_cert_file @client_cert_file = client_cert_file @client_key_file = client_key_file - @tls_subject_name = tls_subject_name + @subject_name = subject_name @default_tags = { 'origin' => origin, 'source_type' => source_type } @instance_id = instance_id && instance_id.to_s end @@ -33,7 +33,7 @@ def stub @stub ||= Loggregator::V2::Ingress::Stub.new( @endpoint, build_credentials, - channel_args: @tls_subject_name ? { GRPC::Core::Channel::SSL_TARGET => @tls_subject_name } : {}, + channel_args: @subject_name ? { GRPC::Core::Channel::SSL_TARGET => @subject_name } : {}, timeout: 10 ) end diff --git a/spec/unit/lib/loggregator_emitter/client_spec.rb b/spec/unit/lib/loggregator_emitter/client_spec.rb index e23c19167fc..4df0fbf8ed2 100644 --- a/spec/unit/lib/loggregator_emitter/client_spec.rb +++ b/spec/unit/lib/loggregator_emitter/client_spec.rb @@ -111,7 +111,7 @@ ca_cert_file: '/certs/ca.crt', client_cert_file: '/certs/client.crt', client_key_file: '/certs/client.key', - tls_subject_name: 'metron' + subject_name: 'metron' ) client.emit('app-guid-123', 'message') From f13aa5d2381762a16b2c87ff2c95bf2eb85a19c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Serdar=20=C3=96zer?= Date: Tue, 21 Apr 2026 16:34:33 +0200 Subject: [PATCH 6/6] fix: nil check changed to idiomatic ruby way --- lib/loggregator_emitter/client.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/loggregator_emitter/client.rb b/lib/loggregator_emitter/client.rb index 463083b28ff..a548f9c899b 100644 --- a/lib/loggregator_emitter/client.rb +++ b/lib/loggregator_emitter/client.rb @@ -14,7 +14,7 @@ def initialize(endpoint:, origin:, source_type:, instance_id: nil, ca_cert_file: @client_key_file = client_key_file @subject_name = subject_name @default_tags = { 'origin' => origin, 'source_type' => source_type } - @instance_id = instance_id && instance_id.to_s + @instance_id = instance_id&.to_s end def emit(app_id, message, tags={})