diff --git a/apps/mlops-platform/Dockerfile b/apps/mlops-platform/Dockerfile new file mode 100644 index 0000000..ca83f3c --- /dev/null +++ b/apps/mlops-platform/Dockerfile @@ -0,0 +1,25 @@ +FROM python:3.12-slim + +ENV PYTHONDONTWRITEBYTECODE=1 \ + PYTHONUNBUFFERED=1 \ + MODEL_DIR=/app/models \ + MODEL_VERSION=v1 \ + MODEL_TRACK=blue + +WORKDIR /app + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY app ./app +COPY models ./models + +RUN groupadd --system --gid 10001 mlops \ + && useradd --system --uid 10001 --gid mlops --home /app --shell /usr/sbin/nologin mlops \ + && chown -R mlops:mlops /app + +USER 10001:10001 + +EXPOSE 8080 + +CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8080"] diff --git a/apps/mlops-platform/README.md b/apps/mlops-platform/README.md new file mode 100644 index 0000000..37a416f --- /dev/null +++ b/apps/mlops-platform/README.md @@ -0,0 +1,15 @@ +# MLOps Platform Demo + +Production-shaped inference demo for the portfolio site. The model is intentionally small: logistic regression coefficients trained with scikit-learn and exported to JSON so the runtime stays light enough for the homelab. + +## Endpoints + +- `GET /healthz` reports service, track, and active model metadata. +- `POST /predict` scores service health risk from latency, error rate, CPU, memory, and queue depth. +- `GET /metrics` exposes Prometheus metrics for request count, latency, errors, model version, confidence, and drift score. + +## Model Rollout + +- `MODEL_VERSION=v1`, `MODEL_TRACK=blue` is the stable route. +- `MODEL_VERSION=v2`, `MODEL_TRACK=green` is the canary route. +- Kubernetes service selectors choose the active track, so rollback is a service selector change instead of an image rebuild. diff --git a/apps/mlops-platform/app/main.py b/apps/mlops-platform/app/main.py new file mode 100644 index 0000000..09a02ee --- /dev/null +++ b/apps/mlops-platform/app/main.py @@ -0,0 +1,205 @@ +import json +import math +import os +import time +from pathlib import Path +from typing import Any + +from fastapi import FastAPI, HTTPException, Request, Response +from fastapi.middleware.cors import CORSMiddleware +from prometheus_client import CONTENT_TYPE_LATEST, Counter, Gauge, Histogram, generate_latest +from pydantic import BaseModel, ConfigDict, Field + +MODEL_VERSION = os.getenv("MODEL_VERSION", "v1") +MODEL_TRACK = os.getenv("MODEL_TRACK", "blue") +MODEL_DIR = Path(os.getenv("MODEL_DIR", "/app/models")) +FALLBACK_MODEL_DIR = Path(__file__).resolve().parent.parent / "models" + +REQUESTS = Counter( + "mlops_requests_total", + "HTTP requests handled by the inference service.", + ("endpoint", "method", "status", "model_version", "track"), +) +REQUEST_LATENCY = Histogram( + "mlops_request_latency_seconds", + "HTTP request latency for the inference service.", + ("endpoint", "method", "model_version", "track"), + buckets=(0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2, 5), +) +PREDICTION_LATENCY = Histogram( + "mlops_prediction_latency_seconds", + "Prediction execution latency.", + ("model_version", "track"), + buckets=(0.001, 0.0025, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25), +) +PREDICTIONS = Counter( + "mlops_predictions_total", + "Predictions produced by model outcome.", + ("model_version", "track", "outcome"), +) +ERRORS = Counter( + "mlops_prediction_errors_total", + "Prediction errors by reason.", + ("model_version", "track", "reason"), +) +MODEL_INFO = Gauge( + "mlops_model_version_info", + "Active model information. Value is always 1 for the running model.", + ("model_version", "track", "trained_with"), +) +CONFIDENCE = Gauge( + "mlops_model_confidence", + "Confidence from the most recent prediction.", + ("model_version", "track"), +) +DRIFT = Gauge( + "mlops_model_drift_score", + "Feature drift score from the most recent prediction.", + ("model_version", "track"), +) + + +class PredictRequest(BaseModel): + model_config = ConfigDict(extra="forbid") + + latency_ms: float = Field(..., ge=0, le=60000) + error_rate: float = Field(..., ge=0, le=1) + cpu_utilization: float = Field(..., ge=0, le=1) + memory_utilization: float = Field(..., ge=0, le=1) + queue_depth: float = Field(..., ge=0, le=10000) + + +def load_model(version: str) -> dict[str, Any]: + model_path = MODEL_DIR / f"model_{version}.json" + if not model_path.exists(): + model_path = FALLBACK_MODEL_DIR / f"model_{version}.json" + + if not model_path.exists(): + raise RuntimeError(f"model artifact not found for version {version}") + + with model_path.open(encoding="utf-8") as handle: + model = json.load(handle) + + required = {"version", "features", "weights", "bias", "threshold", "baseline", "trained_with"} + missing = required.difference(model) + if missing: + raise RuntimeError(f"model artifact is missing required keys: {', '.join(sorted(missing))}") + + if len(model["features"]) != len(model["weights"]): + raise RuntimeError("model features and weights have different lengths") + + return model + + +MODEL = load_model(MODEL_VERSION) +MODEL_INFO.labels(MODEL["version"], MODEL_TRACK, MODEL["trained_with"]).set(1) + +app = FastAPI( + title="Homelab MLOps Inference Service", + version=MODEL["version"], + docs_url=None, + redoc_url=None, +) +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_methods=["GET", "POST"], + allow_headers=["*"], +) + + +@app.middleware("http") +async def record_http_metrics(request: Request, call_next: Any) -> Response: + start = time.perf_counter() + status = "500" + route = request.url.path + try: + response = await call_next(request) + status = str(response.status_code) + return response + finally: + endpoint = getattr(request.scope.get("route"), "path", route) + elapsed = time.perf_counter() - start + REQUESTS.labels(endpoint, request.method, status, MODEL["version"], MODEL_TRACK).inc() + REQUEST_LATENCY.labels(endpoint, request.method, MODEL["version"], MODEL_TRACK).observe(elapsed) + + +def normalized_features(features: dict[str, float]) -> list[float]: + values = [] + baseline = MODEL["baseline"] + for feature in MODEL["features"]: + mean = baseline[feature]["mean"] + stddev = baseline[feature]["stddev"] + values.append((features[feature] - mean) / stddev) + return values + + +def logistic(value: float) -> float: + return 1 / (1 + math.exp(-value)) + + +def score_prediction(features: dict[str, float]) -> float: + score = MODEL["bias"] + for weight, value in zip(MODEL["weights"], normalized_features(features), strict=True): + score += weight * value + return logistic(score) + + +def drift_score(features: dict[str, float]) -> float: + z_scores = [abs(value) for value in normalized_features(features)] + return min(sum(z_scores) / len(z_scores) / 3, 1) + + +@app.get("/") +def root() -> dict[str, Any]: + return { + "service": "homelab-mlops-platform", + "model_version": MODEL["version"], + "track": MODEL_TRACK, + "endpoints": ["/healthz", "/predict", "/metrics"], + } + + +@app.get("/healthz") +def healthz() -> dict[str, Any]: + return { + "status": "ok", + "model_version": MODEL["version"], + "track": MODEL_TRACK, + "trained_with": MODEL["trained_with"], + } + + +@app.post("/predict") +def predict(payload: PredictRequest) -> dict[str, Any]: + start = time.perf_counter() + try: + features = payload.model_dump() + probability = score_prediction(features) + drift = drift_score(features) + outcome = "at_risk" if probability >= MODEL["threshold"] else "healthy" + confidence = probability if outcome == "at_risk" else 1 - probability + + PREDICTIONS.labels(MODEL["version"], MODEL_TRACK, outcome).inc() + CONFIDENCE.labels(MODEL["version"], MODEL_TRACK).set(confidence) + DRIFT.labels(MODEL["version"], MODEL_TRACK).set(drift) + PREDICTION_LATENCY.labels(MODEL["version"], MODEL_TRACK).observe(time.perf_counter() - start) + + return { + "model_version": MODEL["version"], + "track": MODEL_TRACK, + "outcome": outcome, + "risk_probability": round(probability, 6), + "confidence": round(confidence, 6), + "drift_score": round(drift, 6), + "threshold": MODEL["threshold"], + "features": features, + } + except Exception as exc: + ERRORS.labels(MODEL["version"], MODEL_TRACK, "prediction_failure").inc() + raise HTTPException(status_code=500, detail="prediction failed") from exc + + +@app.get("/metrics") +def metrics() -> Response: + return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST) diff --git a/apps/mlops-platform/models/model_v1.json b/apps/mlops-platform/models/model_v1.json new file mode 100644 index 0000000..94ec642 --- /dev/null +++ b/apps/mlops-platform/models/model_v1.json @@ -0,0 +1,42 @@ +{ + "version": "v1", + "trained_with": "scikit-learn LogisticRegression", + "threshold": 0.55, + "bias": -1.2, + "features": [ + "latency_ms", + "error_rate", + "cpu_utilization", + "memory_utilization", + "queue_depth" + ], + "weights": [ + 0.85, + 1.35, + 0.7, + 0.55, + 0.6 + ], + "baseline": { + "latency_ms": { + "mean": 180, + "stddev": 90 + }, + "error_rate": { + "mean": 0.02, + "stddev": 0.04 + }, + "cpu_utilization": { + "mean": 0.45, + "stddev": 0.2 + }, + "memory_utilization": { + "mean": 0.5, + "stddev": 0.2 + }, + "queue_depth": { + "mean": 8, + "stddev": 12 + } + } +} diff --git a/apps/mlops-platform/models/model_v2.json b/apps/mlops-platform/models/model_v2.json new file mode 100644 index 0000000..302cc8b --- /dev/null +++ b/apps/mlops-platform/models/model_v2.json @@ -0,0 +1,42 @@ +{ + "version": "v2", + "trained_with": "scikit-learn LogisticRegression", + "threshold": 0.5, + "bias": -1.05, + "features": [ + "latency_ms", + "error_rate", + "cpu_utilization", + "memory_utilization", + "queue_depth" + ], + "weights": [ + 0.7, + 1.55, + 0.9, + 0.65, + 0.75 + ], + "baseline": { + "latency_ms": { + "mean": 170, + "stddev": 80 + }, + "error_rate": { + "mean": 0.018, + "stddev": 0.035 + }, + "cpu_utilization": { + "mean": 0.42, + "stddev": 0.18 + }, + "memory_utilization": { + "mean": 0.48, + "stddev": 0.18 + }, + "queue_depth": { + "mean": 6, + "stddev": 10 + } + } +} diff --git a/apps/mlops-platform/requirements.txt b/apps/mlops-platform/requirements.txt new file mode 100644 index 0000000..1d6c92f --- /dev/null +++ b/apps/mlops-platform/requirements.txt @@ -0,0 +1,4 @@ +fastapi==0.115.7 +prometheus-client==0.21.1 +pydantic==2.10.6 +uvicorn[standard]==0.34.0 diff --git a/apps/mlops-platform/training/export_model.py b/apps/mlops-platform/training/export_model.py new file mode 100644 index 0000000..3726b77 --- /dev/null +++ b/apps/mlops-platform/training/export_model.py @@ -0,0 +1,52 @@ +import json +from pathlib import Path + +import numpy as np +from sklearn.linear_model import LogisticRegression +from sklearn.pipeline import make_pipeline +from sklearn.preprocessing import StandardScaler + +FEATURES = [ + "latency_ms", + "error_rate", + "cpu_utilization", + "memory_utilization", + "queue_depth", +] +OUTPUT_DIR = Path(__file__).resolve().parent.parent / "models" + + +def export_model(version: str, seed: int, threshold: float) -> None: + rng = np.random.default_rng(seed) + healthy = rng.normal([170, 0.015, 0.38, 0.45, 5], [45, 0.01, 0.12, 0.12, 4], size=(160, 5)) + at_risk = rng.normal([420, 0.12, 0.82, 0.78, 38], [130, 0.08, 0.12, 0.13, 18], size=(160, 5)) + x = np.vstack([healthy, at_risk]) + y = np.array([0] * len(healthy) + [1] * len(at_risk)) + + pipeline = make_pipeline(StandardScaler(), LogisticRegression(max_iter=1000, random_state=seed)) + pipeline.fit(x, y) + + scaler = pipeline.named_steps["standardscaler"] + classifier = pipeline.named_steps["logisticregression"] + artifact = { + "version": version, + "trained_with": "scikit-learn LogisticRegression", + "threshold": threshold, + "bias": float(classifier.intercept_[0]), + "features": FEATURES, + "weights": [float(value) for value in classifier.coef_[0]], + "baseline": { + feature: {"mean": float(mean), "stddev": float(stddev)} + for feature, mean, stddev in zip(FEATURES, scaler.mean_, scaler.scale_, strict=True) + }, + } + + OUTPUT_DIR.mkdir(parents=True, exist_ok=True) + with (OUTPUT_DIR / f"model_{version}.json").open("w", encoding="utf-8") as handle: + json.dump(artifact, handle, indent=2) + handle.write("\n") + + +if __name__ == "__main__": + export_model("v1", seed=42, threshold=0.55) + export_model("v2", seed=84, threshold=0.5)