diff --git a/Gemfile b/Gemfile index b8148ea8e21..f8a1a1d2a83 100644 --- a/Gemfile +++ b/Gemfile @@ -12,7 +12,6 @@ gem 'hashdiff' gem 'httpclient' gem 'json-diff' gem 'json-schema' -gem 'loggregator_emitter', '~> 5.0' gem 'mime-types', '~> 3.7' gem 'multipart-parser' gem 'netaddr', '>= 2.0.4' diff --git a/Gemfile.lock b/Gemfile.lock index 701a7e21682..ff07d0d4e45 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -94,7 +94,6 @@ GEM ms_rest_azure (~> 0.7.0) backport (1.2.0) base64 (0.3.0) - beefcake (1.0.0) benchmark (0.5.0) bigdecimal (4.1.1) bit-struct (0.17) @@ -284,8 +283,6 @@ GEM rb-fsevent (~> 0.10, >= 0.10.3) rb-inotify (~> 0.9, >= 0.9.10) logger (1.7.0) - loggregator_emitter (5.2.0) - beefcake (~> 1.0.0) loofah (2.25.1) crass (~> 1.0.2) nokogiri (>= 1.12.0) @@ -636,7 +633,6 @@ DEPENDENCIES json-diff json-schema listen - loggregator_emitter (~> 5.0) machinist (~> 1.0.6) mime-types (~> 3.7) mock_redis diff --git a/config/cloud_controller.yml b/config/cloud_controller.yml index 1395d928f3d..43f6f384344 100644 --- a/config/cloud_controller.yml +++ b/config/cloud_controller.yml @@ -122,8 +122,7 @@ log_audit_events: false telemetry_log_path: spec/artifacts/cloud_controller_telemetry.log loggregator: - router: "127.0.0.1:3456" - internal_url: 'http://loggregator-trafficcontroller.service.cf.internal:8081' + endpoint: "127.0.0.1:3456" logcache: host: 'http://doppler.service.cf.internal' diff --git a/lib/cloud_controller/config_schemas/api_schema.rb b/lib/cloud_controller/config_schemas/api_schema.rb index e6ab9e260b6..cb8287f4a0e 100644 --- a/lib/cloud_controller/config_schemas/api_schema.rb +++ b/lib/cloud_controller/config_schemas/api_schema.rb @@ -316,8 +316,11 @@ class ApiSchema < VCAP::Config }, optional(:loggregator) => { - router: String, - internal_url: String + endpoint: String, + optional(:ca_file) => String, + optional(:cert_file) => String, + optional(:key_file) => String, + optional(:subject_name) => String }, optional(:fluent) => { diff --git a/lib/cloud_controller/config_schemas/clock_schema.rb b/lib/cloud_controller/config_schemas/clock_schema.rb index 17a681c4823..cdb3b02ab2d 100644 --- a/lib/cloud_controller/config_schemas/clock_schema.rb +++ b/lib/cloud_controller/config_schemas/clock_schema.rb @@ -179,10 +179,6 @@ class ClockSchema < VCAP::Config optional(:uaa_client_secret) => String, optional(:uaa_client_scope) => String, - optional(:loggregator) => { - router: String - }, - optional(:fluent) => { optional(:host) => String, optional(:port) => Integer diff --git a/lib/cloud_controller/config_schemas/worker_schema.rb b/lib/cloud_controller/config_schemas/worker_schema.rb index 92f1ebad887..3f485715ae2 100644 --- a/lib/cloud_controller/config_schemas/worker_schema.rb +++ b/lib/cloud_controller/config_schemas/worker_schema.rb @@ -195,7 +195,11 @@ class WorkerSchema < VCAP::Config }, optional(:loggregator) => { - router: String + endpoint: String, + optional(:ca_file) => String, + optional(:cert_file) => String, + optional(:key_file) => String, + optional(:subject_name) => String }, optional(:fluent) => { diff --git a/lib/cloud_controller/runner.rb b/lib/cloud_controller/runner.rb index 8132025bd17..2b32bfea526 100644 --- a/lib/cloud_controller/runner.rb +++ b/lib/cloud_controller/runner.rb @@ -3,7 +3,7 @@ require 'cloud_controller/uaa/uaa_token_decoder' require 'cloud_controller/uaa/uaa_verification_keys' require 'app_log_emitter' -require 'loggregator_emitter' +require 'loggregator_emitter/client' require 'fluent_emitter' require 'cloud_controller/rack_app_builder' require 'cloud_controller/metrics/periodic_updater' @@ -174,16 +174,21 @@ def setup_blobstore end def setup_app_log_emitter + VCAP::AppLogEmitter.logger = logger VCAP::AppLogEmitter.fluent_emitter = fluent_emitter if @config.get(:fluent) - if @config.get(:loggregator) && @config.get( - :loggregator, :router + return unless @config.get(:loggregator) && @config.get(:loggregator, :endpoint) + + VCAP::AppLogEmitter.emitter = LoggregatorEmitter::Client.new( + endpoint: @config.get(:loggregator, :endpoint), + origin: 'cloud_controller', + source_type: 'API', + instance_id: @config.get(:index), + ca_cert_file: @config.get(:loggregator, :ca_file), + client_cert_file: @config.get(:loggregator, :cert_file), + client_key_file: @config.get(:loggregator, :key_file), + subject_name: @config.get(:loggregator, :subject_name) ) - VCAP::AppLogEmitter.emitter = LoggregatorEmitter::Emitter.new(@config.get(:loggregator, :router), 'cloud_controller', 'API', - @config.get(:index)) - end - - VCAP::AppLogEmitter.logger = logger end def fluent_emitter diff --git a/lib/delayed_job/delayed_worker.rb b/lib/delayed_job/delayed_worker.rb index 7fd04df2d59..20629dc4a31 100644 --- a/lib/delayed_job/delayed_worker.rb +++ b/lib/delayed_job/delayed_worker.rb @@ -3,6 +3,7 @@ require 'puma' require 'prometheus/middleware/exporter' require 'cloud_controller/standalone_metrics_webserver' +require 'loggregator_emitter/client' class CloudController::DelayedWorker DEFAULT_READ_AHEAD_POSTGRES = 0 @@ -96,15 +97,21 @@ def get_initialized_delayed_worker(config, logger) end def setup_app_log_emitter(config, logger) + VCAP::AppLogEmitter.logger = logger VCAP::AppLogEmitter.fluent_emitter = fluent_emitter(config) if config.get(:fluent) - if config.get(:loggregator) && config.get( - :loggregator, :router - ) - VCAP::AppLogEmitter.emitter = LoggregatorEmitter::Emitter.new(config.get(:loggregator, :router), 'cloud_controller', 'API', - config.get(:index)) - end - VCAP::AppLogEmitter.logger = logger + return unless config.get(:loggregator) && config.get(:loggregator, :endpoint) + + VCAP::AppLogEmitter.emitter = LoggregatorEmitter::Client.new( + endpoint: config.get(:loggregator, :endpoint), + origin: 'cloud_controller', + source_type: 'API', + instance_id: config.get(:index), + ca_cert_file: config.get(:loggregator, :ca_file), + client_cert_file: config.get(:loggregator, :cert_file), + client_key_file: config.get(:loggregator, :key_file), + subject_name: config.get(:loggregator, :subject_name) + ) end def fluent_emitter(config) diff --git a/lib/loggregator-api/v2/envelope_pb.rb b/lib/loggregator-api/v2/envelope_pb.rb index e510bb2125e..2c3418d27a6 100644 --- a/lib/loggregator-api/v2/envelope_pb.rb +++ b/lib/loggregator-api/v2/envelope_pb.rb @@ -7,7 +7,7 @@ descriptor_data = "\n!loggregator-api/v2/envelope.proto\x12\x0eloggregator.v2\"\xb5\x04\n\x08\x45nvelope\x12\x11\n\ttimestamp\x18\x01 \x01(\x03\x12\x1c\n\tsource_id\x18\x02 \x01(\tR\tsource_id\x12 \n\x0binstance_id\x18\x08 \x01(\tR\x0binstance_id\x12V\n\x0f\x64\x65precated_tags\x18\x03 \x03(\x0b\x32,.loggregator.v2.Envelope.DeprecatedTagsEntryR\x0f\x64\x65precated_tags\x12\x30\n\x04tags\x18\t \x03(\x0b\x32\".loggregator.v2.Envelope.TagsEntry\x12\"\n\x03log\x18\x04 \x01(\x0b\x32\x13.loggregator.v2.LogH\x00\x12*\n\x07\x63ounter\x18\x05 \x01(\x0b\x32\x17.loggregator.v2.CounterH\x00\x12&\n\x05gauge\x18\x06 \x01(\x0b\x32\x15.loggregator.v2.GaugeH\x00\x12&\n\x05timer\x18\x07 \x01(\x0b\x32\x15.loggregator.v2.TimerH\x00\x12&\n\x05\x65vent\x18\n \x01(\x0b\x32\x15.loggregator.v2.EventH\x00\x1aL\n\x13\x44\x65precatedTagsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12$\n\x05value\x18\x02 \x01(\x0b\x32\x15.loggregator.v2.Value:\x02\x38\x01\x1a+\n\tTagsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\t\n\x07message\"8\n\rEnvelopeBatch\x12\'\n\x05\x62\x61tch\x18\x01 \x03(\x0b\x32\x18.loggregator.v2.Envelope\"E\n\x05Value\x12\x0e\n\x04text\x18\x01 \x01(\tH\x00\x12\x11\n\x07integer\x18\x02 \x01(\x03H\x00\x12\x11\n\x07\x64\x65\x63imal\x18\x03 \x01(\x01H\x00\x42\x06\n\x04\x64\x61ta\"X\n\x03Log\x12\x0f\n\x07payload\x18\x01 \x01(\x0c\x12&\n\x04type\x18\x02 \x01(\x0e\x32\x18.loggregator.v2.Log.Type\"\x18\n\x04Type\x12\x07\n\x03OUT\x10\x00\x12\x07\n\x03\x45RR\x10\x01\"5\n\x07\x43ounter\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\r\n\x05\x64\x65lta\x18\x02 \x01(\x04\x12\r\n\x05total\x18\x03 \x01(\x04\"\x88\x01\n\x05Gauge\x12\x33\n\x07metrics\x18\x01 \x03(\x0b\x32\".loggregator.v2.Gauge.MetricsEntry\x1aJ\n\x0cMetricsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12)\n\x05value\x18\x02 \x01(\x0b\x32\x1a.loggregator.v2.GaugeValue:\x02\x38\x01\")\n\nGaugeValue\x12\x0c\n\x04unit\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x01\"2\n\x05Timer\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\r\n\x05start\x18\x02 \x01(\x03\x12\x0c\n\x04stop\x18\x03 \x01(\x03\"$\n\x05\x45vent\x12\r\n\x05title\x18\x01 \x01(\t\x12\x0c\n\x04\x62ody\x18\x02 \x01(\tBs\n\x1forg.cloudfoundry.loggregator.v2B\x13LoggregatorEnvelopeZ;code.cloudfoundry.org/go-loggregator/v10/rpc/loggregator_v2b\x06proto3" -pool = Google::Protobuf::DescriptorPool.generated_pool +pool = ::Google::Protobuf::DescriptorPool.generated_pool pool.add_serialized_file(descriptor_data) module Loggregator diff --git a/lib/loggregator-api/v2/ingress_pb.rb b/lib/loggregator-api/v2/ingress_pb.rb new file mode 100644 index 00000000000..be0cbf97c2c --- /dev/null +++ b/lib/loggregator-api/v2/ingress_pb.rb @@ -0,0 +1,21 @@ +# frozen_string_literal: true +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: loggregator-api/v2/ingress.proto + +require 'google/protobuf' + +require 'loggregator-api/v2/envelope_pb' + + +descriptor_data = "\n loggregator-api/v2/ingress.proto\x12\x0eloggregator.v2\x1a!loggregator-api/v2/envelope.proto\"\x11\n\x0fIngressResponse\"\x15\n\x13\x42\x61tchSenderResponse\"\x0e\n\x0cSendResponse2\xf0\x01\n\x07Ingress\x12G\n\x06Sender\x12\x18.loggregator.v2.Envelope\x1a\x1f.loggregator.v2.IngressResponse\"\x00(\x01\x12U\n\x0b\x42\x61tchSender\x12\x1d.loggregator.v2.EnvelopeBatch\x1a#.loggregator.v2.BatchSenderResponse\"\x00(\x01\x12\x45\n\x04Send\x12\x1d.loggregator.v2.EnvelopeBatch\x1a\x1c.loggregator.v2.SendResponse\"\x00\x42r\n\x1forg.cloudfoundry.loggregator.v2B\x12LoggregatorIngressZ;code.cloudfoundry.org/go-loggregator/v10/rpc/loggregator_v2b\x06proto3" + +pool = ::Google::Protobuf::DescriptorPool.generated_pool +pool.add_serialized_file(descriptor_data) + +module Loggregator + module V2 + IngressResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("loggregator.v2.IngressResponse").msgclass + BatchSenderResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("loggregator.v2.BatchSenderResponse").msgclass + SendResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("loggregator.v2.SendResponse").msgclass + end +end diff --git a/lib/loggregator-api/v2/ingress_services_pb.rb b/lib/loggregator-api/v2/ingress_services_pb.rb new file mode 100644 index 00000000000..c42d8d048dc --- /dev/null +++ b/lib/loggregator-api/v2/ingress_services_pb.rb @@ -0,0 +1,26 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# Source: loggregator-api/v2/ingress.proto for package 'loggregator.v2' + +require 'grpc' +require 'loggregator-api/v2/ingress_pb' + +module Loggregator + module V2 + module Ingress + class Service + + include ::GRPC::GenericService + + self.marshal_class_method = :encode + self.unmarshal_class_method = :decode + self.service_name = 'loggregator.v2.Ingress' + + rpc :Sender, stream(::Loggregator::V2::Envelope), ::Loggregator::V2::IngressResponse + rpc :BatchSender, stream(::Loggregator::V2::EnvelopeBatch), ::Loggregator::V2::BatchSenderResponse + rpc :Send, ::Loggregator::V2::EnvelopeBatch, ::Loggregator::V2::SendResponse + end + + Stub = Service.rpc_stub_class + end + end +end diff --git a/lib/loggregator_emitter/client.rb b/lib/loggregator_emitter/client.rb new file mode 100644 index 00000000000..a548f9c899b --- /dev/null +++ b/lib/loggregator_emitter/client.rb @@ -0,0 +1,64 @@ +require 'loggregator-api/v2/ingress_services_pb' +require 'loggregator-api/v2/envelope_pb' + +module LoggregatorEmitter + class Client + def initialize(endpoint:, origin:, source_type:, instance_id: nil, ca_cert_file: nil, client_cert_file: nil, client_key_file: nil, subject_name: nil) + raise ArgumentError.new('Must provide a valid endpoint') if endpoint.nil? || endpoint.empty? + raise ArgumentError.new('Must provide a valid origin') unless origin + raise ArgumentError.new('Must provide a valid source_type') unless source_type + + @endpoint = endpoint + @ca_cert_file = ca_cert_file + @client_cert_file = client_cert_file + @client_key_file = client_key_file + @subject_name = subject_name + @default_tags = { 'origin' => origin, 'source_type' => source_type } + @instance_id = instance_id&.to_s + end + + def emit(app_id, message, tags={}) + envelope = create_envelope(app_id, message, Loggregator::V2::Log::Type::OUT, tags) + stub.send(Loggregator::V2::EnvelopeBatch.new(batch: [envelope])) + end + + def emit_error(app_id, message, tags={}) + envelope = create_envelope(app_id, message, Loggregator::V2::Log::Type::ERR, tags) + stub.send(Loggregator::V2::EnvelopeBatch.new(batch: [envelope])) + end + + private + + def stub + @stub ||= Loggregator::V2::Ingress::Stub.new( + @endpoint, + build_credentials, + channel_args: @subject_name ? { GRPC::Core::Channel::SSL_TARGET => @subject_name } : {}, + timeout: 10 + ) + end + + def create_envelope(app_id, message, type, tags) + Loggregator::V2::Envelope.new( + source_id: app_id, + instance_id: @instance_id, + timestamp: Process.clock_gettime(Process::CLOCK_REALTIME, :nanosecond), + log: Loggregator::V2::Log.new( + payload: message.encode('UTF-8'), + type: type + ), + tags: @default_tags.merge(tags.transform_keys(&:to_s).transform_values(&:to_s)) + ) + end + + def build_credentials + return :this_channel_is_insecure unless @ca_cert_file && @client_cert_file && @client_key_file + + GRPC::Core::ChannelCredentials.new( + File.read(@ca_cert_file), + File.read(@client_key_file), + File.read(@client_cert_file) + ) + end + end +end diff --git a/spec/integration/app_log_emitter_spec.rb b/spec/integration/app_log_emitter_spec.rb index a45e81344dd..c396e819704 100644 --- a/spec/integration/app_log_emitter_spec.rb +++ b/spec/integration/app_log_emitter_spec.rb @@ -1,12 +1,8 @@ require 'spec_helper' -require 'securerandom' require 'tempfile' RSpec.describe 'Cloud controller Loggregator Integration', type: :integration do before(:all) do - @loggregator_server = FakeLoggregatorServer.new(12_345) - @loggregator_server.start - @authed_headers = { 'Authorization' => "bearer #{admin_token}", 'Accept' => 'application/json', @@ -18,6 +14,7 @@ config = VCAP::CloudController::YAMLConfig.safe_load_file(base_cc_config_file).deep_merge( VCAP::CloudController::YAMLConfig.safe_load_file(port_8181_overrides) ) + config['loggregator'] = { 'endpoint' => 'localhost:12345' } @cc_config_file = Tempfile.new('cc_config.yml') @cc_config_file.write(YAML.dump(config)) @@ -25,6 +22,9 @@ start_cc(debug: false, config: @cc_config_file.path) + @loggregator_server = FakeLoggregatorServer.new(12_345) + @loggregator_server.start + org = org_with_default_quota(@authed_headers) org_guid = org.json_body['metadata']['guid'] @@ -43,7 +43,7 @@ @loggregator_server.stop end - it 'send logs to the loggregator' do + it 'sends logs to the loggregator' do app = make_post_request('/v2/apps', { 'name' => 'foo_app', @@ -57,9 +57,9 @@ expect(messages.size).to eq(1) message = messages.first - expect(message.message).to eq "Created app with guid #{app_id}" - expect(message.app_id).to eq app_id - expect(message.source_type).to eq 'API' - expect(message.message_type).to eq LogMessage::MessageType::OUT + expect(message.source_id).to eq(app_id) + expect(message.log.type).to eq(:OUT) + expect(message.log.payload).to eq("Created app with guid #{app_id}") + expect(message.tags['source_type']).to eq('API') end end diff --git a/spec/support/fake_loggregator_server.rb b/spec/support/fake_loggregator_server.rb index 98be910caf7..d7a1b986cb8 100644 --- a/spec/support/fake_loggregator_server.rb +++ b/spec/support/fake_loggregator_server.rb @@ -1,32 +1,42 @@ -require 'socket' -require 'sonde' +require 'grpc' +require 'loggregator-api/v2/ingress_services_pb' class FakeLoggregatorServer - attr_reader :messages, :port, :sock + attr_reader :port def initialize(port) - @messages = [] @port = port - @sock = UDPSocket.new + @envelopes = [] + @mutex = Mutex.new end def start - @sock.bind('localhost', port) - - @thread = Thread.new do - loop do - stuff = @sock.recv(65_536) - envelope = ::Sonde::Envelope.decode(stuff) - messages << envelope.logMessage - rescue Beefcake::Message::WrongTypeError, Beefcake::Message::RequiredFieldNotSetError, Beefcake::Message::InvalidValueError => e - puts 'ERROR' - puts e - end - end + service = FakeIngressService.new(@envelopes, @mutex) + @server = GRPC::RpcServer.new + @server.add_http2_port("localhost:#{@port}", :this_port_is_insecure) + @server.handle(service) + @thread = Thread.new { @server.run } + @server.wait_till_running end def stop - Thread.kill(@thread) - @sock.close + @server.stop + @thread.join + end + + def messages + @mutex.synchronize { @envelopes.flat_map { |batch| batch.batch.to_a } } + end + + class FakeIngressService < Loggregator::V2::Ingress::Service + def initialize(envelopes, mutex) + @envelopes = envelopes + @mutex = mutex + end + + def send(envelope_batch, _call) + @mutex.synchronize { @envelopes << envelope_batch } + Loggregator::V2::SendResponse.new + end end end diff --git a/spec/unit/lib/app_log_emitter_spec.rb b/spec/unit/lib/app_log_emitter_spec.rb index 676bebcbd2c..0c0373fe8ef 100644 --- a/spec/unit/lib/app_log_emitter_spec.rb +++ b/spec/unit/lib/app_log_emitter_spec.rb @@ -1,4 +1,5 @@ require 'spec_helper' +require 'loggregator_emitter/client' module VCAP RSpec.describe AppLogEmitter do @@ -18,12 +19,12 @@ module VCAP describe 'when no emitter is set' do it 'does not emit errors' do - expect_any_instance_of(LoggregatorEmitter::Emitter).not_to receive(:emit_error) + expect_any_instance_of(LoggregatorEmitter::Client).not_to receive(:emit_error) AppLogEmitter.emit_error('app_id', 'error message') end it 'does not emit' do - expect_any_instance_of(LoggregatorEmitter::Emitter).not_to receive(:emit) + expect_any_instance_of(LoggregatorEmitter::Client).not_to receive(:emit) AppLogEmitter.emit('app_id', 'log message') end end @@ -62,7 +63,7 @@ module VCAP let(:org) { VCAP::CloudController::Organization.make } let(:space) { VCAP::CloudController::Space.make(organization: org) } let(:app) { VCAP::CloudController::AppModel.make(space:) } - let(:emitter) { LoggregatorEmitter::Emitter.new('127.0.0.1:1234', 'cloud_controller', 'API', 1) } + let(:emitter) { instance_double(LoggregatorEmitter::Client) } before do AppLogEmitter.emitter = emitter diff --git a/spec/unit/lib/cloud_controller/runner_spec.rb b/spec/unit/lib/cloud_controller/runner_spec.rb index 715f7151ebc..2cf316e5d5a 100644 --- a/spec/unit/lib/cloud_controller/runner_spec.rb +++ b/spec/unit/lib/cloud_controller/runner_spec.rb @@ -116,8 +116,8 @@ module VCAP::CloudController end it 'sets up loggregator emitter' do - loggregator_emitter = double(:loggregator_emitter) - expect(LoggregatorEmitter::Emitter).to receive(:new).and_return(loggregator_emitter) + loggregator_emitter = instance_double(LoggregatorEmitter::Client) + expect(LoggregatorEmitter::Client).to receive(:new).and_return(loggregator_emitter) expect(VCAP::AppLogEmitter).to receive(:emitter=).with(loggregator_emitter) subject end diff --git a/spec/unit/lib/loggregator_emitter/client_spec.rb b/spec/unit/lib/loggregator_emitter/client_spec.rb new file mode 100644 index 00000000000..4df0fbf8ed2 --- /dev/null +++ b/spec/unit/lib/loggregator_emitter/client_spec.rb @@ -0,0 +1,123 @@ +require 'rspec' +require 'loggregator_emitter/client' + +RSpec.describe LoggregatorEmitter::Client do + let(:stub) { instance_double(Loggregator::V2::Ingress::Stub) } + + describe '#initialize' do + let(:endpoint) { 'localhost:1234' } + let(:origin) { 'cloud_controller' } + let(:source_type) { 'API' } + let(:instance_id) { 0 } + + it 'raises ArgumentError when endpoint is empty' do + expect do + described_class.new(endpoint: '', origin: origin, source_type: source_type, instance_id: instance_id) + end.to raise_error(ArgumentError, 'Must provide a valid endpoint') + end + + it 'raises ArgumentError when endpoint is nil' do + expect do + described_class.new(endpoint: nil, origin: origin, source_type: source_type, instance_id: instance_id) + end.to raise_error(ArgumentError, 'Must provide a valid endpoint') + end + + it 'raises ArgumentError when origin is nil' do + expect do + described_class.new(endpoint: endpoint, origin: nil, source_type: source_type, instance_id: instance_id) + end.to raise_error(ArgumentError, 'Must provide a valid origin') + end + + it 'raises ArgumentError when source_type is nil' do + expect do + described_class.new(endpoint: endpoint, origin: origin, source_type: nil, instance_id: instance_id) + end.to raise_error(ArgumentError, 'Must provide a valid source_type') + end + + it 'creates a client with valid arguments' do + expect do + described_class.new(endpoint: endpoint, origin: origin, source_type: source_type, instance_id: instance_id) + end.not_to raise_error + end + end + + describe '#emit' do + subject(:client) do + described_class.new(endpoint: 'localhost:1234', origin: 'cloud_controller', source_type: 'API', instance_id: 0) + end + + before do + allow(Loggregator::V2::Ingress::Stub).to receive(:new).and_return(stub) + allow(stub).to receive(:send) + end + + it 'sends an envelope with OUT type' do + client.emit('app-guid-123', 'some log message') + expect(stub).to have_received(:send) do |batch| + envelope = batch.batch.first + expect(envelope.source_id).to eq('app-guid-123') + expect(envelope.log.type).to eq(:OUT) + expect(envelope.log.payload).to eq('some log message') + end + end + end + + describe '#emit_error' do + subject(:client) do + described_class.new(endpoint: 'localhost:1234', origin: 'cloud_controller', source_type: 'API', instance_id: 0) + end + + before do + allow(Loggregator::V2::Ingress::Stub).to receive(:new).and_return(stub) + allow(stub).to receive(:send) + end + + it 'sends an envelope with ERR type' do + client.emit_error('app-guid-123', 'some error message') + expect(stub).to have_received(:send) do |batch| + envelope = batch.batch.first + expect(envelope.source_id).to eq('app-guid-123') + expect(envelope.log.type).to eq(:ERR) + expect(envelope.log.payload).to eq('some error message') + end + end + end + + describe 'credentials' do + before do + allow(Loggregator::V2::Ingress::Stub).to receive(:new).and_return(stub) + allow(stub).to receive(:send) + end + + it 'uses insecure credentials when no cert files are provided' do + client = described_class.new(endpoint: 'localhost:1234', origin: 'cloud_controller', source_type: 'API', instance_id: 0) + client.emit('app-guid-123', 'message') + expect(Loggregator::V2::Ingress::Stub).to have_received(:new).with('localhost:1234', :this_channel_is_insecure, + channel_args: {}, timeout: 10) + end + + it 'uses TLS credentials when all cert files are provided' do + tls_creds = instance_double(GRPC::Core::ChannelCredentials) + allow(File).to receive(:read).with('/certs/ca.crt').and_return('ca-cert-content') + allow(File).to receive(:read).with('/certs/client.key').and_return('client-key-content') + allow(File).to receive(:read).with('/certs/client.crt').and_return('client-cert-content') + allow(GRPC::Core::ChannelCredentials).to receive(:new).and_return(tls_creds) + + client = described_class.new( + endpoint: 'localhost:1234', + origin: 'cloud_controller', + source_type: 'API', + instance_id: 0, + ca_cert_file: '/certs/ca.crt', + client_cert_file: '/certs/client.crt', + client_key_file: '/certs/client.key', + subject_name: 'metron' + ) + client.emit('app-guid-123', 'message') + + expect(GRPC::Core::ChannelCredentials).to have_received(:new).with('ca-cert-content', 'client-key-content', 'client-cert-content') + expect(Loggregator::V2::Ingress::Stub).to have_received(:new).with('localhost:1234', tls_creds, channel_args: { 'grpc.ssl_target_name_override': 'metron' }, + timeout: 10) + end + end +end