Skip to content

Commit 5080b9d

Browse files
committed
feat(k8s): add kubelet-based pod informer
Poll local kubelet /pods endpoint instead of watching kube-apiserver to reduce API server load. Defaults to kubelet mode, configurable via kube.podInformer.mode ("kubelet" or "apiserver"). - Use singleflight to coalesce concurrent refresh calls - Re-read SA token on each refresh for projected token rotation - Propagate context for graceful shutdown cancellation - Validate podInformer.mode in config - Add NODE_IP env var and nodes/proxy RBAC to manifests - Add comprehensive tests
1 parent a8224bb commit 5080b9d

File tree

8 files changed

+945
-10
lines changed

8 files changed

+945
-10
lines changed

cmd/kepler/main.go

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -137,11 +137,7 @@ func createServices(logger *slog.Logger, cfg *config.Config) ([]service.Service,
137137

138138
var podInformer pod.Informer
139139
if *cfg.Kube.Enabled {
140-
podInformer = pod.NewInformer(
141-
pod.WithLogger(logger),
142-
pod.WithKubeConfig(cfg.Kube.Config),
143-
pod.WithNodeName(cfg.Kube.Node),
144-
)
140+
podInformer = createPodInformer(cfg, logger)
145141
services = append(services, podInformer)
146142
}
147143
resourceInformer, err := resource.NewInformer(
@@ -231,6 +227,30 @@ func createServices(logger *slog.Logger, cfg *config.Config) ([]service.Service,
231227
return services, nil
232228
}
233229

230+
func createPodInformer(cfg *config.Config, logger *slog.Logger) pod.Informer {
231+
if cfg.Kube.PodInformer.Mode == "apiserver" {
232+
logger.Info("using API server pod informer")
233+
return pod.NewInformer(
234+
pod.WithLogger(logger),
235+
pod.WithKubeConfig(cfg.Kube.Config),
236+
pod.WithNodeName(cfg.Kube.Node),
237+
)
238+
}
239+
240+
// Default: kubelet-based informer
241+
logger.Info("using kubelet pod informer",
242+
"host", cfg.Kube.PodInformer.KubeletHost,
243+
"port", cfg.Kube.PodInformer.KubeletPort,
244+
"pollInterval", cfg.Kube.PodInformer.PollInterval)
245+
return pod.NewKubeletInformer(
246+
pod.WithLogger(logger),
247+
pod.WithNodeName(cfg.Kube.Node),
248+
pod.WithKubeletHost(cfg.Kube.PodInformer.KubeletHost),
249+
pod.WithKubeletPort(cfg.Kube.PodInformer.KubeletPort),
250+
pod.WithPollInterval(cfg.Kube.PodInformer.PollInterval),
251+
)
252+
}
253+
234254
func createRedfishService(logger *slog.Logger, cfg *config.Config) (*redfish.Service, error) {
235255
return redfish.NewService(cfg.Experimental.Platform.Redfish, logger, redfish.WithStaleness(cfg.Monitor.Staleness))
236256
}

config/config.go

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,10 +117,18 @@ type (
117117
Pprof PprofDebug `yaml:"pprof"`
118118
}
119119

120+
PodInformer struct {
121+
Mode string `yaml:"mode"` // "kubelet" (default) or "apiserver"
122+
PollInterval time.Duration `yaml:"pollInterval"` // Poll interval for kubelet mode
123+
KubeletHost string `yaml:"kubeletHost"` // Kubelet host (default: localhost)
124+
KubeletPort int `yaml:"kubeletPort"` // Kubelet port (default: 10250)
125+
}
126+
120127
Kube struct {
121-
Enabled *bool `yaml:"enabled"`
122-
Config string `yaml:"config"`
123-
Node string `yaml:"nodeName"`
128+
Enabled *bool `yaml:"enabled"`
129+
Config string `yaml:"config"`
130+
Node string `yaml:"nodeName"`
131+
PodInformer PodInformer `yaml:"podInformer"`
124132
}
125133

126134
// Platform contains settings for platform power monitoring
@@ -302,6 +310,12 @@ func DefaultConfig() *Config {
302310
},
303311
Kube: Kube{
304312
Enabled: ptr.To(false),
313+
PodInformer: PodInformer{
314+
Mode: "kubelet",
315+
PollInterval: 15 * time.Second,
316+
KubeletHost: "", // resolved at runtime via NODE_IP env var
317+
KubeletPort: 10250,
318+
},
305319
},
306320

307321
// NOTE: Experimental config will be nil by default and only allocated when needed
@@ -834,6 +848,14 @@ func (c *Config) Validate(skips ...SkipValidation) error {
834848
if c.Kube.Node == "" {
835849
errs = append(errs, fmt.Sprintf("%s not supplied but %s set to true", KubeNodeNameFlag, KubernetesFlag))
836850
}
851+
852+
// Validate PodInformer mode
853+
switch c.Kube.PodInformer.Mode {
854+
case "kubelet", "apiserver":
855+
// valid
856+
default:
857+
errs = append(errs, fmt.Sprintf("invalid kube.podInformer.mode: %q, must be \"kubelet\" or \"apiserver\"", c.Kube.PodInformer.Mode))
858+
}
837859
}
838860
}
839861
// Experimental Platform validation

internal/k8s/pod/kubelet.go

Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
// SPDX-FileCopyrightText: 2025 The Kepler Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package pod
5+
6+
import (
7+
"context"
8+
"crypto/tls"
9+
"encoding/json"
10+
"fmt"
11+
"log/slog"
12+
"net/http"
13+
"os"
14+
"sync"
15+
"time"
16+
17+
"golang.org/x/sync/singleflight"
18+
corev1 "k8s.io/api/core/v1"
19+
)
20+
21+
const (
22+
defaultKubeletPort = 10250
23+
defaultPollInterval = 15 * time.Second
24+
defaultRequestTimeout = 10 * time.Second
25+
serviceAccountTokenPath = "/var/run/secrets/kubernetes.io/serviceaccount/token"
26+
)
27+
28+
// getDefaultKubeletHost returns the kubelet host from NODE_IP env var,
29+
// falling back to "localhost" if not set. The NODE_IP should be set via
30+
// downward API (status.hostIP) in the pod spec.
31+
func getDefaultKubeletHost() string {
32+
if nodeIP := os.Getenv("NODE_IP"); nodeIP != "" {
33+
return nodeIP
34+
}
35+
return "localhost"
36+
}
37+
38+
type kubeletPodInformer struct {
39+
logger *slog.Logger
40+
nodeName string
41+
kubeletHost string
42+
kubeletPort int
43+
pollInterval time.Duration
44+
tokenPath string
45+
46+
httpClient *http.Client
47+
refreshGroup singleflight.Group
48+
49+
mu sync.RWMutex
50+
cache map[string]*ContainerInfo // containerID -> ContainerInfo
51+
}
52+
53+
// NewKubeletInformer creates a new kubelet-based pod informer that polls
54+
// the local kubelet /pods endpoint instead of watching the API server.
55+
func NewKubeletInformer(opts ...OptFn) *kubeletPodInformer {
56+
opt := DefaultOpts()
57+
for _, fn := range opts {
58+
fn(&opt)
59+
}
60+
61+
host := opt.kubeletHost
62+
if host == "" {
63+
host = getDefaultKubeletHost()
64+
}
65+
port := opt.kubeletPort
66+
if port == 0 {
67+
port = defaultKubeletPort
68+
}
69+
interval := opt.pollInterval
70+
if interval == 0 {
71+
interval = defaultPollInterval
72+
}
73+
74+
return &kubeletPodInformer{
75+
logger: opt.logger.With("service", "kubeletPodInformer"),
76+
nodeName: opt.nodeName,
77+
kubeletHost: host,
78+
kubeletPort: port,
79+
pollInterval: interval,
80+
tokenPath: serviceAccountTokenPath,
81+
cache: make(map[string]*ContainerInfo),
82+
}
83+
}
84+
85+
func (i *kubeletPodInformer) Name() string {
86+
return "kubeletPodInformer"
87+
}
88+
89+
func (i *kubeletPodInformer) Init() error {
90+
if i.nodeName == "" {
91+
return fmt.Errorf("nodeName not set")
92+
}
93+
94+
// Verify token file is readable
95+
if _, err := i.readToken(); err != nil {
96+
return fmt.Errorf("failed to read SA token: %w", err)
97+
}
98+
99+
// Setup HTTP client with TLS (kubelet uses self-signed cert)
100+
i.httpClient = &http.Client{
101+
Timeout: defaultRequestTimeout,
102+
Transport: &http.Transport{
103+
TLSClientConfig: &tls.Config{
104+
InsecureSkipVerify: true, // kubelet uses self-signed cert
105+
},
106+
},
107+
}
108+
109+
// Do initial fetch
110+
if err := i.refresh(context.Background()); err != nil {
111+
return fmt.Errorf("initial kubelet fetch failed: %w", err)
112+
}
113+
114+
i.logger.Info("kubelet pod informer initialized",
115+
"nodeName", i.nodeName,
116+
"kubeletHost", i.kubeletHost,
117+
"kubeletPort", i.kubeletPort,
118+
"pollInterval", i.pollInterval)
119+
120+
return nil
121+
}
122+
123+
// readToken reads the service account token from the token file.
124+
// This is called on each refresh to handle projected token rotation.
125+
func (i *kubeletPodInformer) readToken() (string, error) {
126+
tokenBytes, err := os.ReadFile(i.tokenPath)
127+
if err != nil {
128+
return "", err
129+
}
130+
return string(tokenBytes), nil
131+
}
132+
133+
func (i *kubeletPodInformer) Run(ctx context.Context) error {
134+
i.logger.Info("starting kubelet pod informer")
135+
ticker := time.NewTicker(i.pollInterval)
136+
defer ticker.Stop()
137+
138+
for {
139+
select {
140+
case <-ctx.Done():
141+
i.logger.Info("kubelet pod informer stopped")
142+
return nil
143+
case <-ticker.C:
144+
if err := i.refresh(ctx); err != nil {
145+
i.logger.Warn("failed to refresh pods from kubelet", "error", err)
146+
}
147+
}
148+
}
149+
}
150+
151+
func (i *kubeletPodInformer) refresh(ctx context.Context) error {
152+
_, err, _ := i.refreshGroup.Do("refresh", func() (interface{}, error) {
153+
return nil, i.doRefresh(ctx)
154+
})
155+
return err
156+
}
157+
158+
func (i *kubeletPodInformer) doRefresh(ctx context.Context) error {
159+
token, err := i.readToken()
160+
if err != nil {
161+
return fmt.Errorf("failed to read SA token: %w", err)
162+
}
163+
164+
url := fmt.Sprintf("https://%s:%d/pods", i.kubeletHost, i.kubeletPort)
165+
166+
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
167+
if err != nil {
168+
return err
169+
}
170+
req.Header.Set("Authorization", "Bearer "+token)
171+
172+
resp, err := i.httpClient.Do(req)
173+
if err != nil {
174+
return err
175+
}
176+
defer func() {
177+
_ = resp.Body.Close()
178+
}()
179+
180+
if resp.StatusCode != http.StatusOK {
181+
return fmt.Errorf("kubelet returned status %d", resp.StatusCode)
182+
}
183+
184+
var podList corev1.PodList
185+
if err := json.NewDecoder(resp.Body).Decode(&podList); err != nil {
186+
return fmt.Errorf("failed to decode pod list: %w", err)
187+
}
188+
189+
// Build container ID -> pod info cache
190+
newCache := make(map[string]*ContainerInfo)
191+
for _, pod := range podList.Items {
192+
i.addContainersToCache(newCache, &pod, pod.Status.ContainerStatuses)
193+
i.addContainersToCache(newCache, &pod, pod.Status.InitContainerStatuses)
194+
i.addContainersToCache(newCache, &pod, pod.Status.EphemeralContainerStatuses)
195+
}
196+
197+
i.mu.Lock()
198+
i.cache = newCache
199+
i.mu.Unlock()
200+
201+
i.logger.Debug("refreshed pod cache from kubelet",
202+
"podCount", len(podList.Items),
203+
"containerCount", len(newCache))
204+
return nil
205+
}
206+
207+
func (i *kubeletPodInformer) addContainersToCache(cache map[string]*ContainerInfo, pod *corev1.Pod, statuses []corev1.ContainerStatus) {
208+
for _, status := range statuses {
209+
if status.ContainerID == "" {
210+
continue
211+
}
212+
containerID := extractContainerID(status.ContainerID)
213+
cache[containerID] = &ContainerInfo{
214+
PodID: string(pod.UID),
215+
PodName: pod.Name,
216+
Namespace: pod.Namespace,
217+
ContainerName: status.Name,
218+
}
219+
}
220+
}
221+
222+
// LookupByContainerID retrieves pod details and container name given a containerID.
223+
// If the containerID is not found in cache, it triggers an immediate refresh.
224+
func (i *kubeletPodInformer) LookupByContainerID(containerID string) (*ContainerInfo, bool, error) {
225+
i.mu.RLock()
226+
info, found := i.cache[containerID]
227+
i.mu.RUnlock()
228+
229+
if found {
230+
return info, true, nil
231+
}
232+
233+
// Trigger immediate refresh for unknown container (coalesced via singleflight)
234+
if err := i.refresh(context.Background()); err != nil {
235+
i.logger.Warn("on-demand refresh failed", "error", err)
236+
}
237+
238+
i.mu.RLock()
239+
info, found = i.cache[containerID]
240+
i.mu.RUnlock()
241+
242+
if !found {
243+
return nil, false, nil
244+
}
245+
return info, true, nil
246+
}

0 commit comments

Comments
 (0)