update(vm): remove pseudographic and add hetzner api calls

This commit is contained in:
Aleksandr Tcitlionok
2024-12-09 03:06:37 +00:00
parent 5d7ec5c363
commit 24ceca9e4f
3 changed files with 49 additions and 238 deletions

View File

@@ -1,7 +1,7 @@
# MetalCheck Backend
MetalCheck is a backend service that provides real-time insights into virtual machines, physical nodes, and Kubernetes clusters.
It supports deployment in an EKS environment and offers features like data aggregation, export, and pseudographics visualization.
It supports deployment in an EKS environment and offers features like data aggregation and export.
---
@@ -10,22 +10,17 @@ It supports deployment in an EKS environment and offers features like data aggre
- **Virtual Machines**: Monitor VMs from cloud providers like Hetzner.
- **Kubernetes Clusters**: Query Kubernetes clusters to gather node and namespace data.
- **Data Export**: Export collected data in JSON or YAML format.
- **Pseudographics**: Terminal-based data visualization (optional).
---
## Project Structure
## Project Structure
```plaintext
metal-check-backend/
├── app/
│ ├── __init__.py # Initialization
│ ├── main.py # FastAPI entry point
│ ├── database.py # SQLite DB setup and operations
│ ├── extras/
│ │ ├── pseudographics.py # CLI visualization tools
│ ├── routes/
│ │ ├── __init__.py # Initialization for routes
│ │ ├── metal.py # Routes for metal nodes

View File

@@ -1,231 +0,0 @@
from rich.console import Console
from rich.table import Table
from rich.progress import Progress
from database import fetch_all
from kubernetes import client, config
import requests
from datetime import datetime, timezone
# Define constants
THINK_K8S_URL = "http://localhost:8000/think/k8s"
console = Console()
# Helper functions for conversions
def calculate_time_on_duty(creation_timestamp):
"""
Calculate the time on duty in hours, days, or minutes from the creation timestamp.
"""
now = datetime.now(timezone.utc)
delta = now - creation_timestamp
if delta.days < 1 and delta.seconds < 3600:
minutes = delta.seconds // 60
return f"{minutes} minutes" if minutes > 1 else "less than a minute"
if delta.days < 1:
hours = delta.seconds // 3600
return f"{hours} hours" if hours > 1 else "1 hour"
return f"{delta.days} days" if delta.days > 1 else "1 day"
def calculate_time_on_duty_hours(hours):
"""
Convert hours into a human-readable time format (days, hours, or minutes).
"""
try:
# Ensure hours is a float
hours = float(hours)
except (ValueError, TypeError):
return "Invalid time data"
if hours < 1:
minutes = round(hours * 60)
return f"{minutes} minutes" if minutes > 1 else "less than a minute"
elif hours < 24:
return f"{round(hours)} hours" if hours > 1 else "1 hour"
else:
days = hours // 24
remaining_hours = hours % 24
if remaining_hours:
return f"{int(days)} days {int(remaining_hours)} hours"
return f"{int(days)} days"
def convert_cpu_to_cores(cpu):
if "n" in cpu:
return round(int(cpu.replace("n", "")) / 1e9, 4)
elif "u" in cpu:
return round(int(cpu.replace("u", "")) / 1e6, 4)
elif "m" in cpu:
return round(int(cpu.replace("m", "")) / 1000, 4)
return float(cpu)
def convert_memory_to_mib(memory):
if "Ki" in memory:
return int(memory.replace("Ki", "")) / 1024
elif "Mi" in memory:
return int(memory.replace("Mi", ""))
elif "Gi" in memory:
return int(memory.replace("Gi", "")) * 1024
return float(memory)
def convert_memory_to_gb(memory):
if "Ki" in memory:
return int(memory.replace("Ki", "")) / (1024 ** 2)
elif "Mi" in memory:
return int(memory.replace("Mi", "")) / 1024
elif "Gi" in memory:
return int(memory.replace("Gi", ""))
return float(memory)
# Display tables
def display_metal_nodes():
table = Table(title="🖥️ Metal Nodes", style="bold green")
table.add_column("ID", justify="right", style="cyan")
table.add_column("Name", style="magenta")
table.add_column("Location", style="white")
table.add_column("Vendor", style="green")
table.add_column("CPU", justify="right", style="yellow")
table.add_column("Memory (GB)", justify="right", style="cyan")
table.add_column("Storage", style="magenta")
table.add_column("Time on Duty", justify="right", style="magenta")
nodes = fetch_all("metal_nodes")
for node in nodes:
# Convert time_on_duty to a human-readable format
time_on_duty = calculate_time_on_duty_hours(node[7]) # Ensure index 7 corresponds to time_on_duty
table.add_row(
f"{node[0]}",
node[1],
node[2],
node[3],
f"{node[4]}",
node[5],
node[6],
time_on_duty
)
console.print(table)
def display_virtual_machines():
table = Table(title="💻 Virtual Machines", style="bold blue")
table.add_column("ID", justify="right", style="cyan")
table.add_column("Name", style="magenta")
table.add_column("Location", style="white")
table.add_column("CPU", justify="right", style="yellow")
table.add_column("Memory (GB)", justify="right", style="cyan")
table.add_column("Storage", style="magenta")
table.add_column("Type", style="green")
vms = fetch_all("virtual_machines")
for vm in vms:
table.add_row(
f"{vm[0]}",
vm[1],
vm[2],
f"{vm[3]}",
vm[4],
vm[5],
vm[6]
)
console.print(table)
def display_kubernetes_nodes():
config.load_incluster_config()
v1 = client.CoreV1Api()
table = Table(title="📦 Kubernetes Nodes", style="bold yellow")
table.add_column("Node Name", style="white")
table.add_column("CPU", justify="right", style="yellow")
table.add_column("Memory (MiB)", justify="right", style="cyan")
table.add_column("Storage (GB)", justify="right", style="green")
table.add_column("Type", style="blue")
table.add_column("Max Pods", justify="right", style="magenta")
table.add_column("Time on Duty", justify="right", style="magenta")
nodes = v1.list_node()
for node in nodes.items:
ephemeral_storage = node.status.capacity.get("ephemeral-storage", "0")
instance_type = node.metadata.labels.get("beta.kubernetes.io/instance-type", "N/A")
creation_timestamp = node.metadata.creation_timestamp
table.add_row(
node.metadata.name,
node.status.capacity.get("cpu"),
f"{round(convert_memory_to_mib(node.status.capacity.get('memory')), 2)}",
f"{round(convert_memory_to_gb(ephemeral_storage), 2)}",
instance_type,
node.status.allocatable.get("pods"),
calculate_time_on_duty(creation_timestamp) if creation_timestamp else "N/A"
)
console.print(table)
# Fetch and display AI summary
def fetch_ai_summary():
with Progress() as progress:
task = progress.add_task("[cyan]Thinking about Kubernetes...", total=100)
try:
for _ in range(10): # Simulate progress
progress.update(task, advance=10)
import time; time.sleep(0.1)
response = requests.get(THINK_K8S_URL)
progress.update(task, completed=100)
if response.status_code == 200:
data = response.json()
return data.get("summary", "No summary provided.")
else:
return f"Failed to fetch summary: {response.status_code} {response.text}"
except requests.RequestException as e:
return f"An error occurred while fetching the summary: {str(e)}"
def display_namespace_usage():
config.load_incluster_config()
metrics_client = client.CustomObjectsApi()
table = Table(title="📊 Namespace Resource Usage", style="bold magenta")
table.add_column("Namespace", style="white")
table.add_column("CPU (Cores)", justify="right", style="yellow")
table.add_column("Memory (MiB)", justify="right", style="cyan")
namespace_usage = {}
pod_metrics = metrics_client.list_cluster_custom_object(
group="metrics.k8s.io", version="v1beta1", plural="pods"
)
for pod in pod_metrics["items"]:
namespace = pod["metadata"]["namespace"]
if namespace not in namespace_usage:
namespace_usage[namespace] = {"cpu": 0, "memory": 0}
for container in pod["containers"]:
cpu_usage = container["usage"]["cpu"]
memory_usage = container["usage"]["memory"]
namespace_usage[namespace]["cpu"] += convert_cpu_to_cores(cpu_usage)
namespace_usage[namespace]["memory"] += convert_memory_to_mib(memory_usage)
for namespace, usage in namespace_usage.items():
table.add_row(
namespace,
f"{round(usage['cpu'], 4)}",
f"{round(usage['memory'], 2)}"
)
console.print(table)
def display_ai_summary():
summary = fetch_ai_summary()
console.print("\n[bold magenta]AI Summary of Kubernetes Cluster:[/bold magenta]")
console.print(f"[green]{summary}[/green]\n")
if __name__ == "__main__":
console.print("✨ [bold green]Welcome to the Metal Check Dashboard![/bold green] ✨\n")
display_metal_nodes()
display_virtual_machines()
display_kubernetes_nodes()
display_namespace_usage()
display_ai_summary()

View File

@@ -1,6 +1,8 @@
from fastapi import APIRouter
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from database import insert_virtual_machine, fetch_all
import requests
from datetime import datetime
router = APIRouter()
@@ -27,3 +29,48 @@ def add_vm_data(vm: VirtualMachine):
vm_type=vm.type
)
return {"message": f"Virtual machine '{vm.name}' added successfully."}
@router.post("/vm/import-hetzner")
def import_hetzner_vms(api_token: str):
"""
Import virtual machines from Hetzner Cloud.
"""
hetzner_url = "https://api.hetzner.cloud/v1/servers"
headers = {
"Authorization": f"Bearer {api_token}"
}
try:
response = requests.get(hetzner_url, headers=headers)
response.raise_for_status()
except requests.RequestException as e:
raise HTTPException(status_code=500, detail=f"Failed to fetch data from Hetzner API: {e}")
servers = response.json().get("servers", [])
if not servers:
return {"message": "No servers found on Hetzner Cloud."}
for server in servers:
try:
name = server["name"]
location = server["datacenter"]["location"]["name"]
cpu = server["server_type"]["cores"]
memory = f"{server['server_type']['memory']} GB"
storage = f"{server['server_type']['disk']} GB"
vm_type = server["server_type"]["name"]
created_at = datetime.fromisoformat(server["created"].rstrip("Z"))
time_on_duty = round((datetime.utcnow() - created_at).total_seconds() / 3600, 2) # Convert to hours
insert_virtual_machine(
name=name,
location=location,
cpu=cpu,
memory=memory,
storage=storage,
vm_type=vm_type
)
except Exception as e:
# Log or handle individual server import errors gracefully
print(f"Failed to import server {server['name']}: {e}")
return {"message": f"Imported {len(servers)} virtual machines from Hetzner."}