AICloudInsider
MLOpsbeginner

Introduction to ML Pipelines with Kubeflow: From Notebook to Production

Learn how to build your first machine learning pipeline using Kubeflow, containerize components, and orchestrate training-to-deployment workflows on Kubernetes.

Marcus Johnson

Marcus Johnson

MLOps Consultant & Kubernetes Expert

12 min
CI/CD for ML

Introduction to ML Pipelines with Kubeflow: From Notebook to Production

Why ML Pipelines Matter

If you've ever trained a machine learning model in a Jupyter notebook, you know the pain of reproducing results six months later. Which version of the data did you use? What hyperparameters gave the best accuracy? How do you deploy this model to serve predictions?

ML pipelines solve these problems by automating and documenting the entire machine learning workflow. In this beginner's guide, we'll build our first pipeline using Kubeflow Pipelines—the most popular open-source platform for orchestrating ML workflows on Kubernetes.

What is Kubeflow?

Kubeflow is the machine learning toolkit for Kubernetes. It provides components for:

Kubeflow Pipelines: Workflow orchestration

Katib: Hyperparameter tuning

KServe: Model serving

Fairing: Building and deploying ML containers

Today we focus on Kubeflow Pipelines, which lets you define ML workflows as directed acyclic graphs (DAGs) where each step runs in its own container.

Prerequisites

Before we start, ensure you have:

  1. Basic Python knowledge
  2. Docker installed (for containerization)
  3. Access to a Kubernetes cluster (local options: Minikube, Kind, or cloud Kubernetes service)
  4. Kubeflow Pipelines installed (we'll use the standalone deployment)

Step 1: Install Kubeflow Pipelines Standalone

For local development, we can use the lightweight standalone version:

bash
1# Install the Kubeflow Pipelines SDK
2pip install kfp==2.1.0
3
4# For local testing, use the lightweight deployment
5kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources"
6kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io
7kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic-pns"
8

This installs a minimal Kubeflow Pipelines deployment without the full Kubeflow suite.

Step据统计2: Define Your First Pipeline

Let's build a simple pipeline that:

  1. Downloads a dataset
  2. Preprocesses the data
  3. Trains a model
  4. Evaluates the model
  5. Deploys it for serving

Here's the complete pipeline definition:

python
1import kfp
2from kfp import dsl
3from kfp.components import create_component_from_func
4import pandas as pd
5from sklearn.datasets import load_iris
6from sklearn.model_selection import train_test_split
7from sklearn.ensemble import RandomForestClassifier
8from sklearn.metrics import accuracy_score
9import pickle
10import numpy as np
11
12# Component 1: Load data
13@create_component_from_func
14def load_data_component() -> str:
15    """Load Iris dataset and save to file."""
16    iris = load_iris()
17    df = pd.DataFrame(iris.data, columns=iris.feature_names)
18    df['target'] = iris.target
19    df.to_csv('/tmp/iris.csv', index=False)
20    return '/tmp/iris.csv'
21
22# Component 2: Preprocess data
23@create_component_from_func
24def preprocess_data_component(data_path: str) -> dict:
25    """Split data into train/test sets."""
26    df = pd.read_csv(data_path)
27    X = df.drop('target', axis=1)
28    y = df['target']
29    
30    X_train, X_test, y_train, y_test = train_test_split(
31        X, y, test_size=0.2, random_state=42
32    )
33    
34    # Save splits to files
35    X_train.to_csv('/tmp/X_train.csv', index=False)
36    X_test.to_csv('/tmp/X_test.csv', index=False)
37    y_train.to_csv('/tmp/y_train.csv', index=False)
38    y_test.to_csv('/tmp/y_test.csv', index=False)
39    
40    return {
41        'X_train_path': '/tmp/X_train.csv',
42        'X_test_path': '/tmp/X_test.csv', 
43        'y_train_path': '/tmp/y_train.csv',
44        'y_test_path': '/tmp/y_test.csv'
45    }
46
47# Component 3: Train model
48@create_component_from_func
49def train_model_component(
50    X_train_path: str,
51    y_train_path: str,
52    n_estimators: int = 100
53) -> str:
54    """Train a Random Forest classifier."""
55    X_train = pd.read_csv(X_train_path)
56    y_train = pd.read_csv(y_train_path).values.ravel()
57    
58    model = RandomForestClassifier(n_estimators=n_estimators, random_state=42)
59    model.fit(X_train, y_train)
60    
61    # Save model
62    model_path = '/tmp/model.pkl'
63    with open(model_path, 'wb') as f:
64        pickle.dump(model, f)
65    
66    return model_path
67
68# Component 4: Evaluate model
69@create_component_from_func
70def evaluate_model_component(
71    model_path: str,
72    X_test_path: str,
73    y_test_path: str
74) -> dict:
75    """Evaluate model and return metrics."""
76    with open(model_path, 'rb') as f:
77        model = pickle.load(f)
78    
79    X_test = pd.read_csv(X_test_path)
80    y_test = pd.read_csv(y_test_path).values.ravel()
81    
82    y_pred = model.predict(X_test)
83    accuracy = accuracy_score(y_test, y_pred)
84    
85    # Create metrics dictionary
86    metrics = {
87        'accuracy': float(accuracy),
88        'n_samples': int(len(X_test)),
89        'n_features': int(X_test.shape[1])
90    }
91    
92    return metrics
93
94# Component 5: Deploy model (simplified)
95@create_component_from_func  
96def deploy_model_component(
97    model_path: str,
98    metrics: dict
99) -> str:
100    """Simulate model deployment."""
101    accuracy = metrics['accuracy']
102    
103    # In production, this would push to a model registry
104    # or deploy to a serving platform like KServe
105    if accuracy > 0.9:
106        deployment_status = "Model approved for production"
107    else:
108        deployment_status = "Model needs improvement"
109    
110    print(f"Model accuracy: {accuracy:.4f}")
111    print(f"Deployment decision: {deployment_status}")
112    
113    return deployment_status
114
115# Define the pipeline
116@dsl.pipeline(
117    name='iris-classification-pipeline',
118    description='A simple ML pipeline for Iris classification'
119)
120def iris_pipeline(n_estimators: int = adding100):
121    # Step 1: Load data
122    load_data_task = load_data_component()
123    
124    # Step 2: Preprocess
125    preprocess_task = preprocess_data_component(
126        data_path=load_data_task.output
127    )
128    
129    # Step 3: Train
130    train_task = train_model_component(
131        X_train_path=preprocess_task.outputs['X_train_path'],
132        y_train_path=preprocess_task.outputs['y_train_path'],
133        n_estimators=n_estimators
134    )
135    
136    # Step 4: Evaluate
137    evaluate_task = evaluate_model_component(
138        model_path=train_task.output,
139        X_test_path=preprocess_task.outputs['X_test_path'],
140        y_test_path=preprocess_task.outputs['y_test_path']
141    )
142    
143    # Step 5: Deploy
144    deploy_task = deploy_model_component(
145        model_path=train_task.output,
146        metrics=evaluate_task.output
147    )
148
149# Compile and run
150if __name__ == '__main__':
151    # Compile to YAML
152    kfp.compiler.Compiler().compile(
153        pipeline_func=iris_pipeline,
154        package_path='iris_pipeline.yaml'
155    )
156    
157    print("Pipeline compiled to iris_pipeline.yaml")
158

Step 3: Containerize Components

Each component in Kubeflow runs in its own container. While the create_component_from_func helper automatically builds lightweight containers, for production you'll want custom Docker images:

dockerfile
1# Dockerfile for ML training component
2FROM python:3.11-slim
3
4WORKDIR /app
5
6# Install dependencies
7COPY requirements.txt .
8RUN pip install --no-cache-dir -r requirements.txt
9
10# Copy component code
11COPY train_component.py .
12
13# Entry point for Kubeflow
14ENTRYPOINT ["python", "train_component.py"]
15

Build and push to a container registry:

bash
1docker build -t your-registry/train-component:v1 .
2docker push your-registry/train-component:v1
3

Then reference it in your component:

python
1train_component = kfp.components.load_component_from_text('''
2name: Train Model
3inputs:
4  - {name: X_train_path, type: String}
5  - {name: y_train_path, type: String}
6  - {name: n_estimators, type: Integer, default: '100'}
7outputs:
8  - {name: model_path, type: String}
9implementation:
10  container:
11    image: your-registry/train-component:v1
12    command: [python, /app/train_component.py]
13    args: [
14      --X_train_path, {inputPath: X_train_path},
15      --y_train_path, {inputPath: y_train_path},
16      --n_estimators, {inputValue: n_estimators},
17      --model_path, {outputPath: model_path}
18    ]
19'')
20

Step 4: Run and Monitor

Submit your pipeline to the Kubeflow Pipelines UI:

bash
1# Using the KFP SDK
2client = kfp.Client(host='http://localhost:8080')
3run = client.create_run_from_pipeline_func(
4    iris_pipeline,
5    arguments={'n_estimators': 150},
6    experiment_name='iris-experiment'
7)
8

Once submitted, you can monitor the execution in the Kubeflow UI:

Kubeflow Pipeline UI

The UI shows:

  • Each component's status (pending, running, succeeded, failed)
  • Execution time for each step
  • Inputs and outputs
  • Artifacts produced (models, metrics)
  • Logs from each container

Step 5: Schedule Recurring Runs

For production, you'll want pipelines to run automatically. Schedule them using:

yaml
1# scheduled-pipeline.yaml
2apiVersion: argoproj.io/v1alpha1
3kind: CronWorkflow
4metadata:
5  name: iris-pipeline-daily
6spec:
7  schedule: "0 2 * * *"  # Run daily at 2 AM
8  workflowSpec:
9    entrypoint: iris-pipeline
10    templates:
11    - name: iris-pipeline
12      steps:
13      - - name: run-pipeline
14          templateRef:
15            name: iris-classification-pipeline
16            template: iris-pipeline
17          arguments:
18            parameters:
19            - name: n_estimators
20              value: "100"
21

Apply to your cluster:

bash
1kubectl apply -f scheduled-pipeline.yaml
2

Common Pitfalls and Solutions

1. Data Passing Between Components

Kubeflow passes data as files or small parameters. For large datasets, use cloud storage (S3, GCS) and pass URIs instead of file paths.

2. Resource Management

Specify resource requirements to prevent Kubernetes from scheduling CPU-intensive tasks on small nodes:

python
1@dsl.pipeline
2def my_pipeline():
3    train_task = train_component(...)
4    train_task.set_cpu_request("2")
5    train_task.set_cpu_limit("4")
6    train_task.set_memory_request("8Gi")
7    train_task.set_memory_limit("16Gi")
8

3. Caching and Reuse

Kubeflow caches component outputs when inputs are identical. Disable caching for components that should always run:

python
1train_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
2

4. Error Handling

Add retry policies for transient failures:

python
1train_task.set_retry(
2    num_retries=3,
3    backoff_duration="60s",
4    backoff_factor=2
5)
6

Production Considerations

As you move from experimentation to production:

  1. Version Everything: Data, code, models, and pipeline definitions
  2. Implement Testing: Unit tests for components, integration tests for pipelines
  3. Add Monitoring: Track pipeline success rates, execution times, resource usage
  4. Security: Use Kubernetes secrets for credentials, network policies for isolation
  5. Cost Control: Use spot instances for training, auto-scale down when idle

Next Steps

Now that you have your first pipeline running:

  1. Experiment with Parameters: Try the Kubeflow Pipelines UI to manually run with different n_estimators
  2. Add Hyperparameter Tuning: Integrate Katib for automated hyperparameter search
  3. Implement Model Serving: Deploy your trained model with KServe for real-time predictions
  4. Build More Complex Pipelines: Add data validation, feature engineering, and model comparison steps

Conclusion

Kubeflow Pipelines transform ad-hoc ML experimentation into reproducible, automated workflows. By containerizing each step and orchestrating them on Kubernetes, you gain:

. Reproducibility: Same inputs = same outputs, every time . Scalability: Leverage Kubernetes for distributed training . Observability: Track every step through the UI . Collaboration: Share pipeline definitions with your team

Start with the simple pipeline above, then gradually add complexity as your needs grow. Remember: the best pipeline is the one that actually gets used, so prioritize simplicity over completeness in your first iteration.

Further Reading

Happy pipelining!

Marcus Johnson

Marcus Johnson

MLOps Consultant & Kubernetes Expert

Certified Kubernetes administrator and ML platform architect. Helped 40+ companies transition from notebook experiments to production ML pipelines. Speaker at KubeCon and MLconf.

89 articles