my-homelab-configs/apps/mlops-platform/app/main.py

206 lines
6.6 KiB
Python

import json
import math
import os
import time
from pathlib import Path
from typing import Any
from fastapi import FastAPI, HTTPException, Request, Response
from fastapi.middleware.cors import CORSMiddleware
from prometheus_client import CONTENT_TYPE_LATEST, Counter, Gauge, Histogram, generate_latest
from pydantic import BaseModel, ConfigDict, Field
MODEL_VERSION = os.getenv("MODEL_VERSION", "v1")
MODEL_TRACK = os.getenv("MODEL_TRACK", "blue")
MODEL_DIR = Path(os.getenv("MODEL_DIR", "/app/models"))
FALLBACK_MODEL_DIR = Path(__file__).resolve().parent.parent / "models"
REQUESTS = Counter(
"mlops_requests_total",
"HTTP requests handled by the inference service.",
("endpoint", "method", "status", "model_version", "track"),
)
REQUEST_LATENCY = Histogram(
"mlops_request_latency_seconds",
"HTTP request latency for the inference service.",
("endpoint", "method", "model_version", "track"),
buckets=(0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2, 5),
)
PREDICTION_LATENCY = Histogram(
"mlops_prediction_latency_seconds",
"Prediction execution latency.",
("model_version", "track"),
buckets=(0.001, 0.0025, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25),
)
PREDICTIONS = Counter(
"mlops_predictions_total",
"Predictions produced by model outcome.",
("model_version", "track", "outcome"),
)
ERRORS = Counter(
"mlops_prediction_errors_total",
"Prediction errors by reason.",
("model_version", "track", "reason"),
)
MODEL_INFO = Gauge(
"mlops_model_version_info",
"Active model information. Value is always 1 for the running model.",
("model_version", "track", "trained_with"),
)
CONFIDENCE = Gauge(
"mlops_model_confidence",
"Confidence from the most recent prediction.",
("model_version", "track"),
)
DRIFT = Gauge(
"mlops_model_drift_score",
"Feature drift score from the most recent prediction.",
("model_version", "track"),
)
class PredictRequest(BaseModel):
model_config = ConfigDict(extra="forbid")
latency_ms: float = Field(..., ge=0, le=60000)
error_rate: float = Field(..., ge=0, le=1)
cpu_utilization: float = Field(..., ge=0, le=1)
memory_utilization: float = Field(..., ge=0, le=1)
queue_depth: float = Field(..., ge=0, le=10000)
def load_model(version: str) -> dict[str, Any]:
model_path = MODEL_DIR / f"model_{version}.json"
if not model_path.exists():
model_path = FALLBACK_MODEL_DIR / f"model_{version}.json"
if not model_path.exists():
raise RuntimeError(f"model artifact not found for version {version}")
with model_path.open(encoding="utf-8") as handle:
model = json.load(handle)
required = {"version", "features", "weights", "bias", "threshold", "baseline", "trained_with"}
missing = required.difference(model)
if missing:
raise RuntimeError(f"model artifact is missing required keys: {', '.join(sorted(missing))}")
if len(model["features"]) != len(model["weights"]):
raise RuntimeError("model features and weights have different lengths")
return model
MODEL = load_model(MODEL_VERSION)
MODEL_INFO.labels(MODEL["version"], MODEL_TRACK, MODEL["trained_with"]).set(1)
app = FastAPI(
title="Homelab MLOps Inference Service",
version=MODEL["version"],
docs_url=None,
redoc_url=None,
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["GET", "POST"],
allow_headers=["*"],
)
@app.middleware("http")
async def record_http_metrics(request: Request, call_next: Any) -> Response:
start = time.perf_counter()
status = "500"
route = request.url.path
try:
response = await call_next(request)
status = str(response.status_code)
return response
finally:
endpoint = getattr(request.scope.get("route"), "path", route)
elapsed = time.perf_counter() - start
REQUESTS.labels(endpoint, request.method, status, MODEL["version"], MODEL_TRACK).inc()
REQUEST_LATENCY.labels(endpoint, request.method, MODEL["version"], MODEL_TRACK).observe(elapsed)
def normalized_features(features: dict[str, float]) -> list[float]:
values = []
baseline = MODEL["baseline"]
for feature in MODEL["features"]:
mean = baseline[feature]["mean"]
stddev = baseline[feature]["stddev"]
values.append((features[feature] - mean) / stddev)
return values
def logistic(value: float) -> float:
return 1 / (1 + math.exp(-value))
def score_prediction(features: dict[str, float]) -> float:
score = MODEL["bias"]
for weight, value in zip(MODEL["weights"], normalized_features(features), strict=True):
score += weight * value
return logistic(score)
def drift_score(features: dict[str, float]) -> float:
z_scores = [abs(value) for value in normalized_features(features)]
return min(sum(z_scores) / len(z_scores) / 3, 1)
@app.get("/")
def root() -> dict[str, Any]:
return {
"service": "homelab-mlops-platform",
"model_version": MODEL["version"],
"track": MODEL_TRACK,
"endpoints": ["/healthz", "/predict", "/metrics"],
}
@app.get("/healthz")
def healthz() -> dict[str, Any]:
return {
"status": "ok",
"model_version": MODEL["version"],
"track": MODEL_TRACK,
"trained_with": MODEL["trained_with"],
}
@app.post("/predict")
def predict(payload: PredictRequest) -> dict[str, Any]:
start = time.perf_counter()
try:
features = payload.model_dump()
probability = score_prediction(features)
drift = drift_score(features)
outcome = "at_risk" if probability >= MODEL["threshold"] else "healthy"
confidence = probability if outcome == "at_risk" else 1 - probability
PREDICTIONS.labels(MODEL["version"], MODEL_TRACK, outcome).inc()
CONFIDENCE.labels(MODEL["version"], MODEL_TRACK).set(confidence)
DRIFT.labels(MODEL["version"], MODEL_TRACK).set(drift)
PREDICTION_LATENCY.labels(MODEL["version"], MODEL_TRACK).observe(time.perf_counter() - start)
return {
"model_version": MODEL["version"],
"track": MODEL_TRACK,
"outcome": outcome,
"risk_probability": round(probability, 6),
"confidence": round(confidence, 6),
"drift_score": round(drift, 6),
"threshold": MODEL["threshold"],
"features": features,
}
except Exception as exc:
ERRORS.labels(MODEL["version"], MODEL_TRACK, "prediction_failure").inc()
raise HTTPException(status_code=500, detail="prediction failed") from exc
@app.get("/metrics")
def metrics() -> Response:
return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)