diff --git a/controllers/clusterpolicy_controller.go b/controllers/clusterpolicy_controller.go index d16d2d445..23be49517 100644 --- a/controllers/clusterpolicy_controller.go +++ b/controllers/clusterpolicy_controller.go @@ -83,10 +83,8 @@ type ClusterPolicyReconciler struct { // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. -// TODO(user): Modify the Reconcile function to compare the state specified by -// the ClusterPolicy object against the actual cluster state, and then -// perform operations to make the cluster state reflect the state specified by -// the user. +// It compares the state specified by the ClusterPolicy object against the +// actual cluster state and performs operations to reconcile them. // // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.7.0/pkg/reconcile @@ -112,8 +110,20 @@ func (r *ClusterPolicyReconciler) Reconcile(ctx context.Context, req ctrl.Reques return reconcile.Result{}, err } - // TODO: Handle deletion of the main ClusterPolicy and cycle to the next one. - // We already have a main Clusterpolicy + // Handle deletion of the main ClusterPolicy and cycle to the next one. + // If the current singleton is being deleted, reset the controller so that + // the next available ClusterPolicy can be elected as the new singleton. + if clusterPolicyCtrl.singleton != nil && clusterPolicyCtrl.singleton.Name == instance.Name && + instance.DeletionTimestamp != nil { + r.Log.Info("Main ClusterPolicy is being deleted, resetting controller state", + "name", instance.Name) + clusterPolicyCtrl = ClusterPolicyController{} + // RequeueAfter a short interval so that we pick up the next available + // ClusterPolicy (if any) once the current one has been fully removed. + return ctrl.Result{RequeueAfter: time.Second * 5}, nil + } + + // We already have a main ClusterPolicy and this is a duplicate, mark it as ignored. if clusterPolicyCtrl.singleton != nil && clusterPolicyCtrl.singleton.Name != instance.Name { instance.SetStatus(gpuv1.Ignored, clusterPolicyCtrl.operatorNamespace) // do not change `clusterPolicyCtrl.operatorMetrics.reconciliationStatus` here, @@ -243,22 +253,17 @@ func updateCRState(ctx context.Context, r *ClusterPolicyReconciler, namespacedNa } } -func addWatchNewGPUNode(r *ClusterPolicyReconciler, c controller.Controller, mgr ctrl.Manager) error { - // Define a mapping from the Node object in the event to one or more - // ClusterPolicy objects to Reconcile - mapFn := func(ctx context.Context, n *corev1.Node) []reconcile.Request { - // find all the ClusterPolicy to trigger their reconciliation - opts := []client.ListOption{} // Namespace = "" to list across all namespaces. +// buildNodeMapFn returns a mapping function that, for any Node event, enqueues +// a reconcile request for every ClusterPolicy found in the cluster. +func buildNodeMapFn(r *ClusterPolicyReconciler) handler.TypedMapFunc[*corev1.Node, reconcile.Request] { + return func(ctx context.Context, n *corev1.Node) []reconcile.Request { + opts := []client.ListOption{} list := &gpuv1.ClusterPolicyList{} - - err := r.List(ctx, list, opts...) - if err != nil { + if err := r.List(ctx, list, opts...); err != nil { r.Log.Error(err, "Unable to list ClusterPolicies") return []reconcile.Request{} } - cpToRec := []reconcile.Request{} - for _, cp := range list.Items { cpToRec = append(cpToRec, reconcile.Request{NamespacedName: types.NamespacedName{ Name: cp.GetName(), @@ -266,15 +271,16 @@ func addWatchNewGPUNode(r *ClusterPolicyReconciler, c controller.Controller, mgr }}) } r.Log.Info("Reconciliate ClusterPolicies after node label update", "nb", len(cpToRec)) - return cpToRec } +} - p := predicate.TypedFuncs[*corev1.Node]{ +// buildNodePredicate returns a predicate that filters Node events to only those +// that are relevant to the GPU Operator (new GPU nodes, label changes, RHCOS deletions). +func buildNodePredicate() predicate.TypedFuncs[*corev1.Node] { + return predicate.TypedFuncs[*corev1.Node]{ CreateFunc: func(e event.TypedCreateEvent[*corev1.Node]) bool { - labels := e.Object.GetLabels() - - return hasGPULabels(labels) + return hasGPULabels(e.Object.GetLabels()) }, UpdateFunc: func(e event.TypedUpdateEvent[*corev1.Node]) bool { newLabels := e.ObjectNew.GetLabels() @@ -290,9 +296,7 @@ func addWatchNewGPUNode(r *ClusterPolicyReconciler, c controller.Controller, mgr newGPUWorkloadConfig, _ := getWorkloadConfig(newLabels, true) gpuWorkloadConfigLabelChanged := oldGPUWorkloadConfig != newGPUWorkloadConfig - oldOSTreeLabel := oldLabels[nfdOSTreeVersionLabelKey] - newOSTreeLabel := newLabels[nfdOSTreeVersionLabelKey] - osTreeLabelChanged := oldOSTreeLabel != newOSTreeLabel + osTreeLabelChanged := oldLabels[nfdOSTreeVersionLabelKey] != newLabels[nfdOSTreeVersionLabelKey] needsUpdate := gpuCommonLabelMissing || gpuCommonLabelOutdated || @@ -302,43 +306,61 @@ func addWatchNewGPUNode(r *ClusterPolicyReconciler, c controller.Controller, mgr osTreeLabelChanged if needsUpdate { - r.Log.Info("Node needs an update", - "name", nodeName, - "gpuCommonLabelMissing", gpuCommonLabelMissing, - "gpuCommonLabelOutdated", gpuCommonLabelOutdated, - "migManagerLabelMissing", migManagerLabelMissing, - "commonOperandsLabelChanged", commonOperandsLabelChanged, - "gpuWorkloadConfigLabelChanged", gpuWorkloadConfigLabelChanged, - "osTreeLabelChanged", osTreeLabelChanged, - ) + // log is not accessible here; callers log as needed + _ = nodeName } return needsUpdate }, DeleteFunc: func(e event.TypedDeleteEvent[*corev1.Node]) bool { - // if an RHCOS GPU node is deleted, trigger a - // reconciliation to ensure that there is no dangling - // OpenShift Driver-Toolkit (RHCOS version-specific) - // DaemonSet. - // NB: we cannot know here if the DriverToolkit is - // enabled. - labels := e.Object.GetLabels() - _, hasOSTreeLabel := labels[nfdOSTreeVersionLabelKey] - return hasGPULabels(labels) && hasOSTreeLabel }, } +} + +func addWatchNewGPUNode(r *ClusterPolicyReconciler, c controller.Controller, mgr ctrl.Manager) error { + mapFn := buildNodeMapFn(r) + p := buildNodePredicate() + + // Wrap UpdateFunc to emit the "Node needs an update" log when relevant. + innerUpdate := p.UpdateFunc + p.UpdateFunc = func(e event.TypedUpdateEvent[*corev1.Node]) bool { + needsUpdate := innerUpdate(e) + if needsUpdate { + newLabels := e.ObjectNew.GetLabels() + oldLabels := e.ObjectOld.GetLabels() + nodeName := e.ObjectNew.GetName() + + gpuCommonLabelMissing := hasGPULabels(newLabels) && !hasCommonGPULabel(newLabels) + gpuCommonLabelOutdated := !hasGPULabels(newLabels) && hasCommonGPULabel(newLabels) + migManagerLabelMissing := hasMIGCapableGPU(newLabels) && !hasMIGManagerLabel(newLabels) + commonOperandsLabelChanged := hasOperandsDisabled(oldLabels) != hasOperandsDisabled(newLabels) + oldGPUWorkloadConfig, _ := getWorkloadConfig(oldLabels, true) + newGPUWorkloadConfig, _ := getWorkloadConfig(newLabels, true) + gpuWorkloadConfigLabelChanged := oldGPUWorkloadConfig != newGPUWorkloadConfig + osTreeLabelChanged := oldLabels[nfdOSTreeVersionLabelKey] != newLabels[nfdOSTreeVersionLabelKey] + + r.Log.Info("Node needs an update", + "name", nodeName, + "gpuCommonLabelMissing", gpuCommonLabelMissing, + "gpuCommonLabelOutdated", gpuCommonLabelOutdated, + "migManagerLabelMissing", migManagerLabelMissing, + "commonOperandsLabelChanged", commonOperandsLabelChanged, + "gpuWorkloadConfigLabelChanged", gpuWorkloadConfigLabelChanged, + "osTreeLabelChanged", osTreeLabelChanged, + ) + } + return needsUpdate + } - err := c.Watch( + return c.Watch( source.Kind(mgr.GetCache(), &corev1.Node{}, handler.TypedEnqueueRequestsFromMapFunc[*corev1.Node](mapFn), p, ), ) - - return err } // SetupWithManager sets up the controller with the Manager. @@ -371,8 +393,7 @@ func (r *ClusterPolicyReconciler) SetupWithManager(ctx context.Context, mgr ctrl return err } - // TODO(user): Modify this to be the types you create that are owned by the primary resource - // Watch for changes to secondary resource Daemonsets and requeue the owner ClusterPolicy + // Watch for changes to secondary resource DaemonSets owned by ClusterPolicy and requeue the owner. err = c.Watch( source.Kind(mgr.GetCache(), &appsv1.DaemonSet{}, @@ -384,6 +405,54 @@ func (r *ClusterPolicyReconciler) SetupWithManager(ctx context.Context, mgr ctrl return err } + // Watch for changes to secondary resource Deployments owned by ClusterPolicy and requeue the owner. + err = c.Watch( + source.Kind(mgr.GetCache(), + &appsv1.Deployment{}, + handler.TypedEnqueueRequestForOwner[*appsv1.Deployment](mgr.GetScheme(), mgr.GetRESTMapper(), &gpuv1.ClusterPolicy{}, + handler.OnlyControllerOwner()), + ), + ) + if err != nil { + return err + } + + // Watch for changes to secondary resource ConfigMaps owned by ClusterPolicy and requeue the owner. + err = c.Watch( + source.Kind(mgr.GetCache(), + &corev1.ConfigMap{}, + handler.TypedEnqueueRequestForOwner[*corev1.ConfigMap](mgr.GetScheme(), mgr.GetRESTMapper(), &gpuv1.ClusterPolicy{}, + handler.OnlyControllerOwner()), + ), + ) + if err != nil { + return err + } + + // Watch for changes to secondary resource Services owned by ClusterPolicy and requeue the owner. + err = c.Watch( + source.Kind(mgr.GetCache(), + &corev1.Service{}, + handler.TypedEnqueueRequestForOwner[*corev1.Service](mgr.GetScheme(), mgr.GetRESTMapper(), &gpuv1.ClusterPolicy{}, + handler.OnlyControllerOwner()), + ), + ) + if err != nil { + return err + } + + // Watch for changes to secondary resource ServiceAccounts owned by ClusterPolicy and requeue the owner. + err = c.Watch( + source.Kind(mgr.GetCache(), + &corev1.ServiceAccount{}, + handler.TypedEnqueueRequestForOwner[*corev1.ServiceAccount](mgr.GetScheme(), mgr.GetRESTMapper(), &gpuv1.ClusterPolicy{}, + handler.OnlyControllerOwner()), + ), + ) + if err != nil { + return err + } + // Add an index key which allows our reconciler to quickly look up DaemonSets owned by it. // // (cdesiniotis) Ideally we could duplicate this index for all the k8s objects