"""
The ``mlflow.diviner`` module provides an API for logging, saving and loading ``diviner`` models.
Diviner wraps several popular open source time series forecasting libraries in a unified API that
permits training, back-testing cross validation, and forecasting inference for groups of related
series.
This module exports groups of univariate ``diviner`` models in the following formats:
Diviner format
    Serialized instance of a ``diviner`` model type using native diviner serializers.
    (e.g., "GroupedProphet" or "GroupedPmdarima")
:py:mod:`mlflow.pyfunc`
    Produced for use by generic pyfunc-based deployment tools and for batch auditing
    of historical forecasts.
.. _Diviner:
    https://databricks-diviner.readthedocs.io/en/latest/index.html
"""
import logging
import os
import pathlib
import shutil
from typing import Any, Optional
import pandas as pd
import yaml
import mlflow
from mlflow import pyfunc
from mlflow.environment_variables import MLFLOW_DFS_TMP
from mlflow.exceptions import MlflowException
from mlflow.models import Model, ModelInputExample, ModelSignature
from mlflow.models.model import MLMODEL_FILE_NAME
from mlflow.models.signature import _infer_signature_from_input_example
from mlflow.models.utils import _save_example
from mlflow.protos.databricks_pb2 import INVALID_PARAMETER_VALUE
from mlflow.tracking._model_registry import DEFAULT_AWAIT_MAX_SLEEP_SECONDS
from mlflow.tracking.artifact_utils import _download_artifact_from_uri
from mlflow.utils.docstring_utils import LOG_MODEL_PARAM_DOCS, format_docstring
from mlflow.utils.environment import (
    _CONDA_ENV_FILE_NAME,
    _CONSTRAINTS_FILE_NAME,
    _PYTHON_ENV_FILE_NAME,
    _REQUIREMENTS_FILE_NAME,
    _mlflow_conda_env,
    _process_conda_env,
    _process_pip_requirements,
    _PythonEnv,
    _validate_env_arguments,
)
from mlflow.utils.file_utils import (
    get_total_file_size,
    shutil_copytree_without_file_permissions,
    write_to,
)
from mlflow.utils.model_utils import (
    _add_code_from_conf_to_system_path,
    _get_flavor_configuration,
    _get_flavor_configuration_from_uri,
    _validate_and_copy_code_paths,
    _validate_and_prepare_target_save_path,
)
from mlflow.utils.requirements_utils import _get_pinned_requirement
from mlflow.utils.uri import dbfs_hdfs_uri_to_fuse_path, generate_tmp_dfs_path
FLAVOR_NAME = "diviner"
_MODEL_BINARY_KEY = "data"
_MODEL_BINARY_FILE_NAME = "model.div"
_MODEL_TYPE_KEY = "model_type"
_FLAVOR_KEY = "flavors"
_SPARK_MODEL_INDICATOR = "fit_with_spark"
_logger = logging.getLogger(__name__)
[docs]def get_default_pip_requirements():
    """
    Returns:
        A list of default pip requirements for MLflow Models produced with the ``Diviner``
        flavor. Calls to :py:func:`save_model()` and :py:func:`log_model()` produce a pip
        environment that, at a minimum, contains these requirements.
    """
    return [_get_pinned_requirement("diviner")] 
[docs]def get_default_conda_env():
    """
    Returns:
        The default Conda environment for MLflow Models produced with the ``Diviner`` flavor
        that is produced by calls to :py:func:`save_model()` and :py:func:`log_model()`.
    """
    return _mlflow_conda_env(additional_pip_deps=get_default_pip_requirements()) 
[docs]@format_docstring(LOG_MODEL_PARAM_DOCS.format(package_name=FLAVOR_NAME))
def save_model(
    diviner_model,
    path,
    conda_env=None,
    code_paths=None,
    mlflow_model=None,
    signature: ModelSignature = None,
    input_example: ModelInputExample = None,
    pip_requirements=None,
    extra_pip_requirements=None,
    metadata=None,
    **kwargs,
):
    """Save a ``Diviner`` model object to a path on the local file system.
    Args:
        diviner_model: ``Diviner`` model that has been ``fit`` on a grouped temporal
            ``DataFrame``.
        path: Local path destination for the serialized model is to be saved.
        conda_env: {{ conda_env }}
        code_paths: {{ code_paths }}
        mlflow_model: :py:mod:`mlflow.models.Model` the flavor that this model is being added to.
        signature: :py:class:`Model Signature <mlflow.models.ModelSignature>` describes model
            input and output :py:class:`Schema <mlflow.types.Schema>`. The model
            signature can be :py:func:`inferred <mlflow.models.infer_signature>`
            from datasets with valid model input (e.g. the training dataset with target
            column omitted) and valid model output (e.g. model predictions generated on
            the training dataset), for example:
            .. code-block:: python
                from mlflow.models import infer_signature
                model = diviner.GroupedProphet().fit(data, ("region", "state"))
                predictions = model.predict(prediction_config)
                signature = infer_signature(data, predictions)
        input_example: {{ input_example }}
        pip_requirements: {{ pip_requirements }}
        extra_pip_requirements: {{ extra_pip_requirements }}
        metadata: {{ metadata }}
        kwargs: Optional configurations for Spark DataFrame storage iff the model has
            been fit in Spark.
            Current supported options:
            - `partition_by` for setting a (or several) partition columns as a list of \
            column names. Must be a list of strings of grouping key column(s).
            - `partition_count` for setting the number of part files to write from a \
            repartition per `partition_by` group. The default part file count is 200.
            - `dfs_tmpdir` for specifying the DFS temporary location where the model will \
            be stored while copying from a local file system to a Spark-supported "dbfs:/" \
            scheme.
    """
    import diviner
    _validate_env_arguments(conda_env, pip_requirements, extra_pip_requirements)
    path = pathlib.Path(path).absolute()
    _validate_and_prepare_target_save_path(str(path))
    # NB: When moving to native pathlib implementations, path encoding as string will not be needed.
    code_dir_subpath = _validate_and_copy_code_paths(code_paths, str(path))
    if mlflow_model is None:
        mlflow_model = Model()
    saved_example = _save_example(mlflow_model, input_example, str(path))
    if signature is None and saved_example is not None:
        wrapped_model = _DivinerModelWrapper(diviner_model)
        signature = _infer_signature_from_input_example(saved_example, wrapped_model)
    if signature is not None:
        mlflow_model.signature = signature
    if metadata is not None:
        mlflow_model.metadata = metadata
    fit_with_spark = _save_diviner_model(diviner_model, path, **kwargs)
    flavor_conf = {_SPARK_MODEL_INDICATOR: fit_with_spark}
    model_bin_kwargs = {_MODEL_BINARY_KEY: _MODEL_BINARY_FILE_NAME}
    pyfunc.add_to_model(
        mlflow_model,
        loader_module="mlflow.diviner",
        conda_env=_CONDA_ENV_FILE_NAME,
        python_env=_PYTHON_ENV_FILE_NAME,
        code=code_dir_subpath,
        **model_bin_kwargs,
    )
    flavor_conf.update({_MODEL_TYPE_KEY: diviner_model.__class__.__name__}, **model_bin_kwargs)
    mlflow_model.add_flavor(
        FLAVOR_NAME, diviner_version=diviner.__version__, code=code_dir_subpath, **flavor_conf
    )
    if size := get_total_file_size(path):
        mlflow_model.model_size_bytes = size
    mlflow_model.save(str(path.joinpath(MLMODEL_FILE_NAME)))
    if conda_env is None:
        if pip_requirements is None:
            default_reqs = get_default_pip_requirements()
            inferred_reqs = mlflow.models.infer_pip_requirements(
                str(path), FLAVOR_NAME, fallback=default_reqs
            )
            default_reqs = sorted(set(inferred_reqs).union(default_reqs))
        else:
            default_reqs = None
        conda_env, pip_requirements, pip_constraints = _process_pip_requirements(
            default_reqs, pip_requirements, extra_pip_requirements
        )
    else:
        conda_env, pip_requirements, pip_constraints = _process_conda_env(conda_env)
    with path.joinpath(_CONDA_ENV_FILE_NAME).open("w") as f:
        yaml.safe_dump(conda_env, stream=f, default_flow_style=False)
    if pip_constraints:
        write_to(str(path.joinpath(_CONSTRAINTS_FILE_NAME)), "\n".join(pip_constraints))
    write_to(str(path.joinpath(_REQUIREMENTS_FILE_NAME)), "\n".join(pip_requirements))
    _PythonEnv.current().to_yaml(str(path.joinpath(_PYTHON_ENV_FILE_NAME))) 
def _save_diviner_model(diviner_model, path, **kwargs) -> bool:
    """
    Saves a Diviner model to the specified path. If the model was fit by using a Pandas DataFrame
    for the training data submitted to `fit`, directly save the Diviner model object.
    If the Diviner model was fit by using a Spark DataFrame, save the model components separately.
    The metadata and ancillary files to write (JSON and Pandas DataFrames) are written directly
    to a fuse mount location, which the Spark DataFrame that contains the individual serialized
    Diviner model objects is written by using the 'dbfs:' scheme path that Spark recognizes.
    """
    save_path = str(path.joinpath(_MODEL_BINARY_FILE_NAME))
    if getattr(diviner_model, "_fit_with_spark", False):
        # Validate that the path is a relative path early in order to fail fast prior to attempting
        # to write the (large) DataFrame to a tmp DFS path first and raise a path validation
        # Exception within MLflow when attempting to copy the temporary write files from DFS to
        # the file system path provided.
        if not os.path.isabs(path):
            raise MlflowException(
                "The save path provided must be a relative path. "
                f"The path submitted, '{path}' is an absolute path."
            )
        # Create a temporary DFS location to write the Spark DataFrame containing the models to.
        tmp_path = generate_tmp_dfs_path(kwargs.get("dfs_tmpdir", MLFLOW_DFS_TMP.get()))
        # Save the model Spark DataFrame to the temporary DFS location
        diviner_model._save_model_df_to_path(tmp_path, **kwargs)
        diviner_data_path = os.path.abspath(save_path)
        tmp_fuse_path = dbfs_hdfs_uri_to_fuse_path(tmp_path)
        shutil.move(src=tmp_fuse_path, dst=diviner_data_path)
        # Save the model metadata to the path location
        diviner_model._save_model_metadata_components_to_path(path=diviner_data_path)
        return True
    diviner_model.save(save_path)
    return False
def _load_model_fit_in_spark(local_model_path: str, flavor_conf, **kwargs):
    """
    Loads a Diviner model that has been fit (and saved) in the Spark variant.
    """
    # NB: To load the model DataFrame (which is a Spark DataFrame), Spark requires that the file
    # partitions are in DFS. In order to facilitate this, the model DataFrame (saved as parquet)
    # will be copied to a temporary DFS location. The remaining files can be read directly from
    # the local file system path, which is handled within the Diviner APIs.
    import diviner
    dfs_temp_directory = generate_tmp_dfs_path(kwargs.get("dfs_tmpdir", MLFLOW_DFS_TMP.get()))
    dfs_fuse_directory = dbfs_hdfs_uri_to_fuse_path(dfs_temp_directory)
    os.makedirs(dfs_fuse_directory)
    shutil_copytree_without_file_permissions(src_dir=local_model_path, dst_dir=dfs_fuse_directory)
    diviner_instance = getattr(diviner, flavor_conf[_MODEL_TYPE_KEY])
    load_directory = os.path.join(dfs_fuse_directory, flavor_conf[_MODEL_BINARY_KEY])
    return diviner_instance.load(load_directory)
[docs]def load_model(model_uri, dst_path=None, **kwargs):
    """Load a ``Diviner`` object from a local file or a run.
    Args:
        model_uri: The location, in URI format, of the MLflow model. For example:
            - ``/Users/me/path/to/local/model``
            - ``relative/path/to/local/model``
            - ``s3://my_bucket/path/to/model``
            - ``runs:/<mlflow_run_id>/run-relative/path/to/model``
            - ``mlflow-artifacts:/path/to/model``
            For more information about supported URI schemes, see
            `Referencing Artifacts <https://www.mlflow.org/docs/latest/tracking.html#
            artifact-locations>`_.
        dst_path: The local filesystem path to which to download the model artifact.
            This directory must already exist if provided. If unspecified, a local output
            path will be created.
        kwargs: Optional configuration options for loading of a Diviner model. For models
            that have been fit and saved using Spark, if a specific DFS temporary directory
            is desired for loading of Diviner models, use the keyword argument
            `"dfs_tmpdir"` to define the loading temporary path for the model during loading.
    Returns:
        A ``Diviner`` model instance.
    """
    model_uri = str(model_uri)
    flavor_conf = _get_flavor_configuration_from_uri(model_uri, FLAVOR_NAME, _logger)
    local_model_path = _download_artifact_from_uri(artifact_uri=model_uri, output_path=dst_path)
    _add_code_from_conf_to_system_path(local_model_path, flavor_conf)
    if flavor_conf.get(_SPARK_MODEL_INDICATOR, False):
        return _load_model_fit_in_spark(local_model_path, flavor_conf, **kwargs)
    return _load_model(local_model_path, flavor_conf) 
def _load_model(path, flavor_conf):
    """
    Loads a Diviner model instance that was not fit using Spark from a file system location.
    """
    import diviner
    local_path = pathlib.Path(path)
    diviner_model_path = local_path.joinpath(
        flavor_conf.get(_MODEL_BINARY_KEY, _MODEL_BINARY_FILE_NAME)
    )
    diviner_instance = getattr(diviner, flavor_conf[_MODEL_TYPE_KEY])
    return diviner_instance.load(str(diviner_model_path))
def _load_pyfunc(path):
    local_path = pathlib.Path(path)
    # NB: reverting the dir walk that happens with pyfunc's loading implementation
    if local_path.is_file():
        local_path = local_path.parent
    flavor_conf = _get_flavor_configuration(local_path, FLAVOR_NAME)
    if flavor_conf.get(_SPARK_MODEL_INDICATOR):
        raise MlflowException(
            "The model being loaded was fit in Spark. Diviner models fit in "
            "Spark do not support loading as pyfunc."
        )
    return _DivinerModelWrapper(_load_model(local_path, flavor_conf))
[docs]@format_docstring(LOG_MODEL_PARAM_DOCS.format(package_name=FLAVOR_NAME))
def log_model(
    diviner_model,
    artifact_path: Optional[str] = None,
    conda_env=None,
    code_paths=None,
    registered_model_name=None,
    signature: ModelSignature = None,
    input_example: ModelInputExample = None,
    await_registration_for=DEFAULT_AWAIT_MAX_SLEEP_SECONDS,
    pip_requirements=None,
    extra_pip_requirements=None,
    metadata=None,
    name: Optional[str] = None,
    params: Optional[dict[str, Any]] = None,
    tags: Optional[dict[str, Any]] = None,
    model_type: Optional[str] = None,
    step: int = 0,
    model_id: Optional[str] = None,
    **kwargs,
):
    """Log a ``Diviner`` object as an MLflow artifact for the current run.
    Args:
        diviner_model: ``Diviner`` model that has been ``fit`` on a grouped temporal ``DataFrame``.
        artifact_path: Deprecated. Use `name` instead.
        conda_env: {{ conda_env }}
        code_paths: {{ code_paths }}
        registered_model_name: If given, create a model
            version under ``registered_model_name``, also creating a
            registered model if one with the given name does not exist.
        signature: :py:class:`Model Signature <mlflow.models.ModelSignature>` describes model
            input and output :py:class:`Schema <mlflow.types.Schema>`. The model
            signature can be :py:func:`inferred <mlflow.models.infer_signature>`
            from datasets with valid model input (e.g. the training dataset with target
            column omitted) and valid model output (e.g. model predictions generated on
            the training dataset), for example:
            .. code-block:: python
              :caption: Example
              from mlflow.models import infer_signature
              auto_arima_obj = AutoARIMA(out_of_sample_size=60, maxiter=100)
              base_auto_arima = GroupedPmdarima(model_template=auto_arima_obj).fit(
                  df=training_data,
                  group_key_columns=("region", "state"),
                  y_col="y",
                  datetime_col="ds",
                  silence_warnings=True,
              )
              predictions = model.predict(n_periods=30, alpha=0.05, return_conf_int=True)
              signature = infer_signature(data, predictions)
        input_example: {{ input_example }}
        await_registration_for: Number of seconds to wait for the model version
            to finish being created and is in ``READY`` status.
            By default, the function waits for five minutes.
            Specify 0 or None to skip waiting.
        pip_requirements: {{ pip_requirements }}
        extra_pip_requirements: {{ extra_pip_requirements }}
        metadata: {{ metadata }}
        name: {{ name }}
        params: {{ params }}
        tags: {{ tags }}
        model_type: {{ model_type }}
        step: {{ step }}
        model_id: {{ model_id }}
        kwargs: Additional arguments for :py:class:`mlflow.models.model.Model`
            Additionally, for models that have been fit in Spark, the following supported
            configuration options are available to set.
            Current supported options:
            - `partition_by` for setting a (or several) partition columns as a list of \
            column names. Must be a list of strings of grouping key column(s).
            - `partition_count` for setting the number of part files to write from a \
            repartition per `partition_by` group. The default part file count is 200.
            - `dfs_tmpdir` for specifying the DFS temporary location where the model will \
            be stored while copying from a local file system to a Spark-supported "dbfs:/" \
            scheme.
    Returns:
        A :py:class:`ModelInfo <mlflow.models.model.ModelInfo>` instance that contains the
        metadata of the logged model.
    """
    return Model.log(
        artifact_path=artifact_path,
        name=name,
        flavor=mlflow.diviner,
        registered_model_name=registered_model_name,
        diviner_model=diviner_model,
        conda_env=conda_env,
        code_paths=code_paths,
        signature=signature,
        input_example=input_example,
        await_registration_for=await_registration_for,
        pip_requirements=pip_requirements,
        extra_pip_requirements=extra_pip_requirements,
        metadata=metadata,
        params=params,
        tags=tags,
        model_type=model_type,
        step=step,
        model_id=model_id,
        **kwargs,
    ) 
class _DivinerModelWrapper:
    def __init__(self, diviner_model):
        self.diviner_model = diviner_model
    def get_raw_model(self):
        """
        Returns the underlying model.
        """
        return self.diviner_model
    def predict(self, dataframe, params: Optional[dict[str, Any]] = None) -> pd.DataFrame:
        """A method that allows a pyfunc implementation of this flavor to generate forecasted values
        from the end of a trained Diviner model's training series per group.
        The implementation here encapsulates a config-based switch of method calling. In short:
          *  If the ``DataFrame`` supplied to this method contains a column ``groups`` whose
             first row of data is of type List[tuple[str]] (containing the series-identifying
             group keys that were generated to identify a single underlying model during training),
             the caller will resolve to the method ``predict_groups()`` in each of the underlying
             wrapped libraries (i.e., ``GroupedProphet.predict_groups()``).
          *  If the ``DataFrame`` supplied does not contain the column name ``groups``, then the
             specific forecasting method that is primitive-driven (for ``GroupedProphet``, the
             ``predict()`` method mirrors that of ``Prophet``'s, requiring a ``DataFrame``
             submitted with explicit datetime values per group which is not a tenable
             implementation for pyfunc or RESTful serving) is utilized. For ``GroupedProphet``,
             this is the ``.forecast()`` method, while for ``GroupedPmdarima``, this is the
             ``.predict()`` method.
        Args:
            dataframe: A ``pandas.DataFrame`` that contains the required configuration for the
                appropriate ``Diviner`` type.
                For example, for ``GroupedProphet.forecast()``:
                - horizon : int
                - frequency: str
                predict_conf = pd.DataFrame({"horizon": 30, "frequency": "D"}, index=[0])
                forecast = pyfunc.load_pyfunc(model_uri=model_path).predict(predict_conf)
                Will generate 30 days of forecasted values for each group that the model
                was trained on.
            params: Additional parameters to pass to the model for inference.
        Returns:
            A Pandas DataFrame containing the forecasted values for each group key that was
            either trained or declared as a subset with a ``groups`` entry in the ``dataframe``
            configuration argument.
        """
        from diviner import GroupedPmdarima, GroupedProphet
        schema = dataframe.columns.values.tolist()
        conf = dataframe.to_dict(orient="index").get(0)
        # required parameter extraction and validation
        horizon = conf.get("horizon", None)
        n_periods = conf.get("n_periods", None)
        if n_periods and horizon and n_periods != horizon:
            raise MlflowException(
                "The provided prediction configuration contains both `n_periods` and `horizon` "
                "with different values. Please provide only one of these integer values.",
                error_code=INVALID_PARAMETER_VALUE,
            )
        else:
            if not n_periods and horizon:
                n_periods = horizon
        if not n_periods:
            raise MlflowException(
                "The provided prediction configuration Pandas DataFrame does not contain either "
                "the `n_periods` or `horizon` columns. At least one of these must be specified "
                f"with a valid integer value. Configuration schema: {schema}.",
                error_code=INVALID_PARAMETER_VALUE,
            )
        if not isinstance(n_periods, int):
            raise MlflowException(
                "The `n_periods` column contains invalid data. Supplied type must be an integer. "
                f"Type supplied: {type(n_periods)}",
                error_code=INVALID_PARAMETER_VALUE,
            )
        frequency = conf.get("frequency", None)
        if isinstance(self.diviner_model, GroupedProphet) and not frequency:
            raise MlflowException(
                "Diviner's GroupedProphet model requires a `frequency` value to be submitted in "
                "Pandas date_range format. The submitted configuration Pandas DataFrame does not "
                f"contain a `frequency` column. Configuration schema: {schema}.",
                error_code=INVALID_PARAMETER_VALUE,
            )
        predict_col = conf.get("predict_col", None)
        predict_groups = conf.get("groups", None)
        if predict_groups and not isinstance(predict_groups, list):
            raise MlflowException(
                "Specifying a group subset for prediction requires groups to be defined as a "
                f"[List[(Tuple|List)[<group_keys>]]. Submitted group type: {type(predict_groups)}.",
                error_code=INVALID_PARAMETER_VALUE,
            )
        # NB: json serialization of a tuple converts the tuple to a List. Diviner requires a
        # List of Tuples to be input to the group_prediction API. This conversion is for utilizing
        # the pyfunc flavor through the serving API.
        if predict_groups and not isinstance(predict_groups[0], tuple):
            predict_groups = [tuple(group) for group in predict_groups]
        if isinstance(self.diviner_model, GroupedProphet):
            # We're wrapping two different endpoints to Diviner here for the pyfunc implementation.
            # Since we're limited by a single endpoint, we can address redirecting to the
            # method ``predict_groups()`` which will allow for a subset of groups to be forecasted
            # if the prediction configuration DataFrame contains a List[tuple[str]]] in the
            # ``groups`` column. If this column is not present, all groups will be used to generate
            # forecasts, utilizing the less computationally complex method ``forecast``.
            if not predict_groups:
                prediction_df = self.diviner_model.forecast(horizon=n_periods, frequency=frequency)
            else:
                group_kwargs = {k: v for k, v in conf.items() if k in {"predict_col", "on_error"}}
                prediction_df = self.diviner_model.predict_groups(
                    groups=predict_groups, horizon=n_periods, frequency=frequency, **group_kwargs
                )
            if predict_col is not None:
                prediction_df.rename(columns={"yhat": predict_col}, inplace=True)
        elif isinstance(self.diviner_model, GroupedPmdarima):
            # As above, we're redirecting the prediction request to one of two different methods
            # for ``Diviner``'s pmdarima implementation. If the ``groups`` column is present with
            # a list of tuples of keys to lookup, ``predict_groups()`` will be used. Otherwise,
            # the standard ``predict()`` method will be called to generate forecasts for all groups
            # that were trained on.
            restricted_keys = {"n_periods", "horizon", "frequency", "groups"}
            predict_conf = {k: v for k, v in conf.items() if k not in restricted_keys}
            if not predict_groups:
                prediction_df = self.diviner_model.predict(n_periods=n_periods, **predict_conf)
            else:
                prediction_df = self.diviner_model.predict_groups(
                    groups=predict_groups, n_periods=n_periods, **predict_conf
                )
        else:
            raise MlflowException(
                f"The Diviner model instance type '{type(self.diviner_model)}' is not supported "
                f"in version {mlflow.__version__} of MLflow.",
                error_code=INVALID_PARAMETER_VALUE,
            )
        return prediction_df