Prophet with MLflow
In this comprehensive guide, we'll explore how to use Prophet with MLflow for time series forecasting, experiment tracking, and model deployment. We'll cover everything from basic forecasting workflows to advanced business scenario modeling and production deployment patterns.
Quick Start with Prophet and MLflow
Prophet works seamlessly with MLflow to track your forecasting experiments:
import mlflow
import mlflow.prophet
import pandas as pd
import numpy as np
from prophet import Prophet
from prophet.diagnostics import cross_validation, performance_metrics
# Load sample time series data (Prophet expects 'ds' and 'y' columns)
# This example uses the classic Peyton Manning Wikipedia page views dataset
url = "https://raw.githubusercontent.com/facebook/prophet/main/examples/example_wp_log_peyton_manning.csv"
df = pd.read_csv(url)
print(f"Data shape: {df.shape}")
print(f"Date range: {df['ds'].min()} to {df['ds'].max()}")
print(f"Data preview:\n{df.head()}")
with mlflow.start_run(run_name="Basic Prophet Forecast"):
    # Create Prophet model with specific parameters
    model = Prophet(
        changepoint_prior_scale=0.05,  # Flexibility of trend changes
        seasonality_prior_scale=10,  # Strength of seasonality
        holidays_prior_scale=10,  # Strength of holiday effects
        yearly_seasonality=True,
        weekly_seasonality=True,
        daily_seasonality=False,
    )
    # Fit the model
    model.fit(df)
    # Extract and log model parameters
    def extract_prophet_params(prophet_model):
        """Extract Prophet model parameters for logging."""
        from prophet.serialize import SIMPLE_ATTRIBUTES
        params = {}
        for attr in SIMPLE_ATTRIBUTES:
            if hasattr(prophet_model, attr):
                value = getattr(prophet_model, attr)
                if isinstance(value, (int, float, str, bool)):
                    params[attr] = value
        return params
    params = extract_prophet_params(model)
    mlflow.log_params(params)
    # Create future dataframe for forecasting
    future = model.make_future_dataframe(periods=365)  # Forecast 1 year ahead
    forecast = model.predict(future)
    # Cross-validation for model evaluation
    cv_results = cross_validation(
        model,
        initial="730 days",  # Initial training period
        period="180 days",  # Spacing between cutoff dates
        horizon="365 days",  # Forecast horizon
        parallel="threads",  # Use threading for speed
    )
    # Calculate performance metrics
    metrics = performance_metrics(cv_results)
    avg_metrics = metrics[["mse", "rmse", "mae", "mape"]].mean().to_dict()
    mlflow.log_metrics(avg_metrics)
    # Log the model with input example
    mlflow.prophet.log_model(
        pr_model=model, name="prophet_model", input_example=df[["ds"]].head(10)
    )
    print(f"Model trained and logged successfully!")
    print(f"Average MAPE: {avg_metrics['mape']:.2f}%")
This example automatically captures:
- All Prophet model parameters and configuration
- Cross-validation performance metrics
- The trained model ready for deployment
- Sample input data for model documentation
Understanding Prophet's Data Requirements
Prophet has specific data format requirements that are important to understand:
Data Format and Preparation
import pandas as pd
from datetime import datetime, timedelta
def prepare_prophet_data(data, date_col, value_col, freq="D"):
    """
    Prepare data for Prophet training.
    Args:
        data: DataFrame with time series data
        date_col: Name of date column
        value_col: Name of value column
        freq: Frequency of the time series
    """
    # Prophet requires columns named 'ds' (datestamp) and 'y' (value)
    prophet_df = data[[date_col, value_col]].copy()
    prophet_df.columns = ["ds", "y"]
    # Ensure ds is datetime
    prophet_df["ds"] = pd.to_datetime(prophet_df["ds"])
    # Sort by date
    prophet_df = prophet_df.sort_values("ds").reset_index(drop=True)
    # Handle missing dates if needed
    if freq:
        full_date_range = pd.date_range(
            start=prophet_df["ds"].min(), end=prophet_df["ds"].max(), freq=freq
        )
        # Reindex to fill missing dates
        prophet_df = prophet_df.set_index("ds").reindex(full_date_range).reset_index()
        prophet_df.columns = ["ds", "y"]
        # Log data quality metrics
        missing_dates = prophet_df["y"].isna().sum()
        print(f"Missing dates filled: {missing_dates}")
    return prophet_df
# Example usage
# Assuming you have a DataFrame with 'date' and 'sales' columns
# df_prepared = prepare_prophet_data(raw_data, 'date', 'sales', freq='D')
Handling Different Time Series Patterns
Data Preparation Patterns
Multiple Time Series
def prepare_multiple_series(data, date_col, value_col, series_col):
    """Prepare multiple time series for separate Prophet models."""
    results = {}
    for series_name in data[series_col].unique():
        series_data = data[data[series_col] == series_name]
        prophet_data = prepare_prophet_data(series_data, date_col, value_col)
        results[series_name] = prophet_data
    return results
# Train separate models for each series
def train_multiple_prophet_models(series_dict):
    """Train Prophet models for multiple time series."""
    models = {}
    with mlflow.start_run(run_name="Multiple Series Forecasting"):
        for series_name, data in series_dict.items():
            with mlflow.start_run(run_name=f"Series_{series_name}", nested=True):
                model = Prophet()
                model.fit(data)
                # Log series-specific metrics
                mlflow.log_param("series_name", series_name)
                mlflow.log_param("data_points", len(data))
                models[series_name] = model
                # Log individual model
                mlflow.prophet.log_model(pr_model=model, name=f"model_{series_name}")
    return models
Irregular Time Series
def handle_irregular_timeseries(df, min_frequency="W"):
    """Handle irregular time series data."""
    # Aggregate to regular frequency if needed
    df["ds"] = pd.to_datetime(df["ds"])
    df.set_index("ds", inplace=True)
    # Resample to regular frequency
    if min_frequency == "W":
        df_regular = (
            df.resample("W")
            .agg(
                {
                    "y": "sum",  # or 'mean' depending on your use case
                }
            )
            .reset_index()
        )
    elif min_frequency == "M":
        df_regular = (
            df.resample("M")
            .agg(
                {
                    "y": "sum",
                }
            )
            .reset_index()
        )
    # Remove any remaining NaN values
    df_regular = df_regular.dropna()
    return df_regular
Advanced Prophet Configuration
Seasonality and Trend Configuration
def advanced_prophet_configuration():
    """Demonstrate advanced Prophet configuration options."""
    with mlflow.start_run(run_name="Advanced Prophet Configuration"):
        # Create Prophet model with advanced settings
        model = Prophet(
            # Trend configuration
            growth="logistic",  # or 'linear'
            changepoints=None,  # Let Prophet auto-detect, or specify dates
            n_changepoints=25,  # Number of potential changepoints
            changepoint_range=0.8,  # Proportion of history for changepoints
            changepoint_prior_scale=0.05,  # Flexibility of trend changes
            # Seasonality configuration
            yearly_seasonality="auto",  # or True/False/number
            weekly_seasonality="auto",
            daily_seasonality="auto",
            seasonality_mode="additive",  # or 'multiplicative'
            seasonality_prior_scale=10,
            # Holiday configuration
            holidays_prior_scale=10,
            # Uncertainty configuration
            interval_width=0.80,  # Width of uncertainty intervals
            uncertainty_samples=1000,  # Monte Carlo samples for uncertainty
            # Stan configuration
            mcmc_samples=0,  # Use MAP instead of MCMC
            stan_backend="CMDSTANPY",  # Stan backend
        )
        # For logistic growth, need to specify capacity
        if model.growth == "logistic":
            df["cap"] = df["y"].max() * 1.2  # Set capacity 20% above max observed
            df["floor"] = 0  # Optional floor
        # Fit the model
        model.fit(df)
        # Log configuration parameters
        config_params = {
            "growth": model.growth,
            "n_changepoints": model.n_changepoints,
            "changepoint_range": model.changepoint_range,
            "seasonality_mode": model.seasonality_mode,
            "interval_width": model.interval_width,
        }
        mlflow.log_params(config_params)
        return model
# Usage
advanced_model = advanced_prophet_configuration()
Custom Seasonalities and Events
def add_custom_components(model, df):
    """Add custom seasonalities and regressors to Prophet model."""
    with mlflow.start_run(run_name="Custom Prophet Components"):
        # Add custom seasonalities
        model.add_seasonality(
            name="monthly",
            period=30.5,  # Monthly seasonality
            fourier_order=5,  # Number of Fourier terms
        )
        model.add_seasonality(
            name="quarterly", period=91.25, fourier_order=8  # Quarterly seasonality
        )
        # Add conditional seasonalities (e.g., different patterns for weekdays/weekends)
        def is_weekend(ds):
            date = pd.to_datetime(ds)
            return date.weekday() >= 5
        df["weekend"] = df["ds"].apply(is_weekend)
        model.add_seasonality(
            name="weekend_seasonality",
            period=7,
            fourier_order=3,
            condition_name="weekend",
        )
        # Add external regressors
        # Example: Add economic indicator or marketing spend
        np.random.seed(42)
        df["marketing_spend"] = np.random.normal(1000, 200, len(df))
        df["economic_indicator"] = np.random.normal(50, 10, len(df))
        model.add_regressor("marketing_spend", prior_scale=0.5)
        model.add_regressor("economic_indicator", prior_scale=0.3)
        # Fit model with custom components
        model.fit(df)
        # Log custom component information
        mlflow.log_params(
            {
                "custom_seasonalities": ["monthly", "quarterly", "weekend_seasonality"],
                "external_regressors": ["marketing_spend", "economic_indicator"],
                "total_components": len(model.extra_seasonalities)
                + len(model.extra_regressors),
            }
        )
        return model, df
# Usage
model_with_custom = Prophet()
model_with_custom, enhanced_df = add_custom_components(model_with_custom, df.copy())
Holiday and Event Modeling
Built-in Holiday Support
from prophet.make_holidays import make_holidays_df
def add_holiday_effects():
    """Add holiday effects to Prophet model."""
    with mlflow.start_run(run_name="Holiday Modeling"):
        # Create holidays dataframe for specific countries
        holidays = make_holidays_df(
            year_list=range(2010, 2025),
            country="US",  # Built-in support for many countries
        )
        # Add custom holidays/events
        custom_events = pd.DataFrame(
            {
                "holiday": "black_friday",
                "ds": pd.to_datetime(
                    ["2020-11-27", "2021-11-26", "2022-11-25", "2023-11-24"]
                ),
                "lower_window": -1,  # Effect starts 1 day before
                "upper_window": 2,  # Effect lasts 2 days after
            }
        )
        # Combine built-in and custom holidays
        all_holidays = pd.concat([holidays, custom_events])
        # Create model with holidays
        model = Prophet(
            holidays=all_holidays,
            holidays_prior_scale=15,  # Increase for stronger holiday effects
        )
        model.fit(df)
        # Log holiday information
        mlflow.log_params(
            {
                "country_holidays": "US",
                "custom_events": ["black_friday"],
                "total_holidays": len(all_holidays),
                "holidays_prior_scale": 15,
            }
        )
        return model
Business Calendar Integration
def create_business_calendar(start_date, end_date):
    """Create business-specific calendar events."""
    business_events = []
    # Quarterly business reviews
    for year in range(start_date.year, end_date.year + 1):
        for quarter in [1, 2, 3, 4]:
            if quarter == 1:
                date = f"{year}-03-31"
            elif quarter == 2:
                date = f"{year}-06-30"
            elif quarter == 3:
                date = f"{year}-09-30"
            else:
                date = f"{year}-12-31"
            business_events.append(
                {
                    "holiday": "quarterly_review",
                    "ds": pd.to_datetime(date),
                    "lower_window": -7,  # Week before
                    "upper_window": 0,
                }
            )
    # Annual planning periods
    for year in range(start_date.year, end_date.year + 1):
        business_events.append(
            {
                "holiday": "annual_planning",
                "ds": pd.to_datetime(f"{year}-11-15"),
                "lower_window": -14,  # Two weeks of planning
                "upper_window": 14,
            }
        )
    return pd.DataFrame(business_events)
Model Validation and Performance Assessment
Cross-Validation Best Practices
def comprehensive_model_validation(model, df):
    """Perform comprehensive Prophet model validation."""
    with mlflow.start_run(run_name="Comprehensive Model Validation"):
        # Multiple cross-validation configurations
        cv_configs = [
            {
                "name": "short_horizon",
                "initial": "365 days",
                "period": "90 days",
                "horizon": "90 days",
            },
            {
                "name": "medium_horizon",
                "initial": "730 days",
                "period": "180 days",
                "horizon": "180 days",
            },
            {
                "name": "long_horizon",
                "initial": "1095 days",
                "period": "180 days",
                "horizon": "365 days",
            },
        ]
        all_metrics = {}
        for config in cv_configs:
            try:
                # Perform cross-validation
                cv_results = cross_validation(
                    model,
                    initial=config["initial"],
                    period=config["period"],
                    horizon=config["horizon"],
                    parallel="threads",
                )
                # Calculate metrics
                metrics = performance_metrics(cv_results)
                avg_metrics = metrics[["mse", "rmse", "mae", "mape", "coverage"]].mean()
                # Store metrics with configuration prefix
                for metric, value in avg_metrics.items():
                    metric_name = f"{config['name']}_{metric}"
                    all_metrics[metric_name] = value
                    mlflow.log_metric(metric_name, value)
                # Log additional statistics
                mlflow.log_metrics(
                    {
                        f"{config['name']}_cv_folds": len(cv_results),
                        f"{config['name']}_mape_std": metrics["mape"].std(),
                    }
                )
            except Exception as e:
                print(f"Cross-validation failed for {config['name']}: {e}")
                mlflow.log_param(f"{config['name']}_error", str(e))
        return all_metrics
# Usage
validation_metrics = comprehensive_model_validation(model, df)
Forecast Quality Assessment
import matplotlib.pyplot as plt
import seaborn as sns
def analyze_forecast_quality(model, df):
    """Analyze forecast quality with visualizations."""
    with mlflow.start_run(run_name="Forecast Quality Analysis"):
        # Generate forecast
        future = model.make_future_dataframe(periods=365)
        if model.growth == "logistic":
            future["cap"] = df["cap"].iloc[-1]  # Use last known capacity
            future["floor"] = df["floor"].iloc[-1] if "floor" in df.columns else 0
        forecast = model.predict(future)
        # Component analysis
        fig = model.plot_components(forecast, figsize=(12, 8))
        plt.tight_layout()
        plt.savefig("forecast_components.png", dpi=300, bbox_inches="tight")
        mlflow.log_artifact("forecast_components.png")
        plt.close()
        # Forecast plot
        fig = model.plot(forecast, figsize=(12, 6))
        plt.title("Prophet Forecast")
        plt.tight_layout()
        plt.savefig("forecast_plot.png", dpi=300, bbox_inches="tight")
        mlflow.log_artifact("forecast_plot.png")
        plt.close()
        # Residual analysis
        # Get historical predictions
        historical_forecast = forecast[forecast["ds"] <= df["ds"].max()]
        residuals = (
            df.set_index("ds")["y"] - historical_forecast.set_index("ds")["yhat"]
        )
        # Plot residuals
        fig, axes = plt.subplots(2, 2, figsize=(15, 10))
        # Residuals over time
        axes[0, 0].plot(residuals.index, residuals.values)
        axes[0, 0].set_title("Residuals Over Time")
        axes[0, 0].set_xlabel("Date")
        axes[0, 0].set_ylabel("Residual")
        # Residual distribution
        axes[0, 1].hist(residuals.values, bins=30, alpha=0.7)
        axes[0, 1].set_title("Residual Distribution")
        axes[0, 1].set_xlabel("Residual")
        axes[0, 1].set_ylabel("Frequency")
        # Q-Q plot
        from scipy import stats
        stats.probplot(residuals.values, dist="norm", plot=axes[1, 0])
        axes[1, 0].set_title("Q-Q Plot")
        # Residuals vs fitted
        axes[1, 1].scatter(historical_forecast["yhat"], residuals.values, alpha=0.6)
        axes[1, 1].set_title("Residuals vs Fitted")
        axes[1, 1].set_xlabel("Fitted Values")
        axes[1, 1].set_ylabel("Residuals")
        plt.tight_layout()
        plt.savefig("residual_analysis.png", dpi=300, bbox_inches="tight")
        mlflow.log_artifact("residual_analysis.png")
        plt.close()
        # Calculate residual statistics
        residual_stats = {
            "residual_mean": residuals.mean(),
            "residual_std": residuals.std(),
            "residual_skewness": stats.skew(residuals.values),
            "residual_kurtosis": stats.kurtosis(residuals.values),
            "ljung_box_pvalue": stats.diagnostic.acorr_ljungbox(
                residuals.values, lags=10, return_df=True
            )["lb_pvalue"].iloc[-1],
        }
        mlflow.log_metrics(residual_stats)
        return forecast, residual_stats
# Usage
forecast_analysis, residual_stats = analyze_forecast_quality(model, df)
Hyperparameter Optimization
Systematic Parameter Tuning
import itertools
from sklearn.metrics import mean_absolute_percentage_error
def optimize_prophet_hyperparameters(df, param_grid=None):
    """Systematic hyperparameter optimization for Prophet."""
    if param_grid is None:
        param_grid = {
            "changepoint_prior_scale": [0.001, 0.01, 0.1, 0.5],
            "seasonality_prior_scale": [0.01, 0.1, 1.0, 10.0],
            "holidays_prior_scale": [0.01, 0.1, 1.0, 10.0],
            "seasonality_mode": ["additive", "multiplicative"],
        }
    # Generate all parameter combinations
    param_names = list(param_grid.keys())
    param_values = list(param_grid.values())
    param_combinations = list(itertools.product(*param_values))
    results = []
    with mlflow.start_run(run_name="Prophet Hyperparameter Optimization"):
        mlflow.log_param("total_combinations", len(param_combinations))
        for i, param_combo in enumerate(param_combinations):
            param_dict = dict(zip(param_names, param_combo))
            with mlflow.start_run(run_name=f"Config_{i+1}", nested=True):
                try:
                    # Create model with current parameters
                    model = Prophet(**param_dict)
                    # Time series split for validation
                    train_size = int(len(df) * 0.8)
                    train_df = df.iloc[:train_size]
                    test_df = df.iloc[train_size:]
                    # Fit model
                    model.fit(train_df)
                    # Predict on test set
                    future = model.make_future_dataframe(periods=len(test_df))
                    if model.growth == "logistic":
                        future["cap"] = df["cap"].iloc[-1]
                    forecast = model.predict(future)
                    test_forecast = forecast.iloc[-len(test_df) :]
                    # Calculate metrics
                    mape = mean_absolute_percentage_error(
                        test_df["y"], test_forecast["yhat"]
                    )
                    mae = np.mean(np.abs(test_df["y"] - test_forecast["yhat"]))
                    rmse = np.sqrt(np.mean((test_df["y"] - test_forecast["yhat"]) ** 2))
                    # Log parameters and metrics
                    mlflow.log_params(param_dict)
                    mlflow.log_metrics(
                        {"test_mape": mape, "test_mae": mae, "test_rmse": rmse}
                    )
                    # Store results
                    result = {**param_dict, "mape": mape, "mae": mae, "rmse": rmse}
                    results.append(result)
                    print(f"Config {i+1}/{len(param_combinations)}: MAPE = {mape:.4f}")
                except Exception as e:
                    print(f"Error in configuration {i+1}: {e}")
                    mlflow.log_param("error", str(e))
        # Find best configuration
        best_result = min(results, key=lambda x: x["mape"])
        # Log best configuration
        mlflow.log_params({f"best_{k}": v for k, v in best_result.items()})
        # Train final model with best parameters
        best_params = {
            k: v for k, v in best_result.items() if k not in ["mape", "mae", "rmse"]
        }
        final_model = Prophet(**best_params)
        final_model.fit(df)
        # Log final model
        mlflow.prophet.log_model(
            pr_model=final_model,
            name="best_model",
            input_example=df[["ds"]].head(),
        )
        return final_model, best_result, results
# Usage
best_model, best_config, all_results = optimize_prophet_hyperparameters(df)
Advanced Optimization with Optuna
Bayesian Hyperparameter Optimization
import optuna
def objective(trial, df):
    """Optuna objective function for Prophet optimization."""
    # Define hyperparameter search space
    params = {
        "changepoint_prior_scale": trial.suggest_float(
            "changepoint_prior_scale", 0.001, 1.0, log=True
        ),
        "seasonality_prior_scale": trial.suggest_float(
            "seasonality_prior_scale", 0.01, 50.0, log=True
        ),
        "holidays_prior_scale": trial.suggest_float(
            "holidays_prior_scale", 0.01, 50.0, log=True
        ),
        "seasonality_mode": trial.suggest_categorical(
            "seasonality_mode", ["additive", "multiplicative"]
        ),
        "yearly_seasonality": trial.suggest_categorical(
            "yearly_seasonality", [True, False, "auto"]
        ),
        "weekly_seasonality": trial.suggest_categorical(
            "weekly_seasonality", [True, False, "auto"]
        ),
        "daily_seasonality": trial.suggest_categorical(
            "daily_seasonality", [True, False, "auto"]
        ),
    }
    with mlflow.start_run(nested=True):
        try:
            # Create and train model
            model = Prophet(**params)
            # Time series cross-validation
            cv_results = cross_validation(
                model.fit(df),
                initial="730 days",
                period="180 days",
                horizon="90 days",
                parallel="threads",
            )
            # Calculate performance metric
            metrics = performance_metrics(cv_results)
            mape = metrics["mape"].mean()
            # Log trial results
            mlflow.log_params(params)
            mlflow.log_metric("cv_mape", mape)
            return mape
        except Exception as e:
            print(f"Trial failed: {e}")
            return float("inf")
def optuna_prophet_optimization(df, n_trials=100):
    """Run Optuna optimization for Prophet."""
    with mlflow.start_run(run_name="Optuna Prophet Optimization"):
        # Create study
        study = optuna.create_study(
            direction="minimize", sampler=optuna.samplers.TPESampler(seed=42)
        )
        # Optimize
        study.optimize(
            lambda trial: objective(trial, df),
            n_trials=n_trials,
            show_progress_bar=True,
        )
        # Log best results
        best_params = study.best_params
        best_value = study.best_value
        mlflow.log_params({f"best_{k}": v for k, v in best_params.items()})
        mlflow.log_metric("best_mape", best_value)
        # Train final model
        final_model = Prophet(**best_params)
        final_model.fit(df)
        mlflow.prophet.log_model(pr_model=final_model, name="optuna_best_model")
        return final_model, study
# Usage
# optimized_model, study = optuna_prophet_optimization(df, n_trials=50)
Model Deployment and Serving
Model Loading and Prediction
def load_and_predict_prophet_model(model_uri, future_periods=30):
    """Load Prophet model and generate predictions."""
    # Load model
    loaded_model = mlflow.prophet.load_model(model_uri)
    # Generate future dataframe
    future = loaded_model.make_future_dataframe(periods=future_periods)
    # Add any required regressors or caps
    if hasattr(loaded_model, "extra_regressors") and loaded_model.extra_regressors:
        # You would need to provide values for external regressors
        # This is a simplified example
        for regressor in loaded_model.extra_regressors:
            future[regressor] = np.random.normal(1000, 100, len(future))
    if loaded_model.growth == "logistic":
        future["cap"] = 10000  # Set appropriate capacity
    # Generate predictions
    forecast = loaded_model.predict(future)
    return forecast
# Usage
# run_id = "your_run_id_here"
# model_uri = f"runs:/{run_id}/prophet_model"
# predictions = load_and_predict_prophet_model(model_uri, future_periods=365)
Production Deployment Patterns
class ProphetForecaster:
    """Production-ready Prophet forecaster class."""
    def __init__(self, model_uri):
        self.model_uri = model_uri
        self.model = None
        self.last_training_date = None
    def load_model(self):
        """Load the Prophet model."""
        self.model = mlflow.prophet.load_model(self.model_uri)
        if hasattr(self.model, "history"):
            self.last_training_date = self.model.history["ds"].max()
    def predict(self, periods=30, frequency="D", include_history=False):
        """Generate predictions."""
        if self.model is None:
            self.load_model()
        # Generate future dataframe
        future = self.model.make_future_dataframe(
            periods=periods, freq=frequency, include_history=include_history
        )
        # Handle logistic growth
        if self.model.growth == "logistic":
            future["cap"] = future.get("cap", 10000)  # Default capacity
        # Generate forecast
        forecast = self.model.predict(future)
        # Return relevant columns
        columns = ["ds", "yhat", "yhat_lower", "yhat_upper"]
        if not include_history:
            # Return only future predictions
            forecast = forecast.tail(periods)
        return forecast[columns]
    def get_components(self, periods=30):
        """Get forecast components."""
        if self.model is None:
            self.load_model()
        future = self.model.make_future_dataframe(periods=periods)
        if self.model.growth == "logistic":
            future["cap"] = future.get("cap", 10000)
        forecast = self.model.predict(future)
        # Extract components
        components = {}
        for component in ["trend", "yearly", "weekly"]:
            if component in forecast.columns:
                components[component] = forecast[["ds", component]].tail(periods)
        return components
    def check_model_freshness(self, current_date=None):
        """Check if model needs retraining."""
        if current_date is None:
            current_date = pd.Timestamp.now()
        if self.last_training_date is None:
            return False, "No training date available"
        days_since_training = (current_date - self.last_training_date).days
        # Define freshness threshold (e.g., 30 days)
        freshness_threshold = 30
        is_fresh = days_since_training < freshness_threshold
        message = f"Model is {days_since_training} days old"
        return is_fresh, message
# Usage
forecaster = ProphetForecaster("models:/ProphetForecastModel/Production")
predictions = forecaster.predict(periods=90)
components = forecaster.get_components(periods=90)
is_fresh, message = forecaster.check_model_freshness()
Batch Prediction Pipeline
def batch_prophet_predictions(model_registry_name, stage="Production"):
    """Run batch predictions for multiple time series."""
    with mlflow.start_run(run_name="Batch Prophet Predictions"):
        # Load production model
        model_uri = f"models:/{model_registry_name}/{stage}"
        model = mlflow.prophet.load_model(model_uri)
        # Generate predictions for different horizons
        horizons = [30, 90, 365]  # Days
        predictions = {}
        for horizon in horizons:
            future = model.make_future_dataframe(periods=horizon)
            if model.growth == "logistic":
                future["cap"] = 10000  # Set capacity
            forecast = model.predict(future)
            # Store predictions
            predictions[f"{horizon}_days"] = forecast[
                ["ds", "yhat", "yhat_lower", "yhat_upper"]
            ].tail(horizon)
            # Log prediction summary
            pred_summary = {
                f"{horizon}d_mean_forecast": forecast["yhat"].tail(horizon).mean(),
                f"{horizon}d_forecast_range": forecast["yhat"].tail(horizon).max()
                - forecast["yhat"].tail(horizon).min(),
            }
            mlflow.log_metrics(pred_summary)
        # Save predictions as artifacts
        for horizon, pred_df in predictions.items():
            filename = f"predictions_{horizon}.csv"
            pred_df.to_csv(filename, index=False)
            mlflow.log_artifact(filename)
        # Log batch prediction metadata
        mlflow.log_params(
            {
                "model_uri": model_uri,
                "prediction_date": pd.Timestamp.now().isoformat(),
                "horizons": horizons,
            }
        )
        return predictions
Model Monitoring and Maintenance
Forecast Accuracy Monitoring
def monitor_forecast_accuracy(model_uri, actuals_df, prediction_horizon_days=30):
    """Monitor Prophet model accuracy against actual values."""
    with mlflow.start_run(run_name="Forecast Accuracy Monitoring"):
        # Load model
        model = mlflow.prophet.load_model(model_uri)
        # Generate historical predictions for comparison
        cutoff_date = actuals_df["ds"].max() - pd.Timedelta(
            days=prediction_horizon_days
        )
        historical_data = actuals_df[actuals_df["ds"] <= cutoff_date]
        # Refit model on historical data
        temp_model = Prophet()
        temp_model.fit(historical_data)
        # Generate predictions for the monitoring period
        future = temp_model.make_future_dataframe(periods=prediction_horizon_days)
        if temp_model.growth == "logistic":
            future["cap"] = (
                historical_data["cap"].iloc[-1]
                if "cap" in historical_data.columns
                else 10000
            )
        forecast = temp_model.predict(future)
        # Get actual values for the prediction period
        actual_values = actuals_df[actuals_df["ds"] > cutoff_date]
        forecast_values = forecast[forecast["ds"] > cutoff_date]
        # Align dates
        merged = actual_values.merge(
            forecast_values[["ds", "yhat", "yhat_lower", "yhat_upper"]], on="ds"
        )
        if len(merged) > 0:
            # Calculate accuracy metrics
            mae = np.mean(np.abs(merged["y"] - merged["yhat"]))
            mape = np.mean(np.abs((merged["y"] - merged["yhat"]) / merged["y"])) * 100
            rmse = np.sqrt(np.mean((merged["y"] - merged["yhat"]) ** 2))
            # Coverage (percentage of actuals within prediction intervals)
            coverage = (
                np.mean(
                    (merged["y"] >= merged["yhat_lower"])
                    & (merged["y"] <= merged["yhat_upper"])
                )
                * 100
            )
            # Log metrics
            accuracy_metrics = {
                "monitoring_mae": mae,
                "monitoring_mape": mape,
                "monitoring_rmse": rmse,
                "prediction_coverage": coverage,
            }
            mlflow.log_metrics(accuracy_metrics)
            # Create accuracy visualization
            plt.figure(figsize=(12, 6))
            plt.plot(merged["ds"], merged["y"], label="Actual", marker="o")
            plt.plot(merged["ds"], merged["yhat"], label="Predicted", marker="s")
            plt.fill_between(
                merged["ds"],
                merged["yhat_lower"],
                merged["yhat_upper"],
                alpha=0.3,
                label="Prediction Interval",
            )
            plt.title(f"Forecast Accuracy Monitoring (MAPE: {mape:.2f}%)")
            plt.xlabel("Date")
            plt.ylabel("Value")
            plt.legend()
            plt.xticks(rotation=45)
            plt.tight_layout()
            plt.savefig("accuracy_monitoring.png", dpi=300, bbox_inches="tight")
            mlflow.log_artifact("accuracy_monitoring.png")
            plt.close()
            return accuracy_metrics
        else:
            print("No overlapping dates found for accuracy assessment")
            return None
# Usage
# accuracy_metrics = monitor_forecast_accuracy(model_uri, new_actuals_df, prediction_horizon_days=30)
Automated Model Retraining
Production Model Update Pipeline
def automated_prophet_retraining(
    current_model_name, new_data, performance_threshold_mape=10.0, min_data_points=100
):
    """Automated Prophet model retraining pipeline."""
    with mlflow.start_run(run_name="Automated Prophet Retraining"):
        # Load current production model
        current_model_uri = f"models:/{current_model_name}/Production"
        try:
            current_model = mlflow.prophet.load_model(current_model_uri)
            mlflow.log_param("current_model_loaded", True)
        except Exception as e:
            print(f"Could not load current model: {e}")
            current_model = None
            mlflow.log_param("current_model_loaded", False)
        # Data quality checks
        data_quality_passed = True
        quality_issues = []
        # Check data quantity
        if len(new_data) < min_data_points:
            data_quality_passed = False
            quality_issues.append(
                f"Insufficient data: {len(new_data)} < {min_data_points}"
            )
        # Check for missing values
        missing_values = new_data[["ds", "y"]].isnull().sum().sum()
        if missing_values > 0:
            quality_issues.append(f"Missing values found: {missing_values}")
        # Check date continuity
        new_data = new_data.sort_values("ds")
        date_gaps = pd.to_datetime(new_data["ds"]).diff().dt.days
        large_gaps = (date_gaps > 7).sum()  # Gaps larger than 7 days
        if large_gaps > 0:
            quality_issues.append(f"Large date gaps found: {large_gaps}")
        mlflow.log_params(
            {
                "data_quality_passed": data_quality_passed,
                "data_points": len(new_data),
                "quality_issues": "; ".join(quality_issues),
            }
        )
        if not data_quality_passed:
            print("Data quality checks failed. Skipping retraining.")
            return None
        # Train new model
        new_model = Prophet(
            yearly_seasonality=True,
            weekly_seasonality=True,
            daily_seasonality=False,
            changepoint_prior_scale=0.05,
        )
        new_model.fit(new_data)
        # Evaluate new model performance
        cv_results = cross_validation(
            new_model,
            initial="365 days",
            period="90 days",
            horizon="30 days",
            parallel="threads",
        )
        metrics = performance_metrics(cv_results)
        new_mape = metrics["mape"].mean()
        mlflow.log_metric("new_model_mape", new_mape)
        # Compare with current model if available
        should_deploy = True
        if current_model is not None:
            try:
                # Test current model on new data
                current_cv = cross_validation(
                    current_model,
                    initial="365 days",
                    period="90 days",
                    horizon="30 days",
                )
                current_metrics = performance_metrics(current_cv)
                current_mape = current_metrics["mape"].mean()
                mlflow.log_metric("current_model_mape", current_mape)
                # Deploy if new model is significantly better
                improvement = (current_mape - new_mape) / current_mape * 100
                mlflow.log_metric("performance_improvement_percent", improvement)
                should_deploy = improvement > 5.0  # Deploy if >5% improvement
            except Exception as e:
                print(f"Could not evaluate current model: {e}")
                should_deploy = new_mape < performance_threshold_mape
        else:
            should_deploy = new_mape < performance_threshold_mape
        mlflow.log_params(
            {
                "should_deploy": should_deploy,
                "performance_threshold": performance_threshold_mape,
            }
        )
        # Log and potentially deploy new model
        model_info = mlflow.prophet.log_model(
            pr_model=new_model,
            name="retrained_model",
            registered_model_name=current_model_name if should_deploy else None,
        )
        if should_deploy:
            # Transition to production
            client = mlflow.MlflowClient()
            latest_version = client.get_latest_versions(
                current_model_name, stages=["None"]
            )[0]
            client.transition_model_version_stage(
                name=current_model_name,
                version=latest_version.version,
                stage="Production",
            )
            print(f"New model deployed to production with MAPE: {new_mape:.2f}%")
        else:
            print(
                f"New model not deployed. MAPE: {new_mape:.2f}% did not meet criteria."
            )
        return new_model, should_deploy
Best Practices and Tips
Data Preparation Best Practices
def prophet_data_best_practices():
    """Demonstrate Prophet data preparation best practices."""
    best_practices = {
        "data_frequency": "Use consistent frequency (daily, weekly, monthly)",
        "missing_values": "Prophet handles missing values, but document them",
        "outliers": "Consider outlier detection and handling",
        "data_volume": "Minimum 2-3 seasonal cycles (2-3 years for yearly seasonality)",
        "column_names": "Always use 'ds' for dates and 'y' for values",
        "date_format": "Ensure dates are properly parsed as datetime",
        "timezone_handling": "Be consistent with timezone handling",
    }
    print("Prophet Data Preparation Best Practices:")
    for practice, description in best_practices.items():
        print(f"- {practice}: {description}")
    return best_practices
# Data validation function
def validate_prophet_data(df):
    """Validate data for Prophet modeling."""
    issues = []
    recommendations = []
    # Check required columns
    if not all(col in df.columns for col in ["ds", "y"]):
        issues.append("Missing required columns 'ds' and/or 'y'")
    # Check data types
    if "ds" in df.columns and not pd.api.types.is_datetime64_any_dtype(df["ds"]):
        issues.append("Column 'ds' is not datetime type")
        recommendations.append(
            "Convert 'ds' to datetime: df['ds'] = pd.to_datetime(df['ds'])"
        )
    # Check for sufficient data
    if len(df) < 100:
        issues.append(f"Insufficient data points: {len(df)} (recommend >100)")
    # Check for missing values
    missing_y = df["y"].isnull().sum()
    if missing_y > 0:
        recommendations.append(f"Consider handling {missing_y} missing values in 'y'")
    # Check for duplicate dates
    if "ds" in df.columns:
        duplicates = df["ds"].duplicated().sum()
        if duplicates > 0:
            issues.append(f"Found {duplicates} duplicate dates")
    # Check data range
    if "ds" in df.columns and len(df) > 0:
        date_range = (df["ds"].max() - df["ds"].min()).days
        if date_range < 365:
            recommendations.append(
                "Less than 1 year of data may limit seasonality detection"
            )
    return {
        "issues": issues,
        "recommendations": recommendations,
        "data_points": len(df),
        "date_range_days": date_range if "ds" in df.columns and len(df) > 0 else 0,
    }
# Usage
validation_results = validate_prophet_data(df)
print("Validation Results:", validation_results)
Performance Optimization
Prophet Performance Tips
def optimize_prophet_performance():
    """Tips for optimizing Prophet performance."""
    optimization_tips = {
        "parallel_processing": {
            "cross_validation": "Use parallel='threads' or 'processes' in cross_validation()",
            "multiple_models": "Use joblib.Parallel for training multiple models",
        },
        "model_configuration": {
            "mcmc_samples": "Set mcmc_samples=0 for faster MAP estimation",
            "uncertainty_samples": "Reduce uncertainty_samples for faster predictions",
            "stan_backend": "Use 'CMDSTANPY' backend for better performance",
        },
        "data_preprocessing": {
            "frequency": "Aggregate to appropriate frequency (daily vs hourly)",
            "outliers": "Remove extreme outliers before training",
            "data_size": "Consider sampling for very large datasets",
        },
    }
    return optimization_tips
# Example of parallel cross-validation
def parallel_prophet_evaluation(models_dict, df):
    """Evaluate multiple Prophet models in parallel."""
    from joblib import Parallel, delayed
    def evaluate_single_model(name, model):
        try:
            cv_results = cross_validation(
                model.fit(df),
                initial="365 days",
                period="90 days",
                horizon="30 days",
                parallel="threads",
            )
            metrics = performance_metrics(cv_results)
            return name, metrics["mape"].mean()
        except Exception as e:
            return name, float("inf")
    # Parallel evaluation
    results = Parallel(n_jobs=-1)(
        delayed(evaluate_single_model)(name, model)
        for name, model in models_dict.items()
    )
    return dict(results)
Conclusion
MLflow's Prophet integration provides a comprehensive solution for time series forecasting, experiment tracking, and model deployment. Whether you're forecasting business metrics, planning resources, or predicting future trends, the combination of Prophet's intuitive forecasting capabilities and MLflow's robust experiment management creates a powerful platform for professional time series analysis.
Key benefits of using MLflow with Prophet include:
- Simplified Forecasting Workflow: Easy model logging and experiment tracking for time series projects
- Comprehensive Validation: Built-in cross-validation and performance assessment tools
- Business-Ready Features: Holiday modeling, custom seasonalities, and interpretable components
- Production Deployment: Model registry integration with automated retraining capabilities
- Collaborative Development: Team-friendly experiment sharing and model governance
The patterns and examples in this guide provide a solid foundation for building scalable, reliable time series forecasting systems. Start with basic Prophet models for immediate insights, then gradually adopt advanced features like custom seasonalities, automated hyperparameter tuning, and production monitoring as your forecasting needs evolve.
Prophet's philosophy of making forecasting accessible to business users, combined with MLflow's enterprise-grade experiment management, creates an ideal platform for data-driven decision making through accurate, interpretable time series predictions.