mlopsModel Monitoring and Drift Detection in Production: Metrics That Matter
Implement comprehensive monitoring for production ML models with drift detection, performance tracking, and automated retraining triggers using open-source tools.
Learn how to build your first machine learning pipeline using Kubeflow, containerize components, and orchestrate training-to-deployment workflows on Kubernetes.
Marcus Johnson
MLOps Consultant & Kubernetes Expert
If you've ever trained a machine learning model in a Jupyter notebook, you know the pain of reproducing results six months later. Which version of the data did you use? What hyperparameters gave the best accuracy? How do you deploy this model to serve predictions?
ML pipelines solve these problems by automating and documenting the entire machine learning workflow. In this beginner's guide, we'll build our first pipeline using Kubeflow Pipelines—the most popular open-source platform for orchestrating ML workflows on Kubernetes.
Fairing: Building and deploying ML containers
Today we focus on Kubeflow Pipelines, which lets you define ML workflows as directed acyclic graphs (DAGs) where each step runs in its own container.
Before we start, ensure you have:
For local development, we can use the lightweight standalone version:
1# Install the Kubeflow Pipelines SDK
2pip install kfp==2.1.0
3
4# For local testing, use the lightweight deployment
5kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources"
6kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io
7kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic-pns"
8This installs a minimal Kubeflow Pipelines deployment without the full Kubeflow suite.
Let's build a simple pipeline that:
Here's the complete pipeline definition:
1import kfp
2from kfp import dsl
3from kfp.components import create_component_from_func
4import pandas as pd
5from sklearn.datasets import load_iris
6from sklearn.model_selection import train_test_split
7from sklearn.ensemble import RandomForestClassifier
8from sklearn.metrics import accuracy_score
9import pickle
10import numpy as np
11
12# Component 1: Load data
13@create_component_from_func
14def load_data_component() -> str:
15 """Load Iris dataset and save to file."""
16 iris = load_iris()
17 df = pd.DataFrame(iris.data, columns=iris.feature_names)
18 df['target'] = iris.target
19 df.to_csv('/tmp/iris.csv', index=False)
20 return '/tmp/iris.csv'
21
22# Component 2: Preprocess data
23@create_component_from_func
24def preprocess_data_component(data_path: str) -> dict:
25 """Split data into train/test sets."""
26 df = pd.read_csv(data_path)
27 X = df.drop('target', axis=1)
28 y = df['target']
29
30 X_train, X_test, y_train, y_test = train_test_split(
31 X, y, test_size=0.2, random_state=42
32 )
33
34 # Save splits to files
35 X_train.to_csv('/tmp/X_train.csv', index=False)
36 X_test.to_csv('/tmp/X_test.csv', index=False)
37 y_train.to_csv('/tmp/y_train.csv', index=False)
38 y_test.to_csv('/tmp/y_test.csv', index=False)
39
40 return {
41 'X_train_path': '/tmp/X_train.csv',
42 'X_test_path': '/tmp/X_test.csv',
43 'y_train_path': '/tmp/y_train.csv',
44 'y_test_path': '/tmp/y_test.csv'
45 }
46
47# Component 3: Train model
48@create_component_from_func
49def train_model_component(
50 X_train_path: str,
51 y_train_path: str,
52 n_estimators: int = 100
53) -> str:
54 """Train a Random Forest classifier."""
55 X_train = pd.read_csv(X_train_path)
56 y_train = pd.read_csv(y_train_path).values.ravel()
57
58 model = RandomForestClassifier(n_estimators=n_estimators, random_state=42)
59 model.fit(X_train, y_train)
60
61 # Save model
62 model_path = '/tmp/model.pkl'
63 with open(model_path, 'wb') as f:
64 pickle.dump(model, f)
65
66 return model_path
67
68# Component 4: Evaluate model
69@create_component_from_func
70def evaluate_model_component(
71 model_path: str,
72 X_test_path: str,
73 y_test_path: str
74) -> dict:
75 """Evaluate model and return metrics."""
76 with open(model_path, 'rb') as f:
77 model = pickle.load(f)
78
79 X_test = pd.read_csv(X_test_path)
80 y_test = pd.read_csv(y_test_path).values.ravel()
81
82 y_pred = model.predict(X_test)
83 accuracy = accuracy_score(y_test, y_pred)
84
85 # Create metrics dictionary
86 metrics = {
87 'accuracy': float(accuracy),
88 'n_samples': int(len(X_test)),
89 'n_features': int(X_test.shape[1])
90 }
91
92 return metrics
93
94# Component 5: Deploy model (simplified)
95@create_component_from_func
96def deploy_model_component(
97 model_path: str,
98 metrics: dict
99) -> str:
100 """Simulate model deployment."""
101 accuracy = metrics['accuracy']
102
103 # In production, this would push to a model registry
104 # or deploy to a serving platform like KServe
105 if accuracy > 0.9:
106 deployment_status = "Model approved for production"
107 else:
108 deployment_status = "Model needs improvement"
109
110 print(f"Model accuracy: {accuracy:.4f}")
111 print(f"Deployment decision: {deployment_status}")
112
113 return deployment_status
114
115# Define the pipeline
116@dsl.pipeline(
117 name='iris-classification-pipeline',
118 description='A simple ML pipeline for Iris classification'
119)
120def iris_pipeline(n_estimators: int = adding100):
121 # Step 1: Load data
122 load_data_task = load_data_component()
123
124 # Step 2: Preprocess
125 preprocess_task = preprocess_data_component(
126 data_path=load_data_task.output
127 )
128
129 # Step 3: Train
130 train_task = train_model_component(
131 X_train_path=preprocess_task.outputs['X_train_path'],
132 y_train_path=preprocess_task.outputs['y_train_path'],
133 n_estimators=n_estimators
134 )
135
136 # Step 4: Evaluate
137 evaluate_task = evaluate_model_component(
138 model_path=train_task.output,
139 X_test_path=preprocess_task.outputs['X_test_path'],
140 y_test_path=preprocess_task.outputs['y_test_path']
141 )
142
143 # Step 5: Deploy
144 deploy_task = deploy_model_component(
145 model_path=train_task.output,
146 metrics=evaluate_task.output
147 )
148
149# Compile and run
150if __name__ == '__main__':
151 # Compile to YAML
152 kfp.compiler.Compiler().compile(
153 pipeline_func=iris_pipeline,
154 package_path='iris_pipeline.yaml'
155 )
156
157 print("Pipeline compiled to iris_pipeline.yaml")
158Each component in Kubeflow runs in its own container. While the create_component_from_func helper automatically builds lightweight containers, for production you'll want custom Docker images:
1# Dockerfile for ML training component
2FROM python:3.11-slim
3
4WORKDIR /app
5
6# Install dependencies
7COPY requirements.txt .
8RUN pip install --no-cache-dir -r requirements.txt
9
10# Copy component code
11COPY train_component.py .
12
13# Entry point for Kubeflow
14ENTRYPOINT ["python", "train_component.py"]
15Build and push to a container registry:
1docker build -t your-registry/train-component:v1 .
2docker push your-registry/train-component:v1
3Then reference it in your component:
1train_component = kfp.components.load_component_from_text('''
2name: Train Model
3inputs:
4 - {name: X_train_path, type: String}
5 - {name: y_train_path, type: String}
6 - {name: n_estimators, type: Integer, default: '100'}
7outputs:
8 - {name: model_path, type: String}
9implementation:
10 container:
11 image: your-registry/train-component:v1
12 command: [python, /app/train_component.py]
13 args: [
14 --X_train_path, {inputPath: X_train_path},
15 --y_train_path, {inputPath: y_train_path},
16 --n_estimators, {inputValue: n_estimators},
17 --model_path, {outputPath: model_path}
18 ]
19'')
20Submit your pipeline to the Kubeflow Pipelines UI:
1# Using the KFP SDK
2client = kfp.Client(host='http://localhost:8080')
3run = client.create_run_from_pipeline_func(
4 iris_pipeline,
5 arguments={'n_estimators': 150},
6 experiment_name='iris-experiment'
7)
8Once submitted, you can monitor the execution in the Kubeflow UI:

The UI shows:
For production, you'll want pipelines to run automatically. Schedule them using:
1# scheduled-pipeline.yaml
2apiVersion: argoproj.io/v1alpha1
3kind: CronWorkflow
4metadata:
5 name: iris-pipeline-daily
6spec:
7 schedule: "0 2 * * *" # Run daily at 2 AM
8 workflowSpec:
9 entrypoint: iris-pipeline
10 templates:
11 - name: iris-pipeline
12 steps:
13 - - name: run-pipeline
14 templateRef:
15 name: iris-classification-pipeline
16 template: iris-pipeline
17 arguments:
18 parameters:
19 - name: n_estimators
20 value: "100"
21Apply to your cluster:
1kubectl apply -f scheduled-pipeline.yaml
2Kubeflow passes data as files or small parameters. For large datasets, use cloud storage (S3, GCS) and pass URIs instead of file paths.
Specify resource requirements to prevent Kubernetes from scheduling CPU-intensive tasks on small nodes:
1@dsl.pipeline
2def my_pipeline():
3 train_task = train_component(...)
4 train_task.set_cpu_request("2")
5 train_task.set_cpu_limit("4")
6 train_task.set_memory_request("8Gi")
7 train_task.set_memory_limit("16Gi")
8Kubeflow caches component outputs when inputs are identical. Disable caching for components that should always run:
1train_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
2Add retry policies for transient failures:
1train_task.set_retry(
2 num_retries=3,
3 backoff_duration="60s",
4 backoff_factor=2
5)
6As you move from experimentation to production:
Now that you have your first pipeline running:
n_estimatorsKubeflow Pipelines transform ad-hoc ML experimentation into reproducible, automated workflows. By containerizing each step and orchestrating them on Kubernetes, you gain:
. Reproducibility: Same inputs = same outputs, every time . Scalability: Leverage Kubernetes for distributed training . Observability: Track every step through the UI . Collaboration: Share pipeline definitions with your team
Start with the simple pipeline above, then gradually add complexity as your needs grow. Remember: the best pipeline is the one that actually gets used, so prioritize simplicity over completeness in your first iteration.
Happy pipelining!
mlopsImplement comprehensive monitoring for production ML models with drift detection, performance tracking, and automated retraining triggers using open-source tools.
mlopsImplement GitOps workflows for ML models with automated testing, canary deployments, rollback strategies, and multi-environment promotion using ArgoCD and MLflow.
foundation-modelsLearn the fundamentals of supervised learning by implementing linear regression from scratch without libraries. Understand gradient descent, loss functions, and the mathematics behind ML.