Skip to content

Commit 1164cdd

Browse files
committed
feat(k8s): replace API server watch with kubelet /pods polling
Reduces API server load on large clusters by polling local kubelet /pods endpoint instead of maintaining persistent watch connections. - Add kubeletPodInformer that polls kubelet at NODE_IP:10250/pods - Use downward API (status.hostIP) to get node IP - Add nodes/proxy RBAC for kubelet webhook authorization - Keep apiserver mode as fallback via kube.podInformer.mode config Signed-off-by: Vimal Kumar <vimal78@gmail.com>
1 parent a8224bb commit 1164cdd

File tree

8 files changed

+623
-10
lines changed

8 files changed

+623
-10
lines changed

cmd/kepler/main.go

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -137,11 +137,7 @@ func createServices(logger *slog.Logger, cfg *config.Config) ([]service.Service,
137137

138138
var podInformer pod.Informer
139139
if *cfg.Kube.Enabled {
140-
podInformer = pod.NewInformer(
141-
pod.WithLogger(logger),
142-
pod.WithKubeConfig(cfg.Kube.Config),
143-
pod.WithNodeName(cfg.Kube.Node),
144-
)
140+
podInformer = createPodInformer(cfg, logger)
145141
services = append(services, podInformer)
146142
}
147143
resourceInformer, err := resource.NewInformer(
@@ -231,6 +227,30 @@ func createServices(logger *slog.Logger, cfg *config.Config) ([]service.Service,
231227
return services, nil
232228
}
233229

230+
func createPodInformer(cfg *config.Config, logger *slog.Logger) pod.Informer {
231+
if cfg.Kube.PodInformer.Mode == "apiserver" {
232+
logger.Info("using API server pod informer")
233+
return pod.NewInformer(
234+
pod.WithLogger(logger),
235+
pod.WithKubeConfig(cfg.Kube.Config),
236+
pod.WithNodeName(cfg.Kube.Node),
237+
)
238+
}
239+
240+
// Default: kubelet-based informer
241+
logger.Info("using kubelet pod informer",
242+
"host", cfg.Kube.PodInformer.KubeletHost,
243+
"port", cfg.Kube.PodInformer.KubeletPort,
244+
"pollInterval", cfg.Kube.PodInformer.PollInterval)
245+
return pod.NewKubeletInformer(
246+
pod.WithLogger(logger),
247+
pod.WithNodeName(cfg.Kube.Node),
248+
pod.WithKubeletHost(cfg.Kube.PodInformer.KubeletHost),
249+
pod.WithKubeletPort(cfg.Kube.PodInformer.KubeletPort),
250+
pod.WithPollInterval(cfg.Kube.PodInformer.PollInterval),
251+
)
252+
}
253+
234254
func createRedfishService(logger *slog.Logger, cfg *config.Config) (*redfish.Service, error) {
235255
return redfish.NewService(cfg.Experimental.Platform.Redfish, logger, redfish.WithStaleness(cfg.Monitor.Staleness))
236256
}

config/config.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,10 +117,18 @@ type (
117117
Pprof PprofDebug `yaml:"pprof"`
118118
}
119119

120+
PodInformer struct {
121+
Mode string `yaml:"mode"` // "kubelet" (default) or "apiserver"
122+
PollInterval time.Duration `yaml:"pollInterval"` // Poll interval for kubelet mode
123+
KubeletHost string `yaml:"kubeletHost"` // Kubelet host (default: localhost)
124+
KubeletPort int `yaml:"kubeletPort"` // Kubelet port (default: 10250)
125+
}
126+
120127
Kube struct {
121-
Enabled *bool `yaml:"enabled"`
122-
Config string `yaml:"config"`
123-
Node string `yaml:"nodeName"`
128+
Enabled *bool `yaml:"enabled"`
129+
Config string `yaml:"config"`
130+
Node string `yaml:"nodeName"`
131+
PodInformer PodInformer `yaml:"podInformer"`
124132
}
125133

126134
// Platform contains settings for platform power monitoring
@@ -302,6 +310,12 @@ func DefaultConfig() *Config {
302310
},
303311
Kube: Kube{
304312
Enabled: ptr.To(false),
313+
PodInformer: PodInformer{
314+
Mode: "kubelet",
315+
PollInterval: 15 * time.Second,
316+
KubeletHost: "", // resolved at runtime via NODE_IP env var
317+
KubeletPort: 10250,
318+
},
305319
},
306320

307321
// NOTE: Experimental config will be nil by default and only allocated when needed

internal/k8s/pod/kubelet.go

Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
// SPDX-FileCopyrightText: 2025 The Kepler Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package pod
5+
6+
import (
7+
"context"
8+
"crypto/tls"
9+
"encoding/json"
10+
"fmt"
11+
"log/slog"
12+
"net/http"
13+
"os"
14+
"sync"
15+
"time"
16+
17+
corev1 "k8s.io/api/core/v1"
18+
)
19+
20+
const (
21+
defaultKubeletPort = 10250
22+
defaultPollInterval = 15 * time.Second
23+
defaultRequestTimeout = 10 * time.Second
24+
serviceAccountTokenPath = "/var/run/secrets/kubernetes.io/serviceaccount/token"
25+
)
26+
27+
// getDefaultKubeletHost returns the kubelet host from NODE_IP env var,
28+
// falling back to "localhost" if not set. The NODE_IP should be set via
29+
// downward API (status.hostIP) in the pod spec.
30+
func getDefaultKubeletHost() string {
31+
if nodeIP := os.Getenv("NODE_IP"); nodeIP != "" {
32+
return nodeIP
33+
}
34+
return "localhost"
35+
}
36+
37+
type kubeletPodInformer struct {
38+
logger *slog.Logger
39+
nodeName string
40+
kubeletHost string
41+
kubeletPort int
42+
pollInterval time.Duration
43+
44+
httpClient *http.Client
45+
token string
46+
47+
mu sync.RWMutex
48+
cache map[string]*ContainerInfo // containerID -> ContainerInfo
49+
}
50+
51+
// NewKubeletInformer creates a new kubelet-based pod informer that polls
52+
// the local kubelet /pods endpoint instead of watching the API server.
53+
func NewKubeletInformer(opts ...OptFn) *kubeletPodInformer {
54+
opt := DefaultOpts()
55+
for _, fn := range opts {
56+
fn(&opt)
57+
}
58+
59+
host := opt.kubeletHost
60+
if host == "" {
61+
host = getDefaultKubeletHost()
62+
}
63+
port := opt.kubeletPort
64+
if port == 0 {
65+
port = defaultKubeletPort
66+
}
67+
interval := opt.pollInterval
68+
if interval == 0 {
69+
interval = defaultPollInterval
70+
}
71+
72+
return &kubeletPodInformer{
73+
logger: opt.logger.With("service", "kubeletPodInformer"),
74+
nodeName: opt.nodeName,
75+
kubeletHost: host,
76+
kubeletPort: port,
77+
pollInterval: interval,
78+
cache: make(map[string]*ContainerInfo),
79+
}
80+
}
81+
82+
func (i *kubeletPodInformer) Name() string {
83+
return "kubeletPodInformer"
84+
}
85+
86+
func (i *kubeletPodInformer) Init() error {
87+
if i.nodeName == "" {
88+
return fmt.Errorf("nodeName not set")
89+
}
90+
91+
// Read ServiceAccount token
92+
tokenBytes, err := os.ReadFile(serviceAccountTokenPath)
93+
if err != nil {
94+
return fmt.Errorf("failed to read SA token: %w", err)
95+
}
96+
i.token = string(tokenBytes)
97+
98+
// Setup HTTP client with TLS (kubelet uses self-signed cert)
99+
i.httpClient = &http.Client{
100+
Timeout: defaultRequestTimeout,
101+
Transport: &http.Transport{
102+
TLSClientConfig: &tls.Config{
103+
InsecureSkipVerify: true, // kubelet uses self-signed cert
104+
},
105+
},
106+
}
107+
108+
// Do initial fetch
109+
if err := i.refresh(); err != nil {
110+
return fmt.Errorf("initial kubelet fetch failed: %w", err)
111+
}
112+
113+
i.logger.Info("kubelet pod informer initialized",
114+
"nodeName", i.nodeName,
115+
"kubeletHost", i.kubeletHost,
116+
"kubeletPort", i.kubeletPort,
117+
"pollInterval", i.pollInterval)
118+
119+
return nil
120+
}
121+
122+
func (i *kubeletPodInformer) Run(ctx context.Context) error {
123+
i.logger.Info("starting kubelet pod informer")
124+
ticker := time.NewTicker(i.pollInterval)
125+
defer ticker.Stop()
126+
127+
for {
128+
select {
129+
case <-ctx.Done():
130+
i.logger.Info("kubelet pod informer stopped")
131+
return nil
132+
case <-ticker.C:
133+
if err := i.refresh(); err != nil {
134+
i.logger.Warn("failed to refresh pods from kubelet", "error", err)
135+
}
136+
}
137+
}
138+
}
139+
140+
func (i *kubeletPodInformer) refresh() error {
141+
url := fmt.Sprintf("https://%s:%d/pods", i.kubeletHost, i.kubeletPort)
142+
143+
req, err := http.NewRequest(http.MethodGet, url, nil)
144+
if err != nil {
145+
return err
146+
}
147+
req.Header.Set("Authorization", "Bearer "+i.token)
148+
149+
resp, err := i.httpClient.Do(req)
150+
if err != nil {
151+
return err
152+
}
153+
defer resp.Body.Close()
154+
155+
if resp.StatusCode != http.StatusOK {
156+
return fmt.Errorf("kubelet returned status %d", resp.StatusCode)
157+
}
158+
159+
var podList corev1.PodList
160+
if err := json.NewDecoder(resp.Body).Decode(&podList); err != nil {
161+
return fmt.Errorf("failed to decode pod list: %w", err)
162+
}
163+
164+
// Build container ID -> pod info cache
165+
newCache := make(map[string]*ContainerInfo)
166+
for _, pod := range podList.Items {
167+
i.addContainersToCache(newCache, &pod, pod.Status.ContainerStatuses)
168+
i.addContainersToCache(newCache, &pod, pod.Status.InitContainerStatuses)
169+
i.addContainersToCache(newCache, &pod, pod.Status.EphemeralContainerStatuses)
170+
}
171+
172+
i.mu.Lock()
173+
i.cache = newCache
174+
i.mu.Unlock()
175+
176+
i.logger.Debug("refreshed pod cache from kubelet",
177+
"podCount", len(podList.Items),
178+
"containerCount", len(newCache))
179+
return nil
180+
}
181+
182+
func (i *kubeletPodInformer) addContainersToCache(cache map[string]*ContainerInfo, pod *corev1.Pod, statuses []corev1.ContainerStatus) {
183+
for _, status := range statuses {
184+
if status.ContainerID == "" {
185+
continue
186+
}
187+
containerID := extractContainerID(status.ContainerID)
188+
cache[containerID] = &ContainerInfo{
189+
PodID: string(pod.UID),
190+
PodName: pod.Name,
191+
Namespace: pod.Namespace,
192+
ContainerName: status.Name,
193+
}
194+
}
195+
}
196+
197+
// LookupByContainerID retrieves pod details and container name given a containerID.
198+
// If the containerID is not found in cache, it triggers an immediate refresh.
199+
func (i *kubeletPodInformer) LookupByContainerID(containerID string) (*ContainerInfo, bool, error) {
200+
i.mu.RLock()
201+
info, found := i.cache[containerID]
202+
i.mu.RUnlock()
203+
204+
if found {
205+
return info, true, nil
206+
}
207+
208+
// Trigger immediate refresh for unknown container
209+
if err := i.refresh(); err != nil {
210+
i.logger.Warn("on-demand refresh failed", "error", err)
211+
}
212+
213+
i.mu.RLock()
214+
info, found = i.cache[containerID]
215+
i.mu.RUnlock()
216+
217+
if !found {
218+
return nil, false, nil
219+
}
220+
return info, true, nil
221+
}

0 commit comments

Comments
 (0)