From 7689c9d8b8c1c270c13ddb849494555f6425ddb3 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 8 Feb 2026 17:16:42 +0000 Subject: [PATCH 1/6] Initial plan From 90636abe998c33790ef3de46904164b1a6e08c39 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 8 Feb 2026 17:37:11 +0000 Subject: [PATCH 2/6] Add metrics API utilities for fetching node and pod metrics Co-authored-by: brendandburns <5751682+brendandburns@users.noreply.github.com> --- kubernetes/utils/__init__.py | 4 +- kubernetes/utils/metrics.py | 209 +++++++++++++++++++++++++++++++++++ 2 files changed, 212 insertions(+), 1 deletion(-) create mode 100644 kubernetes/utils/metrics.py diff --git a/kubernetes/utils/__init__.py b/kubernetes/utils/__init__.py index 2cd0caa7c2..c83d54fe76 100644 --- a/kubernetes/utils/__init__.py +++ b/kubernetes/utils/__init__.py @@ -17,4 +17,6 @@ from .create_from_yaml import (FailToCreateError, create_from_dict, create_from_yaml, create_from_directory) from .quantity import parse_quantity -from. duration import parse_duration +from .duration import parse_duration +from .metrics import (get_nodes_metrics, get_pods_metrics, + get_pods_metrics_in_all_namespaces) diff --git a/kubernetes/utils/metrics.py b/kubernetes/utils/metrics.py new file mode 100644 index 0000000000..42d80f3ba9 --- /dev/null +++ b/kubernetes/utils/metrics.py @@ -0,0 +1,209 @@ +# Copyright 2024 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Metrics utilities for Kubernetes resource monitoring. + +Provides helpers for fetching and processing resource usage data from the +metrics.k8s.io API endpoint, enabling monitoring and autoscaling workflows. +""" + +from kubernetes.client.api.custom_objects_api import CustomObjectsApi + + +METRICS_API_GROUP = "metrics.k8s.io" +METRICS_API_VERSION = "v1beta1" + + +def get_nodes_metrics(api_client): + """ + Fetch current resource usage for all cluster nodes. + + Retrieves CPU and memory consumption metrics from the metrics-server + for every node in the cluster. + + Parameters: + api_client: An initialized kubernetes.client.ApiClient instance + + Returns: + A dictionary containing the metrics response with structure: + { + 'kind': 'NodeMetricsList', + 'apiVersion': 'metrics.k8s.io/v1beta1', + 'metadata': {...}, + 'items': [ + { + 'metadata': {'name': 'node-1', ...}, + 'timestamp': '2024-01-01T00:00:00Z', + 'window': '30s', + 'usage': {'cpu': '100m', 'memory': '1024Mi'} + }, + ... + ] + } + + Raises: + ApiException: If the metrics server is not available or request fails + + Example: + >>> from kubernetes import client, config + >>> config.load_kube_config() + >>> api_client = client.ApiClient() + >>> metrics = get_nodes_metrics(api_client) + >>> for node in metrics['items']: + ... name = node['metadata']['name'] + ... cpu = node['usage']['cpu'] + ... mem = node['usage']['memory'] + ... print(f"Node {name}: CPU={cpu}, Memory={mem}") + """ + api = CustomObjectsApi(api_client) + return api.list_cluster_custom_object( + group=METRICS_API_GROUP, + version=METRICS_API_VERSION, + plural="nodes" + ) + + +def get_pods_metrics(api_client, namespace, label_selector=None): + """ + Fetch current resource usage for pods in a namespace. + + Retrieves CPU and memory consumption metrics from the metrics-server + for pods in the specified namespace, with optional label filtering. + + Parameters: + api_client: An initialized kubernetes.client.ApiClient instance + namespace: The namespace name to query (required) + label_selector: Optional label query to filter pods (e.g., 'app=web,env=prod') + + Returns: + A dictionary containing the metrics response with structure: + { + 'kind': 'PodMetricsList', + 'apiVersion': 'metrics.k8s.io/v1beta1', + 'metadata': {...}, + 'items': [ + { + 'metadata': {'name': 'pod-1', 'namespace': 'default', ...}, + 'timestamp': '2024-01-01T00:00:00Z', + 'window': '30s', + 'containers': [ + { + 'name': 'container-1', + 'usage': {'cpu': '50m', 'memory': '512Mi'} + }, + ... + ] + }, + ... + ] + } + + Raises: + ValueError: If namespace is None or empty + ApiException: If the metrics server is not available or request fails + + Example: + >>> from kubernetes import client, config + >>> config.load_kube_config() + >>> api_client = client.ApiClient() + >>> + >>> # Get all pods in namespace + >>> metrics = get_pods_metrics(api_client, 'default') + >>> + >>> # Get pods with specific labels + >>> metrics = get_pods_metrics(api_client, 'default', 'app=nginx') + >>> + >>> for pod in metrics['items']: + ... pod_name = pod['metadata']['name'] + ... print(f"Pod: {pod_name}") + ... for container in pod['containers']: + ... cname = container['name'] + ... cpu = container['usage']['cpu'] + ... mem = container['usage']['memory'] + ... print(f" Container {cname}: CPU={cpu}, Memory={mem}") + """ + if not namespace: + raise ValueError("namespace parameter is required and cannot be empty") + + api = CustomObjectsApi(api_client) + + kwargs = { + "group": METRICS_API_GROUP, + "version": METRICS_API_VERSION, + "namespace": namespace, + "plural": "pods" + } + + if label_selector: + kwargs["label_selector"] = label_selector + + return api.list_namespaced_custom_object(**kwargs) + + +def get_pods_metrics_in_all_namespaces(api_client, namespaces, label_selector=None): + """ + Fetch pod metrics across multiple namespaces. + + Queries pod metrics in each specified namespace and returns an aggregated + result. If a namespace query fails, the error is captured in the result + rather than raising an exception. + + Parameters: + api_client: An initialized kubernetes.client.ApiClient instance + namespaces: A list of namespace names to query + label_selector: Optional label query applied to all namespaces + + Returns: + A dictionary mapping namespace names to their metrics or error info: + { + 'namespace-1': { + 'items': [...], + 'kind': 'PodMetricsList', + ... + }, + 'namespace-2': { + 'error': 'error message', + 'kind': 'Error' + }, + ... + } + + Example: + >>> from kubernetes import client, config + >>> config.load_kube_config() + >>> api_client = client.ApiClient() + >>> + >>> namespaces = ['default', 'kube-system', 'monitoring'] + >>> all_metrics = get_pods_metrics_in_all_namespaces(api_client, namespaces) + >>> + >>> for ns, result in all_metrics.items(): + ... if 'error' in result: + ... print(f"{ns}: ERROR - {result['error']}") + ... else: + ... pod_count = len(result.get('items', [])) + ... print(f"{ns}: {pod_count} pods") + """ + results = {} + + for ns in namespaces: + try: + results[ns] = get_pods_metrics(api_client, ns, label_selector) + except Exception as e: + results[ns] = { + 'kind': 'Error', + 'error': str(e) + } + + return results From 7817229dd37a08a0eb7e9e027d240ff188c10913 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 8 Feb 2026 17:42:49 +0000 Subject: [PATCH 3/6] Add metrics example and comprehensive unit tests Co-authored-by: brendandburns <5751682+brendandburns@users.noreply.github.com> --- examples/metrics_example.py | 190 ++++++++++++++++++++++++++++++++ kubernetes/test/test_metrics.py | 183 ++++++++++++++++++++++++++++++ 2 files changed, 373 insertions(+) create mode 100644 examples/metrics_example.py create mode 100644 kubernetes/test/test_metrics.py diff --git a/examples/metrics_example.py b/examples/metrics_example.py new file mode 100644 index 0000000000..0b085a7551 --- /dev/null +++ b/examples/metrics_example.py @@ -0,0 +1,190 @@ +#!/usr/bin/env python +# Copyright 2024 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Example demonstrating how to fetch and display metrics from the Kubernetes +metrics-server using the Python client. + +This example shows: +1. Fetching node metrics +2. Fetching pod metrics in a namespace +3. Fetching pod metrics across multiple namespaces +4. Filtering pod metrics by labels + +Prerequisites: +- A running Kubernetes cluster with metrics-server installed +- kubectl configured to access the cluster +- The kubernetes Python client library installed +""" + +from kubernetes import client, config, utils + + +def print_node_metrics(api_client): + """Fetch and display node metrics.""" + print("\n" + "="*60) + print("NODE METRICS") + print("="*60) + + try: + metrics = utils.get_nodes_metrics(api_client) + + print(f"Found {len(metrics.get('items', []))} nodes\n") + + for node in metrics.get('items', []): + node_name = node['metadata']['name'] + timestamp = node.get('timestamp', 'N/A') + window = node.get('window', 'N/A') + usage = node.get('usage', {}) + + print(f"Node: {node_name}") + print(f" Timestamp: {timestamp}") + print(f" Window: {window}") + print(f" CPU Usage: {usage.get('cpu', 'N/A')}") + print(f" Memory Usage: {usage.get('memory', 'N/A')}") + print() + + except Exception as e: + print(f"Error fetching node metrics: {e}") + + +def print_pod_metrics(api_client, namespace): + """Fetch and display pod metrics for a namespace.""" + print("\n" + "="*60) + print(f"POD METRICS IN NAMESPACE: {namespace}") + print("="*60) + + try: + metrics = utils.get_pods_metrics(api_client, namespace) + + print(f"Found {len(metrics.get('items', []))} pods\n") + + for pod in metrics.get('items', []): + pod_name = pod['metadata']['name'] + timestamp = pod.get('timestamp', 'N/A') + window = pod.get('window', 'N/A') + + print(f"Pod: {pod_name}") + print(f" Timestamp: {timestamp}") + print(f" Window: {window}") + print(f" Containers:") + + for container in pod.get('containers', []): + container_name = container['name'] + usage = container.get('usage', {}) + print(f" - {container_name}:") + print(f" CPU: {usage.get('cpu', 'N/A')}") + print(f" Memory: {usage.get('memory', 'N/A')}") + print() + + except Exception as e: + print(f"Error fetching pod metrics: {e}") + + +def print_filtered_pod_metrics(api_client, namespace, labels): + """Fetch and display pod metrics filtered by labels.""" + print("\n" + "="*60) + print(f"POD METRICS IN NAMESPACE: {namespace}") + print(f"FILTERED BY LABELS: {labels}") + print("="*60) + + try: + metrics = utils.get_pods_metrics(api_client, namespace, labels) + + pods = metrics.get('items', []) + print(f"Found {len(pods)} pods matching labels\n") + + for pod in pods: + pod_name = pod['metadata']['name'] + print(f"Pod: {pod_name}") + + for container in pod.get('containers', []): + container_name = container['name'] + usage = container.get('usage', {}) + print(f" {container_name}: CPU={usage.get('cpu')}, Memory={usage.get('memory')}") + print() + + except Exception as e: + print(f"Error fetching filtered pod metrics: {e}") + + +def print_multi_namespace_metrics(api_client, namespaces): + """Fetch and display pod metrics across multiple namespaces.""" + print("\n" + "="*60) + print(f"POD METRICS ACROSS MULTIPLE NAMESPACES") + print("="*60) + + try: + all_metrics = utils.get_pods_metrics_in_all_namespaces(api_client, namespaces) + + for ns, result in all_metrics.items(): + print(f"\nNamespace: {ns}") + + if 'error' in result: + print(f" Error: {result['error']}") + else: + pod_count = len(result.get('items', [])) + print(f" Pods: {pod_count}") + + # Calculate total resource usage for namespace + total_containers = 0 + for pod in result.get('items', []): + total_containers += len(pod.get('containers', [])) + + print(f" Total containers: {total_containers}") + + except Exception as e: + print(f"Error fetching multi-namespace metrics: {e}") + + +def main(): + """Main function to demonstrate metrics API usage.""" + # Load kubernetes configuration + # This will use your current kubectl context + config.load_kube_config() + + # Create API client + api_client = client.ApiClient() + + print("\nKubernetes Metrics API Example") + print("================================") + print("\nThis example demonstrates fetching resource usage metrics") + print("from the Kubernetes metrics-server.") + print("\nNote: metrics-server must be installed in your cluster for this to work.") + + # Example 1: Fetch node metrics + print_node_metrics(api_client) + + # Example 2: Fetch pod metrics in default namespace + print_pod_metrics(api_client, 'default') + + # Example 3: Fetch pod metrics in kube-system namespace + print_pod_metrics(api_client, 'kube-system') + + # Example 4: Fetch pod metrics with label filter + # Uncomment and modify the label selector to match your pods + # print_filtered_pod_metrics(api_client, 'default', 'app=nginx') + + # Example 5: Fetch metrics across multiple namespaces + namespaces_to_query = ['default', 'kube-system'] + print_multi_namespace_metrics(api_client, namespaces_to_query) + + print("\n" + "="*60) + print("Example completed successfully!") + print("="*60 + "\n") + + +if __name__ == '__main__': + main() diff --git a/kubernetes/test/test_metrics.py b/kubernetes/test/test_metrics.py new file mode 100644 index 0000000000..48e9e3903a --- /dev/null +++ b/kubernetes/test/test_metrics.py @@ -0,0 +1,183 @@ +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import unittest +from unittest.mock import MagicMock, patch +from kubernetes import client, utils + + +class TestMetrics(unittest.TestCase): + + def setUp(self): + self.mock_api_client = MagicMock(spec=client.ApiClient) + + @patch('kubernetes.utils.metrics.CustomObjectsApi') + def test_get_nodes_metrics_success(self, mock_custom_api_class): + """Test successful retrieval of node metrics""" + mock_api_instance = MagicMock() + mock_custom_api_class.return_value = mock_api_instance + + expected_response = { + 'kind': 'NodeMetricsList', + 'apiVersion': 'metrics.k8s.io/v1beta1', + 'items': [ + { + 'metadata': {'name': 'node1'}, + 'timestamp': '2024-01-01T00:00:00Z', + 'window': '30s', + 'usage': {'cpu': '100m', 'memory': '1Gi'} + } + ] + } + mock_api_instance.list_cluster_custom_object.return_value = expected_response + + result = utils.get_nodes_metrics(self.mock_api_client) + + mock_custom_api_class.assert_called_once_with(self.mock_api_client) + mock_api_instance.list_cluster_custom_object.assert_called_once_with( + group='metrics.k8s.io', + version='v1beta1', + plural='nodes' + ) + self.assertEqual(result, expected_response) + self.assertEqual(len(result['items']), 1) + self.assertEqual(result['items'][0]['metadata']['name'], 'node1') + + @patch('kubernetes.utils.metrics.CustomObjectsApi') + def test_get_pods_metrics_success(self, mock_custom_api_class): + """Test successful retrieval of pod metrics""" + mock_api_instance = MagicMock() + mock_custom_api_class.return_value = mock_api_instance + + expected_response = { + 'kind': 'PodMetricsList', + 'apiVersion': 'metrics.k8s.io/v1beta1', + 'items': [ + { + 'metadata': {'name': 'pod1', 'namespace': 'default'}, + 'timestamp': '2024-01-01T00:00:00Z', + 'window': '30s', + 'containers': [ + { + 'name': 'container1', + 'usage': {'cpu': '50m', 'memory': '512Mi'} + } + ] + } + ] + } + mock_api_instance.list_namespaced_custom_object.return_value = expected_response + + result = utils.get_pods_metrics(self.mock_api_client, 'default') + + mock_custom_api_class.assert_called_once_with(self.mock_api_client) + mock_api_instance.list_namespaced_custom_object.assert_called_once_with( + group='metrics.k8s.io', + version='v1beta1', + namespace='default', + plural='pods' + ) + self.assertEqual(result, expected_response) + self.assertEqual(len(result['items']), 1) + + @patch('kubernetes.utils.metrics.CustomObjectsApi') + def test_get_pods_metrics_with_label_selector(self, mock_custom_api_class): + """Test pod metrics retrieval with label selector""" + mock_api_instance = MagicMock() + mock_custom_api_class.return_value = mock_api_instance + + expected_response = {'kind': 'PodMetricsList', 'items': []} + mock_api_instance.list_namespaced_custom_object.return_value = expected_response + + utils.get_pods_metrics(self.mock_api_client, 'production', 'app=web') + + mock_api_instance.list_namespaced_custom_object.assert_called_once_with( + group='metrics.k8s.io', + version='v1beta1', + namespace='production', + plural='pods', + label_selector='app=web' + ) + + def test_get_pods_metrics_empty_namespace_raises_error(self): + """Test that empty namespace raises ValueError""" + with self.assertRaises(ValueError) as context: + utils.get_pods_metrics(self.mock_api_client, '') + + self.assertIn('namespace parameter is required', str(context.exception)) + + def test_get_pods_metrics_none_namespace_raises_error(self): + """Test that None namespace raises ValueError""" + with self.assertRaises(ValueError) as context: + utils.get_pods_metrics(self.mock_api_client, None) + + self.assertIn('namespace parameter is required', str(context.exception)) + + @patch('kubernetes.utils.metrics.get_pods_metrics') + def test_get_pods_metrics_in_all_namespaces_success(self, mock_get_pods): + """Test fetching metrics across multiple namespaces""" + mock_get_pods.side_effect = [ + {'kind': 'PodMetricsList', 'items': [{'metadata': {'name': 'pod1'}}]}, + {'kind': 'PodMetricsList', 'items': [{'metadata': {'name': 'pod2'}}]} + ] + + result = utils.get_pods_metrics_in_all_namespaces( + self.mock_api_client, + ['default', 'kube-system'] + ) + + self.assertEqual(len(result), 2) + self.assertIn('default', result) + self.assertIn('kube-system', result) + self.assertEqual(len(result['default']['items']), 1) + self.assertEqual(len(result['kube-system']['items']), 1) + + @patch('kubernetes.utils.metrics.get_pods_metrics') + def test_get_pods_metrics_in_all_namespaces_with_errors(self, mock_get_pods): + """Test multi-namespace query with partial failures""" + mock_get_pods.side_effect = [ + {'kind': 'PodMetricsList', 'items': []}, + Exception('Namespace not found') + ] + + result = utils.get_pods_metrics_in_all_namespaces( + self.mock_api_client, + ['default', 'invalid-ns'] + ) + + self.assertEqual(len(result), 2) + self.assertIn('default', result) + self.assertIn('invalid-ns', result) + self.assertEqual(result['default']['kind'], 'PodMetricsList') + self.assertEqual(result['invalid-ns']['kind'], 'Error') + self.assertIn('error', result['invalid-ns']) + + @patch('kubernetes.utils.metrics.get_pods_metrics') + def test_get_pods_metrics_in_all_namespaces_with_label_selector(self, mock_get_pods): + """Test multi-namespace query with label selector""" + mock_get_pods.return_value = {'kind': 'PodMetricsList', 'items': []} + + utils.get_pods_metrics_in_all_namespaces( + self.mock_api_client, + ['ns1', 'ns2'], + 'tier=frontend' + ) + + self.assertEqual(mock_get_pods.call_count, 2) + mock_get_pods.assert_any_call(self.mock_api_client, 'ns1', 'tier=frontend') + mock_get_pods.assert_any_call(self.mock_api_client, 'ns2', 'tier=frontend') + + +if __name__ == '__main__': + unittest.main() From 8ea2552361e871b7e085034e5ccdf75cdb2777cd Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 8 Feb 2026 17:44:11 +0000 Subject: [PATCH 4/6] Add comprehensive metrics API documentation Co-authored-by: brendandburns <5751682+brendandburns@users.noreply.github.com> --- kubernetes/docs/metrics.md | 302 +++++++++++++++++++++++++++++++++++++ 1 file changed, 302 insertions(+) create mode 100644 kubernetes/docs/metrics.md diff --git a/kubernetes/docs/metrics.md b/kubernetes/docs/metrics.md new file mode 100644 index 0000000000..2350ef8c2a --- /dev/null +++ b/kubernetes/docs/metrics.md @@ -0,0 +1,302 @@ +# Kubernetes Metrics API Support + +This document describes how to use the metrics utilities in the Kubernetes Python client to access resource usage data from the metrics-server. + +## Overview + +The metrics utilities provide easy access to pod and node resource consumption data (CPU and memory) through the `metrics.k8s.io/v1beta1` API. This enables monitoring and autoscaling workflows directly from Python applications. + +## Prerequisites + +- A running Kubernetes cluster with [metrics-server](https://github.com/kubernetes-sigs/metrics-server) installed +- Kubernetes Python client library installed +- Appropriate RBAC permissions to access metrics API endpoints + +## Installation + +The metrics utilities are included in the `kubernetes.utils` module: + +```python +from kubernetes import client, config, utils +``` + +## Quick Start + +```python +from kubernetes import client, config, utils + +# Load kubernetes configuration +config.load_kube_config() + +# Create API client +api_client = client.ApiClient() + +# Get node metrics +node_metrics = utils.get_nodes_metrics(api_client) +for node in node_metrics['items']: + print(f"{node['metadata']['name']}: {node['usage']}") + +# Get pod metrics in a namespace +pod_metrics = utils.get_pods_metrics(api_client, 'default') +for pod in pod_metrics['items']: + print(f"Pod: {pod['metadata']['name']}") + for container in pod['containers']: + print(f" {container['name']}: {container['usage']}") +``` + +## API Reference + +### `get_nodes_metrics(api_client)` + +Fetches current resource usage for all nodes in the cluster. + +**Parameters:** +- `api_client` (kubernetes.client.ApiClient): Configured API client instance + +**Returns:** +- dict: Response containing node metrics with structure: + ```python + { + 'kind': 'NodeMetricsList', + 'apiVersion': 'metrics.k8s.io/v1beta1', + 'items': [ + { + 'metadata': {'name': 'node-name', ...}, + 'timestamp': '2024-01-01T00:00:00Z', + 'window': '30s', + 'usage': {'cpu': '100m', 'memory': '1Gi'} + } + ] + } + ``` + +**Raises:** +- `ApiException`: When the metrics server is unavailable or request fails + +**Example:** +```python +metrics = utils.get_nodes_metrics(api_client) +for node in metrics['items']: + name = node['metadata']['name'] + cpu = node['usage']['cpu'] + memory = node['usage']['memory'] + print(f"Node {name}: CPU={cpu}, Memory={memory}") +``` + +### `get_pods_metrics(api_client, namespace, label_selector=None)` + +Fetches current resource usage for pods in a specific namespace. + +**Parameters:** +- `api_client` (kubernetes.client.ApiClient): Configured API client instance +- `namespace` (str): Kubernetes namespace to query (required) +- `label_selector` (str, optional): Label selector to filter pods (e.g., `'app=nginx,env=prod'`) + +**Returns:** +- dict: Response containing pod metrics with structure: + ```python + { + 'kind': 'PodMetricsList', + 'apiVersion': 'metrics.k8s.io/v1beta1', + 'items': [ + { + 'metadata': {'name': 'pod-name', 'namespace': 'default', ...}, + 'timestamp': '2024-01-01T00:00:00Z', + 'window': '30s', + 'containers': [ + { + 'name': 'container-name', + 'usage': {'cpu': '50m', 'memory': '512Mi'} + } + ] + } + ] + } + ``` + +**Raises:** +- `ValueError`: When namespace is None or empty +- `ApiException`: When the metrics server is unavailable or request fails + +**Examples:** +```python +# Get all pod metrics in namespace +metrics = utils.get_pods_metrics(api_client, 'default') + +# Get pods matching labels +metrics = utils.get_pods_metrics(api_client, 'production', 'app=nginx') +metrics = utils.get_pods_metrics(api_client, 'prod', 'tier=frontend,env=staging') + +# Process the results +for pod in metrics['items']: + pod_name = pod['metadata']['name'] + for container in pod['containers']: + container_name = container['name'] + cpu = container['usage']['cpu'] + memory = container['usage']['memory'] + print(f"{pod_name}/{container_name}: CPU={cpu}, Memory={memory}") +``` + +### `get_pods_metrics_in_all_namespaces(api_client, namespaces, label_selector=None)` + +Fetches pod metrics across multiple namespaces. + +**Parameters:** +- `api_client` (kubernetes.client.ApiClient): Configured API client instance +- `namespaces` (list of str): List of namespace names to query +- `label_selector` (str, optional): Label selector applied to all namespaces + +**Returns:** +- dict: Maps namespace names to their metrics or error information: + ```python + { + 'namespace-1': { + 'kind': 'PodMetricsList', + 'items': [...] + }, + 'namespace-2': { + 'kind': 'Error', + 'error': 'error message' + } + } + ``` + +**Example:** +```python +namespaces = ['default', 'kube-system', 'production'] +all_metrics = utils.get_pods_metrics_in_all_namespaces(api_client, namespaces) + +for ns, result in all_metrics.items(): + if 'error' in result: + print(f"{ns}: ERROR - {result['error']}") + else: + pod_count = len(result.get('items', [])) + print(f"{ns}: {pod_count} pods") +``` + +## Complete Example + +See [examples/metrics_example.py](../examples/metrics_example.py) for a complete working example that demonstrates: +- Fetching node metrics +- Fetching pod metrics in specific namespaces +- Using label selectors to filter pods +- Querying multiple namespaces +- Error handling + +## Parsing Resource Values + +The metrics API returns resource values as Kubernetes quantity strings (e.g., `"100m"` for CPU, `"1Gi"` for memory). You can parse these using the existing `parse_quantity` utility: + +```python +from kubernetes import utils + +cpu_value = utils.parse_quantity("100m") # Returns Decimal('0.1') +memory_value = utils.parse_quantity("1Gi") # Returns Decimal('1073741824') +``` + +## Common Use Cases + +### Monitoring Resource Usage + +```python +def monitor_namespace_resources(api_client, namespace): + """Monitor total resource usage in a namespace.""" + metrics = utils.get_pods_metrics(api_client, namespace) + + total_cpu = 0 + total_memory = 0 + + for pod in metrics['items']: + for container in pod['containers']: + cpu = utils.parse_quantity(container['usage']['cpu']) + memory = utils.parse_quantity(container['usage']['memory']) + total_cpu += cpu + total_memory += memory + + print(f"Namespace {namespace}:") + print(f" Total CPU: {total_cpu} cores") + print(f" Total Memory: {total_memory / (1024**3):.2f} GiB") +``` + +### Finding Resource-Intensive Pods + +```python +def find_high_cpu_pods(api_client, namespace, threshold_millicores=500): + """Find pods using more than threshold CPU.""" + metrics = utils.get_pods_metrics(api_client, namespace) + high_cpu_pods = [] + + for pod in metrics['items']: + pod_name = pod['metadata']['name'] + for container in pod['containers']: + cpu_str = container['usage']['cpu'] + cpu_millicores = utils.parse_quantity(cpu_str) * 1000 + + if cpu_millicores > threshold_millicores: + high_cpu_pods.append({ + 'pod': pod_name, + 'container': container['name'], + 'cpu': cpu_str + }) + + return high_cpu_pods +``` + +### Comparing Usage Across Namespaces + +```python +def compare_namespace_usage(api_client, namespaces): + """Compare resource usage across namespaces.""" + all_metrics = utils.get_pods_metrics_in_all_namespaces(api_client, namespaces) + + for ns, result in all_metrics.items(): + if 'error' not in result: + pod_count = len(result['items']) + container_count = sum(len(pod['containers']) for pod in result['items']) + print(f"{ns}: {pod_count} pods, {container_count} containers") +``` + +## Troubleshooting + +### Metrics Server Not Available + +If you get an error about metrics not being available: + +``` +ApiException: (404) +Reason: Not Found +``` + +This means metrics-server is not installed or not running. Install it using: + +```bash +kubectl apply -f https://github.com/kubernetes-sigs/metrics-server/releases/latest/download/components.yaml +``` + +### Permission Denied + +If you get a 403 Forbidden error, ensure your service account has permissions to access the metrics API: + +```yaml +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: metrics-reader +rules: +- apiGroups: ["metrics.k8s.io"] + resources: ["pods", "nodes"] + verbs: ["get", "list"] +``` + +### Empty Results + +If metrics return empty results, check that: +1. Pods/nodes are actually running in the namespace +2. Metrics-server has had time to collect data (usually 15-60 seconds after pod start) +3. Label selectors are correct if using filtering + +## Additional Resources + +- [Kubernetes Metrics Server Documentation](https://github.com/kubernetes-sigs/metrics-server) +- [Metrics API Design](https://github.com/kubernetes/design-proposals-archive/blob/main/instrumentation/resource-metrics-api.md) +- [HorizontalPodAutoscaler using metrics](https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/) From 593a545aa141c29cc41451d631e44d1f15fdddc4 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 11 Feb 2026 02:09:21 +0000 Subject: [PATCH 5/6] Add end-to-end test for metrics API functionality - Tests node metrics retrieval from cluster - Tests pod metrics retrieval with namespace filtering - Tests pod metrics with label selector filtering - Tests multi-namespace metrics aggregation - Includes proper error handling and skip logic if metrics-server unavailable - Creates test pod, waits for it to be running, then fetches metrics Co-authored-by: brendandburns <5751682+brendandburns@users.noreply.github.com> --- kubernetes/e2e_test/test_utils.py | 134 ++++++++++++++++++++++++++++++ 1 file changed, 134 insertions(+) diff --git a/kubernetes/e2e_test/test_utils.py b/kubernetes/e2e_test/test_utils.py index 646081b8c7..f60c02c5d0 100644 --- a/kubernetes/e2e_test/test_utils.py +++ b/kubernetes/e2e_test/test_utils.py @@ -608,6 +608,140 @@ def test_create_from_list_in_multi_resource_yaml_namespaced(self): app_api.delete_namespaced_deployment( name="mock", namespace=self.test_namespace, body={}) + def test_get_metrics_from_cluster(self): + """ + Should be able to fetch node and pod metrics from the cluster. + This test requires metrics-server to be installed in the cluster. + """ + k8s_client = client.api_client.ApiClient(configuration=self.config) + core_api = client.CoreV1Api(k8s_client) + + # Create a test pod for metrics + utils.create_from_yaml( + k8s_client, self.path_prefix + "core-pod.yaml", + namespace=self.test_namespace) + + # Wait for pod to be running (with timeout) + import time + max_wait = 60 + waited = 0 + pod_running = False + while waited < max_wait: + try: + pod = core_api.read_namespaced_pod( + name="myapp-pod", namespace=self.test_namespace) + if pod.status.phase == "Running": + pod_running = True + break + except ApiException: + pass + time.sleep(2) + waited += 2 + + # Skip test if pod didn't start (cluster might be slow) + if not pod_running: + core_api.delete_namespaced_pod( + name="myapp-pod", namespace=self.test_namespace, body={}) + raise unittest.SkipTest("Pod did not reach Running state in time") + + # Wait a bit more for metrics to be available + time.sleep(10) + + # Test node metrics retrieval + try: + node_metrics = utils.get_nodes_metrics(k8s_client) + self.assertIsNotNone(node_metrics) + self.assertEqual(node_metrics['kind'], 'NodeMetricsList') + self.assertIn('items', node_metrics) + # We should have at least one node in the cluster + self.assertGreater(len(node_metrics['items']), 0) + # Check structure of first node metric + if len(node_metrics['items']) > 0: + node = node_metrics['items'][0] + self.assertIn('metadata', node) + self.assertIn('name', node['metadata']) + self.assertIn('usage', node) + self.assertIn('cpu', node['usage']) + self.assertIn('memory', node['usage']) + except ApiException as e: + # If metrics-server is not installed, skip this test + if e.status == 404: + core_api.delete_namespaced_pod( + name="myapp-pod", namespace=self.test_namespace, body={}) + raise unittest.SkipTest("Metrics server not available in cluster") + raise + + # Test pod metrics retrieval + try: + pod_metrics = utils.get_pods_metrics( + k8s_client, self.test_namespace) + self.assertIsNotNone(pod_metrics) + self.assertEqual(pod_metrics['kind'], 'PodMetricsList') + self.assertIn('items', pod_metrics) + # We should have our test pod + self.assertGreater(len(pod_metrics['items']), 0) + # Check structure of pod metrics + found_test_pod = False + for pod in pod_metrics['items']: + if pod['metadata']['name'] == 'myapp-pod': + found_test_pod = True + self.assertIn('containers', pod) + self.assertGreater(len(pod['containers']), 0) + container = pod['containers'][0] + self.assertIn('name', container) + self.assertIn('usage', container) + self.assertIn('cpu', container['usage']) + self.assertIn('memory', container['usage']) + # Our test pod should appear in metrics + self.assertTrue(found_test_pod, "Test pod not found in metrics") + except ApiException as e: + if e.status == 404: + core_api.delete_namespaced_pod( + name="myapp-pod", namespace=self.test_namespace, body={}) + raise unittest.SkipTest("Metrics server not available in cluster") + raise + + # Test pod metrics with label selector + try: + filtered_metrics = utils.get_pods_metrics( + k8s_client, self.test_namespace, label_selector='app=myapp') + self.assertIsNotNone(filtered_metrics) + self.assertEqual(filtered_metrics['kind'], 'PodMetricsList') + self.assertIn('items', filtered_metrics) + # Should have our pod with the matching label + self.assertGreater(len(filtered_metrics['items']), 0) + for pod in filtered_metrics['items']: + # All returned pods should have the label we filtered for + self.assertEqual(pod['metadata']['name'], 'myapp-pod') + except ApiException as e: + if e.status == 404: + core_api.delete_namespaced_pod( + name="myapp-pod", namespace=self.test_namespace, body={}) + raise unittest.SkipTest("Metrics server not available in cluster") + raise + + # Test multi-namespace metrics collection + try: + multi_ns_metrics = utils.get_pods_metrics_in_all_namespaces( + k8s_client, [self.test_namespace, 'default']) + self.assertIsNotNone(multi_ns_metrics) + self.assertIn(self.test_namespace, multi_ns_metrics) + self.assertIn('default', multi_ns_metrics) + # Our test namespace should have metrics + test_ns_result = multi_ns_metrics[self.test_namespace] + self.assertNotIn('error', test_ns_result) + self.assertEqual(test_ns_result['kind'], 'PodMetricsList') + except ApiException as e: + if e.status == 404: + core_api.delete_namespaced_pod( + name="myapp-pod", namespace=self.test_namespace, body={}) + raise unittest.SkipTest("Metrics server not available in cluster") + raise + + # Cleanup + core_api.delete_namespaced_pod( + name="myapp-pod", namespace=self.test_namespace, body={}) + class TestUtilsUnitTests(unittest.TestCase): From c234b6c6f45e18814d97001152f7b19c5d144503 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 11 Feb 2026 02:11:32 +0000 Subject: [PATCH 6/6] Simplify e2e test with concise implementation Co-authored-by: brendandburns <5751682+brendandburns@users.noreply.github.com> --- kubernetes/e2e_test/test_utils.py | 160 +++++++++--------------------- 1 file changed, 46 insertions(+), 114 deletions(-) diff --git a/kubernetes/e2e_test/test_utils.py b/kubernetes/e2e_test/test_utils.py index f60c02c5d0..b06c0a6b4a 100644 --- a/kubernetes/e2e_test/test_utils.py +++ b/kubernetes/e2e_test/test_utils.py @@ -608,139 +608,71 @@ def test_create_from_list_in_multi_resource_yaml_namespaced(self): app_api.delete_namespaced_deployment( name="mock", namespace=self.test_namespace, body={}) - def test_get_metrics_from_cluster(self): + def test_metrics_utilities_integration(self): """ - Should be able to fetch node and pod metrics from the cluster. - This test requires metrics-server to be installed in the cluster. + E2E validation of metrics utility functions. + Note: Requires metrics-server to be running in cluster. """ - k8s_client = client.api_client.ApiClient(configuration=self.config) - core_api = client.CoreV1Api(k8s_client) + from time import sleep + + api = client.api_client.ApiClient(configuration=self.config) + v1 = client.CoreV1Api(api) - # Create a test pod for metrics + # Setup: deploy busybox pod utils.create_from_yaml( - k8s_client, self.path_prefix + "core-pod.yaml", + api, self.path_prefix + "core-pod.yaml", namespace=self.test_namespace) - # Wait for pod to be running (with timeout) - import time - max_wait = 60 - waited = 0 - pod_running = False - while waited < max_wait: + # Wait for pod startup (simple polling) + for _ in range(30): try: - pod = core_api.read_namespaced_pod( - name="myapp-pod", namespace=self.test_namespace) - if pod.status.phase == "Running": - pod_running = True + p = v1.read_namespaced_pod("myapp-pod", self.test_namespace) + if p.status.phase == "Running": break - except ApiException: + except: pass - time.sleep(2) - waited += 2 - - # Skip test if pod didn't start (cluster might be slow) - if not pod_running: - core_api.delete_namespaced_pod( - name="myapp-pod", namespace=self.test_namespace, body={}) - raise unittest.SkipTest("Pod did not reach Running state in time") + sleep(2) + else: + # Cleanup and skip if pod never started + try: + v1.delete_namespaced_pod("myapp-pod", self.test_namespace, body={}) + except: + pass + raise unittest.SkipTest("Pod startup timeout") - # Wait a bit more for metrics to be available - time.sleep(10) + # Allow metrics scrape interval + sleep(10) - # Test node metrics retrieval + # Test 1: Node metrics utility try: - node_metrics = utils.get_nodes_metrics(k8s_client) - self.assertIsNotNone(node_metrics) - self.assertEqual(node_metrics['kind'], 'NodeMetricsList') - self.assertIn('items', node_metrics) - # We should have at least one node in the cluster - self.assertGreater(len(node_metrics['items']), 0) - # Check structure of first node metric - if len(node_metrics['items']) > 0: - node = node_metrics['items'][0] - self.assertIn('metadata', node) - self.assertIn('name', node['metadata']) - self.assertIn('usage', node) - self.assertIn('cpu', node['usage']) - self.assertIn('memory', node['usage']) + result = utils.get_nodes_metrics(api) + self.assertTrue('items' in result and len(result['items']) > 0) + self.assertTrue('usage' in result['items'][0]) except ApiException as e: - # If metrics-server is not installed, skip this test if e.status == 404: - core_api.delete_namespaced_pod( - name="myapp-pod", namespace=self.test_namespace, body={}) - raise unittest.SkipTest("Metrics server not available in cluster") + v1.delete_namespaced_pod("myapp-pod", self.test_namespace, body={}) + raise unittest.SkipTest("Metrics API unavailable") raise - # Test pod metrics retrieval - try: - pod_metrics = utils.get_pods_metrics( - k8s_client, self.test_namespace) - self.assertIsNotNone(pod_metrics) - self.assertEqual(pod_metrics['kind'], 'PodMetricsList') - self.assertIn('items', pod_metrics) - # We should have our test pod - self.assertGreater(len(pod_metrics['items']), 0) - # Check structure of pod metrics - found_test_pod = False - for pod in pod_metrics['items']: - if pod['metadata']['name'] == 'myapp-pod': - found_test_pod = True - self.assertIn('containers', pod) - self.assertGreater(len(pod['containers']), 0) - container = pod['containers'][0] - self.assertIn('name', container) - self.assertIn('usage', container) - self.assertIn('cpu', container['usage']) - self.assertIn('memory', container['usage']) - # Our test pod should appear in metrics - self.assertTrue(found_test_pod, "Test pod not found in metrics") - except ApiException as e: - if e.status == 404: - core_api.delete_namespaced_pod( - name="myapp-pod", namespace=self.test_namespace, body={}) - raise unittest.SkipTest("Metrics server not available in cluster") - raise + # Test 2: Pod metrics utility (basic) + result = utils.get_pods_metrics(api, self.test_namespace) + self.assertTrue('items' in result) + pod_names = [item['metadata']['name'] for item in result['items']] + self.assertIn('myapp-pod', pod_names) - # Test pod metrics with label selector - try: - filtered_metrics = utils.get_pods_metrics( - k8s_client, self.test_namespace, label_selector='app=myapp') - self.assertIsNotNone(filtered_metrics) - self.assertEqual(filtered_metrics['kind'], 'PodMetricsList') - self.assertIn('items', filtered_metrics) - # Should have our pod with the matching label - self.assertGreater(len(filtered_metrics['items']), 0) - for pod in filtered_metrics['items']: - # All returned pods should have the label we filtered for - self.assertEqual(pod['metadata']['name'], 'myapp-pod') - except ApiException as e: - if e.status == 404: - core_api.delete_namespaced_pod( - name="myapp-pod", namespace=self.test_namespace, body={}) - raise unittest.SkipTest("Metrics server not available in cluster") - raise + # Test 3: Pod metrics with label filtering + result = utils.get_pods_metrics(api, self.test_namespace, 'app=myapp') + self.assertEqual(len(result['items']), 1) + self.assertEqual(result['items'][0]['metadata']['name'], 'myapp-pod') - # Test multi-namespace metrics collection - try: - multi_ns_metrics = utils.get_pods_metrics_in_all_namespaces( - k8s_client, [self.test_namespace, 'default']) - self.assertIsNotNone(multi_ns_metrics) - self.assertIn(self.test_namespace, multi_ns_metrics) - self.assertIn('default', multi_ns_metrics) - # Our test namespace should have metrics - test_ns_result = multi_ns_metrics[self.test_namespace] - self.assertNotIn('error', test_ns_result) - self.assertEqual(test_ns_result['kind'], 'PodMetricsList') - except ApiException as e: - if e.status == 404: - core_api.delete_namespaced_pod( - name="myapp-pod", namespace=self.test_namespace, body={}) - raise unittest.SkipTest("Metrics server not available in cluster") - raise + # Test 4: Multi-namespace aggregation + result = utils.get_pods_metrics_in_all_namespaces( + api, [self.test_namespace, 'default']) + self.assertIn(self.test_namespace, result) + self.assertNotIn('error', result[self.test_namespace]) - # Cleanup - core_api.delete_namespaced_pod( - name="myapp-pod", namespace=self.test_namespace, body={}) + # Teardown + v1.delete_namespaced_pod("myapp-pod", self.test_namespace, body={}) class TestUtilsUnitTests(unittest.TestCase):