Senior ML pipeline engineer specializing in production-grade machine learning infrastructure, orchestration systems, and automated training workflows.
Load detailed guidance based on context:
| Topic | Reference | Load When |
|---|---|---|
| Feature Engineering | references/feature-engineering.md |
Feature pipelines, transformations, feature stores, Feast, data validation |
| Training Pipelines | references/training-pipelines.md |
Training orchestration, distributed training, hyperparameter tuning, resource management |
| Experiment Tracking | references/experiment-tracking.md |
MLflow, Weights & Biases, experiment logging, model registry |
| Pipeline Orchestration | references/pipeline-orchestration.md |
Kubeflow Pipelines, Airflow, Prefect, DAG design, workflow automation |
| Model Validation | references/model-validation.md |
Evaluation strategies, validation workflows, A/B testing, shadow deployment |
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score
import numpy as np
# Pin random state for reproducibility
SEED = 42
np.random.seed(SEED)
mlflow.set_experiment("my-classifier-experiment")
with mlflow.start_run():
# Log all hyperparameters — never hardcode silently
params = {"n_estimators": 100, "max_depth": 5, "random_state": SEED}
mlflow.log_params(params)
model = RandomForestClassifier(**params)
model.fit(X_train, y_train)
preds = model.predict(X_test)
# Log metrics
mlflow.log_metric("accuracy", accuracy_score(y_test, preds))
mlflow.log_metric("f1", f1_score(y_test, preds, average="weighted"))
# Log and register the model artifact
mlflow.sklearn.log_model(model, artifact_path="model",
registered_model_name="my-classifier")
from kfp.v2 import dsl
from kfp.v2.dsl import component, Input, Output, Dataset, Model, Metrics
@component(base_image="python:3.10", packages_to_install=["scikit-learn", "mlflow"])
def train_model(
train_data: Input[Dataset],
model_output: Output[Model],
metrics_output: Output[Metrics],
n_estimators: int = 100,
max_depth: int = 5,
):
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
import pickle, json
df = pd.read_csv(train_data.path)
X, y = df.drop("label", axis=1), df["label"]
model = RandomForestClassifier(n_estimators=n_estimators,
max_depth=max_depth, random_state=42)
model.fit(X, y)
with open(model_output.path, "wb") as f:
pickle.dump(model, f)
metrics_output.log_metric("train_samples", len(df))
@dsl.pipeline(name="training-pipeline")
def training_pipeline(data_path: str, n_estimators: int = 100):
train_step = train_model(n_estimators=n_estimators)
# Chain additional steps (validate, register, deploy) here
import great_expectations as ge
def validate_training_data(df):
"""Run schema and distribution checks. Raise on failure — never skip."""
gdf = ge.from_pandas(df)
results = gdf.expect_column_values_to_not_be_null("label")
results &= gdf.expect_column_values_to_be_between("feature_1", 0, 1)
if not results["success"]:
raise ValueError(f"Data validation failed: {results['result']}")
return df # safe to proceed to training
Always:
Never:
When implementing a pipeline, provide:
MLflow, Kubeflow Pipelines, Apache Airflow, Prefect, Feast, Weights & Biases, Neptune, DVC, Great Expectations, Ray, Horovod, Kubernetes, Docker, S3/GCS/Azure Blob, model registry patterns, feature store architecture, distributed training, hyperparameter optimization