Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions deploy/crd/kcp.io/syncagent.kcp.io_publishedresources.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,70 @@ spec:
Version is the API version of the related resource. This can be left blank to automatically
use the preferred version.
type: string
watch:
description: |-
Watch configures how the agent identifies the owning primary object when a related
resource with origin: kcp changes. When set, the agent sets up a watch on the related
resource type and uses the configured rule to enqueue the correct primary object.
Without this field, changes to origin:kcp related resources do not trigger reconciliation.
properties:
byOwner:
description: |-
ByOwner configures the watch handler to inspect the OwnerReferences of the changed
object. When an OwnerReference with the given Kind is found, the referenced owner
is enqueued as the primary object.
type: object
bySelector:
description: |-
BySelector configures the watch handler to list primary objects matching the given label
selector. When a related object changes, all primary objects matching this selector
are enqueued for reconciliation.
properties:
matchExpressions:
description: matchExpressions is a list of label selector requirements. The requirements are ANDed.
items:
description: |-
A label selector requirement is a selector that contains values, a key, and an operator that
relates the key and values.
properties:
key:
description: key is the label key that the selector applies to.
type: string
operator:
description: |-
operator represents a key's relationship to a set of values.
Valid operators are In, NotIn, Exists and DoesNotExist.
type: string
values:
description: |-
values is an array of string values. If the operator is In or NotIn,
the values array must be non-empty. If the operator is Exists or DoesNotExist,
the values array must be empty. This array is replaced during a strategic
merge patch.
items:
type: string
type: array
x-kubernetes-list-type: atomic
required:
- key
- operator
type: object
type: array
x-kubernetes-list-type: atomic
matchLabels:
additionalProperties:
type: string
description: |-
matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels
map is equivalent to an element of matchExpressions, whose key field is "key", the
operator is "In", and the values array contains only "value". The requirements are ANDed.
type: object
type: object
x-kubernetes-map-type: atomic
type: object
x-kubernetes-validations:
- message: exactly one of byOwner or bySelector must be set
rule: has(self.byOwner) != has(self.bySelector)
required:
- identifier
- object
Expand Down
3 changes: 3 additions & 0 deletions hack/tools.checksums
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
boilerplate|GOARCH=amd64;GOOS=linux|6f05fc3be207ae2ed99e125509a08df677cb007e197e16607c654a434b91d47f
boilerplate|GOARCH=arm64;GOOS=darwin|3ac82c58f440ac8461746674e39311ba332d6d960966a060dd3be734b1111522
boilerplate|GOARCH=arm64;GOOS=linux|70253486ed7a803a35a9abb2bab4db2f1f7748d5266bf7a1c2ee298fda2b208a
etcd|GOARCH=amd64;GOOS=linux|435d74510f3216bab1932fb6d7a6b5fe8245301143fcd25f7e65dfb7dcf8904a
etcd|GOARCH=arm64;GOOS=linux|cc8c645e5a8df0f35f2a5c51d9b9383037eef0cf0167c52e648457b3971a7a09
gimps|GOARCH=amd64;GOOS=linux|b597efc7e2c72097a44c001b41a06ccca97610963e1f1aec74c3d99c0e0b6c11
gimps|GOARCH=arm64;GOOS=linux|2588daec997b4f4b3a8d8875f780fd6faf3c39c933519e7899e19a686476c8e4
golangci-lint|GOARCH=amd64;GOOS=linux|8a01a08dad47a14824d7d0f14af07c7144105fc079386c9c31fbe85f08f91643
golangci-lint|GOARCH=arm64;GOOS=darwin|5fd0b6a09353eb0101d3ae81d5e3cf4707b77210c66fb92ae152d7280d959419
golangci-lint|GOARCH=arm64;GOOS=linux|2ed9cf2ad070dabc7947ba34cdc5142910be830306f063719898bc8fb44a7074
kube-apiserver|GOARCH=amd64;GOOS=linux|ca822082ec39e54a25836a4011ddb66e482e317a7a4f1a1f73882bbd2cf5a2a1
kube-apiserver|GOARCH=arm64;GOOS=linux|6ade6c2646e2c01fde1095407452afc2b65e89d6da16da29ee39f6223ccaf63b
kubectl|GOARCH=amd64;GOOS=linux|9591f3d75e1581f3f7392e6ad119aab2f28ae7d6c6e083dc5d22469667f27253
kubectl|GOARCH=arm64;GOOS=linux|95df604e914941f3172a93fa8feeb1a1a50f4011dfbe0c01e01b660afc8f9b85
yq|GOARCH=amd64;GOOS=linux|0c2b24e645b57d8e7c0566d18643a6d4f5580feeea3878127354a46f2a1e4598
yq|GOARCH=arm64;GOOS=darwin|164e10e5f7df62990e4f3823205e7ea42ba5660523a428df07c7386c0b62e3d9
yq|GOARCH=arm64;GOOS=linux|9477ac3cc447b6c083986129e35af8122eb2b938fe55c9c3e40436fb966e5813
184 changes: 184 additions & 0 deletions internal/controller/sync/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,16 @@ import (

corev1 "k8s.io/api/core/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/utils/ptr"
ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/cluster"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
Expand Down Expand Up @@ -161,11 +165,191 @@ func Create(
return nil, fmt.Errorf("failed to setup local-side watch: %w", err)
}

if err := setupRelatedResourceWatches(c, localManager, remoteManager, pubRes, localDummy, remoteDummy, log); err != nil {
return nil, err
}

log.Info("Done setting up unmanaged controller.")

return c, nil
}

// setupRelatedResourceWatches sets up watches for all related resources that have a Watch
// config, on their respective origin side, so that changes trigger primary reconciliation.
func setupRelatedResourceWatches(
c mccontroller.Controller,
localManager manager.Manager,
remoteManager mcmanager.Manager,
pubRes *syncagentv1alpha1.PublishedResource,
localDummy, remoteDummy *unstructured.Unstructured,
log *zap.SugaredLogger,
) error {
// Deduplication is per-origin to allow the same GVK on both sides.
watchedKcpGVKs := sets.New[schema.GroupVersionKind]()
watchedServiceGVKs := sets.New[schema.GroupVersionKind]()

for _, relRes := range pubRes.Spec.Related {
if relRes.Watch == nil {
continue
}

gvr := schema.GroupVersionResource{
Group: relRes.Group,
Version: relRes.Version,
Resource: relRes.Resource,
}

// Use the REST mapper of the origin side: related resources may have projected GVKs
// that differ between kcp and the service cluster, so we must resolve using the
// mapper that actually knows about the GVR on that side.
var originRESTMapper meta.RESTMapper
if relRes.Origin == syncagentv1alpha1.RelatedResourceOriginKcp {
originRESTMapper = remoteManager.GetLocalManager().GetRESTMapper()
} else {
originRESTMapper = localManager.GetRESTMapper()
}

gvk, err := originRESTMapper.KindFor(gvr)
if err != nil {
return fmt.Errorf("failed to determine Kind for related resource %v (origin: %s): %w", gvr, relRes.Origin, err)
}

relatedDummy := &unstructured.Unstructured{}
relatedDummy.SetGroupVersionKind(gvk)

if relRes.Origin == syncagentv1alpha1.RelatedResourceOriginKcp {
if watchedKcpGVKs.Has(gvk) {
continue
}
watchedKcpGVKs.Insert(gvk)

enqueueForRelated, err := buildKcpRelatedHandler(relRes.Watch, gvk, remoteDummy, log)
if err != nil {
return err
}

if err := c.MultiClusterWatch(mcsource.TypedKind(relatedDummy, enqueueForRelated)); err != nil {
return fmt.Errorf("failed to setup watch for kcp-origin related resource %v: %w", gvk, err)
}
} else {
if watchedServiceGVKs.Has(gvk) {
continue
}
watchedServiceGVKs.Insert(gvk)

enqueueForRelated, err := buildServiceRelatedHandler(relRes.Watch, gvk, localDummy, localManager, log)
if err != nil {
return err
}

if err := c.Watch(source.TypedKind(localManager.GetCache(), relatedDummy, enqueueForRelated)); err != nil {
return fmt.Errorf("failed to setup watch for service-origin related resource %v: %w", gvk, err)
}
}

log.Infow("Set up watch for related resource", "gvk", gvk, "origin", relRes.Origin)
}

return nil
}

// buildKcpRelatedHandler constructs the per-cluster event handler for a kcp-origin related resource.
func buildKcpRelatedHandler(
watch *syncagentv1alpha1.RelatedResourceWatch,
gvk schema.GroupVersionKind,
remoteDummy *unstructured.Unstructured,
log *zap.SugaredLogger,
) (mchandler.TypedEventHandlerFunc[*unstructured.Unstructured, mcreconcile.Request], error) {
switch {
case watch.ByOwner != nil:
ownerGVK := remoteDummy.GroupVersionKind()
return func(clusterName string, _ cluster.Cluster) handler.TypedEventHandler[*unstructured.Unstructured, mcreconcile.Request] {
return &byOwnerEventHandler{
clusterName: clusterName,
ownerGVK: ownerGVK,
}
}, nil

case watch.BySelector != nil:
labelSelector := watch.BySelector
primaryDummy := remoteDummy.DeepCopy()
return func(clusterName string, cl cluster.Cluster) handler.TypedEventHandler[*unstructured.Unstructured, mcreconcile.Request] {
return &bySelectorEventHandler{
clusterName: clusterName,
client: cl.GetClient(),
primaryDummy: primaryDummy,
labelSelector: labelSelector,
log: log,
}
}, nil

default:
return nil, fmt.Errorf("related resource %v (origin: kcp) has Watch set but neither byOwner nor bySelector configured", gvk)
}
}

// buildServiceRelatedHandler constructs the event handler for a service-cluster-origin related resource.
// It maps the changed related resource back to the remote (kcp) primary via sync metadata on the local primary.
func buildServiceRelatedHandler(
watch *syncagentv1alpha1.RelatedResourceWatch,
gvk schema.GroupVersionKind,
localDummy *unstructured.Unstructured,
localManager manager.Manager,
log *zap.SugaredLogger,
) (handler.TypedEventHandler[*unstructured.Unstructured, mcreconcile.Request], error) {
localClient := localManager.GetClient()

switch {
case watch.ByOwner != nil:
ownerGVK := localDummy.GroupVersionKind()
primaryDummy := localDummy.DeepCopy()
return handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, obj *unstructured.Unstructured) []mcreconcile.Request {
for _, ref := range obj.GetOwnerReferences() {
refGV, err := schema.ParseGroupVersion(ref.APIVersion)
if err != nil || refGV.Group != ownerGVK.Group || refGV.Version != ownerGVK.Version || ref.Kind != ownerGVK.Kind {
continue
}
localPrimary := primaryDummy.DeepCopy()
if err := localClient.Get(ctx, types.NamespacedName{Namespace: obj.GetNamespace(), Name: ref.Name}, localPrimary); err != nil {
log.Warnw("Failed to fetch local primary for byOwner watch", "owner", ref.Name, "error", err)
return nil
}
if req := sync.RemoteNameForLocalObject(localPrimary); req != nil {
return []mcreconcile.Request{*req}
}
return nil
}
return nil
}), nil

case watch.BySelector != nil:
selector, err := metav1.LabelSelectorAsSelector(watch.BySelector)
if err != nil {
return nil, fmt.Errorf("failed to convert bySelector for service-origin related resource %v: %w", gvk, err)
}
primaryDummy := localDummy.DeepCopy()
return handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, _ *unstructured.Unstructured) []mcreconcile.Request {
primaryList := &unstructured.UnstructuredList{}
primaryList.SetAPIVersion(primaryDummy.GetAPIVersion())
primaryList.SetKind(primaryDummy.GetKind() + "List")
if err := localClient.List(ctx, primaryList, &ctrlruntimeclient.ListOptions{LabelSelector: selector}); err != nil {
log.Warnw("Failed to list local primary objects for bySelector watch", "selector", selector.String(), "error", err)
return nil
}
var reqs []mcreconcile.Request
for i := range primaryList.Items {
if req := sync.RemoteNameForLocalObject(&primaryList.Items[i]); req != nil {
reqs = append(reqs, *req)
}
}
return reqs
}), nil

default:
return nil, fmt.Errorf("related resource %v (origin: service) has Watch set but neither byOwner nor bySelector configured", gvk)
}
}

func (r *Reconciler) Reconcile(ctx context.Context, request mcreconcile.Request) (reconcile.Result, error) {
log := r.log.With("cluster", request.ClusterName, "request", request.NamespacedName)
log.Debug("Processing")
Expand Down
Loading
Loading