Files
metalcheck-cli/cli/commands/k8s.py
2024-12-11 12:14:25 +00:00

58 lines
1.9 KiB
Python

import click
import requests
BASE_URL = "http://localhost:8000/k8s" # Backend URL for Kubernetes API
@click.group()
def kubernetes_nodes():
"""Commands for managing Kubernetes Nodes."""
pass
@kubernetes_nodes.command("list")
def list_command():
handle_command("list")
@kubernetes_nodes.command("think")
def think_command():
handle_command("think")
def handle_command(action):
"""Handle commands related to Kubernetes nodes."""
if action == "list":
list_kubernetes_nodes()
elif action == "analyze":
analyze_kubernetes_cluster()
def list_kubernetes_nodes():
"""List all Kubernetes nodes."""
try:
response = requests.get(f"{BASE_URL}/data")
if response.status_code == 200:
nodes = response.json().get("nodes", [])
click.echo("\n📦 Kubernetes Nodes:")
for node in nodes:
click.echo(
f"Node Name: {node['node_name']}, CPU: {node['cpu']}, "
f"Memory: {node['memory']} MiB, Storage: {node['storage']}, "
f"Type: {node['instance_type']}, Max Pods: {node['pods_allocatable']}, "
f"Time on Duty: {node['time_on_duty']}"
)
else:
click.echo(f"Error: {response.status_code} - {response.text}")
except requests.RequestException as e:
click.echo(f"Error: {e}")
def analyze_kubernetes_cluster():
"""Request an AI analysis of the Kubernetes cluster."""
try:
response = requests.get(f"{BASE_URL}/think/k8s")
if response.status_code == 200:
summary = response.json().get("summary", "No analysis provided.")
click.echo("\n🤖 AI Analysis of Kubernetes Cluster:")
click.echo(summary)
else:
click.echo(f"Error: {response.status_code} - {response.text}")
except requests.RequestException as e:
click.echo(f"Error: {e}")