from typing import Dict, Any
from types import FunctionType
import mlflow
import hashlib
import json
import os
import signal
from mlflow.entities.dataset_input import DatasetInput
from mlflow.entities.input_tag import InputTag
from mlflow.protos.databricks_pb2 import INVALID_PARAMETER_VALUE
from mlflow.tracking.client import MlflowClient
from contextlib import contextmanager
from mlflow.exceptions import MlflowException
from mlflow.utils.file_utils import TempDir
from mlflow.entities import RunTag
from mlflow.tracking.artifact_utils import _download_artifact_from_uri
from mlflow.utils import _get_fully_qualified_class_name
from mlflow.utils.annotations import developer_stable
from mlflow.utils.class_utils import _get_class_from_string
from mlflow.utils.mlflow_tags import MLFLOW_DATASET_CONTEXT
from mlflow.utils.string_utils import generate_feature_name_if_not_string
from mlflow.utils.proto_json_utils import NumpyEncoder
from mlflow.data.dataset import Dataset
from mlflow.models.evaluation.validation import (
_MetricValidationResult,
MetricThreshold,
ModelValidationFailedException,
)
import logging
import struct
import sys
import math
import urllib
import pathlib
from collections import OrderedDict
from abc import ABCMeta, abstractmethod
import operator
from decimal import Decimal
_logger = logging.getLogger(__name__)
class _ModelType:
REGRESSOR = "regressor"
CLASSIFIER = "classifier"
QUESTION_ANSWERING = "question-answering"
TEXT_SUMMARIZATION = "text-summarization"
TEXT = "text"
# TODO: Add 'retrieval' model type
def __init__(self):
raise NotImplementedError("This class is not meant to be instantiated.")
@classmethod
def values(cls):
return (
cls.REGRESSOR,
cls.CLASSIFIER,
cls.QUESTION_ANSWERING,
cls.TEXT_SUMMARIZATION,
cls.TEXT,
)
[docs]class EvaluationMetric:
'''
A model evaluation metric.
:param eval_fn:
A function that computes the metric with the following signature:
.. code-block:: python
def eval_fn(
eval_df: Union[pandas.Dataframe, pyspark.sql.DataFrame],
builtin_metrics: Dict[str, float],
) -> float:
"""
:param eval_df:
A Pandas or Spark DataFrame containing ``prediction`` and ``target`` column.
The ``prediction`` column contains the predictions made by the model.
The ``target`` column contains the corresponding labels to the predictions made
on that row.
:param builtin_metrics:
A dictionary containing the metrics calculated by the default evaluator.
The keys are the names of the metrics and the values are the scalar values of
the metrics. Refer to the DefaultEvaluator behavior section for what metrics
will be returned based on the type of model (i.e. classifier or regressor).
:return:
The metric value.
"""
...
:param name: The name of the metric.
:param greater_is_better: Whether a higher value of the metric is better.
:param long_name: (Optional) The long name of the metric. For example,
``"root_mean_squared_error"`` for ``"mse"``.
'''
def __init__(self, eval_fn, name, greater_is_better, long_name=None):
self.eval_fn = eval_fn
self.name = name
self.greater_is_better = greater_is_better
self.long_name = long_name or name
def __str__(self):
if self.long_name:
return (
f"EvaluationMetric(name={self.name}, long_name={self.long_name}, "
f"greater_is_better={self.greater_is_better})"
)
else:
return f"EvaluationMetric(name={self.name}, greater_is_better={self.greater_is_better})"
[docs]def make_metric(
*,
eval_fn,
greater_is_better,
name=None,
long_name=None,
):
'''
A factory function to create an :py:class:`EvaluationMetric` object.
:param eval_fn:
A function that computes the metric with the following signature:
.. code-block:: python
def eval_fn(
eval_df: Union[pandas.Dataframe, pyspark.sql.DataFrame],
builtin_metrics: Dict[str, float],
) -> float:
"""
:param eval_df:
A Pandas or Spark DataFrame containing ``prediction`` and ``target`` column.
The ``prediction`` column contains the predictions made by the model.
The ``target`` column contains the corresponding labels to the predictions made
on that row.
:param builtin_metrics:
A dictionary containing the metrics calculated by the default evaluator.
The keys are the names of the metrics and the values are the scalar values of
the metrics. Refer to the DefaultEvaluator behavior section for what metrics
will be returned based on the type of model (i.e. classifier or regressor).
:return:
The metric value.
"""
...
:param greater_is_better: Whether a higher value of the metric is better.
:param name: The name of the metric. This argument must be specified if ``eval_fn`` is a lambda
function or the ``eval_fn.__name__`` attribute is not available.
:param long_name: (Optional) The long name of the metric. For example, ``"mean_squared_error"``
for ``"mse"``.
.. seealso::
- :py:class:`mlflow.models.EvaluationMetric`
- :py:func:`mlflow.evaluate`
'''
if name is None:
if isinstance(eval_fn, FunctionType) and eval_fn.__name__ == "<lambda>":
raise MlflowException(
"`name` must be specified if `eval_fn` is a lambda function.",
INVALID_PARAMETER_VALUE,
)
if not hasattr(eval_fn, "__name__"):
raise MlflowException(
"`name` must be specified if `eval_fn` does not have a `__name__` attribute.",
INVALID_PARAMETER_VALUE,
)
name = eval_fn.__name__
return EvaluationMetric(eval_fn, name, greater_is_better, long_name)
[docs]@developer_stable
class EvaluationArtifact(metaclass=ABCMeta):
"""
A model evaluation artifact containing an artifact uri and content.
"""
def __init__(self, uri, content=None):
self._uri = uri
self._content = content
@abstractmethod
def _load_content_from_file(self, local_artifact_path):
"""
Abstract interface to load the content from local artifact file path,
and return the loaded content.
"""
pass
def _load(self, local_artifact_path=None):
"""
If ``local_artifact_path`` is ``None``, download artifact from the artifact uri.
Otherwise, load artifact content from the specified path. Assign the loaded content to
``self._content``, and return the loaded content.
"""
if local_artifact_path is not None:
self._content = self._load_content_from_file(local_artifact_path)
else:
with TempDir() as temp_dir:
temp_dir_path = temp_dir.path()
_download_artifact_from_uri(self._uri, temp_dir_path)
local_artifact_file = temp_dir.path(os.listdir(temp_dir_path)[0])
self._content = self._load_content_from_file(local_artifact_file)
return self._content
@abstractmethod
def _save(self, output_artifact_path):
"""Save artifact content into specified path."""
pass
@property
def content(self):
"""
The content of the artifact (representation varies)
"""
if self._content is None:
self._load()
return self._content
@property
def uri(self) -> str:
"""
The URI of the artifact
"""
return self._uri
def __repr__(self):
return f"{self.__class__.__name__}(uri='{self.uri}')"
[docs]class EvaluationResult:
"""
Represents the model evaluation outputs of a `mlflow.evaluate()` API call, containing
both scalar metrics and output artifacts such as performance plots.
"""
def __init__(self, metrics, artifacts, baseline_model_metrics=None):
self._metrics = metrics
self._artifacts = artifacts
self._baseline_model_metrics = baseline_model_metrics if baseline_model_metrics else {}
[docs] @classmethod
def load(cls, path):
"""Load the evaluation results from the specified local filesystem path"""
with open(os.path.join(path, "metrics.json")) as fp:
metrics = json.load(fp)
with open(os.path.join(path, "artifacts_metadata.json")) as fp:
artifacts_metadata = json.load(fp)
artifacts = {}
artifacts_dir = os.path.join(path, "artifacts")
for artifact_name, meta in artifacts_metadata.items():
uri = meta["uri"]
ArtifactCls = _get_class_from_string(meta["class_name"])
artifact = ArtifactCls(uri=uri)
filename = pathlib.Path(urllib.parse.urlparse(uri).path).name
artifact._load(os.path.join(artifacts_dir, filename))
artifacts[artifact_name] = artifact
return EvaluationResult(metrics=metrics, artifacts=artifacts)
[docs] def save(self, path):
"""Write the evaluation results to the specified local filesystem path"""
os.makedirs(path, exist_ok=True)
with open(os.path.join(path, "metrics.json"), "w") as fp:
json.dump(self.metrics, fp, cls=NumpyEncoder)
artifacts_metadata = {
artifact_name: {
"uri": artifact.uri,
"class_name": _get_fully_qualified_class_name(artifact),
}
for artifact_name, artifact in self.artifacts.items()
}
with open(os.path.join(path, "artifacts_metadata.json"), "w") as fp:
json.dump(artifacts_metadata, fp)
artifacts_dir = os.path.join(path, "artifacts")
os.makedirs(artifacts_dir, exist_ok=True)
for artifact in self.artifacts.values():
filename = pathlib.Path(urllib.parse.urlparse(artifact.uri).path).name
artifact._save(os.path.join(artifacts_dir, filename))
@property
def metrics(self) -> Dict[str, Any]:
"""
A dictionary mapping scalar metric names to scalar metric values
"""
return self._metrics
@property
def artifacts(self) -> Dict[str, "mlflow.models.EvaluationArtifact"]:
"""
A dictionary mapping standardized artifact names (e.g. "roc_data") to
artifact content and location information
"""
return self._artifacts
@property
def baseline_model_metrics(self) -> Dict[str, Any]:
"""
A dictionary mapping scalar metric names to scalar metric values for the baseline model
"""
return self._baseline_model_metrics
_cached_mlflow_client = None
def _hash_uint64_ndarray_as_bytes(array):
assert len(array.shape) == 1
# see struct pack format string https://docs.python.org/3/library/struct.html#format-strings
return struct.pack(f">{array.size}Q", *array)
def _hash_ndarray_as_bytes(nd_array):
from pandas.util import hash_array
import numpy as np
return _hash_uint64_ndarray_as_bytes(
hash_array(nd_array.flatten(order="C"))
) + _hash_uint64_ndarray_as_bytes(np.array(nd_array.shape, dtype="uint64"))
def _hash_array_like_obj_as_bytes(data):
"""
Helper method to convert pandas dataframe/numpy array/list into bytes for
MD5 calculation purpose.
"""
from pandas.util import hash_pandas_object
import numpy as np
import pandas as pd
if isinstance(data, pd.DataFrame):
# add checking `'pyspark' in sys.modules` to avoid importing pyspark when user
# run code not related to pyspark.
if "pyspark" in sys.modules:
from pyspark.ml.linalg import Vector as spark_vector_type
else:
spark_vector_type = None
def _hash_array_like_element_as_bytes(v):
if spark_vector_type is not None:
if isinstance(v, spark_vector_type):
return _hash_ndarray_as_bytes(v.toArray())
if isinstance(v, np.ndarray):
return _hash_ndarray_as_bytes(v)
if isinstance(v, list):
return _hash_ndarray_as_bytes(np.array(v))
return v
data = data.applymap(_hash_array_like_element_as_bytes)
return _hash_uint64_ndarray_as_bytes(hash_pandas_object(data))
elif isinstance(data, np.ndarray):
return _hash_ndarray_as_bytes(data)
elif isinstance(data, list):
return _hash_ndarray_as_bytes(np.array(data))
else:
raise ValueError("Unsupported data type.")
def _gen_md5_for_arraylike_obj(md5_gen, data):
"""
Helper method to generate MD5 hash array-like object, the MD5 will calculate over:
- array length
- first NUM_SAMPLE_ROWS_FOR_HASH rows content
- last NUM_SAMPLE_ROWS_FOR_HASH rows content
"""
import numpy as np
len_bytes = _hash_uint64_ndarray_as_bytes(np.array([len(data)], dtype="uint64"))
md5_gen.update(len_bytes)
if len(data) < EvaluationDataset.NUM_SAMPLE_ROWS_FOR_HASH * 2:
md5_gen.update(_hash_array_like_obj_as_bytes(data))
else:
head_rows = data[: EvaluationDataset.NUM_SAMPLE_ROWS_FOR_HASH]
tail_rows = data[-EvaluationDataset.NUM_SAMPLE_ROWS_FOR_HASH :]
md5_gen.update(_hash_array_like_obj_as_bytes(head_rows))
md5_gen.update(_hash_array_like_obj_as_bytes(tail_rows))
[docs]class EvaluationDataset:
"""
An input dataset for model evaluation. This is intended for use with the
:py:func:`mlflow.models.evaluate()`
API.
"""
NUM_SAMPLE_ROWS_FOR_HASH = 5
SPARK_DATAFRAME_LIMIT = 10000
def __init__(self, data, *, targets=None, name=None, path=None, feature_names=None):
"""
The values of the constructor arguments comes from the `evaluate` call.
"""
import numpy as np
import pandas as pd
if name is not None and '"' in name:
raise MlflowException(
message=f'Dataset name cannot include a double quote (") but got {name}',
error_code=INVALID_PARAMETER_VALUE,
)
if path is not None and '"' in path:
raise MlflowException(
message=f'Dataset path cannot include a double quote (") but got {path}',
error_code=INVALID_PARAMETER_VALUE,
)
self._user_specified_name = name
self._path = path
self._hash = None
self._supported_dataframe_types = (pd.DataFrame,)
self._spark_df_type = None
self._labels_data = None
self._targets_name = None
self._has_targets = False
try:
# add checking `'pyspark' in sys.modules` to avoid importing pyspark when user
# run code not related to pyspark.
if "pyspark" in sys.modules:
from pyspark.sql import DataFrame as SparkDataFrame
self._supported_dataframe_types = (pd.DataFrame, SparkDataFrame)
self._spark_df_type = SparkDataFrame
except ImportError:
pass
if feature_names is not None and len(set(feature_names)) < len(list(feature_names)):
raise MlflowException(
message="`feature_names` argument must be a list containing unique feature names.",
error_code=INVALID_PARAMETER_VALUE,
)
has_targets = targets is not None
if has_targets:
self._has_targets = True
if isinstance(data, (np.ndarray, list)):
if has_targets and not isinstance(targets, (np.ndarray, list)):
raise MlflowException(
message="If data is a numpy array or list of evaluation features, "
"`targets` argument must be a numpy array or list of evaluation labels.",
error_code=INVALID_PARAMETER_VALUE,
)
shape_message = (
"If the `data` argument is a numpy array, it must be a 2 dimension "
"array and second dimension represent the number of features. If the `data` "
"argument is a list, each of its element must be a feature array of "
"numpy array or list and all element must has the same length."
)
if isinstance(data, list):
try:
data = np.array(data)
except ValueError as e:
raise MlflowException(
message=shape_message, error_code=INVALID_PARAMETER_VALUE
) from e
if len(data.shape) != 2:
raise MlflowException(
message=shape_message,
error_code=INVALID_PARAMETER_VALUE,
)
self._features_data = data
if has_targets:
self._labels_data = (
targets if isinstance(targets, np.ndarray) else np.array(targets)
)
if len(self._features_data) != len(self._labels_data):
raise MlflowException(
message="The input features example rows must be the same length "
"with labels array.",
error_code=INVALID_PARAMETER_VALUE,
)
num_features = data.shape[1]
if feature_names is not None:
feature_names = list(feature_names)
if num_features != len(feature_names):
raise MlflowException(
message="feature name list must be the same length with feature data.",
error_code=INVALID_PARAMETER_VALUE,
)
self._feature_names = feature_names
else:
self._feature_names = [
f"feature_{str(i + 1).zfill(math.ceil((math.log10(num_features + 1))))}"
for i in range(num_features)
]
elif isinstance(data, self._supported_dataframe_types):
if has_targets and not isinstance(targets, str):
raise MlflowException(
message="If data is a Pandas DataFrame or Spark DataFrame, `targets` argument "
"must be the name of the column which contains evaluation labels in the `data` "
"dataframe.",
error_code=INVALID_PARAMETER_VALUE,
)
if self._spark_df_type and isinstance(data, self._spark_df_type):
if data.count() > EvaluationDataset.SPARK_DATAFRAME_LIMIT:
_logger.warning(
"Specified Spark DataFrame is too large for model evaluation. Only "
f"the first {EvaluationDataset.SPARK_DATAFRAME_LIMIT} rows will be used. "
"If you want evaluate on the whole spark dataframe, please manually call "
"`spark_dataframe.toPandas()`."
)
data = data.limit(EvaluationDataset.SPARK_DATAFRAME_LIMIT).toPandas()
if has_targets:
self._labels_data = data[targets].to_numpy()
self._targets_name = targets
if feature_names is not None:
self._features_data = data[list(feature_names)]
self._feature_names = feature_names
else:
if has_targets:
self._features_data = data.drop(targets, axis=1, inplace=False)
else:
self._features_data = data
self._feature_names = [
generate_feature_name_if_not_string(c) for c in self._features_data.columns
]
else:
raise MlflowException(
message="The data argument must be a numpy array, a list or a Pandas DataFrame, or "
"spark DataFrame if pyspark package installed.",
error_code=INVALID_PARAMETER_VALUE,
)
# generate dataset hash
md5_gen = hashlib.md5()
_gen_md5_for_arraylike_obj(md5_gen, self._features_data)
if self._labels_data is not None:
_gen_md5_for_arraylike_obj(md5_gen, self._labels_data)
md5_gen.update(",".join(list(map(str, self._feature_names))).encode("UTF-8"))
self._hash = md5_gen.hexdigest()
@property
def feature_names(self):
return self._feature_names
@property
def features_data(self):
"""
return features data as a numpy array or a pandas DataFrame.
"""
return self._features_data
@property
def labels_data(self):
"""
return labels data as a numpy array
"""
return self._labels_data
@property
def has_targets(self):
"""
Returns True if the dataset has targets, False otherwise.
"""
return self._has_targets
@property
def targets_name(self):
"""
return targets name
"""
return self._targets_name
@property
def name(self):
"""
Dataset name, which is specified dataset name or the dataset hash if user don't specify
name.
"""
return self._user_specified_name if self._user_specified_name is not None else self.hash
@property
def path(self):
"""
Dataset path
"""
return self._path
@property
def hash(self):
"""
Dataset hash, includes hash on first 20 rows and last 20 rows.
"""
return self._hash
@property
def _metadata(self):
"""
Return dataset metadata containing name, hash, and optional path.
"""
metadata = {
"name": self.name,
"hash": self.hash,
}
if self.path is not None:
metadata["path"] = self.path
return metadata
def _log_dataset_tag(self, client, run_id, model_uuid):
"""
Log dataset metadata as a tag "mlflow.datasets", if the tag already exists, it will
append current dataset metadata into existing tag content.
"""
existing_dataset_metadata_str = client.get_run(run_id).data.tags.get(
"mlflow.datasets", "[]"
)
dataset_metadata_list = json.loads(existing_dataset_metadata_str)
for metadata in dataset_metadata_list:
if (
metadata["hash"] == self.hash
and metadata["name"] == self.name
and metadata["model"] == model_uuid
):
break
else:
dataset_metadata_list.append({**self._metadata, "model": model_uuid})
dataset_metadata_str = json.dumps(dataset_metadata_list, separators=(",", ":"))
client.log_batch(
run_id,
tags=[RunTag("mlflow.datasets", dataset_metadata_str)],
)
def __hash__(self):
return hash(self.hash)
def __eq__(self, other):
import numpy as np
if not isinstance(other, EvaluationDataset):
return False
if isinstance(self._features_data, np.ndarray):
is_features_data_equal = np.array_equal(self._features_data, other._features_data)
else:
is_features_data_equal = self._features_data.equals(other._features_data)
return (
is_features_data_equal
and np.array_equal(self._labels_data, other._labels_data)
and self.name == other.name
and self.path == other.path
and self._feature_names == other._feature_names
)
@developer_stable
class ModelEvaluator(metaclass=ABCMeta):
@abstractmethod
def can_evaluate(self, *, model_type, evaluator_config, **kwargs) -> bool:
"""
:param model_type: A string describing the model type (e.g., "regressor", "classifier", …).
:param evaluator_config: A dictionary of additional configurations for
the evaluator.
:param kwargs: For forwards compatibility, a placeholder for additional arguments
that may be added to the evaluation interface in the future.
:return: True if the evaluator can evaluate the specified model on the
specified dataset. False otherwise.
"""
raise NotImplementedError()
@abstractmethod
def evaluate(
self,
*,
model,
model_type,
dataset,
run_id,
evaluator_config,
custom_metrics=None,
custom_artifacts=None,
baseline_model=None,
**kwargs,
):
"""
The abstract API to log metrics and artifacts, and return evaluation results.
:param model: A pyfunc model instance, used as the candidate_model
to be compared with baseline_model (specified by the `baseline_model` param)
for model validation.
:param model_type: A string describing the model type
(e.g., ``"regressor"``, ``"classifier"``, …).
:param dataset: An instance of `mlflow.models.evaluation.base._EvaluationDataset`
containing features and labels (optional) for model evaluation.
:param run_id: The ID of the MLflow Run to which to log results.
:param evaluator_config: A dictionary of additional configurations for
the evaluator.
:param custom_metrics: A list of :py:class:`EvaluationMetric` objects.
:param custom_artifacts: A list of callable custom artifact functions.
:param kwargs: For forwards compatibility, a placeholder for additional arguments that
may be added to the evaluation interface in the future.
:param baseline_model: (Optional) A string URI referring to a MLflow model with the pyfunc
flavor as a baseline model to be compared with the
candidate model (specified by the `model` param) for model
validation. (pyfunc model instance is not allowed)
:return: A :py:class:`mlflow.models.EvaluationResult` instance containing
evaluation metrics for candidate model and baseline model and
artifacts for candidate model.
"""
raise NotImplementedError()
[docs]def list_evaluators():
"""
Return a name list for all available Evaluators.
"""
# import _model_evaluation_registry inside function to avoid circuit importing
from mlflow.models.evaluation.evaluator_registry import _model_evaluation_registry
return list(_model_evaluation_registry._registry.keys())
@contextmanager
def _start_run_or_reuse_active_run():
"""
A manager context return:
- If there's an active run, return the active run id.
- otherwise start a mflow run with the specified run_id,
if specified run_id is None, start a new run.
"""
active_run = mlflow.active_run()
if not active_run:
# Note `mlflow.start_run` throws if `run_id` is not found.
with mlflow.start_run() as run:
yield run.info.run_id
else:
yield active_run.info.run_id
def _normalize_evaluators_and_evaluator_config_args(
evaluators,
evaluator_config,
):
from mlflow.models.evaluation.evaluator_registry import _model_evaluation_registry
def check_nesting_config_dict(_evaluator_name_list, _evaluator_name_to_conf_map):
return isinstance(_evaluator_name_to_conf_map, dict) and all(
k in _evaluator_name_list and isinstance(v, dict)
for k, v in _evaluator_name_to_conf_map.items()
)
if evaluators is None:
evaluator_name_list = list(_model_evaluation_registry._registry.keys())
if len(evaluator_name_list) > 1:
_logger.warning(
f"Multiple registered evaluators are found {evaluator_name_list} and "
"they will all be used in evaluation if they support the specified model type. "
"If you want to evaluate with one evaluator, specify the `evaluator` argument "
"and optionally specify the `evaluator_config` argument."
)
if evaluator_config is not None:
conf_dict_value_error = MlflowException(
message="If `evaluators` argument is None, all available evaluators will be used. "
"If only the default evaluator is available, the `evaluator_config` argument is "
"interpreted as the config dictionary for the default evaluator. Otherwise, the "
"`evaluator_config` argument must be a dictionary mapping each evaluator's name "
"to its own evaluator config dictionary.",
error_code=INVALID_PARAMETER_VALUE,
)
if evaluator_name_list == ["default"]:
if not isinstance(evaluator_config, dict):
raise conf_dict_value_error
elif "default" not in evaluator_config:
evaluator_name_to_conf_map = {"default": evaluator_config}
else:
evaluator_name_to_conf_map = evaluator_config
else:
if not check_nesting_config_dict(evaluator_name_list, evaluator_config):
raise conf_dict_value_error
evaluator_name_to_conf_map = evaluator_config
else:
evaluator_name_to_conf_map = {}
elif isinstance(evaluators, str):
if not (evaluator_config is None or isinstance(evaluator_config, dict)):
raise MlflowException(
message="If `evaluators` argument is the name of an evaluator, evaluator_config"
" must be None or a dict containing config items for the evaluator.",
error_code=INVALID_PARAMETER_VALUE,
)
evaluator_name_list = [evaluators]
evaluator_name_to_conf_map = {evaluators: evaluator_config}
elif isinstance(evaluators, list):
if evaluator_config is not None:
if not check_nesting_config_dict(evaluators, evaluator_config):
raise MlflowException(
message="If `evaluators` argument is an evaluator name list, evaluator_config "
"must be a dict contains mapping from evaluator name to individual "
"evaluator config dict.",
error_code=INVALID_PARAMETER_VALUE,
)
# Use `OrderedDict.fromkeys` to deduplicate elements but keep elements order.
evaluator_name_list = list(OrderedDict.fromkeys(evaluators))
evaluator_name_to_conf_map = evaluator_config or {}
else:
raise MlflowException(
message="`evaluators` argument must be None, an evaluator name string, or a list of "
"evaluator names.",
error_code=INVALID_PARAMETER_VALUE,
)
return evaluator_name_list, evaluator_name_to_conf_map
def _model_validation_contains_model_comparison(validation_thresholds):
"""
Helper function for determining if validation_thresholds contains
thresholds for model comparsion: either min_relative_change or min_absolute_change
"""
if not validation_thresholds:
return False
thresholds = validation_thresholds.values()
return any(
threshold.min_relative_change or threshold.min_absolute_change for threshold in thresholds
)
_last_failed_evaluator = None
def _get_last_failed_evaluator():
"""
Return the evaluator name of the last failed evaluator when calling `evaluate`.
This can be used to check which evaluator fail when `evaluate` API fail.
"""
return _last_failed_evaluator
def _validate(validation_thresholds, candidate_metrics, baseline_metrics=None):
"""
Validate the model based on validation_thresholds by metrics value and
metrics comparison between candidate model's metrics (candidate_metrics) and
baseline model's metrics (baseline_metrics).
:param validation_thresholds: A dictionary from metric_name to MetricThreshold.
:param candidate_metrics: The metric evaluation result of the candidate model.
:param baseline_metrics: The metric evaluation result of the baseline model.
If the validation does not pass, raise an MlflowException with detail failure message.
"""
if not baseline_metrics:
baseline_metrics = {}
validation_results = {
metric_name: _MetricValidationResult(
metric_name,
candidate_metrics.get(metric_name, None),
threshold,
baseline_metrics.get(metric_name, None),
)
for (metric_name, threshold) in validation_thresholds.items()
}
for metric_name in validation_thresholds.keys():
metric_threshold, validation_result = (
validation_thresholds[metric_name],
validation_results[metric_name],
)
if metric_name not in candidate_metrics:
validation_result.missing_candidate = True
continue
candidate_metric_value, baseline_metric_value = (
candidate_metrics[metric_name],
baseline_metrics[metric_name] if baseline_metrics else None,
)
# If metric is higher is better, >= is used, otherwise <= is used
# for thresholding metric value and model comparsion
comparator_fn = operator.__ge__ if metric_threshold.greater_is_better else operator.__le__
operator_fn = operator.add if metric_threshold.greater_is_better else operator.sub
if metric_threshold.threshold is not None:
# metric threshold fails
# - if not (metric_value >= threshold) for higher is better
# - if not (metric_value <= threshold) for lower is better
validation_result.threshold_failed = not comparator_fn(
candidate_metric_value, metric_threshold.threshold
)
if (
metric_threshold.min_relative_change or metric_threshold.min_absolute_change
) and metric_name not in baseline_metrics:
validation_result.missing_baseline = True
continue
if metric_threshold.min_absolute_change is not None:
# metric comparsion aboslute change fails
# - if not (metric_value >= baseline + min_absolute_change) for higher is better
# - if not (metric_value <= baseline - min_absolute_change) for lower is better
validation_result.min_absolute_change_failed = not comparator_fn(
Decimal(candidate_metric_value),
Decimal(operator_fn(baseline_metric_value, metric_threshold.min_absolute_change)),
)
if metric_threshold.min_relative_change is not None:
# If baseline metric value equals 0, fallback to simple comparison check
if baseline_metric_value == 0:
_logger.warning(
f"Cannot perform relative model comparison for metric {metric_name} as "
"baseline metric value is 0. Falling back to simple comparison: verifying "
"that candidate metric value is better than the baseline metric value."
)
validation_result.min_relative_change_failed = not comparator_fn(
Decimal(candidate_metric_value),
Decimal(operator_fn(baseline_metric_value, 1e-10)),
)
continue
# metric comparsion relative change fails
# - if (metric_value - baseline) / baseline < min_relative_change for higher is better
# - if (baseline - metric_value) / baseline < min_relative_change for lower is better
if metric_threshold.greater_is_better:
relative_change = (
candidate_metric_value - baseline_metric_value
) / baseline_metric_value
else:
relative_change = (
baseline_metric_value - candidate_metric_value
) / baseline_metric_value
validation_result.min_relative_change_failed = (
relative_change < metric_threshold.min_relative_change
)
failure_messages = []
for metric_validation_result in validation_results.values():
if metric_validation_result.is_success():
continue
failure_messages.append(str(metric_validation_result))
if not failure_messages:
return
raise ModelValidationFailedException(message=os.linesep.join(failure_messages))
def _evaluate(
*,
model,
model_type,
dataset,
run_id,
evaluator_name_list,
evaluator_name_to_conf_map,
custom_metrics,
custom_artifacts,
baseline_model,
):
"""
The public API "evaluate" will verify argument first, and then pass normalized arguments
to the _evaluate method.
"""
# import _model_evaluation_registry and PyFuncModel inside function to avoid circuit importing
from mlflow.models.evaluation.evaluator_registry import _model_evaluation_registry
global _last_failed_evaluator
_last_failed_evaluator = None
client = MlflowClient()
model_uuid = model.metadata.model_uuid
dataset._log_dataset_tag(client, run_id, model_uuid)
eval_results = []
for evaluator_name in evaluator_name_list:
config = evaluator_name_to_conf_map.get(evaluator_name) or {}
try:
evaluator = _model_evaluation_registry.get_evaluator(evaluator_name)
except MlflowException:
_logger.warning(f"Evaluator '{evaluator_name}' is not registered.")
continue
_last_failed_evaluator = evaluator_name
if evaluator.can_evaluate(model_type=model_type, evaluator_config=config):
_logger.info(f"Evaluating the model with the {evaluator_name} evaluator.")
eval_result = evaluator.evaluate(
model=model,
model_type=model_type,
dataset=dataset,
run_id=run_id,
evaluator_config=config,
custom_metrics=custom_metrics,
custom_artifacts=custom_artifacts,
baseline_model=baseline_model,
)
eval_results.append(eval_result)
_last_failed_evaluator = None
if len(eval_results) == 0:
raise MlflowException(
message="The model could not be evaluated by any of the registered evaluators, please "
"verify that the model type and other configs are set correctly.",
error_code=INVALID_PARAMETER_VALUE,
)
merged_eval_result = EvaluationResult({}, {}, {})
for eval_result in eval_results:
if not eval_result:
continue
merged_eval_result.metrics.update(eval_result.metrics)
merged_eval_result.artifacts.update(eval_result.artifacts)
if baseline_model and eval_result.baseline_model_metrics:
merged_eval_result.baseline_model_metrics.update(eval_result.baseline_model_metrics)
return merged_eval_result
[docs]def evaluate(
model: str,
data,
*,
model_type: str,
targets=None,
dataset_path=None,
feature_names: list = None,
evaluators=None,
evaluator_config=None,
custom_metrics=None,
custom_artifacts=None,
validation_thresholds=None,
baseline_model=None,
env_manager="local",
):
'''
Evaluate a PyFunc model on the specified dataset using one or more specified ``evaluators``, and
log resulting metrics & artifacts to MLflow Tracking. Set thresholds on the generated metrics to
validate model quality. For additional overview information, see
:ref:`the Model Evaluation documentation <model-evaluation>`.
Default Evaluator behavior:
- The default evaluator, which can be invoked with ``evaluators="default"`` or
``evaluators=None``, supports the ``"regressor"`` and ``"classifier"`` model types.
It generates a variety of model performance metrics, model performance plots, and
model explanations.
- For both the ``"regressor"`` and ``"classifier"`` model types, the default evaluator
generates model summary plots and feature importance plots using
`SHAP <https://shap.readthedocs.io/en/latest/index.html>`_.
- For regressor models, the default evaluator additionally logs:
- **metrics**: example_count, mean_absolute_error, mean_squared_error,
root_mean_squared_error, sum_on_target, mean_on_target, r2_score, max_error,
mean_absolute_percentage_error.
- For binary classifiers, the default evaluator additionally logs:
- **metrics**: true_negatives, false_positives, false_negatives, true_positives, recall,
precision, f1_score, accuracy_score, example_count, log_loss, roc_auc,
precision_recall_auc.
- **artifacts**: lift curve plot, precision-recall plot, ROC plot.
- For multiclass classifiers, the default evaluator additionally logs:
- **metrics**: accuracy_score, example_count, f1_score_micro, f1_score_macro, log_loss
- **artifacts**: A CSV file for "per_class_metrics" (per-class metrics includes
true_negatives/false_positives/false_negatives/true_positives/recall/precision/roc_auc,
precision_recall_auc), precision-recall merged curves plot, ROC merged curves plot.
- For question-answering models, the default evaluator logs:
- **metrics**: ``exact_match``.
- **artifacts**: A JSON file containing the inputs, outputs, and targets (if the ``targets``
argument is supplied) of the model in tabular format.
- For text-summarization models, the default evaluator logs:
- **metrics**: `ROUGE`_ (requires `evaluate`_, `nltk`_, and `rouge_score` to be installed).
- **artifacts**: A JSON file containing the inputs, outputs, and targets (if the ``targets``
argument is supplied) of the model in the tabular format.
.. _ROUGE:
https://huggingface.co/spaces/evaluate-metric/rouge
.. _evaluate:
https://pypi.org/project/evaluate
.. _nltk:
https://pypi.org/project/nltk
.. _rouge_score:
https://pypi.org/project/rouge-score
- For text models, the default evaluator logs:
- **artifacts**: A JSON file containing the inputs, outputs, and targets (if the ``targets``
argument is supplied) of the model in tabular format.
- For sklearn models, the default evaluator additionally logs the model's evaluation criterion
(e.g. mean accuracy for a classifier) computed by `model.score` method.
- The metrics/artifacts listed above are logged to the active MLflow run.
If no active run exists, a new MLflow run is created for logging these metrics and
artifacts. Note that no metrics/artifacts are logged for the ``baseline_model``.
- Additionally, information about the specified dataset - hash, name (if specified), path
(if specified), and the UUID of the model that evaluated it - is logged to the
``mlflow.datasets`` tag.
- The available ``evaluator_config`` options for the default evaluator include:
- **log_model_explainability**: A boolean value specifying whether or not to log model
explainability insights, default value is True.
- **explainability_algorithm**: A string to specify the SHAP Explainer algorithm for model
explainability. Supported algorithm includes: 'exact', 'permutation', 'partition',
'kernel'.
If not set, ``shap.Explainer`` is used with the "auto" algorithm, which chooses the best
Explainer based on the model.
- **explainability_nsamples**: The number of sample rows to use for computing model
explainability insights. Default value is 2000.
- **explainability_kernel_link**: The kernel link function used by shap kernal explainer.
Available values are "identity" and "logit". Default value is "identity".
- **max_classes_for_multiclass_roc_pr**:
For multiclass classification tasks, the maximum number of classes for which to log
the per-class ROC curve and Precision-Recall curve. If the number of classes is
larger than the configured maximum, these curves are not logged.
- **metric_prefix**: An optional prefix to prepend to the name of each metric and artifact
produced during evaluation.
- **log_metrics_with_dataset_info**: A boolean value specifying whether or not to include
information about the evaluation dataset in the name of each metric logged to MLflow
Tracking during evaluation, default value is True.
- **pos_label**: If specified, the positive label to use when computing classification
metrics such as precision, recall, f1, etc. for binary classification models. For
multiclass classification and regression models, this parameter will be ignored.
- **average**: The averaging method to use when computing classification metrics such as
precision, recall, f1, etc. for multiclass classification models
(default: ``'weighted'``). For binary classification and regression models, this
parameter will be ignored.
- **sample_weights**: Weights for each sample to apply when computing model performance
metrics.
- Limitations of evaluation dataset:
- For classification tasks, dataset labels are used to infer the total number of classes.
- For binary classification tasks, the negative label value must be 0 or -1 or False, and
the positive label value must be 1 or True.
- Limitations of metrics/artifacts computation:
- For classification tasks, some metric and artifact computations require the model to
output class probabilities. Currently, for scikit-learn models, the default evaluator
calls the ``predict_proba`` method on the underlying model to obtain probabilities. For
other model types, the default evaluator does not compute metrics/artifacts that require
probability outputs.
- Limitations of default evaluator logging model explainability insights:
- The ``shap.Explainer`` ``auto`` algorithm uses the ``Linear`` explainer for linear models
and the ``Tree`` explainer for tree models. Because SHAP's ``Linear`` and ``Tree``
explainers do not support multi-class classification, the default evaluator falls back to
using the ``Exact`` or ``Permutation`` explainers for multi-class classification tasks.
- Logging model explainability insights is not currently supported for PySpark models.
- The evaluation dataset label values must be numeric or boolean, all feature values
must be numeric, and each feature column must only contain scalar values.
- Limitations when environment restoration is enabled:
- When environment restoration is enabled for the evaluated model (i.e. a non-local
``env_manager`` is specified), the model is loaded as a client that invokes a MLflow
Model Scoring Server process in an independent Python environment with the model's
training time dependencies installed. As such, methods like ``predict_proba`` (for
probability outputs) or ``score`` (computes the evaluation criterian for sklearn models)
of the model become inaccessible and the default evaluator does not compute metrics or
artifacts that require those methods.
- Because the model is an MLflow Model Server process, SHAP explanations are slower to
compute. As such, model explainaibility is disabled when a non-local ``env_manager``
specified, unless the ``evaluator_config`` option **log_model_explainability** is
explicitly set to ``True``.
:param model: A pyfunc model instance, or a URI referring to such a model.
:param data: One of the following:
- A numpy array or list of evaluation features, excluding labels.
- A Pandas DataFrame or Spark DataFrame, containing evaluation features and
labels. If ``feature_names`` argument not specified, all columns are regarded
as feature columns. Otherwise, only column names present in ``feature_names``
are regarded as feature columns. If it is Spark DataFrame, only the first 10000
rows in the Spark DataFrame will be used as evaluation data.
- A :py:class`mlflow.data.dataset.Dataset` instance containing evaluation features
and labels.
:param targets: If ``data`` is a numpy array or list, a numpy array or list of evaluation
labels. If ``data`` is a DataFrame, the string name of a column from ``data``
that contains evaluation labels. Required for classifier and regressor models,
but optional for question-answering, text-summarization, and text models. If
``data`` is a :py:class`mlflow.data.dataset.Dataset` that defines targets,
then ``targets`` is optional.
:param model_type: A string describing the model type. The default evaluator
supports the following model types:
- ``'classifier'``
- ``'regressor'``
- ``'question-answering'``
- ``'text-summarization'``
- ``'text'``
.. note::
``'question-answering'``, ``'text-summarization'``, and ``'text'``
are experimental and may be changed or removed in a future release.
:param dataset_path: (Optional) The path where the data is stored. Must not contain double
quotes (``“``). If specified, the path is logged to the ``mlflow.datasets``
tag for lineage tracking purposes.
:param feature_names: (Optional) If the ``data`` argument is a feature data numpy array or list,
``feature_names`` is a list of the feature names for each feature. If
``None``, then the ``feature_names`` are generated using the format
``feature_{feature_index}``. If the ``data`` argument is a Pandas
DataFrame or a Spark DataFrame, ``feature_names`` is a list of the names
of the feature columns in the DataFrame. If ``None``, then all columns
except the label column are regarded as feature columns.
:param evaluators: The name of the evaluator to use for model evaluation, or a list of
evaluator names. If unspecified, all evaluators capable of evaluating the
specified model on the specified dataset are used. The default evaluator
can be referred to by the name ``"default"``. To see all available
evaluators, call :py:func:`mlflow.models.list_evaluators`.
:param evaluator_config: A dictionary of additional configurations to supply to the evaluator.
If multiple evaluators are specified, each configuration should be
supplied as a nested dictionary whose key is the evaluator name.
:param custom_metrics:
(Optional) A list of :py:class:`EvaluationMetric <mlflow.models.EvaluationMetric>` objects.
.. code-block:: python
:caption: Example usage of custom metrics
import mlflow
import numpy as np
def root_mean_squared_error(eval_df, _builtin_metrics):
return np.sqrt((np.abs(eval_df["prediction"] - eval_df["target"]) ** 2).mean)
rmse_metric = mlflow.models.make_metric(
eval_fn=root_mean_squared_error,
greater_is_better=False,
)
mlflow.evaluate(..., custom_metrics=[rmse_metric])
:param custom_artifacts:
(Optional) A list of custom artifact functions with the following signature:
.. code-block:: python
def custom_artifact(
eval_df: Union[pandas.Dataframe, pyspark.sql.DataFrame],
builtin_metrics: Dict[str, float],
artifacts_dir: str,
) -> Dict[str, Any]:
"""
:param eval_df:
A Pandas or Spark DataFrame containing ``prediction`` and ``target`` column.
The ``prediction`` column contains the predictions made by the model.
The ``target`` column contains the corresponding labels to the predictions made
on that row.
:param builtin_metrics:
A dictionary containing the metrics calculated by the default evaluator.
The keys are the names of the metrics and the values are the scalar values of
the metrics. Refer to the DefaultEvaluator behavior section for what metrics
will be returned based on the type of model (i.e. classifier or regressor).
:param artifacts_dir:
A temporary directory path that can be used by the custom artifacts function to
temporarily store produced artifacts. The directory will be deleted after the
artifacts are logged.
:return:
A dictionary that maps artifact names to artifact objects
(e.g. a Matplotlib Figure) or to artifact paths within ``artifacts_dir``.
"""
...
Object types that artifacts can be represented as:
- A string uri representing the file path to the artifact. MLflow will infer the type of
the artifact based on the file extension.
- A string representation of a JSON object. This will be saved as a .json artifact.
- Pandas DataFrame. This will be resolved as a CSV artifact.
- Numpy array. This will be saved as a .npy artifact.
- Matplotlib Figure. This will be saved as an image artifact. Note that
``matplotlib.pyplot.savefig`` is called behind the scene with default configurations.
To customize, either save the figure with the desired configurations and return its
file path or define customizations through environment variables in
``matplotlib.rcParams``.
- Other objects will be attempted to be pickled with the default protocol.
.. code-block:: python
:caption: Example usage of custom artifacts
import mlflow
import matplotlib.pyplot as plt
def scatter_plot(eval_df, builtin_metrics, artifacts_dir):
plt.scatter(eval_df["prediction"], eval_df["target"])
plt.xlabel("Targets")
plt.ylabel("Predictions")
plt.title("Targets vs. Predictions")
plt.savefig(os.path.join(artifacts_dir, "example.png"))
plt.close()
return {"pred_target_scatter": os.path.join(artifacts_dir, "example.png")}
def pred_sample(eval_df, _builtin_metrics, _artifacts_dir):
return {"pred_sample": pred_sample.head(10)}
mlflow.evaluate(..., custom_artifacts=[scatter_plot, pred_sample])
:param validation_thresholds: (Optional) A dictionary of metric name to
:py:class:`mlflow.models.MetricThreshold` used for model validation. Each metric name must
either be the name of a builtin metric or the name of a custom metric defined in the
``custom_metrics`` parameter.
.. code-block:: python
:caption: Example of Model Validation
from mlflow.models import MetricThreshold
thresholds = {
"accuracy_score": MetricThreshold(
# accuracy should be >=0.8
threshold=0.8,
# accuracy should be at least 5 percent greater than baseline model accuracy
min_absolute_change=0.05,
# accuracy should be at least 0.05 greater than baseline model accuracy
min_relative_change=0.05,
greater_is_better=True,
),
}
with mlflow.start_run():
mlflow.evaluate(
model=your_candidate_model,
data,
targets,
model_type,
dataset_name,
evaluators,
validation_thresholds=thresholds,
baseline_model=your_baseline_model,
)
See :ref:`the Model Validation documentation <model-validation>`
for more details.
:param baseline_model: (Optional) A string URI referring to an MLflow model with the pyfunc
flavor. If specified, the candidate ``model`` is compared to this
baseline for model validation purposes.
:param env_manager: Specify an environment manager to load the candidate ``model`` and
``baseline_model`` in isolated Python evironments and restore their
dependencies. Default value is ``local``, and the following values are
supported:
- ``virtualenv``: (Recommended) Use virtualenv to restore the python
environment that was used to train the model.
- ``conda``: Use Conda to restore the software environment that was used
to train the model.
- ``local``: Use the current Python environment for model inference, which
may differ from the environment used to train the model and may lead to
errors or invalid predictions.
:return: An :py:class:`mlflow.models.EvaluationResult` instance containing
metrics of candidate model and baseline model, and artifacts of candidate model.
'''
from mlflow.pyfunc import PyFuncModel, _ServedPyFuncModel, _load_model_or_server
from mlflow.utils import env_manager as _EnvManager
_EnvManager.validate(env_manager)
if model_type in [_ModelType.REGRESSOR, _ModelType.CLASSIFIER]:
if isinstance(data, Dataset):
if getattr(data, "targets", None) is not None:
targets = data.targets
else:
raise MlflowException(
message="The targets argument is required when data is a Dataset and does not "
"define targets.",
error_code=INVALID_PARAMETER_VALUE,
)
else:
if targets is None:
raise MlflowException(
f"The targets argument must be specified for {model_type} models.",
error_code=INVALID_PARAMETER_VALUE,
)
if isinstance(model, str):
model = _load_model_or_server(model, env_manager)
elif env_manager != _EnvManager.LOCAL:
raise MlflowException(
message="The model argument must be a string URI referring to an MLflow model when a "
"non-local env_manager is specified.",
error_code=INVALID_PARAMETER_VALUE,
)
elif isinstance(model, PyFuncModel):
pass
else:
raise MlflowException(
message="The model argument must be a string URI referring to an MLflow model or "
"an instance of `mlflow.pyfunc.PyFuncModel`.",
error_code=INVALID_PARAMETER_VALUE,
)
if validation_thresholds:
try:
assert type(validation_thresholds) is dict
for key in validation_thresholds.keys():
assert type(key) is str
for threshold in validation_thresholds.values():
assert isinstance(threshold, MetricThreshold)
except AssertionError:
raise MlflowException(
message="The validation thresholds argument must be a dictionary that maps strings "
"to MetricThreshold objects.",
error_code=INVALID_PARAMETER_VALUE,
)
if isinstance(baseline_model, str):
baseline_model = _load_model_or_server(baseline_model, env_manager)
elif baseline_model is not None:
raise MlflowException(
message="The baseline model argument must be a string URI referring to an "
"MLflow model.",
error_code=INVALID_PARAMETER_VALUE,
)
elif _model_validation_contains_model_comparison(validation_thresholds):
raise MlflowException(
message="The baseline model argument is None. The baseline model must be specified "
"when model comparison thresholds (min_absolute_change, min_relative_change) "
"are specified.",
error_code=INVALID_PARAMETER_VALUE,
)
(
evaluator_name_list,
evaluator_name_to_conf_map,
) = _normalize_evaluators_and_evaluator_config_args(evaluators, evaluator_config)
with _start_run_or_reuse_active_run() as run_id:
from mlflow.data.pyfunc_dataset_mixin import PyFuncConvertibleDatasetMixin
if isinstance(data, Dataset) and issubclass(data.__class__, PyFuncConvertibleDatasetMixin):
dataset = data.to_evaluation_dataset(dataset_path, feature_names)
if evaluator_name_to_conf_map and "default" in evaluator_name_to_conf_map:
context = evaluator_name_to_conf_map["default"].get("metric_prefix", None)
else:
context = None
client = MlflowClient()
tags = [InputTag(key=MLFLOW_DATASET_CONTEXT, value=context)] if context else []
dataset_input = DatasetInput(dataset=data._to_mlflow_entity(), tags=tags)
client.log_inputs(run_id, [dataset_input])
else:
dataset = EvaluationDataset(
data,
targets=targets,
path=dataset_path,
feature_names=feature_names,
)
try:
evaluate_result = _evaluate(
model=model,
model_type=model_type,
dataset=dataset,
run_id=run_id,
evaluator_name_list=evaluator_name_list,
evaluator_name_to_conf_map=evaluator_name_to_conf_map,
custom_metrics=custom_metrics,
custom_artifacts=custom_artifacts,
baseline_model=baseline_model,
)
finally:
if isinstance(model, _ServedPyFuncModel):
os.kill(model.pid, signal.SIGTERM)
if isinstance(baseline_model, _ServedPyFuncModel):
os.kill(baseline_model.pid, signal.SIGTERM)
if not validation_thresholds:
return evaluate_result
_logger.info("Validating generated model metrics")
_validate(
validation_thresholds,
evaluate_result.metrics,
evaluate_result.baseline_model_metrics,
)
_logger.info("Model validation passed!")
return evaluate_result