Skip to main content

MLflow PyTorch Integration

Introduction

PyTorch is an open-source deep learning framework developed by Meta's AI Research lab. It provides dynamic computation graphs and a Pythonic API for building neural networks, making it popular for both research and production deep learning applications.

MLflow's PyTorch integration provides experiment tracking, model versioning, and deployment capabilities for deep learning workflows.

Why MLflow + PyTorch?

Autologging

Enable comprehensive experiment tracking with one line: mlflow.pytorch.autolog() automatically logs metrics, parameters, and models.

Experiment Tracking

Track training metrics, hyperparameters, model architectures, and artifacts across all PyTorch experiments.

Model Registry

Version, stage, and deploy PyTorch models with MLflow's model registry and serving infrastructure.

Reproducibility

Capture model states, random seeds, and environments for reproducible deep learning experiments.

Getting Started

Enable comprehensive experiment tracking with one line of code:

python
import mlflow
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset

# Enable autologging
mlflow.pytorch.autolog()

# Create synthetic data
X = torch.randn(1000, 784)
y = torch.randint(0, 10, (1000,))
train_loader = DataLoader(TensorDataset(X, y), batch_size=32, shuffle=True)

# Your existing PyTorch code works unchanged
model = nn.Sequential(nn.Linear(784, 128), nn.ReLU(), nn.Linear(128, 10))
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

# Training loop - metrics, parameters, and models logged automatically
for epoch in range(10):
for data, target in train_loader:
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()

Autologging captures training metrics, model parameters, optimizer configuration, and model checkpoints automatically.

PyTorch Lightning Support

MLflow's autologging works seamlessly with PyTorch Lightning. For vanilla PyTorch with custom training loops, use manual logging as shown in the section below.

Manual Logging

For standard PyTorch workflows, integrate MLflow logging into your training loop:

python
import mlflow
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader


# Define model
class NeuralNetwork(nn.Module):
def __init__(self):
super().__init__()
self.flatten = nn.Flatten()
self.linear_relu_stack = nn.Sequential(
nn.Linear(28 * 28, 512),
nn.ReLU(),
nn.Linear(512, 10),
)

def forward(self, x):
x = self.flatten(x)
return self.linear_relu_stack(x)


# Training parameters
params = {
"epochs": 5,
"learning_rate": 1e-3,
"batch_size": 64,
}

# Training with MLflow logging
with mlflow.start_run():
# Log parameters
mlflow.log_params(params)

# Initialize model and optimizer
model = NeuralNetwork()
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=params["learning_rate"])

# Training loop
for epoch in range(params["epochs"]):
model.train()
train_loss = 0
correct = 0
total = 0

for data, target in train_loader:
optimizer.zero_grad()
output = model(data)
loss = loss_fn(output, target)
loss.backward()
optimizer.step()

train_loss += loss.item()
_, predicted = output.max(1)
total += target.size(0)
correct += predicted.eq(target).sum().item()

# Log metrics per epoch
avg_loss = train_loss / len(train_loader)
accuracy = 100.0 * correct / total

mlflow.log_metrics(
{"train_loss": avg_loss, "train_accuracy": accuracy}, step=epoch
)

# Log final model
mlflow.pytorch.log_model(model, name="model")

System Metrics Tracking

Track hardware resource utilization during training to monitor GPU usage, memory consumption, and system performance:

python
import mlflow
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset

# Create data and model
X = torch.randn(1000, 784)
y = torch.randint(0, 10, (1000,))
train_loader = DataLoader(TensorDataset(X, y), batch_size=32, shuffle=True)

model = nn.Sequential(nn.Linear(784, 128), nn.ReLU(), nn.Linear(128, 10))
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

# Enable system metrics logging
mlflow.enable_system_metrics_logging()

with mlflow.start_run():
mlflow.log_params({"learning_rate": 0.001, "batch_size": 32, "epochs": 10})

# Training loop - system metrics logged automatically
for epoch in range(10):
for data, target in train_loader:
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()

mlflow.log_metric("loss", loss.item(), step=epoch)

mlflow.pytorch.log_model(model, name="model")

System metrics logging automatically captures:

  • GPU Metrics: Utilization percentage, memory usage, temperature, and power consumption
  • CPU Metrics: Utilization percentage and memory usage
  • Disk I/O: Read/write throughput and utilization
  • Network I/O: Network traffic statistics

For PyTorch training, monitoring GPU metrics is especially valuable for:

  • Identifying GPU underutilization that may indicate data loading bottlenecks
  • Detecting memory issues before out-of-memory errors occur
  • Optimizing batch sizes based on GPU memory usage
  • Comparing resource efficiency across different model architectures
Advanced Configuration

You can customize system metrics collection frequency and behavior. See the System Metrics documentation for detailed configuration options.

Model Logging with Signatures

Log PyTorch models with input/output signatures for better model understanding:

python
import mlflow
import torch
import torch.nn as nn
from mlflow.models import infer_signature

model = nn.Sequential(nn.Linear(10, 50), nn.ReLU(), nn.Linear(50, 1))

# Create sample input and output for signature
input_example = torch.randn(1, 10)
predictions = model(input_example)

# Infer signature from input/output
signature = infer_signature(input_example.numpy(), predictions.detach().numpy())

with mlflow.start_run():
# Log model with signature and input example
mlflow.pytorch.log_model(
model,
name="pytorch_model",
signature=signature,
input_example=input_example.numpy(),
)

Checkpoint Tracking

Track model checkpoints during training with MLflow 3's checkpoint versioning. Use the step parameter to version checkpoints and link metrics to specific model versions:

python
import mlflow
import torch
import torch.nn as nn
import pandas as pd
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split


# Helper function to prepare data
def prepare_data(df):
X = torch.tensor(df.iloc[:, :-1].values, dtype=torch.float32)
y = torch.tensor(df.iloc[:, -1].values, dtype=torch.long)
return X, y


# Helper function to compute accuracy
def compute_accuracy(model, X, y):
with torch.no_grad():
outputs = model(X)
_, predicted = torch.max(outputs, 1)
accuracy = (predicted == y).sum().item() / y.size(0)
return accuracy


# Define a basic PyTorch classifier
class IrisClassifier(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super().__init__()
self.fc1 = nn.Linear(input_size, hidden_size)
self.relu = nn.ReLU()
self.fc2 = nn.Linear(hidden_size, output_size)

def forward(self, x):
x = self.fc1(x)
x = self.relu(x)
x = self.fc2(x)
return x


# Load Iris dataset and prepare the DataFrame
iris = load_iris()
iris_df = pd.DataFrame(data=iris.data, columns=iris.feature_names)
iris_df["target"] = iris.target

# Split into training and testing datasets
train_df, test_df = train_test_split(iris_df, test_size=0.2, random_state=42)

# Prepare training data
train_dataset = mlflow.data.from_pandas(train_df, name="iris_train")
X_train, y_train = prepare_data(train_dataset.df)

# Initialize model
input_size = X_train.shape[1]
hidden_size = 16
output_size = len(iris.target_names)
model = IrisClassifier(input_size, hidden_size, output_size)

# Training configuration
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

with mlflow.start_run() as run:
# Log parameters once at the start
mlflow.log_params(
{
"n_layers": 3,
"activation": "ReLU",
"criterion": "CrossEntropyLoss",
"optimizer": "Adam",
"learning_rate": 0.01,
}
)

for epoch in range(101):
# Training step
out = model(X_train)
loss = criterion(out, y_train)
optimizer.zero_grad()
loss.backward()
optimizer.step()

# Log a checkpoint every 10 epochs
if epoch % 10 == 0:
# Log model checkpoint with step parameter
model_info = mlflow.pytorch.log_model(
pytorch_model=model,
name=f"iris-checkpoint-{epoch}",
step=epoch,
input_example=X_train[:5].numpy(),
)

# Log metrics linked to this checkpoint and dataset
accuracy = compute_accuracy(model, X_train, y_train)
mlflow.log_metric(
key="train_accuracy",
value=accuracy,
step=epoch,
model_id=model_info.model_id,
dataset=train_dataset,
)

# Search and rank checkpoints by performance
ranked_checkpoints = mlflow.search_logged_models(
filter_string=f"source_run_id='{run.info.run_id}'",
order_by=[{"field_name": "metrics.train_accuracy", "ascending": False}],
output_format="list",
)

best_checkpoint = ranked_checkpoints[0]
print(f"Best checkpoint: {best_checkpoint.name}")
print(f"Accuracy: {best_checkpoint.metrics[0].value}")

This approach enables you to:

  • Version checkpoints systematically with the step parameter
  • Link metrics to specific models using model_id in log_metric()
  • Associate metrics with datasets for better tracking
  • Rank and compare checkpoints using mlflow.search_logged_models()

Model Loading

Load logged PyTorch models for inference:

python
# Load as PyTorch model
model_uri = "runs:/<run_id>/pytorch_model"
loaded_model = mlflow.pytorch.load_model(model_uri)

# Make predictions
input_tensor = torch.randn(5, 10)
predictions = loaded_model(input_tensor)

# Load as PyFunc for generic inference
pyfunc_model = mlflow.pyfunc.load_model(model_uri)
predictions = pyfunc_model.predict(input_tensor.numpy())

Hyperparameter Optimization

Track hyperparameter tuning experiments with MLflow:

python
import mlflow
import optuna
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

# Create synthetic dataset for demonstration
input_size = 784 # e.g., flattened 28x28 images
output_size = 10 # e.g., 10 classes

X_train = torch.randn(1000, input_size)
y_train = torch.randint(0, output_size, (1000,))
X_val = torch.randn(200, input_size)
y_val = torch.randint(0, output_size, (200,))

train_loader = DataLoader(TensorDataset(X_train, y_train), batch_size=32, shuffle=True)
val_loader = DataLoader(TensorDataset(X_val, y_val), batch_size=32)


def train_and_evaluate(model, optimizer, train_loader, val_loader, epochs=5):
"""Simple training loop for demonstration."""
criterion = nn.CrossEntropyLoss()

for epoch in range(epochs):
model.train()
for data, target in train_loader:
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()

# Validation
model.eval()
val_loss = 0
with torch.no_grad():
for data, target in val_loader:
output = model(data)
val_loss += criterion(output, target).item()

return val_loss / len(val_loader)


def objective(trial):
"""Optuna objective for hyperparameter tuning."""

with mlflow.start_run(nested=True):
# Define hyperparameter search space
params = {
"learning_rate": trial.suggest_float("learning_rate", 1e-5, 1e-1, log=True),
"hidden_size": trial.suggest_int("hidden_size", 32, 512),
"dropout": trial.suggest_float("dropout", 0.1, 0.5),
}

# Log parameters
mlflow.log_params(params)

# Create model
model = nn.Sequential(
nn.Linear(input_size, params["hidden_size"]),
nn.ReLU(),
nn.Dropout(params["dropout"]),
nn.Linear(params["hidden_size"], output_size),
)

# Train model
optimizer = optim.Adam(model.parameters(), lr=params["learning_rate"])
val_loss = train_and_evaluate(model, optimizer, train_loader, val_loader)

# Log validation loss
mlflow.log_metric("val_loss", val_loss)

return val_loss


# Run optimization
with mlflow.start_run(run_name="PyTorch HPO"):
study = optuna.create_study(direction="minimize")
study.optimize(objective, n_trials=50)

# Log best parameters
mlflow.log_params({f"best_{k}": v for k, v in study.best_params.items()})
mlflow.log_metric("best_val_loss", study.best_value)

Model Registry Integration

Register PyTorch models for version control and deployment:

python
import mlflow
import torch.nn as nn
from mlflow import MlflowClient

client = MlflowClient()

with mlflow.start_run():
# Create a simple model for demonstration
model = nn.Sequential(
nn.Conv2d(3, 32, 3),
nn.ReLU(),
nn.MaxPool2d(2),
nn.Flatten(),
nn.Linear(32 * 15 * 15, 10),
)

# Log model to registry
model_info = mlflow.pytorch.log_model(
model, name="pytorch_model", registered_model_name="ImageClassifier"
)

# Tag for tracking
mlflow.set_tags(
{"model_type": "cnn", "dataset": "imagenet", "framework": "pytorch"}
)

# Set alias for production deployment
client.set_registered_model_alias(
name="ImageClassifier",
alias="champion",
version=model_info.registered_model_version,
)

Distributed Training

Track distributed PyTorch training experiments:

python
import mlflow
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, TensorDataset

# Create synthetic dataset
X_train = torch.randn(1000, 784)
y_train = torch.randint(0, 10, (1000,))
train_dataset = TensorDataset(X_train, y_train)

# Simple model
model = nn.Sequential(nn.Linear(784, 128), nn.ReLU(), nn.Linear(128, 10))


def train_epoch(model, train_loader):
"""Simple training epoch for demonstration."""
model.train()
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
total_loss = 0

for data, target in train_loader:
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
total_loss += loss.item()

return total_loss / len(train_loader)


def train_distributed():
# Initialize distributed training
dist.init_process_group(backend="nccl")
rank = dist.get_rank()

# Wrap model with DDP
model_ddp = DDP(model.to(rank), device_ids=[rank])

# Create distributed sampler
from torch.utils.data.distributed import DistributedSampler

sampler = DistributedSampler(
train_dataset, num_replicas=dist.get_world_size(), rank=rank
)
train_loader = DataLoader(train_dataset, batch_size=32, sampler=sampler)

# Only log from rank 0
if rank == 0:
mlflow.start_run()
mlflow.log_params({"world_size": dist.get_world_size(), "backend": "nccl"})

# Training loop
epochs = 10
for epoch in range(epochs):
sampler.set_epoch(epoch) # Shuffle data differently each epoch
train_loss = train_epoch(model_ddp, train_loader)

# Log metrics from rank 0 only
if rank == 0:
mlflow.log_metric("train_loss", train_loss, step=epoch)

# Save model from rank 0
if rank == 0:
mlflow.pytorch.log_model(model, name="distributed_model")
mlflow.end_run()

Learn More