import importlib
from packaging.version import Version
import mlflow
from mlflow.dspy.save import FLAVOR_NAME
from mlflow.models.model import _MODEL_TRACKER
from mlflow.tracing.provider import trace_disabled
from mlflow.tracing.utils import construct_full_inputs
from mlflow.utils.annotations import experimental
from mlflow.utils.autologging_utils import (
autologging_integration,
get_autologging_config,
safe_patch,
)
[docs]@experimental
def autolog(
log_traces: bool = True,
log_traces_from_compile: bool = False,
log_traces_from_eval: bool = True,
log_compiles: bool = False,
log_evals: bool = False,
disable: bool = False,
silent: bool = False,
log_models: bool = True,
):
"""
Enables (or disables) and configures autologging from DSPy to MLflow. Currently, the
MLflow DSPy flavor only supports autologging for tracing.
Args:
log_traces: If ``True``, traces are logged for DSPy models by using. If ``False``,
no traces are collected during inference. Default to ``True``.
log_traces_from_compile: If ``True``, traces are logged when compiling (optimizing)
DSPy programs. If ``False``, traces are only logged from normal model inference and
disabled when compiling. Default to ``False``.
log_traces_from_eval: If ``True``, traces are logged for DSPy models when running DSPy's
`built-in evaluator <https://dspy.ai/learn/evaluation/metrics/#evaluation>`_.
If ``False``, traces are only logged from normal model inference and disabled when
running the evaluator. Default to ``True``.
log_compiles: If ``True``, information about the optimization process is logged when
`Teleprompter.compile()` is called.
log_evals: If ``True``, information about the evaluation call is logged when
`Evaluate.__call__()` is called.
disable: If ``True``, disables the DSPy autologging integration. If ``False``,
enables the DSPy autologging integration.
silent: If ``True``, suppress all event logs and warnings from MLflow during DSPy
autologging. If ``False``, show all events and warnings.
log_models: If ``True``, automatically create a LoggedModel when the model
used for inference is not already logged. The created LoggedModel contains no model
artifacts, but it will be used to associate all traces generated by the model. If
``False``, no LoggedModel is created and the traces will not be associated with any
model. Default to ``True``.
.. Note:: Experimental: This argument may change or be removed in a future release
without warning.
"""
# NB: The @autologging_integration annotation is used for adding shared logic. However, one
# caveat is that the wrapped function is NOT executed when disable=True is passed. This prevents
# us from running cleaning up logging when autologging is turned off. To workaround this, we
# annotate _autolog() instead of this entrypoint, and define the cleanup logic outside it.
# This needs to be called before doing any safe-patching (otherwise safe-patch will be no-op).
# TODO: since this implementation is inconsistent, explore a universal way to solve the issue.
_autolog(
log_traces=log_traces,
log_traces_from_compile=log_traces_from_compile,
log_traces_from_eval=log_traces_from_eval,
log_compiles=log_compiles,
log_evals=log_evals,
disable=disable,
silent=silent,
log_models=log_models,
)
import dspy
from mlflow.dspy.callback import MlflowCallback
from mlflow.dspy.util import log_dspy_dataset, save_dspy_module_state
# Enable tracing by setting the MlflowCallback
if not disable:
if not any(isinstance(c, MlflowCallback) for c in dspy.settings.callbacks):
dspy.settings.configure(callbacks=[*dspy.settings.callbacks, MlflowCallback()])
else:
dspy.settings.configure(
callbacks=[c for c in dspy.settings.callbacks if not isinstance(c, MlflowCallback)]
)
def patch_module(original, self, *args, **kwargs):
if model_id := _MODEL_TRACKER.get(id(self)):
_MODEL_TRACKER.set_active_model_id(model_id)
elif not _MODEL_TRACKER._is_active_model_id_set and log_models:
logged_model = mlflow.create_external_model(name=self.__class__.__name__)
_MODEL_TRACKER.set(id(self), logged_model.model_id)
_MODEL_TRACKER.set_active_model_id(logged_model.model_id)
else:
_MODEL_TRACKER.set_active_model_id(None)
return original(self, *args, **kwargs)
# This is needed because we should not create LoggedModel for internal objects
_MODEL_TRACKER._is_active_model_id_set = True
try:
return original(self, *args, **kwargs)
finally:
_MODEL_TRACKER._is_active_model_id_set = False
# Patch teleprompter/evaluator not to generate traces by default
def patch_fn(original, self, *args, **kwargs):
# NB: Since calling mlflow.dspy.autolog() again does not unpatch a function, we need to
# check this flag at runtime to determine if we should generate traces.
# method to disable tracing for compile and evaluate by default
@trace_disabled
def _trace_disabled_fn(self, *args, **kwargs):
return original(self, *args, **kwargs)
def _compile_fn(self, *args, **kwargs):
if callback := _active_callback():
callback.optimizer_stack_level += 1
try:
if get_autologging_config(FLAVOR_NAME, "log_traces_from_compile"):
result = original(self, *args, **kwargs)
else:
result = _trace_disabled_fn(self, *args, **kwargs)
return result
finally:
if callback:
callback.optimizer_stack_level -= 1
if callback.optimizer_stack_level == 0:
# Reset the callback state after the completion of root compile
callback.reset()
if isinstance(self, Teleprompter):
if not get_autologging_config(FLAVOR_NAME, "log_compiles"):
return _compile_fn(self, *args, **kwargs)
program = _compile_fn(self, *args, **kwargs)
# Save the state of the best model in json format
# so that users can see the demonstrations and instructions.
save_dspy_module_state(program, "best_model.json")
# Teleprompter.get_params is introduced in dspy 2.6.15
params = (
self.get_params()
if Version(importlib.metadata.version("dspy")) >= Version("2.6.15")
else {}
)
# Construct the dict of arguments passed to the compile call
inputs = construct_full_inputs(original, self, *args, **kwargs)
# Update params with the arguments passed to the compile call
params.update(inputs)
mlflow.log_params(
{k: v for k, v in inputs.items() if isinstance(v, (int, float, str, bool))}
)
if trainset := inputs.get("trainset"):
log_dspy_dataset(trainset, "trainset.json")
if valset := inputs.get("valset"):
log_dspy_dataset(valset, "valset.json")
return program
if isinstance(self, Teleprompter) and get_autologging_config(
FLAVOR_NAME, "log_traces_from_compile"
):
return original(self, *args, **kwargs)
if isinstance(self, Evaluate) and get_autologging_config(
FLAVOR_NAME, "log_traces_from_eval"
):
return original(self, *args, **kwargs)
return _trace_disabled_fn(self, *args, **kwargs)
from dspy import Module
from dspy.evaluate import Evaluate
from dspy.teleprompt import Teleprompter
safe_patch(
FLAVOR_NAME,
Module,
"__call__",
patch_module,
)
compile_patch = "compile"
for cls in Teleprompter.__subclasses__():
# NB: This is to avoid the abstraction inheritance of superclasses that are defined
# only for the purposes of abstraction. The recursion behavior of the
# __subclasses__ dunder method will target the appropriate subclasses we need to patch.
if hasattr(cls, compile_patch):
safe_patch(
FLAVOR_NAME,
cls,
compile_patch,
patch_fn,
manage_run=get_autologging_config(FLAVOR_NAME, "log_compiles"),
)
call_patch = "__call__"
if hasattr(Evaluate, call_patch):
safe_patch(
FLAVOR_NAME,
Evaluate,
call_patch,
patch_fn,
)
# This is required by mlflow.autolog()
autolog.integration_name = FLAVOR_NAME
@autologging_integration(FLAVOR_NAME)
def _autolog(
log_traces: bool,
log_traces_from_compile: bool,
log_traces_from_eval: bool,
log_compiles: bool,
log_evals: bool,
disable: bool = False,
silent: bool = False,
log_models: bool = True,
):
pass
def _active_callback():
import dspy
from mlflow.dspy.callback import MlflowCallback
for callback in dspy.settings.callbacks:
if isinstance(callback, MlflowCallback):
return callback