diff --git a/CHANGELOG.md b/CHANGELOG.md index a09ab94f97..43f5dd1841 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,6 +44,7 @@ * [ENHANCEMENT] Compactor: Add partition group creation time to visit marker. #7217 * [ENHANCEMENT] Compactor: Add concurrency for partition cleanup and mark block for deletion #7246 * [ENHANCEMENT] Distributor: Validate metric name before removing empty labels. #7253 +* [ENHANCEMENT] Ruler/Ingester: Propagate append hints to discard out of order samples on Ingester #7226 * [ENHANCEMENT] Make cortex_ingester_tsdb_sample_ooo_delta metric per-tenant #7278 * [ENHANCEMENT] Distributor: Add dimension `nhcb` to keep track of nhcb samples in `cortex_distributor_received_samples_total` and `cortex_distributor_samples_in_total` metrics. * [ENHANCEMENT] Distributor: Add `-distributor.accept-unknown-remote-write-content-type` flag. When enabled, requests with unknown or invalid Content-Type header are treated as remote write v1 instead of returning 415 Unsupported Media Type. Default is false. #7293 diff --git a/pkg/cortexpb/cortex.pb.go b/pkg/cortexpb/cortex.pb.go index e3bd3e51bf..cb87faedba 100644 --- a/pkg/cortexpb/cortex.pb.go +++ b/pkg/cortexpb/cortex.pb.go @@ -192,6 +192,8 @@ type WriteRequest struct { Metadata []*MetricMetadata `protobuf:"bytes,3,rep,name=metadata,proto3" json:"metadata,omitempty"` SkipLabelNameValidation bool `protobuf:"varint,1000,opt,name=skip_label_name_validation,json=skipLabelNameValidation,proto3" json:"skip_label_name_validation,omitempty"` MessageWithBufRef `protobuf:"bytes,1001,opt,name=Ref,proto3,embedded=Ref,customtype=MessageWithBufRef" json:"Ref"` + // When true, indicates that out-of-order samples should be discarded even if OOO is enabled. + DiscardOutOfOrder bool `protobuf:"varint,1002,opt,name=discard_out_of_order,json=discardOutOfOrder,proto3" json:"discard_out_of_order,omitempty"` } func (m *WriteRequest) Reset() { *m = WriteRequest{} } @@ -247,6 +249,13 @@ func (m *WriteRequest) GetSkipLabelNameValidation() bool { return false } +func (m *WriteRequest) GetDiscardOutOfOrder() bool { + if m != nil { + return m.DiscardOutOfOrder + } + return false +} + // refer to https://github.com/prometheus/prometheus/blob/v3.5.0/prompb/io/prometheus/write/v2/types.proto // The histogram and Sample are shared with PRW1. type WriteRequestV2 struct { @@ -1358,100 +1367,102 @@ func init() { func init() { proto.RegisterFile("cortex.proto", fileDescriptor_893a47d0a749d749) } var fileDescriptor_893a47d0a749d749 = []byte{ - // 1477 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x57, 0xcd, 0x6f, 0x1b, 0x45, - 0x14, 0xf7, 0xf8, 0x7b, 0x9f, 0x3f, 0xba, 0x99, 0xba, 0xed, 0x36, 0x6d, 0xd7, 0xa9, 0x2b, 0x20, - 0x2a, 0x55, 0x40, 0xa9, 0x28, 0x50, 0x55, 0x48, 0x76, 0xea, 0x34, 0x56, 0x6b, 0x27, 0x1a, 0x3b, - 0xa9, 0xca, 0xc5, 0xda, 0x38, 0xe3, 0x78, 0xd5, 0xfd, 0x30, 0x3b, 0xeb, 0xaa, 0xe1, 0xc4, 0x09, - 0x71, 0xe4, 0xc2, 0x85, 0x1b, 0xe2, 0xc2, 0x95, 0x33, 0xff, 0x40, 0x8f, 0xb9, 0x51, 0x55, 0x22, - 0xa2, 0xee, 0xa5, 0xdc, 0x7a, 0x80, 0x3b, 0x9a, 0xd9, 0x4f, 0xc7, 0xa9, 0x0a, 0xa8, 0x07, 0x6e, - 0x33, 0xbf, 0xf7, 0x66, 0xe6, 0x37, 0x6f, 0x7e, 0xef, 0xb7, 0x36, 0x14, 0x07, 0xb6, 0xe3, 0xd2, - 0xc7, 0x2b, 0x63, 0xc7, 0x76, 0x6d, 0x9c, 0xf7, 0x66, 0xe3, 0xdd, 0xc5, 0xca, 0xbe, 0xbd, 0x6f, - 0x0b, 0xf0, 0x03, 0x3e, 0xf2, 0xe2, 0xb5, 0xf3, 0xb0, 0xd0, 0xa6, 0x8c, 0x69, 0xfb, 0xf4, 0xbe, - 0xee, 0x8e, 0x1a, 0x93, 0x21, 0xa1, 0xc3, 0x9b, 0xe9, 0x57, 0x3f, 0x54, 0x13, 0xb5, 0x5f, 0x93, - 0x50, 0xbc, 0xef, 0xe8, 0x2e, 0x25, 0xf4, 0x8b, 0x09, 0x65, 0x2e, 0xde, 0x02, 0x70, 0x75, 0x93, - 0x32, 0xea, 0xe8, 0x94, 0x29, 0x68, 0x29, 0xb5, 0x5c, 0x58, 0xad, 0xac, 0x04, 0x07, 0xac, 0xf4, - 0x74, 0x93, 0x76, 0x45, 0xac, 0xb1, 0xf8, 0xe4, 0xa8, 0x9a, 0x78, 0x76, 0x54, 0xc5, 0x5b, 0x0e, - 0xd5, 0x0c, 0xc3, 0x1e, 0xf4, 0xc2, 0x75, 0x24, 0xb6, 0x07, 0xbe, 0x06, 0xd9, 0xae, 0x3d, 0x71, - 0x06, 0x54, 0x49, 0x2e, 0xa1, 0xe5, 0x72, 0x7c, 0x37, 0x0f, 0x6f, 0x5a, 0x13, 0x93, 0xf8, 0x39, - 0xf8, 0x26, 0xe4, 0x4d, 0xea, 0x6a, 0x7b, 0x9a, 0xab, 0x29, 0x29, 0x71, 0xba, 0x12, 0xe5, 0xb7, - 0xa9, 0xeb, 0xe8, 0x83, 0xb6, 0x1f, 0x6f, 0xa4, 0x9f, 0x1c, 0x55, 0x11, 0x09, 0xf3, 0xf1, 0x2d, - 0x58, 0x64, 0x0f, 0xf5, 0x71, 0xdf, 0xd0, 0x76, 0xa9, 0xd1, 0xb7, 0x34, 0x93, 0xf6, 0x1f, 0x69, - 0x86, 0xbe, 0xa7, 0xb9, 0xba, 0x6d, 0x29, 0x2f, 0x73, 0x4b, 0x68, 0x39, 0x4f, 0xce, 0xf1, 0x94, - 0x7b, 0x3c, 0xa3, 0xa3, 0x99, 0x74, 0x27, 0x8c, 0xe3, 0x36, 0xa4, 0x08, 0x1d, 0x2a, 0x7f, 0xf0, - 0xb4, 0xc2, 0xea, 0x85, 0xf8, 0xa9, 0xc7, 0x6a, 0xd7, 0xb8, 0xc4, 0xaf, 0x7e, 0x78, 0x54, 0x45, - 0xcf, 0x8e, 0xaa, 0xf3, 0xa5, 0x25, 0x7c, 0x9f, 0xda, 0x2f, 0x49, 0x28, 0xc7, 0x2b, 0xbb, 0xb3, - 0x8a, 0x15, 0xc8, 0xb1, 0x03, 0x73, 0xd7, 0x36, 0x98, 0x92, 0x5e, 0x4a, 0x2d, 0x4b, 0x24, 0x98, - 0xe2, 0xde, 0x4c, 0xd5, 0x33, 0xe2, 0xde, 0x67, 0x4f, 0xaa, 0xfa, 0xce, 0x6a, 0xe3, 0xa2, 0x5f, - 0xf7, 0xca, 0x7c, 0xdd, 0x77, 0x56, 0x5f, 0x53, 0xf9, 0xec, 0x3f, 0xa8, 0xfc, 0xff, 0xad, 0x7a, - 0xc5, 0xf8, 0xad, 0x71, 0x15, 0x0a, 0x82, 0x18, 0xeb, 0x3b, 0x74, 0xe8, 0x09, 0xb3, 0x44, 0xc0, - 0x83, 0x08, 0x1d, 0x32, 0xfc, 0x21, 0xe4, 0x98, 0x66, 0x8e, 0x0d, 0xca, 0x94, 0xa4, 0xa8, 0x9f, - 0x1c, 0xbb, 0xad, 0x08, 0x08, 0xbd, 0x24, 0x48, 0x90, 0x86, 0x3f, 0x05, 0x18, 0xe9, 0xcc, 0xb5, - 0xf7, 0x1d, 0xcd, 0x64, 0xbe, 0xd8, 0x4e, 0x47, 0x8b, 0x36, 0x82, 0x98, 0xbf, 0x2e, 0x96, 0x8c, - 0x3f, 0x01, 0x89, 0x3e, 0xa6, 0xe6, 0xd8, 0xd0, 0x1c, 0xef, 0x2d, 0x67, 0x9a, 0xa4, 0xe9, 0x87, - 0x76, 0x56, 0xfd, 0xa5, 0x51, 0x32, 0xbe, 0x11, 0xd3, 0x77, 0x46, 0xd4, 0xaa, 0x32, 0xa3, 0x6f, - 0x11, 0x09, 0x17, 0x46, 0xda, 0x7e, 0x1f, 0x16, 0x06, 0x0e, 0xd5, 0x5c, 0xba, 0xd7, 0x17, 0x2f, - 0xec, 0x6a, 0xe6, 0x58, 0x3c, 0x6b, 0x8a, 0xc8, 0x7e, 0xa0, 0x17, 0xe0, 0x35, 0x0d, 0x20, 0xe2, - 0xf0, 0xe6, 0xd2, 0x55, 0x20, 0xf3, 0x48, 0x33, 0x26, 0x5e, 0x83, 0x22, 0xe2, 0x4d, 0xf0, 0x45, - 0x90, 0xa2, 0x93, 0x52, 0xe2, 0xa4, 0x08, 0xe0, 0xc6, 0x01, 0x11, 0x5d, 0x7c, 0x1d, 0xd2, 0xee, - 0xc1, 0x98, 0x2a, 0x48, 0x08, 0xad, 0x7a, 0xd2, 0x95, 0xfc, 0xee, 0xed, 0x1d, 0x8c, 0x29, 0x11, - 0xc9, 0xf8, 0x3c, 0xe4, 0x47, 0xd4, 0x18, 0x73, 0x5a, 0xe2, 0x80, 0x12, 0xc9, 0xf1, 0x39, 0xa1, - 0x43, 0x1e, 0x9a, 0x58, 0xba, 0x2b, 0x42, 0x69, 0x2f, 0xc4, 0xe7, 0x5c, 0x1a, 0xbf, 0x21, 0x71, - 0xb2, 0xbf, 0x15, 0xbe, 0x00, 0xe7, 0xda, 0xcd, 0x1e, 0x69, 0xad, 0xf5, 0x7b, 0x0f, 0xb6, 0x9a, - 0xfd, 0xed, 0x4e, 0x77, 0xab, 0xb9, 0xd6, 0x5a, 0x6f, 0x35, 0x6f, 0xcb, 0x09, 0x7c, 0x0e, 0x4e, - 0xc7, 0x83, 0x6b, 0x9b, 0xdb, 0x9d, 0x5e, 0x93, 0xc8, 0x08, 0x9f, 0x81, 0x85, 0x78, 0xe0, 0x4e, - 0x7d, 0xfb, 0x4e, 0x53, 0x4e, 0xe2, 0xf3, 0x70, 0x26, 0x0e, 0x6f, 0xb4, 0xba, 0xbd, 0xcd, 0x3b, - 0xa4, 0xde, 0x96, 0x53, 0x58, 0x85, 0xc5, 0xb9, 0x15, 0x51, 0x3c, 0x7d, 0xfc, 0xa8, 0xee, 0x76, - 0xbb, 0x5d, 0x27, 0x0f, 0xe4, 0x0c, 0xae, 0x80, 0x1c, 0x0f, 0xb4, 0x3a, 0xeb, 0x9b, 0x72, 0x16, - 0x2b, 0x50, 0x99, 0x49, 0xef, 0xd5, 0x7b, 0xcd, 0x6e, 0xb3, 0x27, 0xe7, 0x6a, 0x3f, 0x23, 0xc0, - 0x5d, 0xd7, 0xa1, 0x9a, 0x39, 0x63, 0xcc, 0x8b, 0x90, 0xef, 0x51, 0x4b, 0xb3, 0xdc, 0xd6, 0x6d, - 0x51, 0x65, 0x89, 0x84, 0x73, 0xae, 0x7d, 0x3f, 0x4d, 0x3c, 0xe1, 0x8c, 0x77, 0xc4, 0x37, 0x21, - 0x41, 0x5a, 0xd0, 0xae, 0x2f, 0xdf, 0x52, 0xbb, 0x7e, 0x87, 0xa0, 0xe4, 0x1f, 0xc4, 0xc6, 0xb6, - 0xc5, 0x28, 0xc6, 0x90, 0x1e, 0xd8, 0x7b, 0x9e, 0x20, 0x32, 0x44, 0x8c, 0xb9, 0xff, 0x99, 0xde, - 0x7a, 0x41, 0x53, 0x22, 0xc1, 0x94, 0x47, 0xba, 0x7e, 0xf3, 0x7a, 0x4a, 0x0b, 0xa6, 0x58, 0x05, - 0xd8, 0x88, 0x9a, 0x34, 0x2d, 0x82, 0x31, 0x84, 0xab, 0xb4, 0x19, 0x76, 0x62, 0xc6, 0x53, 0x69, - 0x08, 0xd4, 0xfe, 0x44, 0x00, 0x91, 0x8d, 0xe0, 0x3a, 0x64, 0x3d, 0xd9, 0xfb, 0x1f, 0xb6, 0x58, - 0xb7, 0x0b, 0x4f, 0xdb, 0xd2, 0x74, 0xa7, 0x51, 0xf1, 0xfd, 0xb5, 0x28, 0xa0, 0xfa, 0x9e, 0x36, - 0x76, 0xa9, 0x43, 0xfc, 0x85, 0xff, 0xc1, 0x66, 0x6e, 0xc4, 0xbd, 0xc2, 0x73, 0x19, 0x3c, 0xef, - 0x15, 0xf3, 0x4e, 0x31, 0x6b, 0x4f, 0xe9, 0x7f, 0x61, 0x4f, 0xb5, 0x8f, 0x40, 0x0a, 0xef, 0xc3, - 0x5f, 0x82, 0x9b, 0xb9, 0x78, 0x89, 0x22, 0x11, 0xe3, 0xd9, 0x8e, 0x2f, 0xfa, 0x1d, 0x5f, 0xab, - 0x43, 0xd6, 0xbb, 0x42, 0x14, 0x47, 0x71, 0x47, 0xb8, 0x0c, 0xc5, 0xd0, 0x00, 0xfa, 0x26, 0x13, - 0x8b, 0x53, 0xa4, 0x10, 0x62, 0x6d, 0x56, 0xfb, 0x3e, 0x09, 0xe5, 0xd9, 0xaf, 0x34, 0xfe, 0x78, - 0xc6, 0x1a, 0xae, 0xbc, 0xee, 0x6b, 0x3e, 0x6f, 0x0f, 0xd7, 0x00, 0x9b, 0x02, 0xeb, 0x0f, 0x35, - 0x53, 0x37, 0x0e, 0xc4, 0x37, 0xc9, 0x57, 0x8e, 0xec, 0x45, 0xd6, 0x45, 0x80, 0x7f, 0x8a, 0xf8, - 0x35, 0xb9, 0x79, 0x08, 0x89, 0x48, 0x44, 0x8c, 0x39, 0xc6, 0x5d, 0x43, 0xe8, 0x42, 0x22, 0x62, - 0x5c, 0x3b, 0x98, 0x71, 0x8f, 0x02, 0xe4, 0xb6, 0x3b, 0x77, 0x3b, 0x9b, 0xf7, 0x3b, 0x72, 0x82, - 0x4f, 0x22, 0x87, 0x90, 0x20, 0x13, 0xb8, 0x42, 0x09, 0xa4, 0xb8, 0x13, 0x60, 0x28, 0xcf, 0x75, - 0x7f, 0x01, 0x72, 0x51, 0xc7, 0xe7, 0x21, 0xed, 0x77, 0x79, 0x11, 0xf2, 0xb1, 0xce, 0xbe, 0x0b, - 0x59, 0xef, 0xe8, 0xb7, 0x20, 0xc4, 0xda, 0xd7, 0x08, 0xf2, 0x81, 0x78, 0xde, 0x86, 0xb0, 0x4f, - 0xfe, 0x08, 0x1c, 0x7f, 0xf2, 0xd4, 0xfc, 0x93, 0xff, 0x95, 0x01, 0x29, 0x14, 0x23, 0xbe, 0x04, - 0xd2, 0xc0, 0x9e, 0x58, 0x6e, 0x5f, 0xb7, 0x5c, 0xf1, 0xe4, 0xe9, 0x8d, 0x04, 0xc9, 0x0b, 0xa8, - 0x65, 0xb9, 0xf8, 0x32, 0x14, 0xbc, 0xf0, 0xd0, 0xb0, 0x35, 0xcf, 0xad, 0xd0, 0x46, 0x82, 0x80, - 0x00, 0xd7, 0x39, 0x86, 0x65, 0x48, 0xb1, 0x89, 0x29, 0x4e, 0x42, 0x84, 0x0f, 0xf1, 0x59, 0xc8, - 0xb2, 0xc1, 0x88, 0x9a, 0x9a, 0x78, 0xdc, 0x05, 0xe2, 0xcf, 0xf0, 0x3b, 0x50, 0xfe, 0x92, 0x3a, - 0x76, 0xdf, 0x1d, 0x39, 0x94, 0x8d, 0x6c, 0x63, 0x4f, 0x3c, 0x34, 0x22, 0x25, 0x8e, 0xf6, 0x02, - 0x10, 0xbf, 0xeb, 0xa7, 0x45, 0xbc, 0xb2, 0x82, 0x17, 0x22, 0x45, 0x8e, 0xaf, 0x05, 0xdc, 0xae, - 0x82, 0x1c, 0xcb, 0xf3, 0x08, 0xe6, 0x04, 0x41, 0x44, 0xca, 0x61, 0xa6, 0x47, 0xb2, 0x0e, 0x65, - 0x8b, 0xee, 0x6b, 0xae, 0xfe, 0x88, 0xf6, 0xd9, 0x58, 0xb3, 0x98, 0x92, 0x3f, 0xfe, 0x2b, 0xa0, - 0x31, 0x19, 0x3c, 0xa4, 0x6e, 0x77, 0xac, 0x59, 0x7e, 0x87, 0x96, 0x82, 0x15, 0x1c, 0x63, 0xf8, - 0x3d, 0x38, 0x15, 0x6e, 0xb1, 0x47, 0x0d, 0x57, 0x63, 0x8a, 0xb4, 0x94, 0x5a, 0xc6, 0x24, 0xdc, - 0xf9, 0xb6, 0x40, 0x67, 0x12, 0x05, 0x37, 0xa6, 0xc0, 0x52, 0x6a, 0x19, 0x45, 0x89, 0x82, 0x18, - 0xb7, 0xb7, 0xf2, 0xd8, 0x66, 0x7a, 0x8c, 0x54, 0xe1, 0xcd, 0xa4, 0x82, 0x15, 0x21, 0xa9, 0x70, - 0x0b, 0x9f, 0x54, 0xd1, 0x23, 0x15, 0xc0, 0x11, 0xa9, 0x30, 0xd1, 0x27, 0x55, 0xf2, 0x48, 0x05, - 0xb0, 0x4f, 0xea, 0x16, 0x80, 0x43, 0x19, 0x75, 0xfb, 0x23, 0x5e, 0xf9, 0xb2, 0x30, 0x81, 0x4b, - 0x27, 0xd8, 0xd8, 0x0a, 0xe1, 0x59, 0x1b, 0xba, 0xe5, 0x12, 0xc9, 0x09, 0x86, 0x73, 0xfa, 0x3b, - 0x35, 0xa7, 0x3f, 0x7c, 0x05, 0x4a, 0x83, 0x09, 0x73, 0x6d, 0xb3, 0x2f, 0x24, 0xcb, 0x14, 0x59, - 0xf0, 0x28, 0x7a, 0xe0, 0x8e, 0xc0, 0x6a, 0x37, 0x41, 0x0a, 0xf7, 0x9f, 0x6d, 0xfa, 0x1c, 0xa4, - 0x1e, 0x34, 0xbb, 0x32, 0xc2, 0x59, 0x48, 0x76, 0x36, 0xe5, 0x64, 0xd4, 0xf8, 0xa9, 0xc5, 0xf4, - 0x37, 0x3f, 0xaa, 0xa8, 0x91, 0x83, 0x8c, 0xb8, 0x61, 0xa3, 0x08, 0x10, 0x09, 0xa4, 0x76, 0x0b, - 0x20, 0xaa, 0x26, 0xd7, 0xa8, 0x3d, 0x1c, 0x32, 0xea, 0x89, 0x7e, 0x81, 0xf8, 0x33, 0x8e, 0x1b, - 0xd4, 0xda, 0x77, 0x47, 0x42, 0xeb, 0x25, 0xe2, 0xcf, 0xae, 0x56, 0x01, 0xa2, 0xdf, 0xe0, 0x9c, - 0x44, 0x7d, 0xab, 0x25, 0x27, 0xb8, 0x75, 0x90, 0xed, 0x7b, 0x4d, 0x19, 0x35, 0x3e, 0x3b, 0x7c, - 0xae, 0x26, 0x9e, 0x3e, 0x57, 0x13, 0xaf, 0x9e, 0xab, 0xe8, 0xab, 0xa9, 0x8a, 0x7e, 0x9a, 0xaa, - 0xe8, 0xc9, 0x54, 0x45, 0x87, 0x53, 0x15, 0xfd, 0x3e, 0x55, 0xd1, 0xcb, 0xa9, 0x9a, 0x78, 0x35, - 0x55, 0xd1, 0xb7, 0x2f, 0xd4, 0xc4, 0xe1, 0x0b, 0x35, 0xf1, 0xf4, 0x85, 0x9a, 0xf8, 0x3c, 0xfc, - 0x2b, 0xb8, 0x9b, 0x15, 0xff, 0xfd, 0xae, 0xff, 0x1d, 0x00, 0x00, 0xff, 0xff, 0x68, 0x4f, 0xef, - 0xc4, 0x2b, 0x0e, 0x00, 0x00, + // 1517 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x57, 0xcd, 0x6f, 0x13, 0xc7, + 0x1b, 0xf6, 0xfa, 0xdb, 0xaf, 0x3f, 0xd8, 0x0c, 0x06, 0x36, 0x01, 0xd6, 0xc1, 0xe8, 0xf7, 0x6b, + 0x44, 0x51, 0x8a, 0x82, 0x4a, 0x5b, 0x84, 0x2a, 0xd9, 0xc1, 0x21, 0x16, 0xd8, 0x8e, 0xc6, 0x4e, + 0x10, 0xbd, 0xac, 0x36, 0xf6, 0x38, 0x5e, 0xe1, 0xdd, 0x75, 0x77, 0xc6, 0x88, 0xf4, 0xd4, 0x53, + 0xd5, 0xde, 0x7a, 0xe9, 0xa5, 0xb7, 0xaa, 0x97, 0x5e, 0x7b, 0xee, 0x3f, 0xc0, 0x31, 0xb7, 0x22, + 0xa4, 0x46, 0x25, 0x5c, 0x68, 0x4f, 0x1c, 0xda, 0x7b, 0x35, 0xb3, 0x9f, 0x8e, 0x83, 0x68, 0x2b, + 0x0e, 0xbd, 0xcd, 0x3c, 0xef, 0x3b, 0x33, 0xcf, 0xcc, 0xfb, 0xbc, 0xcf, 0xda, 0x50, 0xe8, 0xdb, + 0x0e, 0x23, 0x8f, 0x57, 0x27, 0x8e, 0xcd, 0x6c, 0x94, 0x75, 0x67, 0x93, 0xdd, 0xa5, 0xf2, 0x9e, + 0xbd, 0x67, 0x0b, 0xf0, 0x3d, 0x3e, 0x72, 0xe3, 0xd5, 0x45, 0x58, 0x68, 0x11, 0x4a, 0xf5, 0x3d, + 0x72, 0xdf, 0x60, 0xa3, 0xfa, 0x74, 0x88, 0xc9, 0xf0, 0x66, 0xf2, 0xd5, 0x77, 0x95, 0x58, 0xf5, + 0xab, 0x04, 0x14, 0xee, 0x3b, 0x06, 0x23, 0x98, 0x7c, 0x3a, 0x25, 0x94, 0xa1, 0x2d, 0x00, 0x66, + 0x98, 0x84, 0x12, 0xc7, 0x20, 0x54, 0x91, 0x96, 0x13, 0x2b, 0xf9, 0xb5, 0xf2, 0xaa, 0x7f, 0xc0, + 0x6a, 0xcf, 0x30, 0x49, 0x57, 0xc4, 0xea, 0x4b, 0x4f, 0x0e, 0x2b, 0xb1, 0x67, 0x87, 0x15, 0xb4, + 0xe5, 0x10, 0x7d, 0x3c, 0xb6, 0xfb, 0xbd, 0x60, 0x1d, 0x8e, 0xec, 0x81, 0xae, 0x42, 0xba, 0x6b, + 0x4f, 0x9d, 0x3e, 0x51, 0xe2, 0xcb, 0xd2, 0x4a, 0x29, 0xba, 0x9b, 0x8b, 0x37, 0xac, 0xa9, 0x89, + 0xbd, 0x1c, 0x74, 0x13, 0xb2, 0x26, 0x61, 0xfa, 0x40, 0x67, 0xba, 0x92, 0x10, 0xa7, 0x2b, 0x61, + 0x7e, 0x8b, 0x30, 0xc7, 0xe8, 0xb7, 0xbc, 0x78, 0x3d, 0xf9, 0xe4, 0xb0, 0x22, 0xe1, 0x20, 0x1f, + 0xdd, 0x82, 0x25, 0xfa, 0xd0, 0x98, 0x68, 0x63, 0x7d, 0x97, 0x8c, 0x35, 0x4b, 0x37, 0x89, 0xf6, + 0x48, 0x1f, 0x1b, 0x03, 0x9d, 0x19, 0xb6, 0xa5, 0xbc, 0xcc, 0x2c, 0x4b, 0x2b, 0x59, 0x7c, 0x8e, + 0xa7, 0xdc, 0xe3, 0x19, 0x6d, 0xdd, 0x24, 0x3b, 0x41, 0x1c, 0xb5, 0x20, 0x81, 0xc9, 0x50, 0xf9, + 0x8d, 0xa7, 0xe5, 0xd7, 0xce, 0x47, 0x4f, 0x3d, 0xf6, 0x76, 0xf5, 0x8b, 0xfc, 0xea, 0x07, 0x87, + 0x15, 0xe9, 0xd9, 0x61, 0x65, 0xfe, 0x69, 0x31, 0xdf, 0x07, 0x5d, 0x83, 0xf2, 0xc0, 0xa0, 0x7d, + 0xdd, 0x19, 0x68, 0xf6, 0x94, 0x69, 0xf6, 0x50, 0xb3, 0x9d, 0x01, 0x71, 0x94, 0xdf, 0x5d, 0x1a, + 0x0b, 0x5e, 0xb0, 0x33, 0x65, 0x9d, 0x61, 0x87, 0x47, 0xaa, 0x3f, 0xc5, 0xa1, 0x14, 0xad, 0xc5, + 0xce, 0x1a, 0x52, 0x20, 0x43, 0xf7, 0xcd, 0x5d, 0x7b, 0x4c, 0x95, 0xe4, 0x72, 0x62, 0x25, 0x87, + 0xfd, 0x29, 0xea, 0xcd, 0xd4, 0x29, 0x25, 0x5e, 0xea, 0xec, 0x49, 0x75, 0xda, 0x59, 0xab, 0x5f, + 0xf0, 0x2a, 0x55, 0x9e, 0xaf, 0xd4, 0xce, 0xda, 0x6b, 0x6a, 0x95, 0xfe, 0x1b, 0xb5, 0xfa, 0x2f, + 0xbd, 0x37, 0x7f, 0xbd, 0x42, 0xf4, 0xd6, 0xa8, 0x02, 0x79, 0x41, 0x8c, 0x6a, 0x0e, 0x19, 0xba, + 0x52, 0x2e, 0x62, 0x70, 0x21, 0x4c, 0x86, 0x14, 0x5d, 0x83, 0x0c, 0xd5, 0xcd, 0xc9, 0x98, 0x50, + 0x25, 0x2e, 0xde, 0x4f, 0x8e, 0xdc, 0x56, 0x04, 0x84, 0xc2, 0x62, 0xd8, 0x4f, 0x43, 0x1f, 0x01, + 0x8c, 0x0c, 0xca, 0xec, 0x3d, 0x47, 0x37, 0xa9, 0x27, 0xcf, 0xd3, 0xe1, 0xa2, 0x4d, 0x3f, 0xe6, + 0xad, 0x8b, 0x24, 0xa3, 0x0f, 0x21, 0x47, 0x1e, 0x13, 0x73, 0x32, 0xd6, 0x1d, 0xb7, 0x96, 0x33, + 0x6d, 0xd5, 0xf0, 0x42, 0x3b, 0x6b, 0xde, 0xd2, 0x30, 0x19, 0xdd, 0x88, 0x74, 0x44, 0x4a, 0xbc, + 0x55, 0x79, 0xa6, 0x23, 0x44, 0x24, 0x58, 0x18, 0x76, 0xc3, 0xbb, 0xb0, 0xd0, 0x77, 0x88, 0xce, + 0xc8, 0x40, 0x13, 0x15, 0x66, 0xba, 0x39, 0x11, 0x65, 0x4d, 0x60, 0xd9, 0x0b, 0xf4, 0x7c, 0xbc, + 0xaa, 0x03, 0x84, 0x1c, 0xde, 0xfc, 0x74, 0x65, 0x48, 0x3d, 0xd2, 0xc7, 0x53, 0xb7, 0xa5, 0x25, + 0xec, 0x4e, 0xd0, 0x05, 0xc8, 0x85, 0x27, 0x25, 0xc4, 0x49, 0x21, 0x50, 0xfd, 0x39, 0x0e, 0x10, + 0xd2, 0x45, 0xd7, 0x21, 0xc9, 0xf6, 0x27, 0x44, 0x91, 0x84, 0xd0, 0x2a, 0x27, 0x5d, 0xc9, 0xeb, + 0xf7, 0xde, 0xfe, 0x84, 0x60, 0x91, 0x8c, 0x16, 0x21, 0x3b, 0x22, 0xe3, 0x09, 0xa7, 0x25, 0x0e, + 0x28, 0xe2, 0x0c, 0x9f, 0xf3, 0x7e, 0x5b, 0x84, 0xec, 0xd4, 0x32, 0x98, 0x08, 0x25, 0xdd, 0x10, + 0x9f, 0x73, 0x69, 0xfc, 0x22, 0x89, 0x93, 0xbd, 0xad, 0xd0, 0x79, 0x38, 0xd7, 0x6a, 0xf4, 0x70, + 0x73, 0x5d, 0xeb, 0x3d, 0xd8, 0x6a, 0x68, 0xdb, 0xed, 0xee, 0x56, 0x63, 0xbd, 0xb9, 0xd1, 0x6c, + 0xdc, 0x96, 0x63, 0xe8, 0x1c, 0x9c, 0x8e, 0x06, 0xd7, 0x3b, 0xdb, 0xed, 0x5e, 0x03, 0xcb, 0x12, + 0x3a, 0x03, 0x0b, 0xd1, 0xc0, 0x9d, 0xda, 0xf6, 0x9d, 0x86, 0x1c, 0x47, 0x8b, 0x70, 0x26, 0x0a, + 0x6f, 0x36, 0xbb, 0xbd, 0xce, 0x1d, 0x5c, 0x6b, 0xc9, 0x09, 0xa4, 0xc2, 0xd2, 0xdc, 0x8a, 0x30, + 0x9e, 0x3c, 0x7e, 0x54, 0x77, 0xbb, 0xd5, 0xaa, 0xe1, 0x07, 0x72, 0x0a, 0x95, 0x41, 0x8e, 0x06, + 0x9a, 0xed, 0x8d, 0x8e, 0x9c, 0x46, 0x0a, 0x94, 0x67, 0xd2, 0x7b, 0xb5, 0x5e, 0xa3, 0xdb, 0xe8, + 0xc9, 0x99, 0xea, 0x8f, 0x12, 0xa0, 0x2e, 0x73, 0x88, 0x6e, 0xce, 0x58, 0xf9, 0x12, 0x64, 0x7b, + 0xc4, 0xd2, 0x2d, 0xd6, 0xbc, 0x2d, 0x5e, 0x39, 0x87, 0x83, 0x39, 0xd7, 0xbe, 0x97, 0x26, 0x4a, + 0x38, 0xe3, 0x1d, 0xd1, 0x4d, 0xb0, 0x9f, 0xe6, 0xb7, 0xeb, 0xcb, 0xb7, 0xd4, 0xae, 0xdf, 0x48, + 0x50, 0xf4, 0x0e, 0xa2, 0x13, 0xdb, 0xa2, 0x04, 0x21, 0x48, 0xf6, 0xed, 0x81, 0x2b, 0x88, 0x14, + 0x16, 0x63, 0xee, 0x7f, 0xa6, 0xbb, 0x5e, 0xd0, 0xcc, 0x61, 0x7f, 0xca, 0x23, 0x5d, 0xaf, 0x79, + 0x5d, 0xa5, 0xf9, 0x53, 0xa4, 0x02, 0x6c, 0x86, 0x4d, 0x9a, 0x14, 0xc1, 0x08, 0xc2, 0x55, 0xda, + 0x08, 0x3a, 0x31, 0xe5, 0xaa, 0x34, 0x00, 0xaa, 0x7f, 0x48, 0x00, 0xa1, 0x8d, 0xa0, 0x1a, 0xa4, + 0x5d, 0xd9, 0x7b, 0x9f, 0xc2, 0x48, 0xb7, 0x0b, 0x4f, 0xdb, 0xd2, 0x0d, 0xa7, 0x5e, 0xf6, 0xfc, + 0xb5, 0x20, 0xa0, 0xda, 0x40, 0x9f, 0x30, 0xe2, 0x60, 0x6f, 0xe1, 0xbf, 0xb0, 0x99, 0x1b, 0x51, + 0xaf, 0x70, 0x5d, 0x06, 0xcd, 0x7b, 0xc5, 0xbc, 0x53, 0xcc, 0xda, 0x53, 0xf2, 0x1f, 0xd8, 0x53, + 0xf5, 0x7d, 0xc8, 0x05, 0xf7, 0xe1, 0x95, 0xe0, 0x66, 0x2e, 0x2a, 0x51, 0xc0, 0x62, 0x3c, 0xdb, + 0xf1, 0x05, 0xaf, 0xe3, 0xab, 0x35, 0x48, 0xbb, 0x57, 0x08, 0xe3, 0x52, 0xd4, 0x11, 0x2e, 0x41, + 0x21, 0x30, 0x00, 0xcd, 0xa4, 0x62, 0x71, 0x02, 0xe7, 0x03, 0xac, 0x45, 0xab, 0xdf, 0xc6, 0xa1, + 0x34, 0xfb, 0x5d, 0x47, 0x1f, 0xcc, 0x58, 0xc3, 0xe5, 0xd7, 0x7d, 0xff, 0xe7, 0xed, 0xe1, 0x2a, + 0x20, 0x53, 0x60, 0xda, 0x50, 0x37, 0x8d, 0xf1, 0xbe, 0xf8, 0x26, 0x79, 0xca, 0x91, 0xdd, 0xc8, + 0x86, 0x08, 0xf0, 0x4f, 0x11, 0xbf, 0x26, 0x37, 0x0f, 0x21, 0x91, 0x1c, 0x16, 0x63, 0x8e, 0x71, + 0xd7, 0x10, 0xba, 0xc8, 0x61, 0x31, 0xae, 0xee, 0xcf, 0xb8, 0x47, 0x1e, 0x32, 0xdb, 0xed, 0xbb, + 0xed, 0xce, 0xfd, 0xb6, 0x1c, 0xe3, 0x93, 0xd0, 0x21, 0x72, 0x90, 0xf2, 0x5d, 0xa1, 0x08, 0xb9, + 0xa8, 0x13, 0x20, 0x28, 0xcd, 0x75, 0x7f, 0x1e, 0x32, 0x61, 0xc7, 0x67, 0x21, 0xe9, 0x75, 0x79, + 0x01, 0xb2, 0x91, 0xce, 0xbe, 0x0b, 0x69, 0xf7, 0xe8, 0xb7, 0x20, 0xc4, 0xea, 0x17, 0x12, 0x64, + 0x7d, 0xf1, 0xbc, 0x0d, 0x61, 0x9f, 0xfc, 0x11, 0x38, 0x5e, 0xf2, 0xc4, 0x7c, 0xc9, 0xff, 0x4c, + 0x41, 0x2e, 0x10, 0x23, 0xba, 0x08, 0xb9, 0xbe, 0x3d, 0xb5, 0x98, 0x66, 0x58, 0x4c, 0x94, 0x3c, + 0xb9, 0x19, 0xc3, 0x59, 0x01, 0x35, 0x2d, 0x86, 0x2e, 0x41, 0xde, 0x0d, 0x0f, 0xc7, 0xb6, 0xee, + 0xba, 0x95, 0xb4, 0x19, 0xc3, 0x20, 0xc0, 0x0d, 0x8e, 0x21, 0x19, 0x12, 0x74, 0x6a, 0x8a, 0x93, + 0x24, 0xcc, 0x87, 0xe8, 0x2c, 0xa4, 0x69, 0x7f, 0x44, 0x4c, 0x5d, 0x14, 0x77, 0x01, 0x7b, 0x33, + 0xf4, 0x3f, 0x28, 0x7d, 0x46, 0x1c, 0x5b, 0x63, 0x23, 0x87, 0xd0, 0x91, 0x3d, 0x1e, 0x88, 0x42, + 0x4b, 0xb8, 0xc8, 0xd1, 0x9e, 0x0f, 0xa2, 0xff, 0x7b, 0x69, 0x21, 0xaf, 0xb4, 0xe0, 0x25, 0xe1, + 0x02, 0xc7, 0xd7, 0x7d, 0x6e, 0x57, 0x40, 0x8e, 0xe4, 0xb9, 0x04, 0x33, 0x82, 0xa0, 0x84, 0x4b, + 0x41, 0xa6, 0x4b, 0xb2, 0x06, 0x25, 0x8b, 0xec, 0xe9, 0xcc, 0x78, 0x44, 0x34, 0x3a, 0xd1, 0x2d, + 0xaa, 0x64, 0x8f, 0xff, 0x0a, 0xa8, 0x4f, 0xfb, 0x0f, 0x09, 0xeb, 0x4e, 0x74, 0xcb, 0xeb, 0xd0, + 0xa2, 0xbf, 0x82, 0x63, 0x14, 0xbd, 0x03, 0xa7, 0x82, 0x2d, 0x06, 0x64, 0xcc, 0x74, 0xaa, 0xe4, + 0x96, 0x13, 0x2b, 0x08, 0x07, 0x3b, 0xdf, 0x16, 0xe8, 0x4c, 0xa2, 0xe0, 0x46, 0x15, 0x58, 0x4e, + 0xac, 0x48, 0x61, 0xa2, 0x20, 0xc6, 0xed, 0xad, 0x34, 0xb1, 0xa9, 0x11, 0x21, 0x95, 0x7f, 0x33, + 0x29, 0x7f, 0x45, 0x40, 0x2a, 0xd8, 0xc2, 0x23, 0x55, 0x70, 0x49, 0xf9, 0x70, 0x48, 0x2a, 0x48, + 0xf4, 0x48, 0x15, 0x5d, 0x52, 0x3e, 0xec, 0x91, 0xba, 0x05, 0xe0, 0x10, 0x4a, 0x98, 0x36, 0xe2, + 0x2f, 0x5f, 0x12, 0x26, 0x70, 0xf1, 0x04, 0x1b, 0x5b, 0xc5, 0x3c, 0x6b, 0xd3, 0xb0, 0x18, 0xce, + 0x39, 0xfe, 0x70, 0x4e, 0x7f, 0xa7, 0xe6, 0xf4, 0x87, 0x2e, 0x43, 0xb1, 0x3f, 0xa5, 0xcc, 0x36, + 0x35, 0x21, 0x59, 0xaa, 0xc8, 0x82, 0x47, 0xc1, 0x05, 0x77, 0x04, 0x56, 0xbd, 0x09, 0xb9, 0x60, + 0xff, 0xd9, 0xa6, 0xcf, 0x40, 0xe2, 0x41, 0xa3, 0x2b, 0x4b, 0x28, 0x0d, 0xf1, 0x76, 0x47, 0x8e, + 0x87, 0x8d, 0x9f, 0x58, 0x4a, 0x7e, 0xf9, 0xbd, 0x2a, 0xd5, 0x33, 0x90, 0x12, 0x37, 0xac, 0x17, + 0x00, 0x42, 0x81, 0x54, 0x6f, 0x01, 0x84, 0xaf, 0xc9, 0x35, 0x6a, 0x0f, 0x87, 0x94, 0xb8, 0xa2, + 0x5f, 0xc0, 0xde, 0x8c, 0xe3, 0x63, 0x62, 0xed, 0xb1, 0x91, 0xd0, 0x7a, 0x11, 0x7b, 0xb3, 0x2b, + 0x15, 0x80, 0xf0, 0x37, 0x38, 0x27, 0x51, 0xdb, 0x6a, 0xca, 0x31, 0x6e, 0x1d, 0x78, 0xfb, 0x5e, + 0x43, 0x96, 0xea, 0x1f, 0x1f, 0x3c, 0x57, 0x63, 0x4f, 0x9f, 0xab, 0xb1, 0x57, 0xcf, 0x55, 0xe9, + 0xf3, 0x23, 0x55, 0xfa, 0xe1, 0x48, 0x95, 0x9e, 0x1c, 0xa9, 0xd2, 0xc1, 0x91, 0x2a, 0xfd, 0x7a, + 0xa4, 0x4a, 0x2f, 0x8f, 0xd4, 0xd8, 0xab, 0x23, 0x55, 0xfa, 0xfa, 0x85, 0x1a, 0x3b, 0x78, 0xa1, + 0xc6, 0x9e, 0xbe, 0x50, 0x63, 0x9f, 0x04, 0x7f, 0x1e, 0x77, 0xd3, 0xe2, 0xdf, 0xe2, 0xf5, 0xbf, + 0x02, 0x00, 0x00, 0xff, 0xff, 0xb1, 0x8c, 0x07, 0xad, 0x5d, 0x0e, 0x00, 0x00, } func (x SourceEnum) String() string { @@ -1547,6 +1558,9 @@ func (this *WriteRequest) Equal(that interface{}) bool { if !this.MessageWithBufRef.Equal(that1.MessageWithBufRef) { return false } + if this.DiscardOutOfOrder != that1.DiscardOutOfOrder { + return false + } return true } func (this *WriteRequestV2) Equal(that interface{}) bool { @@ -2235,7 +2249,7 @@ func (this *WriteRequest) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 9) + s := make([]string, 0, 10) s = append(s, "&cortexpb.WriteRequest{") s = append(s, "Timeseries: "+fmt.Sprintf("%#v", this.Timeseries)+",\n") s = append(s, "Source: "+fmt.Sprintf("%#v", this.Source)+",\n") @@ -2244,6 +2258,7 @@ func (this *WriteRequest) GoString() string { } s = append(s, "SkipLabelNameValidation: "+fmt.Sprintf("%#v", this.SkipLabelNameValidation)+",\n") s = append(s, "MessageWithBufRef: "+fmt.Sprintf("%#v", this.MessageWithBufRef)+",\n") + s = append(s, "DiscardOutOfOrder: "+fmt.Sprintf("%#v", this.DiscardOutOfOrder)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -2567,6 +2582,18 @@ func (m *WriteRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.DiscardOutOfOrder { + i-- + if m.DiscardOutOfOrder { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x3e + i-- + dAtA[i] = 0xd0 + } { size := m.MessageWithBufRef.Size() i -= size @@ -3569,6 +3596,9 @@ func (m *WriteRequest) Size() (n int) { } l = m.MessageWithBufRef.Size() n += 2 + l + sovCortex(uint64(l)) + if m.DiscardOutOfOrder { + n += 3 + } return n } @@ -3995,6 +4025,7 @@ func (this *WriteRequest) String() string { `Metadata:` + repeatedStringForMetadata + `,`, `SkipLabelNameValidation:` + fmt.Sprintf("%v", this.SkipLabelNameValidation) + `,`, `MessageWithBufRef:` + fmt.Sprintf("%v", this.MessageWithBufRef) + `,`, + `DiscardOutOfOrder:` + fmt.Sprintf("%v", this.DiscardOutOfOrder) + `,`, `}`, }, "") return s @@ -4492,6 +4523,26 @@ func (m *WriteRequest) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 1002: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field DiscardOutOfOrder", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCortex + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.DiscardOutOfOrder = bool(v != 0) default: iNdEx = preIndex skippy, err := skipCortex(dAtA[iNdEx:]) diff --git a/pkg/cortexpb/cortex.proto b/pkg/cortexpb/cortex.proto index 8e27aa1ffa..6288511695 100644 --- a/pkg/cortexpb/cortex.proto +++ b/pkg/cortexpb/cortex.proto @@ -24,6 +24,8 @@ message WriteRequest { bool skip_label_name_validation = 1000; //set intentionally high to keep WriteRequest compatible with upstream Prometheus MessageWithBufRef Ref = 1001 [(gogoproto.embed) = true, (gogoproto.customtype) = "MessageWithBufRef", (gogoproto.nullable) = false]; + // When true, indicates that out-of-order samples should be discarded even if OOO is enabled. + bool discard_out_of_order = 1002; } // refer to https://github.com/prometheus/prometheus/blob/v3.5.0/prompb/io/prometheus/write/v2/types.proto diff --git a/pkg/cortexpb/timeseries.go b/pkg/cortexpb/timeseries.go index 4d780bba6a..194c61b528 100644 --- a/pkg/cortexpb/timeseries.go +++ b/pkg/cortexpb/timeseries.go @@ -103,6 +103,7 @@ func ReuseWriteRequest(req *PreallocWriteRequest) { req.Source = 0 req.Metadata = nil req.Timeseries = nil + req.DiscardOutOfOrder = false writeRequestPool.Put(req) } diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 9f8b25fc1d..3d74f36366 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -984,7 +984,7 @@ func (d *Distributor) doBatch(ctx context.Context, req *cortexpb.WriteRequest, s } } - return d.send(localCtx, ingester, timeseries, metadata, req.Source) + return d.send(localCtx, ingester, timeseries, metadata, req.Source, req.DiscardOutOfOrder) }, func() { cortexpb.ReuseSlice(req.Timeseries) req.Free() @@ -1252,7 +1252,7 @@ func sortLabelsIfNeeded(labels []cortexpb.LabelAdapter) { }) } -func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, timeseries []cortexpb.PreallocTimeseries, metadata []*cortexpb.MetricMetadata, source cortexpb.SourceEnum) error { +func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, timeseries []cortexpb.PreallocTimeseries, metadata []*cortexpb.MetricMetadata, source cortexpb.SourceEnum, discardOutOfOrder bool) error { h, err := d.ingesterPool.GetClientFor(ingester.Addr) if err != nil { return err @@ -1270,9 +1270,10 @@ func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, time if d.cfg.UseStreamPush { req := &cortexpb.WriteRequest{ - Timeseries: timeseries, - Metadata: metadata, - Source: source, + Timeseries: timeseries, + Metadata: metadata, + Source: source, + DiscardOutOfOrder: discardOutOfOrder, } _, err = c.PushStreamConnection(ctx, req) } else { @@ -1280,6 +1281,7 @@ func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, time req.Timeseries = timeseries req.Metadata = metadata req.Source = source + req.DiscardOutOfOrder = discardOutOfOrder _, err = c.PushPreAlloc(ctx, req) diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 4f2aa00395..2b3b91b0fe 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -430,6 +430,99 @@ func TestDistributor_Push(t *testing.T) { } } +func TestDistributor_Push_DiscardOutOfOrder(t *testing.T) { + t.Parallel() + + ctx := user.InjectOrgID(context.Background(), "userDiscardOOO") + + tests := []struct { + name string + discardOutOfOrder bool + expectedDiscardOOO bool + useStreamPush bool + }{ + { + name: "DiscardOutOfOrder=true with regular push", + discardOutOfOrder: true, + expectedDiscardOOO: true, + useStreamPush: false, + }, + { + name: "DiscardOutOfOrder=false with regular push", + discardOutOfOrder: false, + expectedDiscardOOO: false, + useStreamPush: false, + }, + { + name: "DiscardOutOfOrder=true with stream push", + discardOutOfOrder: true, + expectedDiscardOOO: true, + useStreamPush: true, + }, + { + name: "DiscardOutOfOrder=false with stream push", + discardOutOfOrder: false, + expectedDiscardOOO: false, + useStreamPush: true, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + limits := &validation.Limits{} + flagext.DefaultValues(limits) + + ds, ingesters, _, _ := prepare(t, prepConfig{ + numIngesters: 3, + happyIngesters: 3, + numDistributors: 1, + shardByAllLabels: true, + limits: limits, + useStreamPush: tc.useStreamPush, + }) + + request := makeWriteRequest(123456789000, 5, 0, 0) + request.DiscardOutOfOrder = tc.discardOutOfOrder + + _, err := ds[0].Push(ctx, request) + require.NoError(t, err) + + // Poll to ensure all ingesters have received the push before verifying. + test.Poll(t, time.Second, nil, func() any { + for _, ing := range ingesters { + ing.Lock() + pushCalls := ing.calls["Push"] + lastDiscardOOO := ing.lastDiscardOutOfOrder + ing.Unlock() + + // Wait for all ingesters to receive the push call + if pushCalls == 0 { + return fmt.Errorf("ingester has not received push yet") + } + + // Wait for the DiscardOutOfOrder flag to match expected value + if lastDiscardOOO != tc.expectedDiscardOOO { + return fmt.Errorf("ingester has DiscardOutOfOrder=%v, expected %v", lastDiscardOOO, tc.expectedDiscardOOO) + } + } + return nil + }) + + // Final assertion: verify all ingesters received the correct DiscardOutOfOrder flag + for _, ing := range ingesters { + ing.Lock() + lastDiscardOOO := ing.lastDiscardOutOfOrder + ing.Unlock() + + assert.Equal(t, tc.expectedDiscardOOO, lastDiscardOOO, + "ingester should have received DiscardOutOfOrder=%v", tc.expectedDiscardOOO) + } + }) + } +} + func TestDistributor_MetricsCleanup(t *testing.T) { t.Parallel() dists, _, regs, r := prepare(t, prepConfig{ @@ -3604,14 +3697,15 @@ type mockIngester struct { sync.Mutex client.IngesterClient grpc_health_v1.HealthClient - happy atomic.Bool - failResp atomic.Error - stats client.UsersStatsResponse - timeseries map[uint32]*cortexpb.PreallocTimeseries - metadata map[uint32]map[cortexpb.MetricMetadata]struct{} - queryDelay time.Duration - calls map[string]int - lblsValues []string + happy atomic.Bool + failResp atomic.Error + stats client.UsersStatsResponse + timeseries map[uint32]*cortexpb.PreallocTimeseries + metadata map[uint32]map[cortexpb.MetricMetadata]struct{} + queryDelay time.Duration + calls map[string]int + lblsValues []string + lastDiscardOutOfOrder bool } func newMockIngester(id int, ps *prepState, cfg prepConfig) *mockIngester { @@ -3682,6 +3776,9 @@ func (i *mockIngester) Push(ctx context.Context, req *cortexpb.WriteRequest, opt i.trackCall("Push") + // Store the DiscardOutOfOrder flag for test assertions + i.lastDiscardOutOfOrder = req.DiscardOutOfOrder + if !i.happy.Load() { return nil, i.failResp.Load() } diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 86fe0e9d44..59f2abd09e 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -1423,6 +1423,13 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte // Walk the samples, appending them to the users database app := db.Appender(ctx).(extendedAppender) + + // Even when OOO is enabled globally, we want to reject OOO samples in some cases. + // prometheus implementation: https://github.com/prometheus/prometheus/pull/14710 + if req.DiscardOutOfOrder { + app.SetOptions(&storage.AppendOptions{DiscardOutOfOrder: true}) + } + var newSeries []labels.Labels for _, ts := range req.Timeseries { diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index eb509e4e35..8f09aab987 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -7863,3 +7863,83 @@ func TestIngester_checkRegexMatcherLimits(t *testing.T) { }) } } +func TestIngester_DiscardOutOfOrderFlagIntegration(t *testing.T) { + registry := prometheus.NewRegistry() + cfg := defaultIngesterTestConfig(t) + cfg.LifecyclerConfig.JoinAfter = 0 + + limits := defaultLimitsTestConfig() + limits.EnableNativeHistograms = true + limits.OutOfOrderTimeWindow = model.Duration(60 * time.Minute) + + i, err := prepareIngesterWithBlocksStorageAndLimits(t, cfg, limits, nil, "", registry) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + + // Wait until it's ACTIVE + test.Poll(t, time.Second, ring.ACTIVE, func() any { + return i.lifecycler.GetState() + }) + + ctx := user.InjectOrgID(context.Background(), "test-user") + + // Create labels for our test metric + metricLabels := labels.FromStrings(labels.MetricName, "test_metric", "job", "test") + + currentTime := time.Now().UnixMilli() + olderTime := currentTime - 60000 // 1 minute earlier (within OOO window) + + // First, push a sample with current timestamp with discardOutOfOrder=true + req1 := cortexpb.ToWriteRequest( + []labels.Labels{metricLabels}, + []cortexpb.Sample{{Value: 100, TimestampMs: currentTime}}, + nil, nil, cortexpb.RULE) + req1.DiscardOutOfOrder = true + + _, err = i.Push(ctx, req1) + require.NoError(t, err, "First sample push should succeed") + + // Now try to push a sample with older timestamp with discardOutOfOrder=true + // This should be discarded because DiscardOutOfOrder is true + req2 := cortexpb.ToWriteRequest( + []labels.Labels{metricLabels}, + []cortexpb.Sample{{Value: 50, TimestampMs: olderTime}}, + nil, nil, cortexpb.RULE) + req2.DiscardOutOfOrder = true + + _, _ = i.Push(ctx, req2) + + // Query back the data to ensure only the first (current time) sample was stored + s := &mockQueryStreamServer{ctx: ctx} + err = i.QueryStream(&client.QueryRequest{ + StartTimestampMs: olderTime - 1000, + EndTimestampMs: currentTime + 1000, + Matchers: []*client.LabelMatcher{ + {Type: client.EQUAL, Name: labels.MetricName, Value: "test_metric"}, + }, + }, s) + require.NoError(t, err) + + // Verify we only have one series with one sample (the current time sample) + require.Len(t, s.series, 1, "Should have exactly one series") + + // Convert chunks to samples to verify content + series := s.series[0] + require.Len(t, series.Chunks, 1, "Should have exactly one chunk") + + chunk := series.Chunks[0] + chunkData, err := chunkenc.FromData(chunkenc.EncXOR, chunk.Data) + require.NoError(t, err) + + iter := chunkData.Iterator(nil) + sampleCount := 0 + for iter.Next() != chunkenc.ValNone { + ts, val := iter.At() + require.Equal(t, currentTime, ts, "Sample timestamp should match current time") + require.Equal(t, 100.0, val, "Sample value should match first push") + sampleCount++ + } + require.NoError(t, iter.Err()) + require.Equal(t, 1, sampleCount, "Should have exactly one sample stored") +} diff --git a/pkg/ruler/compat.go b/pkg/ruler/compat.go index 0dc5c0210e..3a13151b4c 100644 --- a/pkg/ruler/compat.go +++ b/pkg/ruler/compat.go @@ -49,6 +49,7 @@ type PusherAppender struct { histogramLabels []labels.Labels histograms []cortexpb.Histogram userID string + opts *storage.AppendOptions } func (a *PusherAppender) AppendHistogram(_ storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { @@ -73,7 +74,9 @@ func (a *PusherAppender) Append(_ storage.SeriesRef, l labels.Labels, t int64, v return 0, nil } -func (a *PusherAppender) SetOptions(opts *storage.AppendOptions) {} +func (a *PusherAppender) SetOptions(opts *storage.AppendOptions) { + a.opts = opts +} func (a *PusherAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { // AppendHistogramCTZeroSample is a no-op for PusherAppender as it happens during scrape time only. @@ -94,6 +97,12 @@ func (a *PusherAppender) Commit() error { req := cortexpb.ToWriteRequest(a.labels, a.samples, nil, nil, cortexpb.RULE) req.AddHistogramTimeSeries(a.histogramLabels, a.histograms) + + // Set DiscardOutOfOrder flag if requested via AppendOptions + if a.opts != nil && a.opts.DiscardOutOfOrder { + req.DiscardOutOfOrder = true + } + // Since a.pusher is distributor, client.ReuseSlice will be called in a.pusher.Push. // We shouldn't call client.ReuseSlice here. _, err := a.pusher.Push(user.InjectOrgID(a.ctx, a.userID), req) diff --git a/pkg/ruler/compat_test.go b/pkg/ruler/compat_test.go index 19d45062e7..1d0c5d2a8b 100644 --- a/pkg/ruler/compat_test.go +++ b/pkg/ruler/compat_test.go @@ -18,6 +18,7 @@ import ( "github.com/prometheus/prometheus/model/value" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/stretchr/testify/require" "github.com/weaveworks/common/httpgrpc" @@ -413,3 +414,26 @@ func TestRecordAndReportRuleQueryMetrics(t *testing.T) { require.Equal(t, testutil.ToFloat64(metrics.RulerQueryChunkBytes.WithLabelValues("userID")), float64(10)) require.Equal(t, testutil.ToFloat64(metrics.RulerQueryDataBytes.WithLabelValues("userID")), float64(14)) } +func TestPusherAppender_Commit_WithDiscardOutOfOrder(t *testing.T) { + pusher := &fakePusher{response: &cortexpb.WriteResponse{}} + counter := prometheus.NewCounter(prometheus.CounterOpts{Name: "test"}) + + appender := &PusherAppender{ + ctx: context.Background(), + pusher: pusher, + userID: "test-user", + totalWrites: counter, + failedWrites: counter, + labels: []labels.Labels{labels.FromStrings(labels.MetricName, "test_metric")}, + samples: []cortexpb.Sample{{TimestampMs: 1000, Value: 1.0}}, + } + + appender.SetOptions(&storage.AppendOptions{DiscardOutOfOrder: true}) + + err := appender.Commit() + require.NoError(t, err) + + // Verify that DiscardOutOfOrder was set in the WriteRequest + require.NotNil(t, pusher.request, "WriteRequest should have been sent") + require.True(t, pusher.request.DiscardOutOfOrder, "DiscardOutOfOrder should be true in WriteRequest") +}