import json import math import os import time from pathlib import Path from typing import Any from fastapi import FastAPI, HTTPException, Request, Response from fastapi.middleware.cors import CORSMiddleware from prometheus_client import CONTENT_TYPE_LATEST, Counter, Gauge, Histogram, generate_latest from pydantic import BaseModel, ConfigDict, Field MODEL_VERSION = os.getenv("MODEL_VERSION", "v1") MODEL_TRACK = os.getenv("MODEL_TRACK", "blue") MODEL_DIR = Path(os.getenv("MODEL_DIR", "/app/models")) FALLBACK_MODEL_DIR = Path(__file__).resolve().parent.parent / "models" REQUESTS = Counter( "mlops_requests_total", "HTTP requests handled by the inference service.", ("endpoint", "method", "status", "model_version", "track"), ) REQUEST_LATENCY = Histogram( "mlops_request_latency_seconds", "HTTP request latency for the inference service.", ("endpoint", "method", "model_version", "track"), buckets=(0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2, 5), ) PREDICTION_LATENCY = Histogram( "mlops_prediction_latency_seconds", "Prediction execution latency.", ("model_version", "track"), buckets=(0.001, 0.0025, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25), ) PREDICTIONS = Counter( "mlops_predictions_total", "Predictions produced by model outcome.", ("model_version", "track", "outcome"), ) ERRORS = Counter( "mlops_prediction_errors_total", "Prediction errors by reason.", ("model_version", "track", "reason"), ) MODEL_INFO = Gauge( "mlops_model_version_info", "Active model information. Value is always 1 for the running model.", ("model_version", "track", "trained_with"), ) CONFIDENCE = Gauge( "mlops_model_confidence", "Confidence from the most recent prediction.", ("model_version", "track"), ) DRIFT = Gauge( "mlops_model_drift_score", "Feature drift score from the most recent prediction.", ("model_version", "track"), ) class PredictRequest(BaseModel): model_config = ConfigDict(extra="forbid") latency_ms: float = Field(..., ge=0, le=60000) error_rate: float = Field(..., ge=0, le=1) cpu_utilization: float = Field(..., ge=0, le=1) memory_utilization: float = Field(..., ge=0, le=1) queue_depth: float = Field(..., ge=0, le=10000) def load_model(version: str) -> dict[str, Any]: model_path = MODEL_DIR / f"model_{version}.json" if not model_path.exists(): model_path = FALLBACK_MODEL_DIR / f"model_{version}.json" if not model_path.exists(): raise RuntimeError(f"model artifact not found for version {version}") with model_path.open(encoding="utf-8") as handle: model = json.load(handle) required = {"version", "features", "weights", "bias", "threshold", "baseline", "trained_with"} missing = required.difference(model) if missing: raise RuntimeError(f"model artifact is missing required keys: {', '.join(sorted(missing))}") if len(model["features"]) != len(model["weights"]): raise RuntimeError("model features and weights have different lengths") return model MODEL = load_model(MODEL_VERSION) MODEL_INFO.labels(MODEL["version"], MODEL_TRACK, MODEL["trained_with"]).set(1) app = FastAPI( title="Homelab MLOps Inference Service", version=MODEL["version"], docs_url=None, redoc_url=None, ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["GET", "POST"], allow_headers=["*"], ) @app.middleware("http") async def record_http_metrics(request: Request, call_next: Any) -> Response: start = time.perf_counter() status = "500" route = request.url.path try: response = await call_next(request) status = str(response.status_code) return response finally: endpoint = getattr(request.scope.get("route"), "path", route) elapsed = time.perf_counter() - start REQUESTS.labels(endpoint, request.method, status, MODEL["version"], MODEL_TRACK).inc() REQUEST_LATENCY.labels(endpoint, request.method, MODEL["version"], MODEL_TRACK).observe(elapsed) def normalized_features(features: dict[str, float]) -> list[float]: values = [] baseline = MODEL["baseline"] for feature in MODEL["features"]: mean = baseline[feature]["mean"] stddev = baseline[feature]["stddev"] values.append((features[feature] - mean) / stddev) return values def logistic(value: float) -> float: return 1 / (1 + math.exp(-value)) def score_prediction(features: dict[str, float]) -> float: score = MODEL["bias"] for weight, value in zip(MODEL["weights"], normalized_features(features), strict=True): score += weight * value return logistic(score) def drift_score(features: dict[str, float]) -> float: z_scores = [abs(value) for value in normalized_features(features)] return min(sum(z_scores) / len(z_scores) / 3, 1) @app.get("/") def root() -> dict[str, Any]: return { "service": "homelab-mlops-platform", "model_version": MODEL["version"], "track": MODEL_TRACK, "endpoints": ["/healthz", "/predict", "/metrics"], } @app.get("/healthz") def healthz() -> dict[str, Any]: return { "status": "ok", "model_version": MODEL["version"], "track": MODEL_TRACK, "trained_with": MODEL["trained_with"], } @app.post("/predict") def predict(payload: PredictRequest) -> dict[str, Any]: start = time.perf_counter() try: features = payload.model_dump() probability = score_prediction(features) drift = drift_score(features) outcome = "at_risk" if probability >= MODEL["threshold"] else "healthy" confidence = probability if outcome == "at_risk" else 1 - probability PREDICTIONS.labels(MODEL["version"], MODEL_TRACK, outcome).inc() CONFIDENCE.labels(MODEL["version"], MODEL_TRACK).set(confidence) DRIFT.labels(MODEL["version"], MODEL_TRACK).set(drift) PREDICTION_LATENCY.labels(MODEL["version"], MODEL_TRACK).observe(time.perf_counter() - start) return { "model_version": MODEL["version"], "track": MODEL_TRACK, "outcome": outcome, "risk_probability": round(probability, 6), "confidence": round(confidence, 6), "drift_score": round(drift, 6), "threshold": MODEL["threshold"], "features": features, } except Exception as exc: ERRORS.labels(MODEL["version"], MODEL_TRACK, "prediction_failure").inc() raise HTTPException(status_code=500, detail="prediction failed") from exc @app.get("/metrics") def metrics() -> Response: return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)