mlopsIntroduction to ML Pipelines with Kubeflow: From Notebook to Production
Learn how to build your first machine learning pipeline using Kubeflow, containerize components, and orchestrate training-to-deployment workflows on Kubernetes.
Implement comprehensive monitoring for production ML models with drift detection, performance tracking, and automated retraining triggers using open-source tools.
Sarah Chen
ML Engineer & Cloud AI Specialist
You deployed your fraud detection model with 98% accuracy. Six months later, transaction patterns changed, economic conditions shifted, and your model's performance silently degraded to 82% accuracy. No alerts fired. No one noticed until the CFO asked about rising fraud losses.
This scenario plays out daily in organizations worldwide. Deploying models is only half the battle; keeping them healthy in production requires continuous monitoring. In this intermediate guide, we'll build a complete monitoring stack that detects drift, tracks performance, and triggers retraining automatically.
Traditional application monitoring focuses on availability, latency, and errors. ML models need additional monitoring dimensions:
Apache Kafka: Streaming data for real-time monitoring
1# monitoring-architecture.yaml
2components:
3 data_stream:
4 source: production_inference_api
5 transport: kafka
6 topics:
7 - inference_inputs # Raw features
8 - inference_outputs # Predictions + ground truth (when available)
9
10 drift_detector:
11 tool: evidently
12 checks:
13 - data_drift: statistical_tests
14 - target_drift: when_labels_available
15 - model_performance: accuracy/precision/recall
16
17 metrics_collector:
18 tool: prometheus
19 exporters:
20 - evidently_metrics_exporter
21 - custom_ml_metrics
22
23 visualization:
24 tool: grafana
25 dashboards:
26 - model_health_overview
27 - drift_analysis_detailed
28 - performance_trends
29
30 alerting:
31 tool: alertmanager
32 receivers:
33 - slack
34 - pagerduty
35 - email
36
37 action_pipeline:
38 triggers:
39 - severe_drift_detected -> retrain_model
40 - performance_below_threshold -> human_review
41Let's implement drift detection for a customer churn prediction model:
1import pandas as pd
2import numpy as np
3from datetime import datetime, timedelta
4from evidently import ColumnMapping
5from evidently.metrics import DataDriftTable
6from evidently.report import Report
7from evidently.test_suite import TestSuite
8from evidently.tests import TestValueDrift
9from evidently.ui.workspace import Workspace
10import json
11import warnings
12warnings.filterwarnings('ignore')
13
14# Define reference and current datasets
15# Reference: Data from model training period
16# Current: Latest production data
17
18def load_reference_data():
19 """Load the data used for model training."""
20 return pd.read_csv('s3://models/churn-model/v1/training_data.csv')
21
22def load_current_data(start_date, end_date):
23 """Load production data from the specified period."""
24 query = f"""
25 SELECT * FROM inference_logs
26 WHERE timestamp BETWEEN '{start_date}' AND '{end_date}'
27 """
28 # In practice, this would query your database or data warehouse
29 return pd.read_csv('production_data_sample.csv')
30
31# Column mapping for Evidently
32column_mapping = ColumnMapping(
33 numerical_features=['tenure', 'monthly_charges', 'total_charges', 'num_services'],
34 categorical_features=['contract_type', 'payment_method', 'internet_service'],
35 target='churn',
36 prediction='prediction',
37 datetime='timestamp'
38)
39
40# Generate drift report
41def generate_drift_report(reference_data, current_data):
42 """Generate comprehensive drift analysis."""
43
44 data_drift_report = Report(metrics=[
45 DataDriftTable(),
46 ])
47
48 data_drift_report.run(
49 reference_data=reference_data,
50 current_data=current_data,
51 column_mapping=column_mapping
52 )
53
54 return data_drift_report
55
56# Example: Monitor weekly drift
57reference = load_reference_data()
58current = load_current_data(
59 start_date='2026-05-20',
60 end_date='2026-05-27'
61)
62
63report = generate_drift_report(reference, current)
64
65# Extract drift metrics
66drift_metrics = report.as_dict()
67drift_detected = drift_metrics['metrics'][0]['result']['drift_detected']
68drift_score = drift_metrics['metrics'][0]['result']['drift_score']
69features_drifted = drift_metrics['metrics'][0]['result']['number_of_drifted_features']
70
71print(f"Drift detected: {drift_detected}")
72print(f"Drift score: {drift_score:.3f}")
73print(f"Features drifted: {features_drifted}")
74
75# Export to Prometheus format
76def export_to_prometheus(drift_metrics):
77 """Convert drift metrics to Prometheus exposition format."""
78 prometheus_lines = []
79
80 # Drift score metric
81 prometheus_lines.append(
82 f'ml_model_drift_score{{model="churn", version="v1"}} {drift_score}'
83 )
84
85 # Number of drifted features
86 prometheus_lines.append(
87 f'ml_model_drifted_features{{model="churn", version="v1"}} {features_drifted}'
88 )
89
90 # Individual feature drift
91 for feature, result in drift_metrics['metrics'][0]['result']['features'].items():
92 if 'drift_score' in result:
93 prometheus_lines.append(
94 f'ml_feature_drift_score{{model="churn", version="v1", feature="{feature}"}} {result["drift_score"]}'
95 )
96
97 return '\n'.join(prometheus_lines)
98
99prometheus_output = export_to_prometheus(drift_metrics)
100print("\nPrometheus metrics:")
101print(prometheus_output)
102
103# Save to file for Prometheus to scrape
104with open('/metrics/ml_metrics.prom', 'w') as f:
105 f.write(prometheus_output)
106Concept drift occurs when the relationship between features and target changes. Detection requires ground truth labels, which arrive delayed in many applications:
1def detect_concept_drift(reference_labels, current_labels, reference_preds, current_preds):
2 """Detect concept drift using performance metrics."""
3 from sklearn.metrics import accuracy_score, f1_score
4 from scipy import stats
5
6 # Calculate performance metrics
7 ref_accuracy = accuracy_score(reference_labels, reference_preds)
8 curr_accuracy = accuracy_score(current_labels, current_preds)
9
10 ref_f1 = f1_score(reference_labels, reference_preds, average='weighted')
11 curr_f1 = f1_score(current_labels, current_preds, average='weighted')
12
13 # Statistical test for significant difference
14 # McNemar's test for paired binary classifications
15 from statsmodels.stats.contingency_tables import mcnemar
16
17 # Create contingency table
18 # (Assuming binary classification for simplicity)
19 contingency = np.array([
20 [np.sum((reference_labels == 1) & (current_labels == 1)),
21 np.sum((reference_labels == 1) & (current_labels == 0))],
22 [np.sum((reference_labels == 0) & (current_labels == 1)),
23 np.sum((reference_labels == 0) & (current_labels == 0))]
24 ])
25
26 result = mcnemar(contingency, exact=True)
27 p_value = result.pvalue
28
29 # Performance degradation threshold
30 accuracy_drop = ref_accuracy - curr_accuracy
31 f1_drop = ref_f1 - curr_f1
32
33 drift_detected = (p_value < 0.05) or (accuracy_drop > 0.05) or (f1_drop > 0.05)
34
35 return {
36 'concept_drift_detected': drift_detected,
37 'p_value': p_value,
38 'accuracy_drop': accuracy_drop,
39 'f1_drop': f1_drop,
40 'reference_accuracy': ref_accuracy,
41 'current_accuracy': curr_accuracy
42 }
43For high-volume inference, implement streaming drift detection:
1from kafka import KafkaConsumer, KafkaProducer
2import json
3from evidently.metrics.data_drift import DataDriftTable
4from evidently.metric_preset import DataDriftPreset
5import threading
6from collections import deque
7import time
8
9class StreamingDriftDetector:
10 def __init__(self, bootstrap_servers, reference_data, window_size=1000):
11 self.consumer = KafkaConsumer(
12 'inference_inputs',
13 bootstrap_servers=bootstrap_servers,
14 value_deserializer=lambda x: json.loads(x.decode('utf-8')),
15 group_id='drift-detector'
16 )
17 self.reference_data = reference_data
18 self.window = deque(maxlen=window_size)
19 self.drift_history = []
20
21 def process_stream(self):
22 """Process incoming inference data and detect drift."""
23 for message in self.consumer:
24 data_point = message.value
25
26 # Add to sliding window
27 self.window.append(data_point)
28
29 # Check if window is full
30 if len(self.window) == self.window.maxlen:
31 current_data = pd.DataFrame(list(self.window))
32 drift_result = self.check_drift(current_data)
33
34 if drift_result['drift_detected']:
35 self.trigger_alert(drift_result)
36
37 # Store for trend analysis
38 self.drift_history.append({
39 'timestamp': datetime.now(),
40 'drift_score': drift_result['drift_score'],
41 'drift_detected': drift_result['drift_detected']
42 })
43
44 def check_drift(self, current_data):
45 """Check drift between reference and current window."""
46 report = Report(metrics=[DataDriftPreset()])
47 report.run(
48 reference_data=self.reference_data.sample(min(5000, len(self.reference_data))),
49 current_data=current_data,
50 column_mapping=column_mapping
51 )
52
53 result = report.as_dict()
54 drift_detected = result['metrics'][0]['result']['drift_detected']
55 drift_score = result['metrics'][0]['result']['drift_score']
56
57 return {
58 'drift_detected': drift_detected,
59 'drift_score': drift_score,
60 'timestamp': datetime.now()
61 }
62
63 def trigger_alert(self, drift_result):
64 """Send alert when drift detected."""
65 # Send to alert manager
66 alert = {
67 'model': 'churn-prediction',
68 'version': 'v1',
69 'severity': 'warning' if drift_result['drift_score'] < 0.5 else 'critical',
70 'drift_score': drift_result['drift_score'],
71 'timestamp': drift_result['timestamp'].isoformat(),
72 'message': f"Data drift detected with score {drift_result['drift_score']:.3f}"
73 }
74
75 # In production, send to AlertManager, Slack, PagerDuty, etc.
76 print(f"ALERT: {alert}")
77
78 # Optionally trigger retraining pipeline
79 if drift_result['drift_score'] > 0.7:
80 self.trigger_retraining()
81
82 def trigger_retraining(self):
83 """Trigger model retraining pipeline."""
84 # This would kick off a Kubeflow pipeline, AWS SageMaker training job, etc.
85 print("Triggering model retraining pipeline...")
86 # Implementation depends on your training infrastructure
87
88# Start detector
89detector = StreamingDriftDetector(
90 bootstrap_servers=['localhost:9092'],
91 reference_data=load_reference_data(),
92 window_size=1000
93)
94
95# Run in background thread
96detector_thread = threading.Thread(target=detector.process_stream)
97detector_thread.daemon = True
98detector_thread.start()
99Create comprehensive ML monitoring dashboards:
1// grafana-dashboard.json
2{
3 "dashboard": {
4 "title": "ML Model Monitoring",
5 "panels": [
6 {
7 "title": "Data Drift Score",
8 "targets": [
9 {
10 "expr": "ml_model_drift_score{model="churn", version="v1"}",
11 "legendFormat": "Drift Score"
12 }
13 ],
14 "thresholds": [
15 {"color": "green", "value": 0},
16 {"color": "yellow", "value": 0.3},
17 {"color": "red", "value": 0.5}
18 ]
19 },
20 {
21 "title": "Drifted Features Count",
22 "targets": [
23 {
24 "expr": "ml_model_drifted_features{model="churn", version="v1"}",
25 "legendFormat": "Drifted Features"
26 }
27 ]
28 },
29 {
30 "title": "Model Performance Trends",
31 "targets": [
32 {
33 "expr": "model_accuracy{model="churn", version="v1"}",
34 "legendFormat": "Accuracy"
35 },
36 {
37 "expr": "model_precision{model="churn", version="v1"}",
38 "legendFormat": "Precision"
39 },
40 {
41 "expr": "model_recall{model="churn", version="v1"}",
42 "legendFormat": "Recall"
43 }
44 ]
45 },
46 {
47 "title": "Inference Latency & Throughput",
48 "targets": [
49 {
50 "expr": "rate(model_inference_duration_seconds_sum[5m]) / rate(model_inference_duration_seconds_count[5m])",
51 "legendFormat": "Avg Latency"
52 },
53 {
54 "expr": "rate(model_inference_requests_total[5m])",
55 "legendFormat": "Requests/sec"
56 }
57 ]
58 }
59 ],
60 "alertRules": [
61 {
62 "name": "High Data Drift Alert",
63 "condition": "ml_model_drift_score{model="churn", version="v1"} > 0.5",
64 "for": "5m",
65 "annotations": {
66 "summary": "High data drift detected for churn model",
67 "description": "Data drift score {{ $value }} exceeds threshold 0.5. Consider retraining model."
68 }
69 },
70 {
71 "name": "Performance Degradation Alert",
72 "condition": "model_accuracy{model="churn", version="v1"} < 0.85",
73 "for": "15m",
74 "annotations": {
75 "summary": "Model accuracy below threshold",
76 "description": "Accuracy {{ $value }} below 0.85 for 15 minutes."
77 }
78 }
79 ]
80 }
81}
82When monitoring detects issues, automatically trigger retraining:
1# retraining-triggers.yaml
2apiVersion: argoproj.io/v1alpha1
3kind: Workflow
4metadata:
5 name: model-retraining-trigger
6spec:
7 entrypoint: evaluate-and-retrain
8 arguments:
9 parameters:
10 - name: model_name
11 value: churn-prediction
12 - name: model_version
13 value: v1
14
15 templates:
16 - name: evaluate-and-retrain
17 steps:
18 - - name: check-drift
19 template: check-drift-metrics
20
21 - - name: decide-action
22 template: decision-gate
23 when: "{{steps.check-drift.outputs.parameters.drift_detected}} == true"
24
25 - - name: retrain-model
26 template: retrain-pipeline
27 when: "{{steps.decision-gate.outputs.parameters.retrain_needed}} == true"
28
29 - name: check-drift-metrics
30 container:
31 image: drift-checker:latest
32 command: [python, /app/check_drift.py]
33 args:
34 - --model-name
35 - "{{inputs.parameters.model_name}}"
36 - --model-version
37 - "{{inputs.parameters.model_version}}"
38 outputs:
39 parameters:
40 - name: drift_detected
41 valueFrom:
42 path: /tmp/drift_detected.txt
43
44 - name: decision-gate
45 container:
46 image: decision-engine:latest
47 command: [python, /app/decision_engine.py]
48 args:
49 - --drift-score
50 - "{{steps.check-drift.outputs.parameters.drift_score}}"
51 - --performance-metrics
52 - "{{steps.check-drift.outputs.parameters.performance_metrics}}"
53 outputs:
54 parameters:
55 - name: retrain_needed
56 valueFrom:
57 path: /tmp/retrain_needed.txt
58
59 - name: retrain-pipeline
60 dag:
61 tasks:
62 - name: trigger-training
63 templateRef:
64 name: model-training-pipeline
65 template: train-model
66 arguments:
67 parameters:
68 - name: data_version
69 value: latest
70 - name: hyperparameters
71 value: optimized
72
73 - name: validate-new-model
74 template: validation-step
75 dependencies: [trigger-training]
76
77 - name: deploy-if-better
78 template: deployment-decision
79 dependencies: [validate-new-model]
80Track these essential metrics for every production model:
Not all alerts are equal. Implement tiered alerting:
Tier 1 (Critical - Immediate Action):
Tier 2 (Warning - Review within hours):
Tier 3 (Info - Weekly review):
Monitoring has costs too:
Optimize by:
Phase 1 (2 weeks):
Phase 2 (4 weeks):
Phase 3 (8 weeks):
Model monitoring is not a luxury—it's a necessity for production ML systems. Without it, you're flying blind, unaware when your models decay into irrelevance.
The stack we've built—Evidently for drift detection, Prometheus for metrics, Grafana for visualization, and automated triggers for retraining—provides comprehensive monitoring at reasonable cost. Start with the basics, then add sophistication as your needs grow.
Remember: The goal isn't to detect every minor shift, but to catch significant degradation before it impacts business outcomes. Set thresholds based on business impact, not statistical purity.
Your models are living entities that need care and feeding. Give them the monitoring they deserve.
mlopsLearn how to build your first machine learning pipeline using Kubeflow, containerize components, and orchestrate training-to-deployment workflows on Kubernetes.
mlopsImplement GitOps workflows for ML models with automated testing, canary deployments, rollback strategies, and multi-environment promotion using ArgoCD and MLflow.
foundation-modelsLearn the fundamentals of supervised learning by implementing linear regression from scratch without libraries. Understand gradient descent, loss functions, and the mathematics behind ML.