Source code for mlflow.promptflow

"""
The ``mlflow.promptflow`` module provides an API for logging and loading Promptflow models.
This module exports Promptflow models with the following flavors:

Promptflow (native) format
    This is the main flavor that can be accessed with Promptflow APIs.
:py:mod:`mlflow.pyfunc`
    Produced for use by generic pyfunc-based deployment tools and batch inference.

.. _Promptflow:
    https://microsoft.github.io/promptflow
"""

import logging
import os
import shutil
from pathlib import Path
from typing import Any

import pandas as pd
import yaml

import mlflow
from mlflow import pyfunc
from mlflow.entities.model_registry.prompt import Prompt
from mlflow.models import Model, ModelSignature
from mlflow.models.model import MLMODEL_FILE_NAME
from mlflow.models.signature import _infer_signature_from_input_example
from mlflow.models.utils import ModelInputExample, _save_example
from mlflow.tracking._model_registry import DEFAULT_AWAIT_MAX_SLEEP_SECONDS
from mlflow.tracking.artifact_utils import _download_artifact_from_uri
from mlflow.utils.docstring_utils import LOG_MODEL_PARAM_DOCS, format_docstring
from mlflow.utils.environment import (
    _CONDA_ENV_FILE_NAME,
    _CONSTRAINTS_FILE_NAME,
    _PYTHON_ENV_FILE_NAME,
    _REQUIREMENTS_FILE_NAME,
    _mlflow_conda_env,
    _process_conda_env,
    _process_pip_requirements,
    _PythonEnv,
    _validate_env_arguments,
)
from mlflow.utils.file_utils import write_to
from mlflow.utils.model_utils import (
    _validate_and_copy_code_paths,
    _validate_and_prepare_target_save_path,
)
from mlflow.utils.requirements_utils import _get_pinned_requirement

_logger = logging.getLogger(__name__)

FLAVOR_NAME = "promptflow"

_MODEL_FLOW_DIRECTORY = "flow"
_FLOW_ENV_REQUIREMENTS = "python_requirements_txt"
_UNSUPPORTED_MODEL_ERROR_MESSAGE = (
    "MLflow promptflow flavor only supports instance defined with 'flow.dag.yaml' file "
    "and loaded by ~promptflow.load_flow(), found {instance_type}."
)
_INVALID_PREDICT_INPUT_ERROR_MESSAGE = (
    "Input must be a pandas DataFrame with only 1 row "
    "or a dictionary contains flow inputs key-value pairs."
)
_CONNECTION_PROVIDER_CONFIG_KEY = "connection_provider"
_CONNECTION_OVERRIDES_CONFIG_KEY = "connection_overrides"


[docs]def get_default_pip_requirements(): """ Returns: A list of default pip requirements for MLflow Models produced by this flavor. Calls to :func:`save_model()` and :func:`log_model()` produce a pip environment that, at a minimum, contains these requirements. """ tools_package = None try: # Note: If user don't use built-in tool in their flow, # then promptflow-tools is not a mandatory dependency. tools_package = _get_pinned_requirement("promptflow-tools") except ImportError: # pylint: disable=broad-except pass requirements = [tools_package] if tools_package else [] return requirements + [_get_pinned_requirement("promptflow")]
[docs]def get_default_conda_env(): """ Returns: The default Conda environment for MLflow Models produced by calls to :func:`save_model()` and :func:`log_model()`. """ return _mlflow_conda_env(additional_pip_deps=get_default_pip_requirements())
[docs]@format_docstring(LOG_MODEL_PARAM_DOCS.format(package_name=FLAVOR_NAME)) def log_model( model, artifact_path: str | None = None, conda_env=None, code_paths=None, registered_model_name=None, signature=None, input_example=None, await_registration_for=DEFAULT_AWAIT_MAX_SLEEP_SECONDS, pip_requirements=None, extra_pip_requirements=None, metadata=None, model_config: dict[str, Any] | None = None, prompts: list[str | Prompt] | None = None, name: str | None = None, params: dict[str, Any] | None = None, tags: dict[str, Any] | None = None, model_type: str | None = None, step: int = 0, model_id: str | None = None, ): """ Log a Promptflow model as an MLflow artifact for the current run. Args: model: A promptflow model loaded by `promptflow.load_flow()`. artifact_path: Deprecated. Use `name` instead. conda_env: {{ conda_env }} code_paths: {{ code_paths }} registered_model_name: If given, create a model version under ``registered_model_name``, also creating a registered model if one with the given name does not exist. signature: {{ signature }} input_example: {{ input_example }} await_registration_for: Number of seconds to wait for the model version to finish being created and is in ``READY`` status. By default, the function waits for five minutes. Specify 0 or None to skip waiting. pip_requirements: {{ pip_requirements }} extra_pip_requirements: {{ extra_pip_requirements }} metadata: {{ metadata }} model_config: A dict of valid overrides that can be applied to a flow instance during inference. These arguments are used exclusively for the case of loading the model as a ``pyfunc`` Model. These values are not applied to a returned flow from a call to ``mlflow.promptflow.load_model()``. To override configs for a loaded flow with promptflow flavor, please update the ``pf_model.context`` directly. Configs that can be overridden includes: ``connection_provider`` - The connection provider to use for the flow. Reach https://microsoft.github.io/promptflow/how-to-guides/set-global-configs.html#connection-provider for more details on how to set connection provider. ``connection_overrides`` - The connection name overrides to use for the flow. Example: ``{"aoai_connection": "azure_open_ai_connection"}``. The node with reference to connection 'aoai_connection' will be resolved to the actual connection 'azure_open_ai_connection'. An example of providing overrides for a model to use azure machine learning workspace connection: .. code-block:: python flow_folder = Path(__file__).parent / "basic" flow = load_flow(flow_folder) workspace_resource_id = ( "azureml://subscriptions/{your-subscription}/resourceGroups/{your-resourcegroup}" "/providers/Microsoft.MachineLearningServices/workspaces/{your-workspace}" ) model_config = { "connection_provider": workspace_resource_id, "connection_overrides": {"local_conn_name": "remote_conn_name"}, } with mlflow.start_run(): logged_model = mlflow.promptflow.log_model( flow, name="promptflow_model", model_config=model_config ) prompts: {{ prompts }} name: {{ name }} params: {{ params }} tags: {{ tags }} model_type: {{ model_type }} step: {{ step }} model_id: {{ model_id }} Returns A :py:class:`ModelInfo <mlflow.models.model.ModelInfo>` instance that contains the metadata of the logged model. """ return Model.log( artifact_path=artifact_path, name=name, flavor=mlflow.promptflow, registered_model_name=registered_model_name, model=model, conda_env=conda_env, code_paths=code_paths, signature=signature, input_example=input_example, await_registration_for=await_registration_for, pip_requirements=pip_requirements, extra_pip_requirements=extra_pip_requirements, metadata=metadata, model_config=model_config, prompts=prompts, params=params, tags=tags, model_type=model_type, step=step, model_id=model_id, )
[docs]@format_docstring(LOG_MODEL_PARAM_DOCS.format(package_name=FLAVOR_NAME)) def save_model( model, path, conda_env=None, code_paths=None, mlflow_model=None, signature: ModelSignature = None, input_example: ModelInputExample = None, pip_requirements=None, extra_pip_requirements=None, metadata=None, model_config: dict[str, Any] | None = None, ): """ Save a Promptflow model to a path on the local file system. Args: model: A promptflow model loaded by `promptflow.load_flow()`. path: Local path where the serialized model (as YAML) is to be saved. conda_env: {{ conda_env }} code_paths: {{ code_paths }} mlflow_model: :py:mod:`mlflow.models.Model` this flavor is being added to. signature: {{ signature }} input_example: {{ input_example }} pip_requirements: {{ pip_requirements }} extra_pip_requirements: {{ extra_pip_requirements }} metadata: {{ metadata }} model_config: A dict of valid overrides that can be applied to a flow instance during inference. These arguments are used exclusively for the case of loading the model as a ``pyfunc`` Model. These values are not applied to a returned flow from a call to ``mlflow.promptflow.load_model()``. To override configs for a loaded flow with promptflow flavor, please update the ``pf_model.context`` directly. Configs that can be overridden includes: ``connection_provider`` - The connection provider to use for the flow. Reach https://microsoft.github.io/promptflow/how-to-guides/set-global-configs.html#connection-provider for more details on how to set connection provider. ``connection_overrides`` - The connection name overrides to use for the flow. Example: ``{"aoai_connection": "azure_open_ai_connection"}``. The node with reference to connection 'aoai_connection' will be resolved to the actual connection 'azure_open_ai_connection'. An example of providing overrides for a model to use azure machine learning workspace connection: .. code-block:: python flow_folder = Path(__file__).parent / "basic" flow = load_flow(flow_folder) workspace_resource_id = ( "azureml://subscriptions/{your-subscription}/resourceGroups/{your-resourcegroup}" "/providers/Microsoft.MachineLearningServices/workspaces/{your-workspace}" ) model_config = { "connection_provider": workspace_resource_id, "connection_overrides": {"local_conn_name": "remote_conn_name"}, } with mlflow.start_run(): logged_model = mlflow.promptflow.log_model( flow, name="promptflow_model", model_config=model_config ) """ import promptflow from promptflow._sdk._mlflow import ( DAG_FILE_NAME, Flow, _merge_local_code_and_additional_includes, remove_additional_includes, ) _validate_env_arguments(conda_env, pip_requirements, extra_pip_requirements) if ( not isinstance(model, Flow) or not hasattr(model, "flow_dag_path") or not hasattr(model, "code") ): raise mlflow.MlflowException.invalid_parameter_value( _UNSUPPORTED_MODEL_ERROR_MESSAGE.format(instance_type=type(model).__name__) ) # check if path exists path = os.path.abspath(path) _validate_and_prepare_target_save_path(path) # Copy to 'flow' directory to get files merged with flow files. code_dir_subpath = _validate_and_copy_code_paths( code_paths, path, default_subpath=_MODEL_FLOW_DIRECTORY ) model_flow_path = os.path.join(path, _MODEL_FLOW_DIRECTORY) # Resolve additional includes in flow with _merge_local_code_and_additional_includes(code_path=model.code) as resolved_model_dir: remove_additional_includes(Path(resolved_model_dir)) shutil.copytree(src=resolved_model_dir, dst=model_flow_path, dirs_exist_ok=True) # Get flow env in flow dag flow_env = _resolve_env_from_flow(model.flow_dag_path) if mlflow_model is None: mlflow_model = Model() saved_example = _save_example(mlflow_model, input_example, path) if signature is None and saved_example is not None: wrapped_model = _PromptflowModelWrapper(model) signature = _infer_signature_from_input_example(saved_example, wrapped_model) if signature is not None: mlflow_model.signature = signature if metadata is not None: mlflow_model.metadata = metadata # update flavor info to mlflow_model mlflow_model.add_flavor( FLAVOR_NAME, version=promptflow.__version__, entry=f"{_MODEL_FLOW_DIRECTORY}/{DAG_FILE_NAME}", **flow_env, ) # append loader_module, data and env data to mlflow_model pyfunc.add_to_model( mlflow_model, loader_module="mlflow.promptflow", conda_env=_CONDA_ENV_FILE_NAME, python_env=_PYTHON_ENV_FILE_NAME, code=code_dir_subpath, model_config=model_config, ) # save mlflow_model to path/MLmodel mlflow_model.save(os.path.join(path, MLMODEL_FILE_NAME)) if conda_env is None: if pip_requirements is None: default_reqs = get_default_pip_requirements() inferred_reqs = mlflow.models.infer_pip_requirements( path, FLAVOR_NAME, fallback=default_reqs ) default_reqs = sorted(set(inferred_reqs).union(default_reqs)) else: default_reqs = None conda_env, pip_requirements, pip_constraints = _process_pip_requirements( default_reqs, pip_requirements, extra_pip_requirements, ) else: conda_env, pip_requirements, pip_constraints = _process_conda_env(conda_env) with open(os.path.join(path, _CONDA_ENV_FILE_NAME), "w") as f: yaml.safe_dump(conda_env, stream=f, default_flow_style=False) # Save `constraints.txt` if necessary if pip_constraints: write_to(os.path.join(path, _CONSTRAINTS_FILE_NAME), "\n".join(pip_constraints)) # Save `requirements.txt` write_to(os.path.join(path, _REQUIREMENTS_FILE_NAME), "\n".join(pip_requirements)) _PythonEnv.current().to_yaml(os.path.join(path, _PYTHON_ENV_FILE_NAME))
def _resolve_env_from_flow(flow_dag_path): with open(flow_dag_path) as f: flow_dict = yaml.safe_load(f) environment = flow_dict.get("environment", {}) if _FLOW_ENV_REQUIREMENTS in environment: # Append entry path to requirements environment[_FLOW_ENV_REQUIREMENTS] = ( f"{_MODEL_FLOW_DIRECTORY}/{environment[_FLOW_ENV_REQUIREMENTS]}" ) return environment class _PromptflowModelWrapper: def __init__(self, model, model_config: dict[str, Any] | None = None): from promptflow._sdk._mlflow import FlowInvoker self.model = model # TODO: Improve this if we have more configs afterwards model_config = model_config or {} connection_provider = model_config.get(_CONNECTION_PROVIDER_CONFIG_KEY, "local") _logger.info("Using connection provider: %s", connection_provider) connection_overrides = model_config.get(_CONNECTION_OVERRIDES_CONFIG_KEY, None) _logger.info("Using connection overrides: %s", connection_overrides) self.model_invoker = FlowInvoker( self.model, connection_provider=connection_provider, connections_name_overrides=connection_overrides, ) def get_raw_model(self): """ Returns the underlying model. """ return self.model def predict( # pylint: disable=unused-argument self, data: pd.DataFrame | list[str | dict[str, Any]], params: dict[str, Any] | None = None, # pylint: disable=unused-argument ) -> dict[str, Any] | list[Any]: """ Args: data: Model input data. Either a pandas DataFrame with only 1 row or a dictionary. .. code-block:: python loaded_model = mlflow.pyfunc.load_model(logged_model.model_uri) # Predict on a flow input dictionary. print(loaded_model.predict({"text": "Python Hello World!"})) params: Additional parameters to pass to the model for inference. Returns Model predictions. Dict type, example ``{"output": "\n\nprint('Hello World!')"}`` """ if isinstance(data, pd.DataFrame): messages = data.to_dict(orient="records") if len(messages) > 1: raise mlflow.MlflowException.invalid_parameter_value( _INVALID_PREDICT_INPUT_ERROR_MESSAGE ) messages = messages[0] return [self.model_invoker.invoke(messages)] elif isinstance(data, dict): messages = data return self.model_invoker.invoke(messages) raise mlflow.MlflowException.invalid_parameter_value(_INVALID_PREDICT_INPUT_ERROR_MESSAGE) def _load_pyfunc(path, model_config: dict[str, Any] | None = None): """ Load PyFunc implementation for Promptflow. Called by ``pyfunc.load_model``. Args path: Local filesystem path to the MLflow Model with the ``promptflow`` flavor. """ from promptflow import load_flow model_flow_path = os.path.join(path, _MODEL_FLOW_DIRECTORY) model = load_flow(model_flow_path) return _PromptflowModelWrapper(model=model, model_config=model_config)
[docs]def load_model(model_uri, dst_path=None): """ Load a Promptflow model from a local file or a run. Args: model_uri: The location, in URI format, of the MLflow model. For example: - ``/Users/me/path/to/local/model`` - ``relative/path/to/local/model`` - ``s3://my_bucket/path/to/model`` - ``runs:/<mlflow_run_id>/run-relative/path/to/model`` - ``models:/<model_name>/<model_version>`` - ``models:/<model_name>/<stage>`` For more information about supported URI schemes, see `Referencing Artifacts <https://www.mlflow.org/docs/latest/concepts.html# artifact-locations>`_. dst_path: The local filesystem path to which to download the model artifact. This directory must already exist. If unspecified, a local output path will be created. Returns A Promptflow model instance """ from promptflow import load_flow local_model_path = _download_artifact_from_uri(artifact_uri=model_uri, output_path=dst_path) model_data_path = os.path.join(local_model_path, _MODEL_FLOW_DIRECTORY) return load_flow(model_data_path)