Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 122 additions & 31 deletions internal/serviceoffercontroller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,15 +235,21 @@ func (c *Controller) enqueueOfferFromRegistration(obj any) {
if u == nil {
return
}
var request monetizeapi.RegistrationRequest
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, &request); err != nil {
log.Printf("serviceoffer-controller: decode registrationrequest for parent enqueue: %v", err)
return
}
if request.Spec.ServiceOfferNamespace == "" || request.Spec.ServiceOfferName == "" {
return
for _, item := range c.offerInformer.GetStore().List() {
u := asUnstructured(item)
if u == nil {
continue
}
offer, err := decodeServiceOffer(u)
if err != nil {
log.Printf("serviceoffer-controller: decode offer for registration fan-out: %v", err)
continue
}
if offer.DeletionTimestamp != nil || offer.IsPaused() || !offer.Spec.Registration.Enabled {
continue
}
c.offerQueue.Add(offer.Namespace + "/" + offer.Name)
}
c.offerQueue.Add(request.Spec.ServiceOfferNamespace + "/" + request.Spec.ServiceOfferName)
}

func (c *Controller) enqueueDiscoveryRefresh(obj any) {
Expand Down Expand Up @@ -418,6 +424,15 @@ func (c *Controller) reconcileOffer(ctx context.Context, key string) error {
if err := c.updateOfferStatus(ctx, raw, status); err != nil {
return err
}
if offer.Spec.Registration.Enabled {
owner, err := c.registrationOwner()
if err != nil {
return err
}
if owner != nil {
c.registrationQueue.Add(owner.Namespace + "/" + registrationRequestName(owner.Name))
}
}
if !ready {
// Dependent resources like the upstream Deployment, Middleware, HTTPRoute,
// and RegistrationRequest can become ready after this reconcile completes.
Expand All @@ -441,6 +456,21 @@ func (c *Controller) reconcileDeletingOffer(ctx context.Context, offer *monetize
return err
}

if offer.Spec.Registration.Enabled {
nextOwner, err := c.registrationOwner()
if err != nil {
return err
}
if nextOwner != nil {
if err := c.deleteRegistrationRequest(ctx, offer.Namespace, offer.Name); err != nil {
return err
}
c.offerQueue.Add(nextOwner.Namespace + "/" + nextOwner.Name)
c.registrationQueue.Add(nextOwner.Namespace + "/" + registrationRequestName(nextOwner.Name))
return nil
}
}

if !offer.Spec.Registration.Enabled && strings.TrimSpace(offer.Status.AgentID) == "" {
return c.deleteRegistrationRequest(ctx, offer.Namespace, offer.Name)
}
Expand Down Expand Up @@ -565,22 +595,33 @@ func (c *Controller) reconcileRegistrationStatus(ctx context.Context, status *mo
if err != nil {
return err
}
if owner != nil && (owner.Namespace != offer.Namespace || owner.Name != offer.Name) {
if owner == nil {
setCondition(status, "Registered", "False", "Pending", "Waiting for shared registration owner")
return nil
}
if owner.Namespace != offer.Namespace || owner.Name != offer.Name {
if err := c.deleteRegistrationRequest(ctx, offer.Namespace, offer.Name); err != nil {
return err
}
setCondition(
status,
"Registered",
"False",
"SingletonConflict",
fmt.Sprintf("Registration path /.well-known/agent-registration.json is reserved by %s/%s", owner.Namespace, owner.Name),
)
log.Printf("serviceoffer-controller: registration for %s/%s blocked by singleton owner %s/%s", offer.Namespace, offer.Name, owner.Namespace, owner.Name)
return nil
}
if !isConditionTrue(*status, "RoutePublished") {
setCondition(status, "Registered", "False", "WaitingForRoute", "Waiting for route publication before registration")
raw, err := c.registrationRequests.Namespace(owner.Namespace).Get(ctx, registrationRequestName(owner.Name), metav1.GetOptions{})
if apierrors.IsNotFound(err) {
setCondition(
status,
"Registered",
"False",
"Pending",
fmt.Sprintf("Waiting for shared registration owned by %s/%s", owner.Namespace, owner.Name),
)
return nil
}
if err != nil {
return err
}
request, err := decodeRegistrationRequest(raw)
if err != nil {
return err
}
applySharedRegistrationStatus(status, offer, owner, request)
return nil
}

Expand All @@ -603,14 +644,7 @@ func (c *Controller) reconcileRegistrationStatus(ctx context.Context, status *mo
status.AgentID = request.Status.AgentID
status.RegistrationTxHash = request.Status.RegistrationTxHash

if requestPhaseReady(request.Status.Phase) {
setCondition(status, "Registered", "True", request.Status.Phase, defaultString(request.Status.Message, "Registration reconciled"))
return nil
}

reason := defaultString(request.Status.Phase, "Pending")
message := defaultString(request.Status.Message, "Waiting for RegistrationRequest to finish")
setCondition(status, "Registered", "False", reason, message)
applySharedRegistrationStatus(status, offer, owner, request)
return nil
}

Expand Down Expand Up @@ -654,6 +688,18 @@ func (c *Controller) reconcileRegistrationRequest(ctx context.Context, key strin

offerRaw, err := c.offers.Namespace(request.Spec.ServiceOfferNamespace).Get(ctx, request.Spec.ServiceOfferName, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
owner, ownerErr := c.registrationOwner()
if ownerErr != nil {
return ownerErr
}
if owner != nil {
if err := c.deleteRegistrationRequest(ctx, namespace, request.Spec.ServiceOfferName); err != nil {
return err
}
c.offerQueue.Add(owner.Namespace + "/" + owner.Name)
c.registrationQueue.Add(owner.Namespace + "/" + registrationRequestName(owner.Name))
return nil
}
if err := c.deleteRegistrationResources(ctx, request); err != nil {
return err
}
Expand Down Expand Up @@ -689,7 +735,11 @@ func (c *Controller) reconcileRegistrationActive(ctx context.Context, raw *unstr
agentID := firstNonEmpty(status.AgentID, offer.Status.AgentID)
txHash := firstNonEmpty(status.RegistrationTxHash, offer.Status.RegistrationTxHash)

document := buildActiveRegistrationDocument(offer, baseURL, agentID)
offers, err := c.registrationOffers("", "")
if err != nil {
return err
}
document := buildActiveRegistrationDocument(offer, offers, baseURL, agentID)
documentJSON, contentHash, err := marshalRegistrationDocument(document)
if err != nil {
return err
Expand Down Expand Up @@ -1117,7 +1167,7 @@ func (c *Controller) deleteRegistrationRequest(ctx context.Context, namespace, o
return nil
}

func (c *Controller) registrationOwner() (*monetizeapi.ServiceOffer, error) {
func (c *Controller) registrationOffers(excludeNamespace, excludeName string) ([]*monetizeapi.ServiceOffer, error) {
var candidates []*monetizeapi.ServiceOffer
for _, item := range c.offerInformer.GetStore().List() {
u := asUnstructured(item)
Expand All @@ -1128,11 +1178,22 @@ func (c *Controller) registrationOwner() (*monetizeapi.ServiceOffer, error) {
if err != nil {
return nil, err
}
if offer.Namespace == excludeNamespace && offer.Name == excludeName {
continue
}
if offer.DeletionTimestamp != nil || offer.IsPaused() || !offer.Spec.Registration.Enabled {
continue
}
candidates = append(candidates, offer)
}
return candidates, nil
}

func (c *Controller) registrationOwner() (*monetizeapi.ServiceOffer, error) {
candidates, err := c.registrationOffers("", "")
if err != nil {
return nil, err
}
return selectRegistrationOwner(candidates), nil
}

Expand Down Expand Up @@ -1160,6 +1221,36 @@ func selectRegistrationOwner(offers []*monetizeapi.ServiceOffer) *monetizeapi.Se
return offers[0]
}

func applySharedRegistrationStatus(status *monetizeapi.ServiceOfferStatus, offer, owner *monetizeapi.ServiceOffer, request *monetizeapi.RegistrationRequest) {
status.AgentID = request.Status.AgentID
status.RegistrationTxHash = request.Status.RegistrationTxHash

if !isConditionTrue(*status, "RoutePublished") {
setCondition(status, "Registered", "False", "WaitingForRoute", "Waiting for route publication before shared registration")
return
}

if requestPhaseReady(request.Status.Phase) {
message := defaultString(request.Status.Message, "Registration reconciled")
if owner != nil && (owner.Namespace != offer.Namespace || owner.Name != offer.Name) {
if request.Status.AgentID != "" {
message = fmt.Sprintf("Shared registration via %s/%s recorded agent %s", owner.Namespace, owner.Name, request.Status.AgentID)
} else {
message = fmt.Sprintf("Shared registration via %s/%s is active", owner.Namespace, owner.Name)
}
}
setCondition(status, "Registered", "True", request.Status.Phase, message)
return
}

reason := defaultString(request.Status.Phase, "Pending")
message := defaultString(request.Status.Message, "Waiting for RegistrationRequest to finish")
if owner != nil && (owner.Namespace != offer.Namespace || owner.Name != offer.Name) {
message = fmt.Sprintf("Waiting for shared registration owned by %s/%s: %s", owner.Namespace, owner.Name, message)
}
setCondition(status, "Registered", "False", reason, message)
}

func (c *Controller) applyObject(ctx context.Context, resource dynamic.ResourceInterface, desired *unstructured.Unstructured) error {
payload, err := json.Marshal(desired.Object)
if err != nil {
Expand Down
41 changes: 41 additions & 0 deletions internal/serviceoffercontroller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,44 @@ func TestPurchaseReadyRequiresRuntimePoolToMatchSpec(t *testing.T) {
t.Fatal("purchase should be ready once runtime pool matches spec")
}
}

func TestApplySharedRegistrationStatus_NonOwnerUsesSharedAgent(t *testing.T) {
status := &monetizeapi.ServiceOfferStatus{
Conditions: []monetizeapi.Condition{{Type: "RoutePublished", Status: "True"}},
}
owner := &monetizeapi.ServiceOffer{ObjectMeta: metav1.ObjectMeta{Name: "alpha", Namespace: "demo"}}
offer := &monetizeapi.ServiceOffer{ObjectMeta: metav1.ObjectMeta{Name: "beta", Namespace: "demo"}}
request := &monetizeapi.RegistrationRequest{
Status: monetizeapi.RegistrationRequestStatus{
Phase: registrationPhaseRegistered,
AgentID: "42",
RegistrationTxHash: "0xtx",
},
}

applySharedRegistrationStatus(status, offer, owner, request)

if status.AgentID != "42" || status.RegistrationTxHash != "0xtx" {
t.Fatalf("shared registration identifiers not copied: %+v", status)
}
if !isConditionTrue(*status, "Registered") {
t.Fatalf("registered condition not set true: %+v", status.Conditions)
}
}

func TestApplySharedRegistrationStatus_WaitsForRoute(t *testing.T) {
status := &monetizeapi.ServiceOfferStatus{}
owner := &monetizeapi.ServiceOffer{ObjectMeta: metav1.ObjectMeta{Name: "alpha", Namespace: "demo"}}
request := &monetizeapi.RegistrationRequest{
Status: monetizeapi.RegistrationRequestStatus{
Phase: registrationPhaseRegistered,
AgentID: "7",
},
}

applySharedRegistrationStatus(status, owner, owner, request)

if isConditionTrue(*status, "Registered") {
t.Fatalf("registered should remain false until route is published: %+v", status.Conditions)
}
}
Loading
Loading