"""
The ``mlflow.pmdarima`` module provides an API for logging and loading ``pmdarima`` models.
This module exports univariate ``pmdarima`` models in the following formats:
Pmdarima format
Serialized instance of a ``pmdarima`` model using pickle.
:py:mod:`mlflow.pyfunc`
Produced for use by generic pyfunc-based deployment tools and for batch auditing
of historical forecasts.
.. code-block:: python
:caption: Example
import pandas as pd
import mlflow
import mlflow.pyfunc
import pmdarima
from pmdarima import auto_arima
# Define a custom model class
class PmdarimaWrapper(mlflow.pyfunc.PythonModel):
def load_context(self, context):
self.model = context.artifacts["model"]
def predict(self, context, model_input):
return self.model.predict(n_periods=model_input.shape[0])
# Specify locations of source data and the model artifact
SOURCE_DATA = "https://raw.githubusercontent.com/facebook/prophet/master/examples/example_retail_sales.csv"
ARTIFACT_PATH = "model"
# Read data and recode columns
sales_data = pd.read_csv(SOURCE_DATA)
sales_data.rename(columns={"y": "sales", "ds": "date"}, inplace=True)
# Split the data into train/test
train_size = int(0.8 * len(sales_data))
train, _ = sales_data[:train_size], sales_data[train_size:]
# Create the model
model = pmdarima.auto_arima(train["sales"], seasonal=True, m=12)
# Log the model
with mlflow.start_run():
wrapper = PmdarimaWrapper()
mlflow.pyfunc.log_model(
name="model",
python_model=wrapper,
artifacts={"model": mlflow.pyfunc.model_to_dict(model)},
)
.. _Pmdarima:
http://alkaline-ml.com/pmdarima/
"""
import logging
import os
import pickle
import warnings
from typing import Any
import pandas as pd
import yaml
from packaging.version import Version
import mlflow
from mlflow import pyfunc
from mlflow.environment_variables import MLFLOW_ALLOW_PICKLE_DESERIALIZATION
from mlflow.exceptions import MlflowException
from mlflow.models import Model, ModelInputExample, ModelSignature
from mlflow.models.model import MLMODEL_FILE_NAME
from mlflow.models.signature import _infer_signature_from_input_example
from mlflow.models.utils import _save_example
from mlflow.protos.databricks_pb2 import INVALID_PARAMETER_VALUE
from mlflow.tracking._model_registry import DEFAULT_AWAIT_MAX_SLEEP_SECONDS
from mlflow.tracking.artifact_utils import _download_artifact_from_uri
from mlflow.utils.databricks_utils import (
is_in_databricks_model_serving_environment,
is_in_databricks_runtime,
)
from mlflow.utils.docstring_utils import LOG_MODEL_PARAM_DOCS, format_docstring
from mlflow.utils.environment import (
_CONDA_ENV_FILE_NAME,
_CONSTRAINTS_FILE_NAME,
_PYTHON_ENV_FILE_NAME,
_REQUIREMENTS_FILE_NAME,
_mlflow_conda_env,
_process_conda_env,
_process_pip_requirements,
_PythonEnv,
_validate_env_arguments,
)
from mlflow.utils.file_utils import get_total_file_size, write_to
from mlflow.utils.model_utils import (
_add_code_from_conf_to_system_path,
_copy_extra_files,
_get_flavor_configuration,
_validate_and_copy_code_paths,
_validate_and_prepare_target_save_path,
)
from mlflow.utils.requirements_utils import _get_pinned_requirement
FLAVOR_NAME = "pmdarima"
_MODEL_BINARY_KEY = "data"
_MODEL_BINARY_FILE_NAME = "model.pmd"
_MODEL_TYPE_KEY = "model_type"
_logger = logging.getLogger(__name__)
warnings.warn(
"pmdarima flavor is deprecated and will be removed in a future release",
FutureWarning,
stacklevel=2,
)
[docs]def get_default_pip_requirements():
"""
Returns:
A list of default pip requirements for MLflow Models produced by this flavor. Calls to
:func:`save_model()` and :func:`log_model()` produce a pip environment that, at a minimum,
contains these requirements.
"""
return [_get_pinned_requirement("pmdarima")]
[docs]def get_default_conda_env():
"""
Returns:
The default Conda environment for MLflow Models produced by calls to
:func:`save_model()` and :func:`log_model()`.
"""
return _mlflow_conda_env(additional_pip_deps=get_default_pip_requirements())
[docs]@format_docstring(LOG_MODEL_PARAM_DOCS.format(package_name=FLAVOR_NAME))
def save_model(
pmdarima_model,
path,
conda_env=None,
code_paths=None,
mlflow_model=None,
signature: ModelSignature = None,
input_example: ModelInputExample = None,
pip_requirements=None,
extra_pip_requirements=None,
metadata=None,
extra_files=None,
):
"""
Save a pmdarima ``ARIMA`` model or ``Pipeline`` object to a path on the local file system.
Args:
pmdarima_model: pmdarima ``ARIMA`` or ``Pipeline`` model that has been ``fit`` on a
temporal series.
path: Local path destination for the serialized model (in pickle format) is to be saved.
conda_env: {{ conda_env }}
code_paths: {{ code_paths }}
mlflow_model: :py:mod:`mlflow.models.Model` this flavor is being added to.
signature: an instance of the :py:class:`ModelSignature <mlflow.models.ModelSignature>`
class that describes the model's inputs and outputs. If not specified but an
``input_example`` is supplied, a signature will be automatically inferred
based on the supplied input example and model. To disable automatic signature
inference when providing an input example, set ``signature`` to ``False``.
To manually infer a model signature, call
:py:func:`infer_signature() <mlflow.models.infer_signature>` on datasets
with valid model inputs, such as a training dataset with the target column
omitted, and valid model outputs, like model predictions made on the training
dataset, for example:
.. code-block:: python
from mlflow.models import infer_signature
model = pmdarima.auto_arima(data)
predictions = model.predict(n_periods=30, return_conf_int=False)
signature = infer_signature(data, predictions)
.. Warning:: if utilizing confidence interval generation in the ``predict``
method of a ``pmdarima`` model (``return_conf_int=True``), the signature
will not be inferred due to the complex tuple return type when using the
native ``ARIMA.predict()`` API. ``infer_schema`` will function correctly
if using the ``pyfunc`` flavor of the model, though.
input_example: {{ input_example }}
pip_requirements: {{ pip_requirements }}
extra_pip_requirements: {{ extra_pip_requirements }}
metadata: {{ metadata }}
extra_files: {{ extra_files }}
.. code-block:: python
:caption: Example
import pandas as pd
import mlflow
import pmdarima
# Specify locations of source data and the model artifact
SOURCE_DATA = "https://raw.githubusercontent.com/facebook/prophet/master/examples/example_retail_sales.csv"
ARTIFACT_PATH = "model"
# Read data and recode columns
sales_data = pd.read_csv(SOURCE_DATA)
sales_data.rename(columns={"y": "sales", "ds": "date"}, inplace=True)
# Split the data into train/test
train_size = int(0.8 * len(sales_data))
train = sales_data[:train_size]
test = sales_data[train_size:]
with mlflow.start_run():
# Create the model
model = pmdarima.auto_arima(train["sales"], seasonal=True, m=12)
# Save the model to the specified path
mlflow.pmdarima.save_model(model, "model")
"""
import pmdarima
_validate_env_arguments(conda_env, pip_requirements, extra_pip_requirements)
path = os.path.abspath(path)
_validate_and_prepare_target_save_path(path)
code_dir_subpath = _validate_and_copy_code_paths(code_paths, path)
if mlflow_model is None:
mlflow_model = Model()
saved_example = _save_example(mlflow_model, input_example, path)
if signature is None and saved_example is not None:
wrapped_model = _PmdarimaModelWrapper(pmdarima_model)
signature = _infer_signature_from_input_example(saved_example, wrapped_model)
elif signature is False:
signature = None
if signature is not None:
mlflow_model.signature = signature
if metadata is not None:
mlflow_model.metadata = metadata
model_data_path = os.path.join(path, _MODEL_BINARY_FILE_NAME)
_save_model(pmdarima_model, model_data_path)
model_bin_kwargs = {_MODEL_BINARY_KEY: _MODEL_BINARY_FILE_NAME}
extra_files_config = _copy_extra_files(extra_files, path)
pyfunc.add_to_model(
mlflow_model,
loader_module="mlflow.pmdarima",
conda_env=_CONDA_ENV_FILE_NAME,
python_env=_PYTHON_ENV_FILE_NAME,
code=code_dir_subpath,
**model_bin_kwargs,
)
flavor_conf = {
_MODEL_TYPE_KEY: pmdarima_model.__class__.__name__,
**model_bin_kwargs,
**extra_files_config,
}
mlflow_model.add_flavor(
FLAVOR_NAME, pmdarima_version=pmdarima.__version__, code=code_dir_subpath, **flavor_conf
)
if size := get_total_file_size(path):
mlflow_model.model_size_bytes = size
mlflow_model.save(os.path.join(path, MLMODEL_FILE_NAME))
if conda_env is None:
if pip_requirements is None:
default_reqs = get_default_pip_requirements()
inferred_reqs = mlflow.models.infer_pip_requirements(
path, FLAVOR_NAME, fallback=default_reqs
)
default_reqs = sorted(set(inferred_reqs).union(default_reqs))
else:
default_reqs = None
conda_env, pip_requirements, pip_constraints = _process_pip_requirements(
default_reqs, pip_requirements, extra_pip_requirements
)
else:
conda_env, pip_requirements, pip_constraints = _process_conda_env(conda_env)
with open(os.path.join(path, _CONDA_ENV_FILE_NAME), "w") as f:
yaml.safe_dump(conda_env, stream=f, default_flow_style=False)
if pip_constraints:
write_to(os.path.join(path, _CONSTRAINTS_FILE_NAME), "\n".join(pip_constraints))
write_to(os.path.join(path, _REQUIREMENTS_FILE_NAME), "\n".join(pip_requirements))
_PythonEnv.current().to_yaml(os.path.join(path, _PYTHON_ENV_FILE_NAME))
[docs]@format_docstring(LOG_MODEL_PARAM_DOCS.format(package_name=FLAVOR_NAME))
def log_model(
pmdarima_model,
artifact_path: str | None = None,
conda_env=None,
code_paths=None,
registered_model_name=None,
signature: ModelSignature = None,
input_example: ModelInputExample = None,
await_registration_for=DEFAULT_AWAIT_MAX_SLEEP_SECONDS,
pip_requirements=None,
extra_pip_requirements=None,
metadata=None,
extra_files=None,
name: str | None = None,
params: dict[str, Any] | None = None,
tags: dict[str, Any] | None = None,
model_type: str | None = None,
step: int = 0,
model_id: str | None = None,
**kwargs,
):
"""
Logs a ``pmdarima`` ``ARIMA`` or ``Pipeline`` object as an MLflow artifact for the current run.
Args:
pmdarima_model: pmdarima ``ARIMA`` or ``Pipeline`` model that has been ``fit`` on a
temporal series.
artifact_path: Deprecated. Use `name` instead.
conda_env: {{ conda_env }}
code_paths: {{ code_paths }}
registered_model_name: If given, create a model
version under ``registered_model_name``, also creating a
registered model if one with the given name does not exist.
signature: an instance of the :py:class:`ModelSignature <mlflow.models.ModelSignature>`
class that describes the model's inputs and outputs. If not specified but an
``input_example`` is supplied, a signature will be automatically inferred
based on the supplied input example and model. To disable automatic signature
inference when providing an input example, set ``signature`` to ``False``.
To manually infer a model signature, call
:py:func:`infer_signature() <mlflow.models.infer_signature>` on datasets
with valid model inputs, such as a training dataset with the target column
omitted, and valid model outputs, like model predictions made on the training
dataset, for example:
.. code-block:: python
from mlflow.models import infer_signature
model = pmdarima.auto_arima(data)
predictions = model.predict(n_periods=30, return_conf_int=False)
signature = infer_signature(data, predictions)
.. Warning:: if utilizing confidence interval generation in the ``predict``
method of a ``pmdarima`` model (``return_conf_int=True``), the signature
will not be inferred due to the complex tuple return type when using the
native ``ARIMA.predict()`` API. ``infer_schema`` will function correctly
if using the ``pyfunc`` flavor of the model, though.
input_example: {{ input_example }}
await_registration_for: Number of seconds to wait for the model version
to finish being created and is in ``READY`` status.
By default, the function waits for five minutes.
Specify 0 or None to skip waiting.
pip_requirements: {{ pip_requirements }}
extra_pip_requirements: {{ extra_pip_requirements }}
metadata: {{ metadata }}
extra_files: {{ extra_files }}
name: {{ name }}
params: {{ params }}
tags: {{ tags }}
model_type: {{ model_type }}
step: {{ step }}
model_id: {{ model_id }}
kwargs: Additional arguments for :py:class:`mlflow.models.model.Model`
Returns:
A :py:class:`ModelInfo <mlflow.models.model.ModelInfo>` instance that contains the
metadata of the logged model.
.. code-block:: python
:caption: Example
import pandas as pd
import mlflow
from mlflow.models import infer_signature
import pmdarima
from pmdarima.metrics import smape
# Specify locations of source data and the model artifact
SOURCE_DATA = "https://raw.githubusercontent.com/facebook/prophet/master/examples/example_retail_sales.csv"
ARTIFACT_PATH = "model"
# Read data and recode columns
sales_data = pd.read_csv(SOURCE_DATA)
sales_data.rename(columns={"y": "sales", "ds": "date"}, inplace=True)
# Split the data into train/test
train_size = int(0.8 * len(sales_data))
train = sales_data[:train_size]
test = sales_data[train_size:]
with mlflow.start_run():
# Create the model
model = pmdarima.auto_arima(train["sales"], seasonal=True, m=12)
# Calculate metrics
prediction = model.predict(n_periods=len(test))
metrics = {"smape": smape(test["sales"], prediction)}
# Infer signature
input_sample = pd.DataFrame(train["sales"])
output_sample = pd.DataFrame(model.predict(n_periods=5))
signature = infer_signature(input_sample, output_sample)
# Log model
mlflow.pmdarima.log_model(model, name=ARTIFACT_PATH, signature=signature)
"""
return Model.log(
artifact_path=artifact_path,
name=name,
flavor=mlflow.pmdarima,
registered_model_name=registered_model_name,
pmdarima_model=pmdarima_model,
conda_env=conda_env,
code_paths=code_paths,
signature=signature,
input_example=input_example,
await_registration_for=await_registration_for,
pip_requirements=pip_requirements,
extra_pip_requirements=extra_pip_requirements,
metadata=metadata,
extra_files=extra_files,
params=params,
tags=tags,
model_type=model_type,
step=step,
model_id=model_id,
**kwargs,
)
[docs]def load_model(model_uri, dst_path=None):
"""
Load a ``pmdarima`` ``ARIMA`` model or ``Pipeline`` object from a local file or a run.
Args:
model_uri: The location, in URI format, of the MLflow model. For example:
- ``/Users/me/path/to/local/model``
- ``relative/path/to/local/model``
- ``s3://my_bucket/path/to/model``
- ``runs:/<mlflow_run_id>/run-relative/path/to/model``
- ``mlflow-artifacts:/path/to/model``
For more information about supported URI schemes, see
`Referencing Artifacts <https://www.mlflow.org/docs/latest/tracking.html#
artifact-locations>`_.
dst_path: The local filesystem path to which to download the model artifact.
This directory must already exist. If unspecified, a local output
path will be created.
Returns:
A ``pmdarima`` model instance
.. code-block:: python
:caption: Example
import pandas as pd
import mlflow
from mlflow.models import infer_signature
import pmdarima
from pmdarima.metrics import smape
# Specify locations of source data and the model artifact
SOURCE_DATA = "https://raw.githubusercontent.com/facebook/prophet/master/examples/example_retail_sales.csv"
ARTIFACT_PATH = "model"
# Read data and recode columns
sales_data = pd.read_csv(SOURCE_DATA)
sales_data.rename(columns={"y": "sales", "ds": "date"}, inplace=True)
# Split the data into train/test
train_size = int(0.8 * len(sales_data))
train = sales_data[:train_size]
test = sales_data[train_size:]
with mlflow.start_run():
# Create the model
model = pmdarima.auto_arima(train["sales"], seasonal=True, m=12)
# Calculate metrics
prediction = model.predict(n_periods=len(test))
metrics = {"smape": smape(test["sales"], prediction)}
# Infer signature
input_sample = pd.DataFrame(train["sales"])
output_sample = pd.DataFrame(model.predict(n_periods=5))
signature = infer_signature(input_sample, output_sample)
# Log model
input_example = input_sample.head()
model_info = mlflow.pmdarima.log_model(
model, name=ARTIFACT_PATH, signature=signature, input_example=input_example
)
# Load the model
loaded_model = mlflow.pmdarima.load_model(model_info.model_uri)
# Forecast for the next 60 days
forecast = loaded_model.predict(n_periods=60)
print(f"forecast: {forecast}")
.. code-block:: text
:caption: Output
forecast:
234 382452.397246
235 380639.458720
236 359805.611219
...
"""
local_model_path = _download_artifact_from_uri(artifact_uri=model_uri, output_path=dst_path)
flavor_conf = _get_flavor_configuration(model_path=local_model_path, flavor_name=FLAVOR_NAME)
_add_code_from_conf_to_system_path(local_model_path, flavor_conf)
pmdarima_model_file_path = os.path.join(
local_model_path, flavor_conf.get(_MODEL_BINARY_KEY, _MODEL_BINARY_FILE_NAME)
)
return _load_model(pmdarima_model_file_path)
def _save_model(model, path):
with open(path, "wb") as f:
pickle.dump(model, f)
def _load_model(path):
if (
not MLFLOW_ALLOW_PICKLE_DESERIALIZATION.get()
and not is_in_databricks_runtime()
and not is_in_databricks_model_serving_environment()
):
raise MlflowException(
"Deserializing model using pickle is disallowed, but this model is saved "
"in pickle format. The workaround is to set environment variable "
"'MLFLOW_ALLOW_PICKLE_DESERIALIZATION' to 'true'."
)
with open(path, "rb") as pickled_model:
return pickle.load(pickled_model)
def _load_pyfunc(path):
return _PmdarimaModelWrapper(_load_model(path))
class _PmdarimaModelWrapper:
def __init__(self, pmdarima_model):
import pmdarima
self.pmdarima_model = pmdarima_model
self._pmdarima_version = pmdarima.__version__
def get_raw_model(self):
"""
Returns the underlying model.
"""
return self.pmdarima_model
def predict(self, dataframe, params: dict[str, Any] | None = None) -> pd.DataFrame:
"""
Args:
dataframe: Model input data.
params: Additional parameters to pass to the model for inference.
Returns:
Model predictions.
"""
df_schema = dataframe.columns.values.tolist()
if len(dataframe) > 1:
raise MlflowException(
f"The provided prediction pd.DataFrame contains {len(dataframe)} rows. "
"Only 1 row should be supplied.",
error_code=INVALID_PARAMETER_VALUE,
)
attrs = dataframe.to_dict(orient="index").get(0)
n_periods = attrs.get("n_periods", None)
if not n_periods:
raise MlflowException(
f"The provided prediction configuration pd.DataFrame columns ({df_schema}) do not "
"contain the required column `n_periods` for specifying future prediction periods "
"to generate.",
error_code=INVALID_PARAMETER_VALUE,
)
if not isinstance(n_periods, int):
raise MlflowException(
f"The provided `n_periods` value {n_periods} must be an integer."
f"provided type: {type(n_periods)}",
error_code=INVALID_PARAMETER_VALUE,
)
# NB Any model that is trained with exogenous regressor elements will need to provide
# `X` entries as a 2D array structure to the predict method.
exogenous_regressor = attrs.get("X", None)
if exogenous_regressor and Version(self._pmdarima_version) < Version("1.8.0"):
warnings.warn(
"An exogenous regressor element was provided in column 'X'. This is "
"supported only in pmdarima version >= 1.8.0. Installed version: "
f"{self._pmdarima_version}"
)
return_conf_int = attrs.get("return_conf_int", False)
alpha = attrs.get("alpha", 0.05)
if not isinstance(n_periods, int):
raise MlflowException(
"The prediction DataFrame must contain a column `n_periods` with "
"an integer value for number of future periods to predict.",
error_code=INVALID_PARAMETER_VALUE,
)
if Version(self._pmdarima_version) >= Version("1.8.0"):
raw_predictions = self.pmdarima_model.predict(
n_periods=n_periods,
X=exogenous_regressor,
return_conf_int=return_conf_int,
alpha=alpha,
)
else:
raw_predictions = self.pmdarima_model.predict(
n_periods=n_periods,
return_conf_int=return_conf_int,
alpha=alpha,
)
if return_conf_int:
ci_low, ci_high = list(zip(*raw_predictions[1]))
predictions = pd.DataFrame.from_dict({
"yhat": raw_predictions[0],
"yhat_lower": ci_low,
"yhat_upper": ci_high,
})
else:
predictions = pd.DataFrame.from_dict({"yhat": raw_predictions})
return predictions