diff --git a/examples/metrics_example.py b/examples/metrics_example.py new file mode 100644 index 0000000000..0b085a7551 --- /dev/null +++ b/examples/metrics_example.py @@ -0,0 +1,190 @@ +#!/usr/bin/env python +# Copyright 2024 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Example demonstrating how to fetch and display metrics from the Kubernetes +metrics-server using the Python client. + +This example shows: +1. Fetching node metrics +2. Fetching pod metrics in a namespace +3. Fetching pod metrics across multiple namespaces +4. Filtering pod metrics by labels + +Prerequisites: +- A running Kubernetes cluster with metrics-server installed +- kubectl configured to access the cluster +- The kubernetes Python client library installed +""" + +from kubernetes import client, config, utils + + +def print_node_metrics(api_client): + """Fetch and display node metrics.""" + print("\n" + "="*60) + print("NODE METRICS") + print("="*60) + + try: + metrics = utils.get_nodes_metrics(api_client) + + print(f"Found {len(metrics.get('items', []))} nodes\n") + + for node in metrics.get('items', []): + node_name = node['metadata']['name'] + timestamp = node.get('timestamp', 'N/A') + window = node.get('window', 'N/A') + usage = node.get('usage', {}) + + print(f"Node: {node_name}") + print(f" Timestamp: {timestamp}") + print(f" Window: {window}") + print(f" CPU Usage: {usage.get('cpu', 'N/A')}") + print(f" Memory Usage: {usage.get('memory', 'N/A')}") + print() + + except Exception as e: + print(f"Error fetching node metrics: {e}") + + +def print_pod_metrics(api_client, namespace): + """Fetch and display pod metrics for a namespace.""" + print("\n" + "="*60) + print(f"POD METRICS IN NAMESPACE: {namespace}") + print("="*60) + + try: + metrics = utils.get_pods_metrics(api_client, namespace) + + print(f"Found {len(metrics.get('items', []))} pods\n") + + for pod in metrics.get('items', []): + pod_name = pod['metadata']['name'] + timestamp = pod.get('timestamp', 'N/A') + window = pod.get('window', 'N/A') + + print(f"Pod: {pod_name}") + print(f" Timestamp: {timestamp}") + print(f" Window: {window}") + print(f" Containers:") + + for container in pod.get('containers', []): + container_name = container['name'] + usage = container.get('usage', {}) + print(f" - {container_name}:") + print(f" CPU: {usage.get('cpu', 'N/A')}") + print(f" Memory: {usage.get('memory', 'N/A')}") + print() + + except Exception as e: + print(f"Error fetching pod metrics: {e}") + + +def print_filtered_pod_metrics(api_client, namespace, labels): + """Fetch and display pod metrics filtered by labels.""" + print("\n" + "="*60) + print(f"POD METRICS IN NAMESPACE: {namespace}") + print(f"FILTERED BY LABELS: {labels}") + print("="*60) + + try: + metrics = utils.get_pods_metrics(api_client, namespace, labels) + + pods = metrics.get('items', []) + print(f"Found {len(pods)} pods matching labels\n") + + for pod in pods: + pod_name = pod['metadata']['name'] + print(f"Pod: {pod_name}") + + for container in pod.get('containers', []): + container_name = container['name'] + usage = container.get('usage', {}) + print(f" {container_name}: CPU={usage.get('cpu')}, Memory={usage.get('memory')}") + print() + + except Exception as e: + print(f"Error fetching filtered pod metrics: {e}") + + +def print_multi_namespace_metrics(api_client, namespaces): + """Fetch and display pod metrics across multiple namespaces.""" + print("\n" + "="*60) + print(f"POD METRICS ACROSS MULTIPLE NAMESPACES") + print("="*60) + + try: + all_metrics = utils.get_pods_metrics_in_all_namespaces(api_client, namespaces) + + for ns, result in all_metrics.items(): + print(f"\nNamespace: {ns}") + + if 'error' in result: + print(f" Error: {result['error']}") + else: + pod_count = len(result.get('items', [])) + print(f" Pods: {pod_count}") + + # Calculate total resource usage for namespace + total_containers = 0 + for pod in result.get('items', []): + total_containers += len(pod.get('containers', [])) + + print(f" Total containers: {total_containers}") + + except Exception as e: + print(f"Error fetching multi-namespace metrics: {e}") + + +def main(): + """Main function to demonstrate metrics API usage.""" + # Load kubernetes configuration + # This will use your current kubectl context + config.load_kube_config() + + # Create API client + api_client = client.ApiClient() + + print("\nKubernetes Metrics API Example") + print("================================") + print("\nThis example demonstrates fetching resource usage metrics") + print("from the Kubernetes metrics-server.") + print("\nNote: metrics-server must be installed in your cluster for this to work.") + + # Example 1: Fetch node metrics + print_node_metrics(api_client) + + # Example 2: Fetch pod metrics in default namespace + print_pod_metrics(api_client, 'default') + + # Example 3: Fetch pod metrics in kube-system namespace + print_pod_metrics(api_client, 'kube-system') + + # Example 4: Fetch pod metrics with label filter + # Uncomment and modify the label selector to match your pods + # print_filtered_pod_metrics(api_client, 'default', 'app=nginx') + + # Example 5: Fetch metrics across multiple namespaces + namespaces_to_query = ['default', 'kube-system'] + print_multi_namespace_metrics(api_client, namespaces_to_query) + + print("\n" + "="*60) + print("Example completed successfully!") + print("="*60 + "\n") + + +if __name__ == '__main__': + main() diff --git a/kubernetes/docs/metrics.md b/kubernetes/docs/metrics.md new file mode 100644 index 0000000000..2350ef8c2a --- /dev/null +++ b/kubernetes/docs/metrics.md @@ -0,0 +1,302 @@ +# Kubernetes Metrics API Support + +This document describes how to use the metrics utilities in the Kubernetes Python client to access resource usage data from the metrics-server. + +## Overview + +The metrics utilities provide easy access to pod and node resource consumption data (CPU and memory) through the `metrics.k8s.io/v1beta1` API. This enables monitoring and autoscaling workflows directly from Python applications. + +## Prerequisites + +- A running Kubernetes cluster with [metrics-server](https://github.com/kubernetes-sigs/metrics-server) installed +- Kubernetes Python client library installed +- Appropriate RBAC permissions to access metrics API endpoints + +## Installation + +The metrics utilities are included in the `kubernetes.utils` module: + +```python +from kubernetes import client, config, utils +``` + +## Quick Start + +```python +from kubernetes import client, config, utils + +# Load kubernetes configuration +config.load_kube_config() + +# Create API client +api_client = client.ApiClient() + +# Get node metrics +node_metrics = utils.get_nodes_metrics(api_client) +for node in node_metrics['items']: + print(f"{node['metadata']['name']}: {node['usage']}") + +# Get pod metrics in a namespace +pod_metrics = utils.get_pods_metrics(api_client, 'default') +for pod in pod_metrics['items']: + print(f"Pod: {pod['metadata']['name']}") + for container in pod['containers']: + print(f" {container['name']}: {container['usage']}") +``` + +## API Reference + +### `get_nodes_metrics(api_client)` + +Fetches current resource usage for all nodes in the cluster. + +**Parameters:** +- `api_client` (kubernetes.client.ApiClient): Configured API client instance + +**Returns:** +- dict: Response containing node metrics with structure: + ```python + { + 'kind': 'NodeMetricsList', + 'apiVersion': 'metrics.k8s.io/v1beta1', + 'items': [ + { + 'metadata': {'name': 'node-name', ...}, + 'timestamp': '2024-01-01T00:00:00Z', + 'window': '30s', + 'usage': {'cpu': '100m', 'memory': '1Gi'} + } + ] + } + ``` + +**Raises:** +- `ApiException`: When the metrics server is unavailable or request fails + +**Example:** +```python +metrics = utils.get_nodes_metrics(api_client) +for node in metrics['items']: + name = node['metadata']['name'] + cpu = node['usage']['cpu'] + memory = node['usage']['memory'] + print(f"Node {name}: CPU={cpu}, Memory={memory}") +``` + +### `get_pods_metrics(api_client, namespace, label_selector=None)` + +Fetches current resource usage for pods in a specific namespace. + +**Parameters:** +- `api_client` (kubernetes.client.ApiClient): Configured API client instance +- `namespace` (str): Kubernetes namespace to query (required) +- `label_selector` (str, optional): Label selector to filter pods (e.g., `'app=nginx,env=prod'`) + +**Returns:** +- dict: Response containing pod metrics with structure: + ```python + { + 'kind': 'PodMetricsList', + 'apiVersion': 'metrics.k8s.io/v1beta1', + 'items': [ + { + 'metadata': {'name': 'pod-name', 'namespace': 'default', ...}, + 'timestamp': '2024-01-01T00:00:00Z', + 'window': '30s', + 'containers': [ + { + 'name': 'container-name', + 'usage': {'cpu': '50m', 'memory': '512Mi'} + } + ] + } + ] + } + ``` + +**Raises:** +- `ValueError`: When namespace is None or empty +- `ApiException`: When the metrics server is unavailable or request fails + +**Examples:** +```python +# Get all pod metrics in namespace +metrics = utils.get_pods_metrics(api_client, 'default') + +# Get pods matching labels +metrics = utils.get_pods_metrics(api_client, 'production', 'app=nginx') +metrics = utils.get_pods_metrics(api_client, 'prod', 'tier=frontend,env=staging') + +# Process the results +for pod in metrics['items']: + pod_name = pod['metadata']['name'] + for container in pod['containers']: + container_name = container['name'] + cpu = container['usage']['cpu'] + memory = container['usage']['memory'] + print(f"{pod_name}/{container_name}: CPU={cpu}, Memory={memory}") +``` + +### `get_pods_metrics_in_all_namespaces(api_client, namespaces, label_selector=None)` + +Fetches pod metrics across multiple namespaces. + +**Parameters:** +- `api_client` (kubernetes.client.ApiClient): Configured API client instance +- `namespaces` (list of str): List of namespace names to query +- `label_selector` (str, optional): Label selector applied to all namespaces + +**Returns:** +- dict: Maps namespace names to their metrics or error information: + ```python + { + 'namespace-1': { + 'kind': 'PodMetricsList', + 'items': [...] + }, + 'namespace-2': { + 'kind': 'Error', + 'error': 'error message' + } + } + ``` + +**Example:** +```python +namespaces = ['default', 'kube-system', 'production'] +all_metrics = utils.get_pods_metrics_in_all_namespaces(api_client, namespaces) + +for ns, result in all_metrics.items(): + if 'error' in result: + print(f"{ns}: ERROR - {result['error']}") + else: + pod_count = len(result.get('items', [])) + print(f"{ns}: {pod_count} pods") +``` + +## Complete Example + +See [examples/metrics_example.py](../examples/metrics_example.py) for a complete working example that demonstrates: +- Fetching node metrics +- Fetching pod metrics in specific namespaces +- Using label selectors to filter pods +- Querying multiple namespaces +- Error handling + +## Parsing Resource Values + +The metrics API returns resource values as Kubernetes quantity strings (e.g., `"100m"` for CPU, `"1Gi"` for memory). You can parse these using the existing `parse_quantity` utility: + +```python +from kubernetes import utils + +cpu_value = utils.parse_quantity("100m") # Returns Decimal('0.1') +memory_value = utils.parse_quantity("1Gi") # Returns Decimal('1073741824') +``` + +## Common Use Cases + +### Monitoring Resource Usage + +```python +def monitor_namespace_resources(api_client, namespace): + """Monitor total resource usage in a namespace.""" + metrics = utils.get_pods_metrics(api_client, namespace) + + total_cpu = 0 + total_memory = 0 + + for pod in metrics['items']: + for container in pod['containers']: + cpu = utils.parse_quantity(container['usage']['cpu']) + memory = utils.parse_quantity(container['usage']['memory']) + total_cpu += cpu + total_memory += memory + + print(f"Namespace {namespace}:") + print(f" Total CPU: {total_cpu} cores") + print(f" Total Memory: {total_memory / (1024**3):.2f} GiB") +``` + +### Finding Resource-Intensive Pods + +```python +def find_high_cpu_pods(api_client, namespace, threshold_millicores=500): + """Find pods using more than threshold CPU.""" + metrics = utils.get_pods_metrics(api_client, namespace) + high_cpu_pods = [] + + for pod in metrics['items']: + pod_name = pod['metadata']['name'] + for container in pod['containers']: + cpu_str = container['usage']['cpu'] + cpu_millicores = utils.parse_quantity(cpu_str) * 1000 + + if cpu_millicores > threshold_millicores: + high_cpu_pods.append({ + 'pod': pod_name, + 'container': container['name'], + 'cpu': cpu_str + }) + + return high_cpu_pods +``` + +### Comparing Usage Across Namespaces + +```python +def compare_namespace_usage(api_client, namespaces): + """Compare resource usage across namespaces.""" + all_metrics = utils.get_pods_metrics_in_all_namespaces(api_client, namespaces) + + for ns, result in all_metrics.items(): + if 'error' not in result: + pod_count = len(result['items']) + container_count = sum(len(pod['containers']) for pod in result['items']) + print(f"{ns}: {pod_count} pods, {container_count} containers") +``` + +## Troubleshooting + +### Metrics Server Not Available + +If you get an error about metrics not being available: + +``` +ApiException: (404) +Reason: Not Found +``` + +This means metrics-server is not installed or not running. Install it using: + +```bash +kubectl apply -f https://github.com/kubernetes-sigs/metrics-server/releases/latest/download/components.yaml +``` + +### Permission Denied + +If you get a 403 Forbidden error, ensure your service account has permissions to access the metrics API: + +```yaml +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: metrics-reader +rules: +- apiGroups: ["metrics.k8s.io"] + resources: ["pods", "nodes"] + verbs: ["get", "list"] +``` + +### Empty Results + +If metrics return empty results, check that: +1. Pods/nodes are actually running in the namespace +2. Metrics-server has had time to collect data (usually 15-60 seconds after pod start) +3. Label selectors are correct if using filtering + +## Additional Resources + +- [Kubernetes Metrics Server Documentation](https://github.com/kubernetes-sigs/metrics-server) +- [Metrics API Design](https://github.com/kubernetes/design-proposals-archive/blob/main/instrumentation/resource-metrics-api.md) +- [HorizontalPodAutoscaler using metrics](https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/) diff --git a/kubernetes/e2e_test/test_utils.py b/kubernetes/e2e_test/test_utils.py index 646081b8c7..b06c0a6b4a 100644 --- a/kubernetes/e2e_test/test_utils.py +++ b/kubernetes/e2e_test/test_utils.py @@ -608,6 +608,72 @@ def test_create_from_list_in_multi_resource_yaml_namespaced(self): app_api.delete_namespaced_deployment( name="mock", namespace=self.test_namespace, body={}) + def test_metrics_utilities_integration(self): + """ + E2E validation of metrics utility functions. + Note: Requires metrics-server to be running in cluster. + """ + from time import sleep + + api = client.api_client.ApiClient(configuration=self.config) + v1 = client.CoreV1Api(api) + + # Setup: deploy busybox pod + utils.create_from_yaml( + api, self.path_prefix + "core-pod.yaml", + namespace=self.test_namespace) + + # Wait for pod startup (simple polling) + for _ in range(30): + try: + p = v1.read_namespaced_pod("myapp-pod", self.test_namespace) + if p.status.phase == "Running": + break + except: + pass + sleep(2) + else: + # Cleanup and skip if pod never started + try: + v1.delete_namespaced_pod("myapp-pod", self.test_namespace, body={}) + except: + pass + raise unittest.SkipTest("Pod startup timeout") + + # Allow metrics scrape interval + sleep(10) + + # Test 1: Node metrics utility + try: + result = utils.get_nodes_metrics(api) + self.assertTrue('items' in result and len(result['items']) > 0) + self.assertTrue('usage' in result['items'][0]) + except ApiException as e: + if e.status == 404: + v1.delete_namespaced_pod("myapp-pod", self.test_namespace, body={}) + raise unittest.SkipTest("Metrics API unavailable") + raise + + # Test 2: Pod metrics utility (basic) + result = utils.get_pods_metrics(api, self.test_namespace) + self.assertTrue('items' in result) + pod_names = [item['metadata']['name'] for item in result['items']] + self.assertIn('myapp-pod', pod_names) + + # Test 3: Pod metrics with label filtering + result = utils.get_pods_metrics(api, self.test_namespace, 'app=myapp') + self.assertEqual(len(result['items']), 1) + self.assertEqual(result['items'][0]['metadata']['name'], 'myapp-pod') + + # Test 4: Multi-namespace aggregation + result = utils.get_pods_metrics_in_all_namespaces( + api, [self.test_namespace, 'default']) + self.assertIn(self.test_namespace, result) + self.assertNotIn('error', result[self.test_namespace]) + + # Teardown + v1.delete_namespaced_pod("myapp-pod", self.test_namespace, body={}) + class TestUtilsUnitTests(unittest.TestCase): diff --git a/kubernetes/test/test_metrics.py b/kubernetes/test/test_metrics.py new file mode 100644 index 0000000000..48e9e3903a --- /dev/null +++ b/kubernetes/test/test_metrics.py @@ -0,0 +1,183 @@ +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import unittest +from unittest.mock import MagicMock, patch +from kubernetes import client, utils + + +class TestMetrics(unittest.TestCase): + + def setUp(self): + self.mock_api_client = MagicMock(spec=client.ApiClient) + + @patch('kubernetes.utils.metrics.CustomObjectsApi') + def test_get_nodes_metrics_success(self, mock_custom_api_class): + """Test successful retrieval of node metrics""" + mock_api_instance = MagicMock() + mock_custom_api_class.return_value = mock_api_instance + + expected_response = { + 'kind': 'NodeMetricsList', + 'apiVersion': 'metrics.k8s.io/v1beta1', + 'items': [ + { + 'metadata': {'name': 'node1'}, + 'timestamp': '2024-01-01T00:00:00Z', + 'window': '30s', + 'usage': {'cpu': '100m', 'memory': '1Gi'} + } + ] + } + mock_api_instance.list_cluster_custom_object.return_value = expected_response + + result = utils.get_nodes_metrics(self.mock_api_client) + + mock_custom_api_class.assert_called_once_with(self.mock_api_client) + mock_api_instance.list_cluster_custom_object.assert_called_once_with( + group='metrics.k8s.io', + version='v1beta1', + plural='nodes' + ) + self.assertEqual(result, expected_response) + self.assertEqual(len(result['items']), 1) + self.assertEqual(result['items'][0]['metadata']['name'], 'node1') + + @patch('kubernetes.utils.metrics.CustomObjectsApi') + def test_get_pods_metrics_success(self, mock_custom_api_class): + """Test successful retrieval of pod metrics""" + mock_api_instance = MagicMock() + mock_custom_api_class.return_value = mock_api_instance + + expected_response = { + 'kind': 'PodMetricsList', + 'apiVersion': 'metrics.k8s.io/v1beta1', + 'items': [ + { + 'metadata': {'name': 'pod1', 'namespace': 'default'}, + 'timestamp': '2024-01-01T00:00:00Z', + 'window': '30s', + 'containers': [ + { + 'name': 'container1', + 'usage': {'cpu': '50m', 'memory': '512Mi'} + } + ] + } + ] + } + mock_api_instance.list_namespaced_custom_object.return_value = expected_response + + result = utils.get_pods_metrics(self.mock_api_client, 'default') + + mock_custom_api_class.assert_called_once_with(self.mock_api_client) + mock_api_instance.list_namespaced_custom_object.assert_called_once_with( + group='metrics.k8s.io', + version='v1beta1', + namespace='default', + plural='pods' + ) + self.assertEqual(result, expected_response) + self.assertEqual(len(result['items']), 1) + + @patch('kubernetes.utils.metrics.CustomObjectsApi') + def test_get_pods_metrics_with_label_selector(self, mock_custom_api_class): + """Test pod metrics retrieval with label selector""" + mock_api_instance = MagicMock() + mock_custom_api_class.return_value = mock_api_instance + + expected_response = {'kind': 'PodMetricsList', 'items': []} + mock_api_instance.list_namespaced_custom_object.return_value = expected_response + + utils.get_pods_metrics(self.mock_api_client, 'production', 'app=web') + + mock_api_instance.list_namespaced_custom_object.assert_called_once_with( + group='metrics.k8s.io', + version='v1beta1', + namespace='production', + plural='pods', + label_selector='app=web' + ) + + def test_get_pods_metrics_empty_namespace_raises_error(self): + """Test that empty namespace raises ValueError""" + with self.assertRaises(ValueError) as context: + utils.get_pods_metrics(self.mock_api_client, '') + + self.assertIn('namespace parameter is required', str(context.exception)) + + def test_get_pods_metrics_none_namespace_raises_error(self): + """Test that None namespace raises ValueError""" + with self.assertRaises(ValueError) as context: + utils.get_pods_metrics(self.mock_api_client, None) + + self.assertIn('namespace parameter is required', str(context.exception)) + + @patch('kubernetes.utils.metrics.get_pods_metrics') + def test_get_pods_metrics_in_all_namespaces_success(self, mock_get_pods): + """Test fetching metrics across multiple namespaces""" + mock_get_pods.side_effect = [ + {'kind': 'PodMetricsList', 'items': [{'metadata': {'name': 'pod1'}}]}, + {'kind': 'PodMetricsList', 'items': [{'metadata': {'name': 'pod2'}}]} + ] + + result = utils.get_pods_metrics_in_all_namespaces( + self.mock_api_client, + ['default', 'kube-system'] + ) + + self.assertEqual(len(result), 2) + self.assertIn('default', result) + self.assertIn('kube-system', result) + self.assertEqual(len(result['default']['items']), 1) + self.assertEqual(len(result['kube-system']['items']), 1) + + @patch('kubernetes.utils.metrics.get_pods_metrics') + def test_get_pods_metrics_in_all_namespaces_with_errors(self, mock_get_pods): + """Test multi-namespace query with partial failures""" + mock_get_pods.side_effect = [ + {'kind': 'PodMetricsList', 'items': []}, + Exception('Namespace not found') + ] + + result = utils.get_pods_metrics_in_all_namespaces( + self.mock_api_client, + ['default', 'invalid-ns'] + ) + + self.assertEqual(len(result), 2) + self.assertIn('default', result) + self.assertIn('invalid-ns', result) + self.assertEqual(result['default']['kind'], 'PodMetricsList') + self.assertEqual(result['invalid-ns']['kind'], 'Error') + self.assertIn('error', result['invalid-ns']) + + @patch('kubernetes.utils.metrics.get_pods_metrics') + def test_get_pods_metrics_in_all_namespaces_with_label_selector(self, mock_get_pods): + """Test multi-namespace query with label selector""" + mock_get_pods.return_value = {'kind': 'PodMetricsList', 'items': []} + + utils.get_pods_metrics_in_all_namespaces( + self.mock_api_client, + ['ns1', 'ns2'], + 'tier=frontend' + ) + + self.assertEqual(mock_get_pods.call_count, 2) + mock_get_pods.assert_any_call(self.mock_api_client, 'ns1', 'tier=frontend') + mock_get_pods.assert_any_call(self.mock_api_client, 'ns2', 'tier=frontend') + + +if __name__ == '__main__': + unittest.main() diff --git a/kubernetes/utils/__init__.py b/kubernetes/utils/__init__.py index 2cd0caa7c2..c83d54fe76 100644 --- a/kubernetes/utils/__init__.py +++ b/kubernetes/utils/__init__.py @@ -17,4 +17,6 @@ from .create_from_yaml import (FailToCreateError, create_from_dict, create_from_yaml, create_from_directory) from .quantity import parse_quantity -from. duration import parse_duration +from .duration import parse_duration +from .metrics import (get_nodes_metrics, get_pods_metrics, + get_pods_metrics_in_all_namespaces) diff --git a/kubernetes/utils/metrics.py b/kubernetes/utils/metrics.py new file mode 100644 index 0000000000..42d80f3ba9 --- /dev/null +++ b/kubernetes/utils/metrics.py @@ -0,0 +1,209 @@ +# Copyright 2024 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Metrics utilities for Kubernetes resource monitoring. + +Provides helpers for fetching and processing resource usage data from the +metrics.k8s.io API endpoint, enabling monitoring and autoscaling workflows. +""" + +from kubernetes.client.api.custom_objects_api import CustomObjectsApi + + +METRICS_API_GROUP = "metrics.k8s.io" +METRICS_API_VERSION = "v1beta1" + + +def get_nodes_metrics(api_client): + """ + Fetch current resource usage for all cluster nodes. + + Retrieves CPU and memory consumption metrics from the metrics-server + for every node in the cluster. + + Parameters: + api_client: An initialized kubernetes.client.ApiClient instance + + Returns: + A dictionary containing the metrics response with structure: + { + 'kind': 'NodeMetricsList', + 'apiVersion': 'metrics.k8s.io/v1beta1', + 'metadata': {...}, + 'items': [ + { + 'metadata': {'name': 'node-1', ...}, + 'timestamp': '2024-01-01T00:00:00Z', + 'window': '30s', + 'usage': {'cpu': '100m', 'memory': '1024Mi'} + }, + ... + ] + } + + Raises: + ApiException: If the metrics server is not available or request fails + + Example: + >>> from kubernetes import client, config + >>> config.load_kube_config() + >>> api_client = client.ApiClient() + >>> metrics = get_nodes_metrics(api_client) + >>> for node in metrics['items']: + ... name = node['metadata']['name'] + ... cpu = node['usage']['cpu'] + ... mem = node['usage']['memory'] + ... print(f"Node {name}: CPU={cpu}, Memory={mem}") + """ + api = CustomObjectsApi(api_client) + return api.list_cluster_custom_object( + group=METRICS_API_GROUP, + version=METRICS_API_VERSION, + plural="nodes" + ) + + +def get_pods_metrics(api_client, namespace, label_selector=None): + """ + Fetch current resource usage for pods in a namespace. + + Retrieves CPU and memory consumption metrics from the metrics-server + for pods in the specified namespace, with optional label filtering. + + Parameters: + api_client: An initialized kubernetes.client.ApiClient instance + namespace: The namespace name to query (required) + label_selector: Optional label query to filter pods (e.g., 'app=web,env=prod') + + Returns: + A dictionary containing the metrics response with structure: + { + 'kind': 'PodMetricsList', + 'apiVersion': 'metrics.k8s.io/v1beta1', + 'metadata': {...}, + 'items': [ + { + 'metadata': {'name': 'pod-1', 'namespace': 'default', ...}, + 'timestamp': '2024-01-01T00:00:00Z', + 'window': '30s', + 'containers': [ + { + 'name': 'container-1', + 'usage': {'cpu': '50m', 'memory': '512Mi'} + }, + ... + ] + }, + ... + ] + } + + Raises: + ValueError: If namespace is None or empty + ApiException: If the metrics server is not available or request fails + + Example: + >>> from kubernetes import client, config + >>> config.load_kube_config() + >>> api_client = client.ApiClient() + >>> + >>> # Get all pods in namespace + >>> metrics = get_pods_metrics(api_client, 'default') + >>> + >>> # Get pods with specific labels + >>> metrics = get_pods_metrics(api_client, 'default', 'app=nginx') + >>> + >>> for pod in metrics['items']: + ... pod_name = pod['metadata']['name'] + ... print(f"Pod: {pod_name}") + ... for container in pod['containers']: + ... cname = container['name'] + ... cpu = container['usage']['cpu'] + ... mem = container['usage']['memory'] + ... print(f" Container {cname}: CPU={cpu}, Memory={mem}") + """ + if not namespace: + raise ValueError("namespace parameter is required and cannot be empty") + + api = CustomObjectsApi(api_client) + + kwargs = { + "group": METRICS_API_GROUP, + "version": METRICS_API_VERSION, + "namespace": namespace, + "plural": "pods" + } + + if label_selector: + kwargs["label_selector"] = label_selector + + return api.list_namespaced_custom_object(**kwargs) + + +def get_pods_metrics_in_all_namespaces(api_client, namespaces, label_selector=None): + """ + Fetch pod metrics across multiple namespaces. + + Queries pod metrics in each specified namespace and returns an aggregated + result. If a namespace query fails, the error is captured in the result + rather than raising an exception. + + Parameters: + api_client: An initialized kubernetes.client.ApiClient instance + namespaces: A list of namespace names to query + label_selector: Optional label query applied to all namespaces + + Returns: + A dictionary mapping namespace names to their metrics or error info: + { + 'namespace-1': { + 'items': [...], + 'kind': 'PodMetricsList', + ... + }, + 'namespace-2': { + 'error': 'error message', + 'kind': 'Error' + }, + ... + } + + Example: + >>> from kubernetes import client, config + >>> config.load_kube_config() + >>> api_client = client.ApiClient() + >>> + >>> namespaces = ['default', 'kube-system', 'monitoring'] + >>> all_metrics = get_pods_metrics_in_all_namespaces(api_client, namespaces) + >>> + >>> for ns, result in all_metrics.items(): + ... if 'error' in result: + ... print(f"{ns}: ERROR - {result['error']}") + ... else: + ... pod_count = len(result.get('items', [])) + ... print(f"{ns}: {pod_count} pods") + """ + results = {} + + for ns in namespaces: + try: + results[ns] = get_pods_metrics(api_client, ns, label_selector) + except Exception as e: + results[ns] = { + 'kind': 'Error', + 'error': str(e) + } + + return results