PyFunc in Practice
If you're looking to fully leverage the capabilities of mlflow.pyfunc
and understand how it can be utilized in a Machine Learning project, this blog post will guide you through the process. MLflow PyFunc offers creative freedom and flexibility, allowing the development of complex systems encapsulated as models in MLflow that follow the same lifecycle as traditional ones. This blog will showcase how to create multi-model setups, seamlessly connect to databases, and implement your own custom fit method in your MLflow PyFunc model.
Introduction
This blog post demonstrates the capabilities of MLflow PyFunc and how it can be utilized to build a multi-model setup encapsulated as a PyFunc flavor model in MLflow. This approach allows ensemble models to follow the same lifecycle as traditional Built-In Model Flavors in MLflow.
But first, let's use an analogy to get you familiarized with the concept of ensemble models and why you should consider this solution in your next Machine Learning project.
Imagine you are in the market to buy a house. Would you make a decision based solely on the first house you visit and the advice of a single real estate agent? Of course not! The process of buying a house involves considering multiple factors and gathering information from various sources to make an informed decision.
The house buying process explained:
- Identify Your Needs: Determine whether you want a new or used house, the type of house, the model, and the year of construction.
- Research: Look for a list of available houses, check for discounts and offers, read customer reviews, and seek opinions from friends and family.
- Evaluate: Consider the performance, location, neighborhood amenities, and price range.
- Compare: Compare multiple houses to find the best fit for your needs and budget.
In short, you wouldn’t directly reach a conclusion but would instead make a decision considering all the aforementioned factors before deciding on the best choice.
Ensemble models in Machine Learning operate on a similar idea. Ensemble learning helps improve Machine Learning results by combining several models to improve predictive performance compared to a single model. The performance increase can be due to several factors such as the reduction in variance by averaging multiple models or reducing bias by focusing on errors of previous models. There are several types of ensemble learning techniques exists such as:
- Averaging
- Weighted Averaging
- Bagging
- Boosting
- Stacking
However, developing such systems requires careful management of the lifecycle of ensemble models, as integrating diverse models can be highly complex. This is where MLflow PyFunc becomes invaluable. It offers the flexibility to build complex systems, treating the entire ensemble as a model that adheres to the same lifecycle processes as traditional models. Essentially, MLflow PyFunc allows the creation of custom methods tailored to ensemble models, serving as an alternative to the built-in MLflow flavors available for popular frameworks such as scikit-learn, PyTorch, and LangChain.
This blog utilizes the house price dataset from Kaggle to demonstrate the development and management of ensemble models through MLflow.
We will leverage various tools and technologies to highlight the capabilities of MLflow PyFunc models. Before delving into the ensemble model itself, we will explore how these components integrate to create a robust and efficient Machine Learning pipeline.
Components of the Project
DuckDB
DuckDB is a high-performance analytical database system designed to be fast, reliable, portable, and easy to use. In this project, it showcases the integration of a database connection within the model context, facilitating efficient data handling directly within the model. Learn more about DuckDB.
scikit-learn (sklearn)
scikit-learn is a Machine Learning library for Python that provides efficient tools for data analysis and modelling. In this project, it is used to develop and evaluate various Machine Learning models that are integrated into our ensemble model. Learn more about scikit-learn.
MLflow
MLflow is an open-source platform for managing the end-to-end Machine Learning lifecycle, including experimentation, reproducibility, and deployment. In this project, it tracks experiments, manages model versions, and facilitates the deployment of MLflow PyFunc models in a similar manner to how we are familiar with individual flavors. Learn more about MLflow.
Note: To reproduce this project, please refer to the official MLflow documentation for more details on setting up a simple local MLflow Tracking Server.
Creating the Ensemble Model
Creating a MLflow PyFunc ensemble model requires additional steps compared to using the built-in flavors for logging and working with popular Machine Learning frameworks.
To implement an ensemble model, you need to define an mlflow.pyfunc
model, which involves creating a Python class that inherits from the PythonModel
class and implementing its constructor and class methods. While the basic creation of a PyFunc model only requires implementing the predict
method, an ensemble model requires additional methods to manage the models and obtain multi-model predictions. After instantiating the ensemble model, you must use the custom fit
method to train the ensemble model's sub-models. Similar to an out-of-the-box MLflow model, you need to log the model along with its artifacts during the training run and then register the model in the MLflow Model Registry. A model alias production
will also be added to the model to streamline both model updates and inference. Model aliases allow you to assign a mutable, named reference to a specific version of a registered model. By assigning the alias to a particular model version, it can be easily referenced via a model URI or the model registry API. This setup allows for seamless updates to the model version used for inference without changing the serving workload code. For more details, refer to Deploy and Organize Models with Aliases and Tags.
The following sections, as depicted in the diagram, detail the implementation of each method for the ensemble model, providing a comprehensive understanding of defining, managing, and utilizing an ensemble model with MLflow PyFunc.
Before delving into the detailed implementation of each method, let's first review the skeleton of our EnsembleModel
class. This skeleton serves as a blueprint for understanding the structure of the ensemble model. The subsequent sections will provide an overview and code for both the default methods provided by MLflow PyFunc and the custom methods implemented for the ensemble model.
Here is the skeleton of the EnsembleModel
class:
import mlflow
class EnsembleModel(mlflow.pyfunc.PythonModel):
"""Ensemble model class leveraging Pyfunc for multi-model integration in MLflow."""
def __init__(self):
"""Initialize the EnsembleModel instance."""
...
def add_strategy_and_save_to_db(self):
"""Add strategies to the DuckDB database."""
...
def feature_engineering(self):
"""Perform feature engineering on input data."""
...
def initialize_models(self):
"""Initialize models and their hyperparameter grids."""
...
def fit(self):
"""Train the ensemble of models."""
...
def predict(self):
"""Predict using the ensemble of models."""
...
def load_context(self):
"""Load the preprocessor and models from the MLflow context."""
...
Initializing the EnsembleModel
The constructor method in the ensemble model is crucial for setting up its essential elements. It establishes key attributes such as the preprocessor, a dictionary to store trained models, the path to a DuckDB database, and a pandas DataFrame for managing different ensemble strategies. Additionally, it takes advantage of the initialize_models
method to define the sub-models integrated into the ensemble.
import pandas as pd
def __init__(self):
"""
Initializes the EnsembleModel instance.
Sets up an empty preprocessing pipeline, a dictionary for fitted models,
and a DataFrame to store strategies. Also calls the method to initialize sub-models.
"""
self.preprocessor = None
self.fitted_models = {}
self.db_path = None
self.strategies = pd.DataFrame(columns=["strategy", "model_list", "weights"])
self.initialize_models()
Adding Strategies and Saving to the Database
The custom-defined add_strategy_and_save_to_db
method enables the addition of new ensemble strategies to the model and their storage in a DuckDB database. This method accepts a pandas DataFrame containing the strategies and the database path as inputs. It appends the new strategies to the existing ones and saves them in the database specified during the initialization of the ensemble model. This method facilitates the management of various ensemble strategies and ensures their persistent storage for future use.
import duckdb
import pandas as pd
def add_strategy_and_save_to_db(self, strategy_df: pd.DataFrame, db_path: str) -> None:
"""Add strategies from a DataFrame and save them to the DuckDB database.
Args:
strategy_df (pd.DataFrame): DataFrame containing strategies.
db_path (str): Path to the DuckDB database.
"""
# Update the instance-level database path for the current object
self.db_path = db_path
# Attempt to concatenate new strategies with the existing DataFrame
try:
self.strategies = pd.concat([self.strategies, strategy_df], ignore_index=True)
except Exception as e:
# Print an error message if any exceptions occur during concatenation
print(f"Error concatenating DataFrames: {e}")
return # Exit early to prevent further errors
# Use context manager for the database connection
try:
with duckdb.connect(self.db_path) as con:
# Register the strategies DataFrame as a temporary table in DuckDB
con.register("strategy_df", self.strategies)
# Drop any existing strategies table and create a new one with updated strategies
con.execute("DROP TABLE IF EXISTS strategies")
con.execute("CREATE TABLE strategies AS SELECT * FROM strategy_df")
except Exception as e:
# Print an error message if any exceptions occur during database operations
print(f"Error executing database operations: {e}")
The following example demonstrates how to use this method to add strategies to the database.
import pandas as pd
# Initialize ensemble model
ensemble_model = EnsembleModel()
# Define strategies for the ensemble model
strategy_data = {
"strategy": ["average_1"],
"model_list": ["random_forest,xgboost,decision_tree,gradient_boosting,adaboost"],
"weights": ["1"],
}
# Create a DataFrame to hold the strategy information
strategies_df = pd.DataFrame(strategy_data)
# Add strategies to the database
ensemble_model.add_strategy_and_save_to_db(strategies_df, "models/strategies.db")
The DataFrame strategy_data
includes:
- strategy: The name of the strategy for model predictions.
- model_list: A comma-separated list of model names included in the strategy.
- weights: A comma-separated list of weights assigned to each model in the
model_list
. If not provided, implies equal weights or default values.
strategy | model_list | weights |
---|---|---|
average_1 | random_forest,xgboost,decision_tree,gradient_boosting,adaboost | 1 |
Feature Engineering
The feature_engineering
method preprocesses input data by handling missing values, scaling numerical features, and encoding categorical features. It applies different transformations to both numerical and categorical features, and returns the processed features as a NumPy array. This method is crucial for preparing data in a suitable format for model training, ensuring consistency and enhancing model performance.
import numpy as np
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
def feature_engineering(self, X: pd.DataFrame) -> np.ndarray:
"""
Applies feature engineering to the input data X, including imputation, scaling, and encoding.
Args:
X (pd.DataFrame): Input features with potential categorical and numerical columns.
Returns:
np.ndarray: Processed feature array after transformations.
"""
# Convert columns with 'object' dtype to 'category' dtype for proper handling of categorical features
X = X.apply(
lambda col: col.astype("category") if col.dtypes == "object" else col
)
# Identify categorical and numerical features from the DataFrame
categorical_features = X.select_dtypes(include=["category"]).columns
numerical_features = X.select_dtypes(include=["number"]).columns
# Define the pipeline for numerical features: imputation followed by scaling
numeric_transformer = Pipeline(
steps=[
(
"imputer",
SimpleImputer(strategy="median"),
), # Replace missing values with the median
(
"scaler",
StandardScaler(),
), # Standardize features by removing the mean and scaling to unit variance
]
)
# Define the pipeline for categorical features: imputation followed by one-hot encoding
categorical_transformer = Pipeline(
steps=[
(
"imputer",
SimpleImputer(strategy="most_frequent"),
), # Replace missing values with the most frequent value
(
"onehot",
OneHotEncoder(handle_unknown="ignore"),
), # Encode categorical features as a one-hot numeric array
]
)
# Create a ColumnTransformer to apply the appropriate pipelines to the respective feature types
preprocessor = ColumnTransformer(
transformers=[
(
"num",
numeric_transformer,
numerical_features,
), # Apply the numeric pipeline to numerical features
(
"cat",
categorical_transformer,
categorical_features,
), # Apply the categorical pipeline to categorical features
]
)
# Fit and transform the input data using the preprocessor
X_processed = preprocessor.fit_transform(X)
# Store the preprocessor for future use in the predict method
self.preprocessor = preprocessor
return X_processed
Initializing Models
The initialize_models
method sets up a dictionary of various Machine Learning models along with their hyperparameter grids. This includes models such as RandomForest
, XGBoost
, DecisionTree
, GradientBoosting
, and AdaBoost
. This step is crucial for preparing the ensemble’s sub-models and specifying the hyperparameters to adjust during training, ensuring that each model is configured correctly and ready for training.
from sklearn.ensemble import (
AdaBoostRegressor,
GradientBoostingRegressor,
RandomForestRegressor,
)
from sklearn.tree import DecisionTreeRegressor
from xgboost import XGBRegressor
def initialize_models(self) -> None:
"""
Initializes a dictionary of models along with their hyperparameter grids for grid search.
"""
# Define various regression models with their respective hyperparameter grids for tuning
self.models = {
"random_forest": (
RandomForestRegressor(random_state=42),
{"n_estimators": [50, 100, 200], "max_depth": [None, 10, 20]},
),
"xgboost": (
XGBRegressor(random_state=42),
{"n_estimators": [50, 100, 200], "max_depth": [3, 6, 10]},
),
"decision_tree": (
DecisionTreeRegressor(random_state=42),
{"max_depth": [None, 10, 20]},
),
"gradient_boosting": (
GradientBoostingRegressor(random_state=42),
{"n_estimators": [50, 100, 200], "max_depth": [3, 5, 7]},
),
"adaboost": (
AdaBoostRegressor(random_state=42),
{"n_estimators": [50, 100, 200], "learning_rate": [0.01, 0.1, 1.0]},
),
}
Defining a Custom fit
Method to Train and Save Multi-Models
As already highlighted in the previous method, a key feature of MLflow PyFunc models is the ability to define custom methods, providing significant flexibility and customization for various tasks. In the multi-model PyFunc setup, the fit
method is essential for customizing and optimizing multiple sub-models. It manages the training and fine-tuning of algorithms such as RandomForestRegressor
, XGBRegressor
, DecisionTreeRegressor
, GradientBoostingRegressor
, and AdaBoostRegressor
. For demonstration purposes, Grid Search is used, which, while straightforward, can be computationally intensive and time-consuming, especially for ensemble models. To enhance efficiency, advanced optimization methods such as Bayesian optimization are recommended. Tools like Optuna and Hyperopt leverage probabilistic models to intelligently navigate the search space, significantly reducing the number of evaluations needed to identify optimal configurations.
import os
import joblib
import pandas as pd
from sklearn.model_selection import GridSearchCV
def fit(
self, X_train_processed: pd.DataFrame, y_train: pd.Series, save_path: str
) -> None:
"""
Trains the ensemble of models using the provided preprocessed training data.
Args:
X_train_processed (pd.DataFrame): Preprocessed feature matrix for training.
y_train (pd.Series): Target variable for training.
save_path (str): Directory path where trained models will be saved.
"""
# Create the directory for saving models if it does not exist
os.makedirs(save_path, exist_ok=True)
# Iterate over each model and its parameter grid
for model_name, (model, param_grid) in self.models.items():
# Perform GridSearchCV to find the best hyperparameters for the current model
grid_search = GridSearchCV(
model, param_grid, cv=5, n_jobs=-1, scoring="neg_mean_squared_error"
)
grid_search.fit(
X_train_processed, y_train
) # Fit the model with the training data
# Save the best estimator from GridSearchCV
best_model = grid_search.best_estimator_
self.fitted_models[model_name] = best_model
# Save the trained model to disk
joblib.dump(best_model, os.path.join(save_path, f"{model_name}.pkl"))
Defining a Custom predict
Method to Aggregate Multi-model Predictions
To streamline the inference process, every PyFunc model should define a custom predict
method as the single entry point for inference. This approach abstracts the model's internal workings at inference time, whether dealing with a custom PyFunc model or an out-of-the-box MLflow built-in flavor for popular ML frameworks.
The custom predict
method for the ensemble model is designed to collect and combine predictions from the sub-models, supporting various aggregation strategies (e.g., average, weighted). The process involves the following steps:
- Load the sub-model predictions aggregation strategy based on the user-defined approach.
- Load the models to be used for inference.
- Preprocess the input data.
- Collect predictions from individual models.
- Aggregate the model predictions according to the specified strategy.
import duckdb
import joblib
import numpy as np
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
def predict(self, context, model_input: pd.DataFrame) -> np.ndarray:
"""
Predicts the target variable using the ensemble of models based on the selected strategy.
Args:
context: MLflow context object.
model_input (pd.DataFrame): Input features for prediction.
Returns:
np.ndarray: Array of predicted values.
Raises:
ValueError: If the strategy is unknown or no models are fitted.
"""
# Check if the 'strategy' column is present in the input DataFrame
if "strategy" in model_input.columns:
# Extract the strategy and drop it from the input features
print(f"Strategy: {model_input['strategy'].iloc[0]}")
strategy = model_input["strategy"].iloc[0]
model_input.drop(columns=["strategy"], inplace=True)
else:
# Default to 'average' strategy if none is provided
strategy = "average"
# Load the strategy details from the pre-loaded strategies DataFrame
loaded_strategy = self.strategies[self.strategies["strategy"] == strategy]
if loaded_strategy.empty:
# Raise an error if the specified strategy is not found
raise ValueError(
f"Strategy '{strategy}' not found in the pre-loaded strategies."
)
# Parse the list of models to be used for prediction
model_list = loaded_strategy["model_list"].iloc[0].split(",")
# Transform input features using the preprocessor, if available
if self.preprocessor is None:
# Feature engineering is required if the preprocessor is not set
X_processed = self.feature_engineering(model_input)
else:
# Use the existing preprocessor to transform the features
X_processed = self.preprocessor.transform(model_input)
if not self.fitted_models:
# Raise an error if no models are fitted
raise ValueError("No fitted models found. Please fit the models first.")
# Collect predictions from all models specified in the strategy
predictions = np.array(
[self.fitted_models[model].predict(X_processed) for model in model_list]
)
# Apply the specified strategy to combine the model predictions
if "average" in strategy:
# Calculate the average of predictions from all models
return np.mean(predictions, axis=0)
elif "weighted" in strategy:
# Extract weights from the strategy and normalize them
weights = [float(w) for w in loaded_strategy["weights"].iloc[0].split(",")]
weights = np.array(weights)
weights /= np.sum(weights) # Ensure weights sum to 1
# Compute the weighted average of predictions
return np.average(predictions, axis=0, weights=weights)
else:
# Raise an error if an unknown strategy is encountered
raise ValueError(f"Unknown strategy: {strategy}")