Source code for mlflow.tracking.fluent

"""
Internal module implementing the fluent API, allowing management of an active
MLflow run. This module is exposed to users at the top-level :py:mod:`mlflow` module.
"""
import os

import atexit
import time
import logging
import inspect
from copy import deepcopy
from packaging.version import Version
from typing import Any, Dict, List, Optional, Union, TYPE_CHECKING

from mlflow.entities import Experiment, Run, RunInfo, RunStatus, Param, RunTag, Metric, ViewType
from mlflow.entities.lifecycle_stage import LifecycleStage
from mlflow.exceptions import MlflowException
from mlflow.protos.databricks_pb2 import (
    INVALID_PARAMETER_VALUE,
    RESOURCE_DOES_NOT_EXIST,
)
from mlflow.tracking.client import MlflowClient
from mlflow.tracking import artifact_utils, _get_store
from mlflow.tracking.context import registry as context_registry
from mlflow.tracking.default_experiment import registry as default_experiment_registry
from mlflow.store.tracking import SEARCH_MAX_RESULTS_DEFAULT
from mlflow.utils import env
from mlflow.utils.autologging_utils import (
    is_testing,
    autologging_integration,
    AUTOLOGGING_INTEGRATIONS,
    autologging_is_disabled,
    AUTOLOGGING_CONF_KEY_IS_GLOBALLY_CONFIGURED,
)
from mlflow.utils.import_hooks import register_post_import_hook
from mlflow.utils.mlflow_tags import (
    MLFLOW_PARENT_RUN_ID,
    MLFLOW_RUN_NAME,
    MLFLOW_RUN_NOTE,
)
from mlflow.utils.validation import _validate_run_id, _validate_experiment_id_type

if TYPE_CHECKING:
    import pandas  # pylint: disable=unused-import
    import matplotlib  # pylint: disable=unused-import
    import matplotlib.figure
    import plotly  # pylint: disable=unused-import
    import numpy  # pylint: disable=unused-import
    import PIL  # pylint: disable=unused-import

_EXPERIMENT_ID_ENV_VAR = "MLFLOW_EXPERIMENT_ID"
_EXPERIMENT_NAME_ENV_VAR = "MLFLOW_EXPERIMENT_NAME"
_RUN_ID_ENV_VAR = "MLFLOW_RUN_ID"
_active_run_stack = []
_active_experiment_id = None
_last_active_run_id = None

SEARCH_MAX_RESULTS_PANDAS = 100000
NUM_RUNS_PER_PAGE_PANDAS = 10000

_logger = logging.getLogger(__name__)


[docs]def set_experiment(experiment_name: str = None, experiment_id: str = None) -> Experiment: """ Set the given experiment as the active experiment. The experiment must either be specified by name via `experiment_name` or by ID via `experiment_id`. The experiment name and ID cannot both be specified. :param experiment_name: Case sensitive name of the experiment to be activated. If an experiment with this name does not exist, a new experiment wth this name is created. :param experiment_id: ID of the experiment to be activated. If an experiment with this ID does not exist, an exception is thrown. :return: An instance of :py:class:`mlflow.entities.Experiment` representing the new active experiment. .. code-block:: python :caption: Example import mlflow # Set an experiment name, which must be unique and case-sensitive. experiment = mlflow.set_experiment("Social NLP Experiments") # Get Experiment Details print("Experiment_id: {}".format(experiment.experiment_id)) print("Artifact Location: {}".format(experiment.artifact_location)) print("Tags: {}".format(experiment.tags)) print("Lifecycle_stage: {}".format(experiment.lifecycle_stage)) .. code-block:: text :caption: Output Experiment_id: 1 Artifact Location: file:///.../mlruns/1 Tags: {} Lifecycle_stage: active """ if (experiment_name is not None and experiment_id is not None) or ( experiment_name is None and experiment_id is None ): raise MlflowException( message="Must specify exactly one of: `experiment_id` or `experiment_name`.", error_code=INVALID_PARAMETER_VALUE, ) client = MlflowClient() if experiment_id is None: experiment = client.get_experiment_by_name(experiment_name) if not experiment: _logger.info( "Experiment with name '%s' does not exist. Creating a new experiment.", experiment_name, ) # NB: If two simultaneous threads or processes attempt to set the same experiment # simultaneously, a race condition may be encountered here wherein experiment creation # fails experiment_id = client.create_experiment(experiment_name) experiment = client.get_experiment(experiment_id) else: experiment = client.get_experiment(experiment_id) if experiment is None: raise MlflowException( message=f"Experiment with ID '{experiment_id}' does not exist.", error_code=RESOURCE_DOES_NOT_EXIST, ) if experiment.lifecycle_stage != LifecycleStage.ACTIVE: raise MlflowException( message=( "Cannot set a deleted experiment '%s' as the active experiment." " You can restore the experiment, or permanently delete the " " experiment to create a new one." % experiment.name ), error_code=INVALID_PARAMETER_VALUE, ) global _active_experiment_id _active_experiment_id = experiment.experiment_id return experiment
[docs]class ActiveRun(Run): # pylint: disable=W0223 """Wrapper around :py:class:`mlflow.entities.Run` to enable using Python ``with`` syntax.""" def __init__(self, run): Run.__init__(self, run.info, run.data) def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): status = RunStatus.FINISHED if exc_type is None else RunStatus.FAILED end_run(RunStatus.to_string(status)) return exc_type is None
[docs]def start_run( run_id: str = None, experiment_id: Optional[str] = None, run_name: Optional[str] = None, nested: bool = False, tags: Optional[Dict[str, Any]] = None, description: Optional[str] = None, ) -> ActiveRun: """ Start a new MLflow run, setting it as the active run under which metrics and parameters will be logged. The return value can be used as a context manager within a ``with`` block; otherwise, you must call ``end_run()`` to terminate the current run. If you pass a ``run_id`` or the ``MLFLOW_RUN_ID`` environment variable is set, ``start_run`` attempts to resume a run with the specified run ID and other parameters are ignored. ``run_id`` takes precedence over ``MLFLOW_RUN_ID``. If resuming an existing run, the run status is set to ``RunStatus.RUNNING``. MLflow sets a variety of default tags on the run, as defined in :ref:`MLflow system tags <system_tags>`. :param run_id: If specified, get the run with the specified UUID and log parameters and metrics under that run. The run's end time is unset and its status is set to running, but the run's other attributes (``source_version``, ``source_type``, etc.) are not changed. :param experiment_id: ID of the experiment under which to create the current run (applicable only when ``run_id`` is not specified). If ``experiment_id`` argument is unspecified, will look for valid experiment in the following order: activated using ``set_experiment``, ``MLFLOW_EXPERIMENT_NAME`` environment variable, ``MLFLOW_EXPERIMENT_ID`` environment variable, or the default experiment as defined by the tracking server. :param run_name: Name of new run (stored as a ``mlflow.runName`` tag). Used only when ``run_id`` is unspecified. :param nested: Controls whether run is nested in parent run. ``True`` creates a nested run. :param tags: An optional dictionary of string keys and values to set as tags on the run. If a run is being resumed, these tags are set on the resumed run. If a new run is being created, these tags are set on the new run. :param description: An optional string that populates the description box of the run. If a run is being resumed, the description is set on the resumed run. If a new run is being created, the description is set on the new run. :return: :py:class:`mlflow.ActiveRun` object that acts as a context manager wrapping the run's state. .. code-block:: python :caption: Example import mlflow # Create nested runs with mlflow.start_run(run_name='PARENT_RUN') as parent_run: mlflow.log_param("parent", "yes") with mlflow.start_run(run_name='CHILD_RUN', nested=True) as child_run: mlflow.log_param("child", "yes") print("parent run_id: {}".format(parent_run.info.run_id)) print("child run_id : {}".format(child_run.info.run_id)) print("--") # Search all child runs with a parent id query = "tags.mlflow.parentRunId = '{}'".format(parent_run.info.run_id) results = mlflow.search_runs(filter_string=query) print(results[["run_id", "params.child", "tags.mlflow.runName"]]) .. code-block:: text :caption: Output parent run_id: 5ec0e7ae18f54c2694ffb48c2fccf25c child run_id : 78b3b0d264b44cd29e8dc389749bb4be -- run_id params.child tags.mlflow.runName 0 78b3b0d264b44cd29e8dc389749bb4be yes CHILD_RUN """ global _active_run_stack _validate_experiment_id_type(experiment_id) # back compat for int experiment_id experiment_id = str(experiment_id) if isinstance(experiment_id, int) else experiment_id if len(_active_run_stack) > 0 and not nested: raise Exception( ( "Run with UUID {} is already active. To start a new run, first end the " + "current run with mlflow.end_run(). To start a nested " + "run, call start_run with nested=True" ).format(_active_run_stack[0].info.run_id) ) client = MlflowClient() if run_id: existing_run_id = run_id elif _RUN_ID_ENV_VAR in os.environ: existing_run_id = os.environ[_RUN_ID_ENV_VAR] del os.environ[_RUN_ID_ENV_VAR] else: existing_run_id = None if existing_run_id: _validate_run_id(existing_run_id) active_run_obj = client.get_run(existing_run_id) # Check to see if experiment_id from environment matches experiment_id from set_experiment() if ( _active_experiment_id is not None and _active_experiment_id != active_run_obj.info.experiment_id ): raise MlflowException( "Cannot start run with ID {} because active run ID " "does not match environment run ID. Make sure --experiment-name " "or --experiment-id matches experiment set with " "set_experiment(), or just use command-line " "arguments".format(existing_run_id) ) # Check to see if current run isn't deleted if active_run_obj.info.lifecycle_stage == LifecycleStage.DELETED: raise MlflowException( "Cannot start run with ID {} because it is in the " "deleted state.".format(existing_run_id) ) # Use previous end_time because a value is required for update_run_info end_time = active_run_obj.info.end_time _get_store().update_run_info( existing_run_id, run_status=RunStatus.RUNNING, end_time=end_time ) tags = tags or {} if description: if MLFLOW_RUN_NOTE in tags: raise MlflowException( f"Description is already set via the tag {MLFLOW_RUN_NOTE} in tags." f"Remove the key {MLFLOW_RUN_NOTE} from the tags or omit the description.", error_code=INVALID_PARAMETER_VALUE, ) tags[MLFLOW_RUN_NOTE] = description if tags: client.log_batch( run_id=existing_run_id, tags=[RunTag(key, str(value)) for key, value in tags.items()], ) active_run_obj = client.get_run(existing_run_id) else: if len(_active_run_stack) > 0: parent_run_id = _active_run_stack[-1].info.run_id else: parent_run_id = None exp_id_for_run = experiment_id if experiment_id is not None else _get_experiment_id() user_specified_tags = deepcopy(tags) or {} if description: if MLFLOW_RUN_NOTE in user_specified_tags: raise MlflowException( f"Description is already set via the tag {MLFLOW_RUN_NOTE} in tags." f"Remove the key {MLFLOW_RUN_NOTE} from the tags or omit the description.", error_code=INVALID_PARAMETER_VALUE, ) user_specified_tags[MLFLOW_RUN_NOTE] = description if parent_run_id is not None: user_specified_tags[MLFLOW_PARENT_RUN_ID] = parent_run_id if run_name is not None: user_specified_tags[MLFLOW_RUN_NAME] = run_name resolved_tags = context_registry.resolve_tags(user_specified_tags) active_run_obj = client.create_run(experiment_id=exp_id_for_run, tags=resolved_tags) _active_run_stack.append(ActiveRun(active_run_obj)) return _active_run_stack[-1]
[docs]def end_run(status: str = RunStatus.to_string(RunStatus.FINISHED)) -> None: """End an active MLflow run (if there is one). .. code-block:: python :caption: Example import mlflow # Start run and get status mlflow.start_run() run = mlflow.active_run() print("run_id: {}; status: {}".format(run.info.run_id, run.info.status)) # End run and get status mlflow.end_run() run = mlflow.get_run(run.info.run_id) print("run_id: {}; status: {}".format(run.info.run_id, run.info.status)) print("--") # Check for any active runs print("Active run: {}".format(mlflow.active_run())) .. code-block:: text :caption: Output run_id: b47ee4563368419880b44ad8535f6371; status: RUNNING run_id: b47ee4563368419880b44ad8535f6371; status: FINISHED -- Active run: None """ global _active_run_stack, _last_active_run_id if len(_active_run_stack) > 0: # Clear out the global existing run environment variable as well. env.unset_variable(_RUN_ID_ENV_VAR) run = _active_run_stack.pop() MlflowClient().set_terminated(run.info.run_id, status) _last_active_run_id = run.info.run_id
atexit.register(end_run)
[docs]def active_run() -> Optional[ActiveRun]: """Get the currently active ``Run``, or None if no such run exists. **Note**: You cannot access currently-active run attributes (parameters, metrics, etc.) through the run returned by ``mlflow.active_run``. In order to access such attributes, use the :py:class:`mlflow.tracking.MlflowClient` as follows: .. code-block:: python :caption: Example import mlflow mlflow.start_run() run = mlflow.active_run() print("Active run_id: {}".format(run.info.run_id)) mlflow.end_run() .. code-block:: text :caption: Output Active run_id: 6f252757005748708cd3aad75d1ff462 """ return _active_run_stack[-1] if len(_active_run_stack) > 0 else None
[docs]def last_active_run() -> Optional[Run]: """ Gets the most recent active run. Examples: .. code-block:: python :caption: To retrieve the most recent autologged run: import mlflow from sklearn.model_selection import train_test_split from sklearn.datasets import load_diabetes from sklearn.ensemble import RandomForestRegressor mlflow.autolog() db = load_diabetes() X_train, X_test, y_train, y_test = train_test_split(db.data, db.target) # Create and train models. rf = RandomForestRegressor(n_estimators = 100, max_depth = 6, max_features = 3) rf.fit(X_train, y_train) # Use the model to make predictions on the test dataset. predictions = rf.predict(X_test) autolog_run = mlflow.last_active_run() .. code-block:: python :caption: To get the most recently active run that ended: import mlflow mlflow.start_run() mlflow.end_run() run = mlflow.last_active_run() .. code-block:: python :caption: To retrieve the currently active run: import mlflow mlflow.start_run() run = mlflow.last_active_run() mlflow.end_run() :return: The active run (this is equivalent to ``mlflow.active_run()``) if one exists. Otherwise, the last run started from the current Python process that reached a terminal status (i.e. FINISHED, FAILED, or KILLED). """ _active_run = active_run() if _active_run is not None: return _active_run if _last_active_run_id is None: return None return get_run(_last_active_run_id)
[docs]def get_run(run_id: str) -> Run: """ Fetch the run from backend store. The resulting :py:class:`Run <mlflow.entities.Run>` contains a collection of run metadata -- :py:class:`RunInfo <mlflow.entities.RunInfo>`, as well as a collection of run parameters, tags, and metrics -- :py:class:`RunData <mlflow.entities.RunData>`. In the case where multiple metrics with the same key are logged for the run, the :py:class:`RunData <mlflow.entities.RunData>` contains the most recently logged value at the largest step for each metric. :param run_id: Unique identifier for the run. :return: A single :py:class:`mlflow.entities.Run` object, if the run exists. Otherwise, raises an exception. .. code-block:: python :caption: Example import mlflow with mlflow.start_run() as run: mlflow.log_param("p", 0) run_id = run.info.run_id print("run_id: {}; lifecycle_stage: {}".format(run_id, mlflow.get_run(run_id).info.lifecycle_stage)) .. code-block:: text :caption: Output run_id: 7472befefc754e388e8e922824a0cca5; lifecycle_stage: active """ return MlflowClient().get_run(run_id)
[docs]def log_param(key: str, value: Any) -> None: """ Log a parameter under the current run. If no run is active, this method will create a new active run. :param key: Parameter name (string). This string may only contain alphanumerics, underscores (_), dashes (-), periods (.), spaces ( ), and slashes (/). All backend stores will support keys up to length 250, but some may support larger keys. :param value: Parameter value (string, but will be string-ified if not). All backend stores will support values up to length 5000, but some may support larger values. .. code-block:: python :caption: Example import mlflow with mlflow.start_run(): mlflow.log_param("learning_rate", 0.01) """ run_id = _get_or_start_run().info.run_id MlflowClient().log_param(run_id, key, value)
[docs]def set_tag(key: str, value: Any) -> None: """ Set a tag under the current run. If no run is active, this method will create a new active run. :param key: Tag name (string). This string may only contain alphanumerics, underscores (_), dashes (-), periods (.), spaces ( ), and slashes (/). All backend stores will support keys up to length 250, but some may support larger keys. :param value: Tag value (string, but will be string-ified if not). All backend stores will support values up to length 5000, but some may support larger values. .. code-block:: python :caption: Example import mlflow with mlflow.start_run(): mlflow.set_tag("release.version", "2.2.0") """ run_id = _get_or_start_run().info.run_id MlflowClient().set_tag(run_id, key, value)
[docs]def delete_tag(key: str) -> None: """ Delete a tag from a run. This is irreversible. If no run is active, this method will create a new active run. :param key: Name of the tag .. code-block:: python :caption: Example import mlflow tags = {"engineering": "ML Platform", "engineering_remote": "ML Platform"} with mlflow.start_run() as run: mlflow.set_tags(tags) with mlflow.start_run(run_id=run.info.run_id): mlflow.delete_tag("engineering_remote") """ run_id = _get_or_start_run().info.run_id MlflowClient().delete_tag(run_id, key)
[docs]def log_metric(key: str, value: float, step: Optional[int] = None) -> None: """ Log a metric under the current run. If no run is active, this method will create a new active run. :param key: Metric name (string). This string may only contain alphanumerics, underscores (_), dashes (-), periods (.), spaces ( ), and slashes (/). All backend stores will support keys up to length 250, but some may support larger keys. :param value: Metric value (float). Note that some special values such as +/- Infinity may be replaced by other values depending on the store. For example, the SQLAlchemy store replaces +/- Infinity with max / min float values. All backend stores will support values up to length 5000, but some may support larger values. :param step: Metric step (int). Defaults to zero if unspecified. .. code-block:: python :caption: Example import mlflow with mlflow.start_run(): mlflow.log_metric("mse", 2500.00) """ run_id = _get_or_start_run().info.run_id MlflowClient().log_metric(run_id, key, value, int(time.time() * 1000), step or 0)
[docs]def log_metrics(metrics: Dict[str, float], step: Optional[int] = None) -> None: """ Log multiple metrics for the current run. If no run is active, this method will create a new active run. :param metrics: Dictionary of metric_name: String -> value: Float. Note that some special values such as +/- Infinity may be replaced by other values depending on the store. For example, sql based store may replace +/- Infinity with max / min float values. :param step: A single integer step at which to log the specified Metrics. If unspecified, each metric is logged at step zero. :returns: None .. code-block:: python :caption: Example import mlflow metrics = {"mse": 2500.00, "rmse": 50.00} # Log a batch of metrics with mlflow.start_run(): mlflow.log_metrics(metrics) """ run_id = _get_or_start_run().info.run_id timestamp = int(time.time() * 1000) metrics_arr = [Metric(key, value, timestamp, step or 0) for key, value in metrics.items()] MlflowClient().log_batch(run_id=run_id, metrics=metrics_arr, params=[], tags=[])
[docs]def log_params(params: Dict[str, Any]) -> None: """ Log a batch of params for the current run. If no run is active, this method will create a new active run. :param params: Dictionary of param_name: String -> value: (String, but will be string-ified if not) :returns: None .. code-block:: python :caption: Example import mlflow params = {"learning_rate": 0.01, "n_estimators": 10} # Log a batch of parameters with mlflow.start_run(): mlflow.log_params(params) """ run_id = _get_or_start_run().info.run_id params_arr = [Param(key, str(value)) for key, value in params.items()] MlflowClient().log_batch(run_id=run_id, metrics=[], params=params_arr, tags=[])
[docs]def set_tags(tags: Dict[str, Any]) -> None: """ Log a batch of tags for the current run. If no run is active, this method will create a new active run. :param tags: Dictionary of tag_name: String -> value: (String, but will be string-ified if not) :returns: None .. code-block:: python :caption: Example import mlflow tags = {"engineering": "ML Platform", "release.candidate": "RC1", "release.version": "2.2.0"} # Set a batch of tags with mlflow.start_run(): mlflow.set_tags(tags) """ run_id = _get_or_start_run().info.run_id tags_arr = [RunTag(key, str(value)) for key, value in tags.items()] MlflowClient().log_batch(run_id=run_id, metrics=[], params=[], tags=tags_arr)
[docs]def log_artifact(local_path: str, artifact_path: Optional[str] = None) -> None: """ Log a local file or directory as an artifact of the currently active run. If no run is active, this method will create a new active run. :param local_path: Path to the file to write. :param artifact_path: If provided, the directory in ``artifact_uri`` to write to. .. code-block:: python :caption: Example import mlflow # Create a features.txt artifact file features = "rooms, zipcode, median_price, school_rating, transport" with open("features.txt", 'w') as f: f.write(features) # With artifact_path=None write features.txt under # root artifact_uri/artifacts directory with mlflow.start_run(): mlflow.log_artifact("features.txt") """ run_id = _get_or_start_run().info.run_id MlflowClient().log_artifact(run_id, local_path, artifact_path)
[docs]def log_artifacts(local_dir: str, artifact_path: Optional[str] = None) -> None: """ Log all the contents of a local directory as artifacts of the run. If no run is active, this method will create a new active run. :param local_dir: Path to the directory of files to write. :param artifact_path: If provided, the directory in ``artifact_uri`` to write to. .. code-block:: python :caption: Example import os import mlflow # Create some files to preserve as artifacts features = "rooms, zipcode, median_price, school_rating, transport" data = {"state": "TX", "Available": 25, "Type": "Detached"} # Create couple of artifact files under the directory "data" os.makedirs("data", exist_ok=True) with open("data/data.json", 'w', encoding='utf-8') as f: json.dump(data, f, indent=2) with open("data/features.txt", 'w') as f: f.write(features) # Write all files in "data" to root artifact_uri/states with mlflow.start_run(): mlflow.log_artifacts("data", artifact_path="states") """ run_id = _get_or_start_run().info.run_id MlflowClient().log_artifacts(run_id, local_dir, artifact_path)
[docs]def log_text(text: str, artifact_file: str) -> None: """ Log text as an artifact. :param text: String containing text to log. :param artifact_file: The run-relative artifact file path in posixpath format to which the text is saved (e.g. "dir/file.txt"). .. code-block:: python :caption: Example import mlflow with mlflow.start_run(): # Log text to a file under the run's root artifact directory mlflow.log_text("text1", "file1.txt") # Log text in a subdirectory of the run's root artifact directory mlflow.log_text("text2", "dir/file2.txt") # Log HTML text mlflow.log_text("<h1>header</h1>", "index.html") """ run_id = _get_or_start_run().info.run_id MlflowClient().log_text(run_id, text, artifact_file)
[docs]def log_dict(dictionary: Any, artifact_file: str) -> None: """ Log a JSON/YAML-serializable object (e.g. `dict`) as an artifact. The serialization format (JSON or YAML) is automatically inferred from the extension of `artifact_file`. If the file extension doesn't exist or match any of [".json", ".yml", ".yaml"], JSON format is used. :param dictionary: Dictionary to log. :param artifact_file: The run-relative artifact file path in posixpath format to which the dictionary is saved (e.g. "dir/data.json"). .. code-block:: python :caption: Example import mlflow dictionary = {"k": "v"} with mlflow.start_run(): # Log a dictionary as a JSON file under the run's root artifact directory mlflow.log_dict(dictionary, "data.json") # Log a dictionary as a YAML file in a subdirectory of the run's root artifact directory mlflow.log_dict(dictionary, "dir/data.yml") # If the file extension doesn't exist or match any of [".json", ".yaml", ".yml"], # JSON format is used. mlflow.log_dict(dictionary, "data") mlflow.log_dict(dictionary, "data.txt") """ run_id = _get_or_start_run().info.run_id MlflowClient().log_dict(run_id, dictionary, artifact_file)
[docs]def log_figure( figure: Union["matplotlib.figure.Figure", "plotly.graph_objects.Figure"], artifact_file: str ) -> None: """ Log a figure as an artifact. The following figure objects are supported: - `matplotlib.figure.Figure`_ - `plotly.graph_objects.Figure`_ .. _matplotlib.figure.Figure: https://matplotlib.org/api/_as_gen/matplotlib.figure.Figure.html .. _plotly.graph_objects.Figure: https://plotly.com/python-api-reference/generated/plotly.graph_objects.Figure.html :param figure: Figure to log. :param artifact_file: The run-relative artifact file path in posixpath format to which the figure is saved (e.g. "dir/file.png"). .. code-block:: python :caption: Matplotlib Example import mlflow import matplotlib.pyplot as plt fig, ax = plt.subplots() ax.plot([0, 1], [2, 3]) with mlflow.start_run(): mlflow.log_figure(fig, "figure.png") .. code-block:: python :caption: Plotly Example import mlflow from plotly import graph_objects as go fig = go.Figure(go.Scatter(x=[0, 1], y=[2, 3])) with mlflow.start_run(): mlflow.log_figure(fig, "figure.html") """ run_id = _get_or_start_run().info.run_id MlflowClient().log_figure(run_id, figure, artifact_file)
[docs]def log_image(image: Union["numpy.ndarray", "PIL.Image.Image"], artifact_file: str) -> None: """ Log an image as an artifact. The following image objects are supported: - `numpy.ndarray`_ - `PIL.Image.Image`_ .. _numpy.ndarray: https://numpy.org/doc/stable/reference/generated/numpy.ndarray.html .. _PIL.Image.Image: https://pillow.readthedocs.io/en/stable/reference/Image.html#PIL.Image.Image Numpy array support - data type (( ) represents a valid value range): - bool - integer (0 ~ 255) - unsigned integer (0 ~ 255) - float (0.0 ~ 1.0) .. warning:: - Out-of-range integer values will be **clipped** to [0, 255]. - Out-of-range float values will be **clipped** to [0, 1]. - shape (H: height, W: width): - H x W (Grayscale) - H x W x 1 (Grayscale) - H x W x 3 (an RGB channel order is assumed) - H x W x 4 (an RGBA channel order is assumed) :param image: Image to log. :param artifact_file: The run-relative artifact file path in posixpath format to which the image is saved (e.g. "dir/image.png"). .. code-block:: python :caption: Numpy Example import mlflow import numpy as np image = np.random.randint(0, 256, size=(100, 100, 3), dtype=np.uint8) with mlflow.start_run(): mlflow.log_image(image, "image.png") .. code-block:: python :caption: Pillow Example import mlflow from PIL import Image image = Image.new("RGB", (100, 100)) with mlflow.start_run(): mlflow.log_image(image, "image.png") """ run_id = _get_or_start_run().info.run_id MlflowClient().log_image(run_id, image, artifact_file)
def _record_logged_model(mlflow_model): run_id = _get_or_start_run().info.run_id MlflowClient()._record_logged_model(run_id, mlflow_model)
[docs]def get_experiment(experiment_id: str) -> Experiment: """ Retrieve an experiment by experiment_id from the backend store :param experiment_id: The string-ified experiment ID returned from ``create_experiment``. :return: :py:class:`mlflow.entities.Experiment` .. code-block:: python :caption: Example import mlflow experiment = mlflow.get_experiment("0") print("Name: {}".format(experiment.name)) print("Artifact Location: {}".format(experiment.artifact_location)) print("Tags: {}".format(experiment.tags)) print("Lifecycle_stage: {}".format(experiment.lifecycle_stage)) .. code-block:: text :caption: Output Name: Default Artifact Location: file:///.../mlruns/0 Tags: {} Lifecycle_stage: active """ return MlflowClient().get_experiment(experiment_id)
[docs]def get_experiment_by_name(name: str) -> Optional[Experiment]: """ Retrieve an experiment by experiment name from the backend store :param name: The case senstive experiment name. :return: An instance of :py:class:`mlflow.entities.Experiment` if an experiment with the specified name exists, otherwise None. .. code-block:: python :caption: Example import mlflow # Case sensitive name experiment = mlflow.get_experiment_by_name("Default") print("Experiment_id: {}".format(experiment.experiment_id)) print("Artifact Location: {}".format(experiment.artifact_location)) print("Tags: {}".format(experiment.tags)) print("Lifecycle_stage: {}".format(experiment.lifecycle_stage)) .. code-block:: text :caption: Output Experiment_id: 0 Artifact Location: file:///.../mlruns/0 Tags: {} Lifecycle_stage: active """ return MlflowClient().get_experiment_by_name(name)
[docs]def list_experiments( view_type: int = ViewType.ACTIVE_ONLY, max_results: Optional[int] = None, ) -> List[Experiment]: """ :param view_type: Qualify requested type of experiments. :param max_results: If passed, specifies the maximum number of experiments desired. If not passed, all experiments will be returned. :return: A list of :py:class:`Experiment <mlflow.entities.Experiment>` objects. """ def pagination_wrapper_func(number_to_get, next_page_token): return MlflowClient().list_experiments( view_type=view_type, max_results=number_to_get, page_token=next_page_token, ) return _paginate(pagination_wrapper_func, SEARCH_MAX_RESULTS_DEFAULT, max_results)
[docs]def create_experiment( name: str, artifact_location: Optional[str] = None, tags: Optional[Dict[str, Any]] = None, ) -> str: """ Create an experiment. :param name: The experiment name, which must be unique and is case sensitive :param artifact_location: The location to store run artifacts. If not provided, the server picks an appropriate default. :param tags: An optional dictionary of string keys and values to set as tags on the experiment. :return: String ID of the created experiment. .. code-block:: python :caption: Example import mlflow # Create an experiment name, which must be unique and case sensitive experiment_id = mlflow.create_experiment("Social NLP Experiments") experiment = mlflow.get_experiment(experiment_id) print("Name: {}".format(experiment.name)) print("Experiment_id: {}".format(experiment.experiment_id)) print("Artifact Location: {}".format(experiment.artifact_location)) print("Tags: {}".format(experiment.tags)) print("Lifecycle_stage: {}".format(experiment.lifecycle_stage)) .. code-block:: text :caption: Output Name: Social NLP Experiments Experiment_id: 1 Artifact Location: file:///.../mlruns/1 Tags= {} Lifecycle_stage: active """ return MlflowClient().create_experiment(name, artifact_location, tags)
[docs]def delete_experiment(experiment_id: str) -> None: """ Delete an experiment from the backend store. :param experiment_id: The The string-ified experiment ID returned from ``create_experiment``. .. code-block:: python :caption: Example import mlflow experiment_id = mlflow.create_experiment("New Experiment") mlflow.delete_experiment(experiment_id) # Examine the deleted experiment details. experiment = mlflow.get_experiment(experiment_id) print("Name: {}".format(experiment.name)) print("Artifact Location: {}".format(experiment.artifact_location)) print("Lifecycle_stage: {}".format(experiment.lifecycle_stage)) .. code-block:: text :caption: Output Name: New Experiment Artifact Location: file:///.../mlruns/2 Lifecycle_stage: deleted """ MlflowClient().delete_experiment(experiment_id)
[docs]def delete_run(run_id: str) -> None: """ Deletes a run with the given ID. :param run_id: Unique identifier for the run to delete. .. code-block:: python :caption: Example import mlflow with mlflow.start_run() as run: mlflow.log_param("p", 0) run_id = run.info.run_id mlflow.delete_run(run_id) print("run_id: {}; lifecycle_stage: {}".format(run_id, mlflow.get_run(run_id).info.lifecycle_stage)) .. code-block:: text :caption: Output run_id: 45f4af3e6fd349e58579b27fcb0b8277; lifecycle_stage: deleted """ MlflowClient().delete_run(run_id)
[docs]def get_artifact_uri(artifact_path: Optional[str] = None) -> str: """ Get the absolute URI of the specified artifact in the currently active run. If `path` is not specified, the artifact root URI of the currently active run will be returned; calls to ``log_artifact`` and ``log_artifacts`` write artifact(s) to subdirectories of the artifact root URI. If no run is active, this method will create a new active run. :param artifact_path: The run-relative artifact path for which to obtain an absolute URI. For example, "path/to/artifact". If unspecified, the artifact root URI for the currently active run will be returned. :return: An *absolute* URI referring to the specified artifact or the currently adtive run's artifact root. For example, if an artifact path is provided and the currently active run uses an S3-backed store, this may be a uri of the form ``s3://<bucket_name>/path/to/artifact/root/path/to/artifact``. If an artifact path is not provided and the currently active run uses an S3-backed store, this may be a URI of the form ``s3://<bucket_name>/path/to/artifact/root``. .. code-block:: python :caption: Example import mlflow features = "rooms, zipcode, median_price, school_rating, transport" with open("features.txt", 'w') as f: f.write(features) # Log the artifact in a directory "features" under the root artifact_uri/features with mlflow.start_run(): mlflow.log_artifact("features.txt", artifact_path="features") # Fetch the artifact uri root directory artifact_uri = mlflow.get_artifact_uri() print("Artifact uri: {}".format(artifact_uri)) # Fetch a specific artifact uri artifact_uri = mlflow.get_artifact_uri(artifact_path="features/features.txt") print("Artifact uri: {}".format(artifact_uri)) .. code-block:: text :caption: Output Artifact uri: file:///.../0/a46a80f1c9644bd8f4e5dd5553fffce/artifacts Artifact uri: file:///.../0/a46a80f1c9644bd8f4e5dd5553fffce/artifacts/features/features.txt """ return artifact_utils.get_artifact_uri( run_id=_get_or_start_run().info.run_id, artifact_path=artifact_path )
[docs]def search_runs( experiment_ids: Optional[List[str]] = None, filter_string: str = "", run_view_type: int = ViewType.ACTIVE_ONLY, max_results: int = SEARCH_MAX_RESULTS_PANDAS, order_by: Optional[List[str]] = None, output_format: str = "pandas", search_all_experiments: bool = False, experiment_names: Optional[List[str]] = None, ) -> Union[List[Run], "pandas.DataFrame"]: """ Get a pandas DataFrame of runs that fit the search criteria. :param experiment_ids: List of experiment IDs. Search can work with experiment IDs or experiment names, but not both in the same call. Values other than ``None`` or ``[]`` will result in error if ``experiment_names`` is also not ``None`` or ``[]``. ``None`` will default to the active experiment if ``experiment_names`` is ``None`` or ``[]``. :param filter_string: Filter query string, defaults to searching all runs. :param run_view_type: one of enum values ``ACTIVE_ONLY``, ``DELETED_ONLY``, or ``ALL`` runs defined in :py:class:`mlflow.entities.ViewType`. :param max_results: The maximum number of runs to put in the dataframe. Default is 100,000 to avoid causing out-of-memory issues on the user's machine. :param order_by: List of columns to order by (e.g., "metrics.rmse"). The ``order_by`` column can contain an optional ``DESC`` or ``ASC`` value. The default is ``ASC``. The default ordering is to sort by ``start_time DESC``, then ``run_id``. :param output_format: The output format to be returned. If ``pandas``, a ``pandas.DataFrame`` is returned and, if ``list``, a list of :py:class:`mlflow.entities.Run` is returned. :param search_all_experiments: Boolean specifying whether all experiments should be searched. Only honored if ``experiment_ids`` is ``[]`` or ``None``. :param experiment_names: List of experiment names. Search can work with experiment IDs or experiment names, but not both in the same call. Values other than ``None`` or ``[]`` will result in error if ``experiment_ids`` is also not ``None`` or ``[]``. ``None`` will default to the active experiment if ``experiment_ids`` is ``None`` or ``[]``. :return: If output_format is ``list``: a list of :py:class:`mlflow.entities.Run`. If output_format is ``pandas``: ``pandas.DataFrame`` of runs, where each metric, parameter, and tag is expanded into its own column named metrics.*, params.*, or tags.* respectively. For runs that don't have a particular metric, parameter, or tag, the value for the corresponding column is (NumPy) ``Nan``, ``None``, or ``None`` respectively. .. code-block:: python :caption: Example import mlflow # Create an experiment and log two runs under it experiment_name = "Social NLP Experiments" experiment_id = mlflow.create_experiment(experiment_name) with mlflow.start_run(experiment_id=experiment_id): mlflow.log_metric("m", 1.55) mlflow.set_tag("s.release", "1.1.0-RC") with mlflow.start_run(experiment_id=experiment_id): mlflow.log_metric("m", 2.50) mlflow.set_tag("s.release", "1.2.0-GA") # Search for all the runs in the experiment with the given experiment ID df = mlflow.search_runs([experiment_id], order_by=["metrics.m DESC"]) print(df[["metrics.m", "tags.s.release", "run_id"]]) print("--") # Search the experiment_id using a filter_string with tag # that has a case insensitive pattern filter_string = "tags.s.release ILIKE '%rc%'" df = mlflow.search_runs([experiment_id], filter_string=filter_string) print(df[["metrics.m", "tags.s.release", "run_id"]]) print("--") # Search for all the runs in the experiment with the given experiment name df = mlflow.search_runs(experiment_names=[experiment_name], order_by=["metrics.m DESC"]) print(df[["metrics.m", "tags.s.release", "run_id"]]) .. code-block:: text :caption: Output metrics.m tags.s.release run_id 0 2.50 1.2.0-GA 147eed886ab44633902cc8e19b2267e2 1 1.55 1.1.0-RC 5cc7feaf532f496f885ad7750809c4d4 -- metrics.m tags.s.release run_id 0 1.55 1.1.0-RC 5cc7feaf532f496f885ad7750809c4d4 -- metrics.m tags.s.release run_id 0 2.50 1.2.0-GA 147eed886ab44633902cc8e19b2267e2 1 1.55 1.1.0-RC 5cc7feaf532f496f885ad7750809c4d4 """ no_ids = experiment_ids is None or len(experiment_ids) == 0 no_names = experiment_names is None or len(experiment_names) == 0 no_ids_or_names = no_ids and no_names if not no_ids and not no_names: raise MlflowException( message="Only experiment_ids or experiment_names can be used, but not both", error_code=INVALID_PARAMETER_VALUE, ) if search_all_experiments and no_ids_or_names: experiment_ids = [ exp.experiment_id for exp in list_experiments(view_type=ViewType.ACTIVE_ONLY) ] elif no_ids_or_names: experiment_ids = _get_experiment_id() elif not no_names: experiments = [get_experiment_by_name(n) for n in experiment_names if n is not None] experiment_ids = [e.experiment_id for e in experiments if e is not None] # Using an internal function as the linter doesn't like assigning a lambda, and inlining the # full thing is a mess def pagination_wrapper_func(number_to_get, next_page_token): return MlflowClient().search_runs( experiment_ids, filter_string, run_view_type, number_to_get, order_by, next_page_token, ) runs = _paginate(pagination_wrapper_func, NUM_RUNS_PER_PAGE_PANDAS, max_results) if output_format == "list": return runs # List[mlflow.entities.run.Run] elif output_format == "pandas": import numpy as np import pandas as pd info = { "run_id": [], "experiment_id": [], "status": [], "artifact_uri": [], "start_time": [], "end_time": [], } params, metrics, tags = ({}, {}, {}) PARAM_NULL, METRIC_NULL, TAG_NULL = (None, np.nan, None) for i, run in enumerate(runs): info["run_id"].append(run.info.run_id) info["experiment_id"].append(run.info.experiment_id) info["status"].append(run.info.status) info["artifact_uri"].append(run.info.artifact_uri) info["start_time"].append(pd.to_datetime(run.info.start_time, unit="ms", utc=True)) info["end_time"].append(pd.to_datetime(run.info.end_time, unit="ms", utc=True)) # Params param_keys = set(params.keys()) for key in param_keys: if key in run.data.params: params[key].append(run.data.params[key]) else: params[key].append(PARAM_NULL) new_params = set(run.data.params.keys()) - param_keys for p in new_params: params[p] = [PARAM_NULL] * i # Fill in null values for all previous runs params[p].append(run.data.params[p]) # Metrics metric_keys = set(metrics.keys()) for key in metric_keys: if key in run.data.metrics: metrics[key].append(run.data.metrics[key]) else: metrics[key].append(METRIC_NULL) new_metrics = set(run.data.metrics.keys()) - metric_keys for m in new_metrics: metrics[m] = [METRIC_NULL] * i metrics[m].append(run.data.metrics[m]) # Tags tag_keys = set(tags.keys()) for key in tag_keys: if key in run.data.tags: tags[key].append(run.data.tags[key]) else: tags[key].append(TAG_NULL) new_tags = set(run.data.tags.keys()) - tag_keys for t in new_tags: tags[t] = [TAG_NULL] * i tags[t].append(run.data.tags[t]) data = {} data.update(info) for key in metrics: data["metrics." + key] = metrics[key] for key in params: data["params." + key] = params[key] for key in tags: data["tags." + key] = tags[key] return pd.DataFrame(data) else: raise ValueError( "Unsupported output format: %s. Supported string values are 'pandas' or 'list'" % output_format )
[docs]def list_run_infos( experiment_id: str, run_view_type: int = ViewType.ACTIVE_ONLY, max_results: int = SEARCH_MAX_RESULTS_DEFAULT, order_by: Optional[List[str]] = None, ) -> List[RunInfo]: """ Return run information for runs which belong to the experiment_id. :param experiment_id: The experiment id which to search :param run_view_type: ACTIVE_ONLY, DELETED_ONLY, or ALL runs :param max_results: Maximum number of results desired. :param order_by: List of order_by clauses. Currently supported values are are ``metric.key``, ``parameter.key``, ``tag.key``, ``attribute.key``. For example, ``order_by=["tag.release ASC", "metric.click_rate DESC"]``. :return: A list of :py:class:`RunInfo <mlflow.entities.RunInfo>` objects that satisfy the search expressions. .. code-block:: python :caption: Example import mlflow from mlflow.entities import ViewType # Create two runs with mlflow.start_run() as run1: mlflow.log_param("p", 0) with mlflow.start_run() as run2: mlflow.log_param("p", 1) # Delete the last run mlflow.delete_run(run2.info.run_id) def print_run_infos(run_infos): for r in run_infos: print("- run_id: {}, lifecycle_stage: {}".format(r.run_id, r.lifecycle_stage)) print("Active runs:") print_run_infos(mlflow.list_run_infos("0", run_view_type=ViewType.ACTIVE_ONLY)) print("Deleted runs:") print_run_infos(mlflow.list_run_infos("0", run_view_type=ViewType.DELETED_ONLY)) print("All runs:") print_run_infos(mlflow.list_run_infos("0", run_view_type=ViewType.ALL)) .. code-block:: text :caption: Output Active runs: - run_id: 4937823b730640d5bed9e3e5057a2b34, lifecycle_stage: active Deleted runs: - run_id: b13f1badbed842cf9975c023d23da300, lifecycle_stage: deleted All runs: - run_id: b13f1badbed842cf9975c023d23da300, lifecycle_stage: deleted - run_id: 4937823b730640d5bed9e3e5057a2b34, lifecycle_stage: active """ # Using an internal function as the linter doesn't like assigning a lambda, and inlining the # full thing is a mess def pagination_wrapper_func(number_to_get, next_page_token): return MlflowClient().list_run_infos( experiment_id, run_view_type, number_to_get, order_by, next_page_token ) return _paginate(pagination_wrapper_func, SEARCH_MAX_RESULTS_DEFAULT, max_results)
def _paginate(paginated_fn, max_results_per_page, max_results=None): """ Intended to be a general use pagination utility. :param paginated_fn: :type paginated_fn: This function is expected to take in the number of results to retrieve per page and a pagination token, and return a PagedList object :param max_results_per_page: :type max_results_per_page: The maximum number of results to retrieve per page :param max_results: :type max_results: The maximum number of results to retrieve overall. If unspecified, all results will be retrieved. :return: Returns a list of entities, as determined by the paginated_fn parameter, with no more entities than specified by max_results :rtype: list[object] """ all_results = [] next_page_token = None returns_all = max_results is None while returns_all or len(all_results) < max_results: num_to_get = max_results_per_page if returns_all else max_results - len(all_results) if num_to_get < max_results_per_page: page_results = paginated_fn(num_to_get, next_page_token) else: page_results = paginated_fn(max_results_per_page, next_page_token) all_results.extend(page_results) if hasattr(page_results, "token") and page_results.token: next_page_token = page_results.token else: break return all_results def _get_or_start_run(): if len(_active_run_stack) > 0: return _active_run_stack[-1] return start_run() def _get_experiment_id_from_env(): experiment_name = env.get_env(_EXPERIMENT_NAME_ENV_VAR) if experiment_name is not None: exp = MlflowClient().get_experiment_by_name(experiment_name) return exp.experiment_id if exp else None return env.get_env(_EXPERIMENT_ID_ENV_VAR) def _get_experiment_id(): return ( _active_experiment_id or _get_experiment_id_from_env() or default_experiment_registry.get_experiment_id() )
[docs]@autologging_integration("mlflow") def autolog( log_input_examples: bool = False, log_model_signatures: bool = True, log_models: bool = True, disable: bool = False, exclusive: bool = False, disable_for_unsupported_versions: bool = False, silent: bool = False, # pylint: disable=unused-argument ) -> None: """ Enables (or disables) and configures autologging for all supported integrations. The parameters are passed to any autologging integrations that support them. See the :ref:`tracking docs <automatic-logging>` for a list of supported autologging integrations. Note that framework-specific configurations set at any point will take precedence over any configurations set by this function. For example: .. code-block:: python mlflow.autolog(log_models=False, exclusive=True) import sklearn would enable autologging for `sklearn` with `log_models=False` and `exclusive=True`, but .. code-block:: python mlflow.autolog(log_models=False, exclusive=True) import sklearn mlflow.sklearn.autolog(log_models=True) would enable autologging for `sklearn` with `log_models=True` and `exclusive=False`, the latter resulting from the default value for `exclusive` in `mlflow.sklearn.autolog`; other framework autolog functions (e.g. `mlflow.tensorflow.autolog`) would use the configurations set by `mlflow.autolog` (in this instance, `log_models=False`, `exclusive=True`), until they are explicitly called by the user. :param log_input_examples: If ``True``, input examples from training datasets are collected and logged along with model artifacts during training. If ``False``, input examples are not logged. Note: Input examples are MLflow model attributes and are only collected if ``log_models`` is also ``True``. :param log_model_signatures: If ``True``, :py:class:`ModelSignatures <mlflow.models.ModelSignature>` describing model inputs and outputs are collected and logged along with model artifacts during training. If ``False``, signatures are not logged. Note: Model signatures are MLflow model attributes and are only collected if ``log_models`` is also ``True``. :param log_models: If ``True``, trained models are logged as MLflow model artifacts. If ``False``, trained models are not logged. Input examples and model signatures, which are attributes of MLflow models, are also omitted when ``log_models`` is ``False``. :param disable: If ``True``, disables all supported autologging integrations. If ``False``, enables all supported autologging integrations. :param exclusive: If ``True``, autologged content is not logged to user-created fluent runs. If ``False``, autologged content is logged to the active fluent run, which may be user-created. :param disable_for_unsupported_versions: If ``True``, disable autologging for versions of all integration libraries that have not been tested against this version of the MLflow client or are incompatible. :param silent: If ``True``, suppress all event logs and warnings from MLflow during autologging setup and training execution. If ``False``, show all events and warnings during autologging setup and training execution. .. code-block:: python :caption: Example import numpy as np import mlflow.sklearn from mlflow.tracking import MlflowClient from sklearn.linear_model import LinearRegression def print_auto_logged_info(r): tags = {k: v for k, v in r.data.tags.items() if not k.startswith("mlflow.")} artifacts = [f.path for f in MlflowClient().list_artifacts(r.info.run_id, "model")] print("run_id: {}".format(r.info.run_id)) print("artifacts: {}".format(artifacts)) print("params: {}".format(r.data.params)) print("metrics: {}".format(r.data.metrics)) print("tags: {}".format(tags)) # prepare training data X = np.array([[1, 1], [1, 2], [2, 2], [2, 3]]) y = np.dot(X, np.array([1, 2])) + 3 # Auto log all the parameters, metrics, and artifacts mlflow.autolog() model = LinearRegression() with mlflow.start_run() as run: model.fit(X, y) # fetch the auto logged parameters and metrics for ended run print_auto_logged_info(mlflow.get_run(run_id=run.info.run_id)) .. code-block:: text :caption: Output run_id: fd10a17d028c47399a55ab8741721ef7 artifacts: ['model/MLmodel', 'model/conda.yaml', 'model/model.pkl'] params: {'copy_X': 'True', 'normalize': 'False', 'fit_intercept': 'True', 'n_jobs': 'None'} metrics: {'training_score': 1.0, 'training_rmse': 4.440892098500626e-16, 'training_r2_score': 1.0, 'training_mae': 2.220446049250313e-16, 'training_mse': 1.9721522630525295e-31} tags: {'estimator_class': 'sklearn.linear_model._base.LinearRegression', 'estimator_name': 'LinearRegression'} """ from mlflow import ( tensorflow, keras, gluon, xgboost, lightgbm, pyspark, statsmodels, spark, sklearn, fastai, pytorch, ) locals_copy = locals().items() # Mapping of library module name to specific autolog function # eg: mxnet.gluon is the actual library, mlflow.gluon.autolog is our autolog function for it LIBRARY_TO_AUTOLOG_FN = { "tensorflow": tensorflow.autolog, "keras": keras.autolog, "mxnet.gluon": gluon.autolog, "xgboost": xgboost.autolog, "lightgbm": lightgbm.autolog, "statsmodels": statsmodels.autolog, "sklearn": sklearn.autolog, "fastai": fastai.autolog, "pyspark": spark.autolog, "pyspark.ml": pyspark.ml.autolog, # TODO: Broaden this beyond pytorch_lightning as we add autologging support for more # Pytorch frameworks under mlflow.pytorch.autolog "pytorch_lightning": pytorch.autolog, } def get_autologging_params(autolog_fn): try: needed_params = list(inspect.signature(autolog_fn).parameters.keys()) return {k: v for k, v in locals_copy if k in needed_params} except Exception: return {} def setup_autologging(module): try: autolog_fn = LIBRARY_TO_AUTOLOG_FN[module.__name__] # Only call integration's autolog function with `mlflow.autolog` configs # if the integration's autolog function has not already been called by the user. # Logic is as follows: # - if a previous_config exists, that means either `mlflow.autolog` or # `mlflow.integration.autolog` was called. # - if the config contains `AUTOLOGGING_CONF_KEY_IS_GLOBALLY_CONFIGURED`, the # configuration was set by `mlflow.autolog`, and so we can safely call `autolog_fn` # with `autologging_params`. # - if the config doesn't contain this key, the configuration was set by an # `mlflow.integration.autolog` call, so we should not call `autolog_fn` with # new configs. prev_config = AUTOLOGGING_INTEGRATIONS.get(autolog_fn.integration_name) if prev_config and not prev_config.get( AUTOLOGGING_CONF_KEY_IS_GLOBALLY_CONFIGURED, False ): return autologging_params = get_autologging_params(autolog_fn) autolog_fn(**autologging_params) AUTOLOGGING_INTEGRATIONS[autolog_fn.integration_name][ AUTOLOGGING_CONF_KEY_IS_GLOBALLY_CONFIGURED ] = True if not autologging_is_disabled( autolog_fn.integration_name ) and not autologging_params.get("silent", False): _logger.info("Autologging successfully enabled for %s.", module.__name__) except Exception as e: if is_testing(): # Raise unexpected exceptions in test mode in order to detect # errors within dependent autologging integrations raise elif not autologging_params.get("silent", False): _logger.warning( "Exception raised while enabling autologging for %s: %s", module.__name__, str(e), ) # for each autolog library (except pyspark), register a post-import hook. # this way, we do not send any errors to the user until we know they are using the library. # the post-import hook also retroactively activates for previously-imported libraries. for module in list( set(LIBRARY_TO_AUTOLOG_FN.keys()) - set(["tensorflow", "keras", "pyspark", "pyspark.ml"]) ): register_post_import_hook(setup_autologging, module, overwrite=True) FULLY_IMPORTED_KERAS = False TF_AUTOLOG_SETUP_CALLED = False def conditionally_set_up_keras_autologging(keras_module): nonlocal FULLY_IMPORTED_KERAS, TF_AUTOLOG_SETUP_CALLED FULLY_IMPORTED_KERAS = True if Version(keras_module.__version__) >= Version("2.6.0"): # NB: Keras unconditionally depends on TensorFlow beginning with Version 2.6.0, and # many classes defined in the `keras` module are aliases of classes in the `tf.keras` # module. Accordingly, TensorFlow autologging serves as a replacement for Keras # autologging in Keras >= 2.6.0 try: import tensorflow setup_autologging(tensorflow) TF_AUTOLOG_SETUP_CALLED = True except Exception as e: _logger.debug( "Failed to set up TensorFlow autologging for tf.keras models upon" " Keras library import: %s", str(e), ) raise else: setup_autologging(keras_module) register_post_import_hook(conditionally_set_up_keras_autologging, "keras", overwrite=True) def set_up_tensorflow_autologging(tensorflow_module): import sys nonlocal FULLY_IMPORTED_KERAS, TF_AUTOLOG_SETUP_CALLED if "keras" in sys.modules and not FULLY_IMPORTED_KERAS: # In Keras >= 2.6.0, importing Keras imports the TensorFlow library, which can # trigger this autologging import hook for TensorFlow before the entire Keras import # procedure is completed. Attempting to set up autologging before the Keras import # procedure has completed will result in a failure due to the unavailability of # certain modules. In this case, we terminate the TensorFlow autologging import hook # and rely on the Keras autologging import hook to successfully set up TensorFlow # autologging for tf.keras models once the Keras import procedure has completed return # By design, in Keras >= 2.6.0, Keras needs to enable tensorflow autologging so that # tf.keras models always use tensorflow autologging, rather than vanilla keras autologging. # As a result, Keras autologging must call `mlflow.tensorflow.autolog()` in Keras >= 2.6.0. # Accordingly, we insert this check to ensure that importing tensorflow, which may import # keras, does not enable tensorflow autologging twice. if not TF_AUTOLOG_SETUP_CALLED: setup_autologging(tensorflow_module) register_post_import_hook(set_up_tensorflow_autologging, "tensorflow", overwrite=True) # for pyspark, we activate autologging immediately, without waiting for a module import. # this is because on Databricks a SparkSession already exists and the user can directly # interact with it, and this activity should be logged. try: import pyspark as pyspark_module import pyspark.ml as pyspark_ml_module setup_autologging(pyspark_module) setup_autologging(pyspark_ml_module) except ImportError as ie: # if pyspark isn't installed, a user could potentially install it in the middle # of their session so we want to enable autologging once they do if "pyspark" in str(ie): register_post_import_hook(setup_autologging, "pyspark", overwrite=True) register_post_import_hook(setup_autologging, "pyspark.ml", overwrite=True) except Exception as e: if is_testing(): # Raise unexpected exceptions in test mode in order to detect # errors within dependent autologging integrations raise else: _logger.warning("Exception raised while enabling autologging for spark: %s", str(e))