Files
metalcheck-cli/cli/commands/k8s.py
2024-12-11 12:56:18 +00:00

57 lines
2.0 KiB
Python

import click
import requests
@click.group()
@click.pass_context
def kubernetes_nodes(ctx):
"""Commands for managing Kubernetes Nodes."""
pass
@kubernetes_nodes.command("list")
@click.pass_context
def list_command(ctx):
"""List all Kubernetes nodes."""
base_url = ctx.obj["BASE_URL"]
list_kubernetes_nodes(base_url)
@kubernetes_nodes.command("analyze")
@click.pass_context
def analyze_command(ctx):
"""Request an AI analysis of the Kubernetes cluster."""
base_url = ctx.obj["BASE_URL"]
analyze_kubernetes_cluster(base_url)
def list_kubernetes_nodes(base_url):
"""List all Kubernetes nodes."""
k8s_url = f"{base_url}/k8s/data"
try:
response = requests.get(k8s_url)
if response.status_code == 200:
nodes = response.json().get("nodes", [])
click.echo("\n📦 Kubernetes Nodes:")
for node in nodes:
click.echo(
f"Node Name: {node['node_name']}, CPU: {node['cpu']}, "
f"Memory: {node['memory']} MiB, Storage: {node['storage']}, "
f"Type: {node['instance_type']}, Max Pods: {node['pods_allocatable']}, "
f"Time on Duty: {node['time_on_duty']}"
)
else:
click.echo(f"Error: {response.status_code} - {response.text}")
except requests.RequestException as e:
click.echo(f"Error: {e}")
def analyze_kubernetes_cluster(base_url):
"""Request an AI analysis of the Kubernetes cluster."""
analyze_url = f"{base_url}/think/k8s"
try:
response = requests.get(analyze_url)
if response.status_code == 200:
summary = response.json().get("summary", "No analysis provided.")
click.echo("\n🤖 AI Analysis of Kubernetes Cluster:")
click.echo(summary)
else:
click.echo(f"Error: {response.status_code} - {response.text}")
except requests.RequestException as e:
click.echo(f"Error: {e}")