adding mlops
Homelab Main / deploy (push) Successful in 1m33s Details

This commit is contained in:
juvdiaz 2026-05-29 09:24:47 -06:00
parent 4dab85c9d6
commit 240a55e826
7 changed files with 385 additions and 0 deletions

View File

@ -0,0 +1,25 @@
FROM python:3.12-slim
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
MODEL_DIR=/app/models \
MODEL_VERSION=v1 \
MODEL_TRACK=blue
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY app ./app
COPY models ./models
RUN groupadd --system --gid 10001 mlops \
&& useradd --system --uid 10001 --gid mlops --home /app --shell /usr/sbin/nologin mlops \
&& chown -R mlops:mlops /app
USER 10001:10001
EXPOSE 8080
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8080"]

View File

@ -0,0 +1,15 @@
# MLOps Platform Demo
Production-shaped inference demo for the portfolio site. The model is intentionally small: logistic regression coefficients trained with scikit-learn and exported to JSON so the runtime stays light enough for the homelab.
## Endpoints
- `GET /healthz` reports service, track, and active model metadata.
- `POST /predict` scores service health risk from latency, error rate, CPU, memory, and queue depth.
- `GET /metrics` exposes Prometheus metrics for request count, latency, errors, model version, confidence, and drift score.
## Model Rollout
- `MODEL_VERSION=v1`, `MODEL_TRACK=blue` is the stable route.
- `MODEL_VERSION=v2`, `MODEL_TRACK=green` is the canary route.
- Kubernetes service selectors choose the active track, so rollback is a service selector change instead of an image rebuild.

View File

@ -0,0 +1,205 @@
import json
import math
import os
import time
from pathlib import Path
from typing import Any
from fastapi import FastAPI, HTTPException, Request, Response
from fastapi.middleware.cors import CORSMiddleware
from prometheus_client import CONTENT_TYPE_LATEST, Counter, Gauge, Histogram, generate_latest
from pydantic import BaseModel, ConfigDict, Field
MODEL_VERSION = os.getenv("MODEL_VERSION", "v1")
MODEL_TRACK = os.getenv("MODEL_TRACK", "blue")
MODEL_DIR = Path(os.getenv("MODEL_DIR", "/app/models"))
FALLBACK_MODEL_DIR = Path(__file__).resolve().parent.parent / "models"
REQUESTS = Counter(
"mlops_requests_total",
"HTTP requests handled by the inference service.",
("endpoint", "method", "status", "model_version", "track"),
)
REQUEST_LATENCY = Histogram(
"mlops_request_latency_seconds",
"HTTP request latency for the inference service.",
("endpoint", "method", "model_version", "track"),
buckets=(0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2, 5),
)
PREDICTION_LATENCY = Histogram(
"mlops_prediction_latency_seconds",
"Prediction execution latency.",
("model_version", "track"),
buckets=(0.001, 0.0025, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25),
)
PREDICTIONS = Counter(
"mlops_predictions_total",
"Predictions produced by model outcome.",
("model_version", "track", "outcome"),
)
ERRORS = Counter(
"mlops_prediction_errors_total",
"Prediction errors by reason.",
("model_version", "track", "reason"),
)
MODEL_INFO = Gauge(
"mlops_model_version_info",
"Active model information. Value is always 1 for the running model.",
("model_version", "track", "trained_with"),
)
CONFIDENCE = Gauge(
"mlops_model_confidence",
"Confidence from the most recent prediction.",
("model_version", "track"),
)
DRIFT = Gauge(
"mlops_model_drift_score",
"Feature drift score from the most recent prediction.",
("model_version", "track"),
)
class PredictRequest(BaseModel):
model_config = ConfigDict(extra="forbid")
latency_ms: float = Field(..., ge=0, le=60000)
error_rate: float = Field(..., ge=0, le=1)
cpu_utilization: float = Field(..., ge=0, le=1)
memory_utilization: float = Field(..., ge=0, le=1)
queue_depth: float = Field(..., ge=0, le=10000)
def load_model(version: str) -> dict[str, Any]:
model_path = MODEL_DIR / f"model_{version}.json"
if not model_path.exists():
model_path = FALLBACK_MODEL_DIR / f"model_{version}.json"
if not model_path.exists():
raise RuntimeError(f"model artifact not found for version {version}")
with model_path.open(encoding="utf-8") as handle:
model = json.load(handle)
required = {"version", "features", "weights", "bias", "threshold", "baseline", "trained_with"}
missing = required.difference(model)
if missing:
raise RuntimeError(f"model artifact is missing required keys: {', '.join(sorted(missing))}")
if len(model["features"]) != len(model["weights"]):
raise RuntimeError("model features and weights have different lengths")
return model
MODEL = load_model(MODEL_VERSION)
MODEL_INFO.labels(MODEL["version"], MODEL_TRACK, MODEL["trained_with"]).set(1)
app = FastAPI(
title="Homelab MLOps Inference Service",
version=MODEL["version"],
docs_url=None,
redoc_url=None,
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["GET", "POST"],
allow_headers=["*"],
)
@app.middleware("http")
async def record_http_metrics(request: Request, call_next: Any) -> Response:
start = time.perf_counter()
status = "500"
route = request.url.path
try:
response = await call_next(request)
status = str(response.status_code)
return response
finally:
endpoint = getattr(request.scope.get("route"), "path", route)
elapsed = time.perf_counter() - start
REQUESTS.labels(endpoint, request.method, status, MODEL["version"], MODEL_TRACK).inc()
REQUEST_LATENCY.labels(endpoint, request.method, MODEL["version"], MODEL_TRACK).observe(elapsed)
def normalized_features(features: dict[str, float]) -> list[float]:
values = []
baseline = MODEL["baseline"]
for feature in MODEL["features"]:
mean = baseline[feature]["mean"]
stddev = baseline[feature]["stddev"]
values.append((features[feature] - mean) / stddev)
return values
def logistic(value: float) -> float:
return 1 / (1 + math.exp(-value))
def score_prediction(features: dict[str, float]) -> float:
score = MODEL["bias"]
for weight, value in zip(MODEL["weights"], normalized_features(features), strict=True):
score += weight * value
return logistic(score)
def drift_score(features: dict[str, float]) -> float:
z_scores = [abs(value) for value in normalized_features(features)]
return min(sum(z_scores) / len(z_scores) / 3, 1)
@app.get("/")
def root() -> dict[str, Any]:
return {
"service": "homelab-mlops-platform",
"model_version": MODEL["version"],
"track": MODEL_TRACK,
"endpoints": ["/healthz", "/predict", "/metrics"],
}
@app.get("/healthz")
def healthz() -> dict[str, Any]:
return {
"status": "ok",
"model_version": MODEL["version"],
"track": MODEL_TRACK,
"trained_with": MODEL["trained_with"],
}
@app.post("/predict")
def predict(payload: PredictRequest) -> dict[str, Any]:
start = time.perf_counter()
try:
features = payload.model_dump()
probability = score_prediction(features)
drift = drift_score(features)
outcome = "at_risk" if probability >= MODEL["threshold"] else "healthy"
confidence = probability if outcome == "at_risk" else 1 - probability
PREDICTIONS.labels(MODEL["version"], MODEL_TRACK, outcome).inc()
CONFIDENCE.labels(MODEL["version"], MODEL_TRACK).set(confidence)
DRIFT.labels(MODEL["version"], MODEL_TRACK).set(drift)
PREDICTION_LATENCY.labels(MODEL["version"], MODEL_TRACK).observe(time.perf_counter() - start)
return {
"model_version": MODEL["version"],
"track": MODEL_TRACK,
"outcome": outcome,
"risk_probability": round(probability, 6),
"confidence": round(confidence, 6),
"drift_score": round(drift, 6),
"threshold": MODEL["threshold"],
"features": features,
}
except Exception as exc:
ERRORS.labels(MODEL["version"], MODEL_TRACK, "prediction_failure").inc()
raise HTTPException(status_code=500, detail="prediction failed") from exc
@app.get("/metrics")
def metrics() -> Response:
return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)

View File

@ -0,0 +1,42 @@
{
"version": "v1",
"trained_with": "scikit-learn LogisticRegression",
"threshold": 0.55,
"bias": -1.2,
"features": [
"latency_ms",
"error_rate",
"cpu_utilization",
"memory_utilization",
"queue_depth"
],
"weights": [
0.85,
1.35,
0.7,
0.55,
0.6
],
"baseline": {
"latency_ms": {
"mean": 180,
"stddev": 90
},
"error_rate": {
"mean": 0.02,
"stddev": 0.04
},
"cpu_utilization": {
"mean": 0.45,
"stddev": 0.2
},
"memory_utilization": {
"mean": 0.5,
"stddev": 0.2
},
"queue_depth": {
"mean": 8,
"stddev": 12
}
}
}

View File

@ -0,0 +1,42 @@
{
"version": "v2",
"trained_with": "scikit-learn LogisticRegression",
"threshold": 0.5,
"bias": -1.05,
"features": [
"latency_ms",
"error_rate",
"cpu_utilization",
"memory_utilization",
"queue_depth"
],
"weights": [
0.7,
1.55,
0.9,
0.65,
0.75
],
"baseline": {
"latency_ms": {
"mean": 170,
"stddev": 80
},
"error_rate": {
"mean": 0.018,
"stddev": 0.035
},
"cpu_utilization": {
"mean": 0.42,
"stddev": 0.18
},
"memory_utilization": {
"mean": 0.48,
"stddev": 0.18
},
"queue_depth": {
"mean": 6,
"stddev": 10
}
}
}

View File

@ -0,0 +1,4 @@
fastapi==0.115.7
prometheus-client==0.21.1
pydantic==2.10.6
uvicorn[standard]==0.34.0

View File

@ -0,0 +1,52 @@
import json
from pathlib import Path
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
FEATURES = [
"latency_ms",
"error_rate",
"cpu_utilization",
"memory_utilization",
"queue_depth",
]
OUTPUT_DIR = Path(__file__).resolve().parent.parent / "models"
def export_model(version: str, seed: int, threshold: float) -> None:
rng = np.random.default_rng(seed)
healthy = rng.normal([170, 0.015, 0.38, 0.45, 5], [45, 0.01, 0.12, 0.12, 4], size=(160, 5))
at_risk = rng.normal([420, 0.12, 0.82, 0.78, 38], [130, 0.08, 0.12, 0.13, 18], size=(160, 5))
x = np.vstack([healthy, at_risk])
y = np.array([0] * len(healthy) + [1] * len(at_risk))
pipeline = make_pipeline(StandardScaler(), LogisticRegression(max_iter=1000, random_state=seed))
pipeline.fit(x, y)
scaler = pipeline.named_steps["standardscaler"]
classifier = pipeline.named_steps["logisticregression"]
artifact = {
"version": version,
"trained_with": "scikit-learn LogisticRegression",
"threshold": threshold,
"bias": float(classifier.intercept_[0]),
"features": FEATURES,
"weights": [float(value) for value in classifier.coef_[0]],
"baseline": {
feature: {"mean": float(mean), "stddev": float(stddev)}
for feature, mean, stddev in zip(FEATURES, scaler.mean_, scaler.scale_, strict=True)
},
}
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
with (OUTPUT_DIR / f"model_{version}.json").open("w", encoding="utf-8") as handle:
json.dump(artifact, handle, indent=2)
handle.write("\n")
if __name__ == "__main__":
export_model("v1", seed=42, threshold=0.55)
export_model("v2", seed=84, threshold=0.5)