206 lines
6.6 KiB
Python
206 lines
6.6 KiB
Python
import json
|
|
import math
|
|
import os
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from fastapi import FastAPI, HTTPException, Request, Response
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from prometheus_client import CONTENT_TYPE_LATEST, Counter, Gauge, Histogram, generate_latest
|
|
from pydantic import BaseModel, ConfigDict, Field
|
|
|
|
MODEL_VERSION = os.getenv("MODEL_VERSION", "v1")
|
|
MODEL_TRACK = os.getenv("MODEL_TRACK", "blue")
|
|
MODEL_DIR = Path(os.getenv("MODEL_DIR", "/app/models"))
|
|
FALLBACK_MODEL_DIR = Path(__file__).resolve().parent.parent / "models"
|
|
|
|
REQUESTS = Counter(
|
|
"mlops_requests_total",
|
|
"HTTP requests handled by the inference service.",
|
|
("endpoint", "method", "status", "model_version", "track"),
|
|
)
|
|
REQUEST_LATENCY = Histogram(
|
|
"mlops_request_latency_seconds",
|
|
"HTTP request latency for the inference service.",
|
|
("endpoint", "method", "model_version", "track"),
|
|
buckets=(0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2, 5),
|
|
)
|
|
PREDICTION_LATENCY = Histogram(
|
|
"mlops_prediction_latency_seconds",
|
|
"Prediction execution latency.",
|
|
("model_version", "track"),
|
|
buckets=(0.001, 0.0025, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25),
|
|
)
|
|
PREDICTIONS = Counter(
|
|
"mlops_predictions_total",
|
|
"Predictions produced by model outcome.",
|
|
("model_version", "track", "outcome"),
|
|
)
|
|
ERRORS = Counter(
|
|
"mlops_prediction_errors_total",
|
|
"Prediction errors by reason.",
|
|
("model_version", "track", "reason"),
|
|
)
|
|
MODEL_INFO = Gauge(
|
|
"mlops_model_version_info",
|
|
"Active model information. Value is always 1 for the running model.",
|
|
("model_version", "track", "trained_with"),
|
|
)
|
|
CONFIDENCE = Gauge(
|
|
"mlops_model_confidence",
|
|
"Confidence from the most recent prediction.",
|
|
("model_version", "track"),
|
|
)
|
|
DRIFT = Gauge(
|
|
"mlops_model_drift_score",
|
|
"Feature drift score from the most recent prediction.",
|
|
("model_version", "track"),
|
|
)
|
|
|
|
|
|
class PredictRequest(BaseModel):
|
|
model_config = ConfigDict(extra="forbid")
|
|
|
|
latency_ms: float = Field(..., ge=0, le=60000)
|
|
error_rate: float = Field(..., ge=0, le=1)
|
|
cpu_utilization: float = Field(..., ge=0, le=1)
|
|
memory_utilization: float = Field(..., ge=0, le=1)
|
|
queue_depth: float = Field(..., ge=0, le=10000)
|
|
|
|
|
|
def load_model(version: str) -> dict[str, Any]:
|
|
model_path = MODEL_DIR / f"model_{version}.json"
|
|
if not model_path.exists():
|
|
model_path = FALLBACK_MODEL_DIR / f"model_{version}.json"
|
|
|
|
if not model_path.exists():
|
|
raise RuntimeError(f"model artifact not found for version {version}")
|
|
|
|
with model_path.open(encoding="utf-8") as handle:
|
|
model = json.load(handle)
|
|
|
|
required = {"version", "features", "weights", "bias", "threshold", "baseline", "trained_with"}
|
|
missing = required.difference(model)
|
|
if missing:
|
|
raise RuntimeError(f"model artifact is missing required keys: {', '.join(sorted(missing))}")
|
|
|
|
if len(model["features"]) != len(model["weights"]):
|
|
raise RuntimeError("model features and weights have different lengths")
|
|
|
|
return model
|
|
|
|
|
|
MODEL = load_model(MODEL_VERSION)
|
|
MODEL_INFO.labels(MODEL["version"], MODEL_TRACK, MODEL["trained_with"]).set(1)
|
|
|
|
app = FastAPI(
|
|
title="Homelab MLOps Inference Service",
|
|
version=MODEL["version"],
|
|
docs_url=None,
|
|
redoc_url=None,
|
|
)
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_methods=["GET", "POST"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
|
|
@app.middleware("http")
|
|
async def record_http_metrics(request: Request, call_next: Any) -> Response:
|
|
start = time.perf_counter()
|
|
status = "500"
|
|
route = request.url.path
|
|
try:
|
|
response = await call_next(request)
|
|
status = str(response.status_code)
|
|
return response
|
|
finally:
|
|
endpoint = getattr(request.scope.get("route"), "path", route)
|
|
elapsed = time.perf_counter() - start
|
|
REQUESTS.labels(endpoint, request.method, status, MODEL["version"], MODEL_TRACK).inc()
|
|
REQUEST_LATENCY.labels(endpoint, request.method, MODEL["version"], MODEL_TRACK).observe(elapsed)
|
|
|
|
|
|
def normalized_features(features: dict[str, float]) -> list[float]:
|
|
values = []
|
|
baseline = MODEL["baseline"]
|
|
for feature in MODEL["features"]:
|
|
mean = baseline[feature]["mean"]
|
|
stddev = baseline[feature]["stddev"]
|
|
values.append((features[feature] - mean) / stddev)
|
|
return values
|
|
|
|
|
|
def logistic(value: float) -> float:
|
|
return 1 / (1 + math.exp(-value))
|
|
|
|
|
|
def score_prediction(features: dict[str, float]) -> float:
|
|
score = MODEL["bias"]
|
|
for weight, value in zip(MODEL["weights"], normalized_features(features), strict=True):
|
|
score += weight * value
|
|
return logistic(score)
|
|
|
|
|
|
def drift_score(features: dict[str, float]) -> float:
|
|
z_scores = [abs(value) for value in normalized_features(features)]
|
|
return min(sum(z_scores) / len(z_scores) / 3, 1)
|
|
|
|
|
|
@app.get("/")
|
|
def root() -> dict[str, Any]:
|
|
return {
|
|
"service": "homelab-mlops-platform",
|
|
"model_version": MODEL["version"],
|
|
"track": MODEL_TRACK,
|
|
"endpoints": ["/healthz", "/predict", "/metrics"],
|
|
}
|
|
|
|
|
|
@app.get("/healthz")
|
|
def healthz() -> dict[str, Any]:
|
|
return {
|
|
"status": "ok",
|
|
"model_version": MODEL["version"],
|
|
"track": MODEL_TRACK,
|
|
"trained_with": MODEL["trained_with"],
|
|
}
|
|
|
|
|
|
@app.post("/predict")
|
|
def predict(payload: PredictRequest) -> dict[str, Any]:
|
|
start = time.perf_counter()
|
|
try:
|
|
features = payload.model_dump()
|
|
probability = score_prediction(features)
|
|
drift = drift_score(features)
|
|
outcome = "at_risk" if probability >= MODEL["threshold"] else "healthy"
|
|
confidence = probability if outcome == "at_risk" else 1 - probability
|
|
|
|
PREDICTIONS.labels(MODEL["version"], MODEL_TRACK, outcome).inc()
|
|
CONFIDENCE.labels(MODEL["version"], MODEL_TRACK).set(confidence)
|
|
DRIFT.labels(MODEL["version"], MODEL_TRACK).set(drift)
|
|
PREDICTION_LATENCY.labels(MODEL["version"], MODEL_TRACK).observe(time.perf_counter() - start)
|
|
|
|
return {
|
|
"model_version": MODEL["version"],
|
|
"track": MODEL_TRACK,
|
|
"outcome": outcome,
|
|
"risk_probability": round(probability, 6),
|
|
"confidence": round(confidence, 6),
|
|
"drift_score": round(drift, 6),
|
|
"threshold": MODEL["threshold"],
|
|
"features": features,
|
|
}
|
|
except Exception as exc:
|
|
ERRORS.labels(MODEL["version"], MODEL_TRACK, "prediction_failure").inc()
|
|
raise HTTPException(status_code=500, detail="prediction failed") from exc
|
|
|
|
|
|
@app.get("/metrics")
|
|
def metrics() -> Response:
|
|
return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)
|