Source code for mlflow.openai

"""
The ``mlflow.openai`` module provides an API for logging and loading OpenAI models.

Credential management for OpenAI on Databricks
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

When this flavor logs a model on Databricks, it saves a YAML file with the following contents as
``openai.yaml`` if the ``MLFLOW_OPENAI_SECRET_SCOPE`` environment variable is set.

.. code-block:: yaml

    OPENAI_API_BASE: {scope}:openai_api_base
    OPENAI_API_KEY: {scope}:openai_api_key
    OPENAI_API_KEY_PATH: {scope}:openai_api_key_path
    OPENAI_API_TYPE: {scope}:openai_api_type
    OPENAI_ORGANIZATION: {scope}:openai_organization

- ``{scope}`` is the value of the ``MLFLOW_OPENAI_SECRET_SCOPE`` environment variable.
- The keys are the environment variables that the ``openai-python`` package uses to
  configure the API client.
- The values are the references to the secrets that store the values of the environment
  variables.

When the logged model is served on Databricks, each secret will be resolved and set as the
corresponding environment variable. See https://docs.databricks.com/security/secrets/index.html
for how to set up secrets on Databricks.
"""
import os
import yaml
import logging
from enum import Enum
from string import Formatter
import itertools

import mlflow
from mlflow import pyfunc
from mlflow.models import Model, ModelInputExample
from mlflow.models.model import MLMODEL_FILE_NAME
from mlflow.models.signature import ModelSignature
from mlflow.models.utils import _save_example
from mlflow.tracking.artifact_utils import _download_artifact_from_uri
from mlflow.utils.environment import (
    _mlflow_conda_env,
    _validate_env_arguments,
    _process_pip_requirements,
    _process_conda_env,
    _CONDA_ENV_FILE_NAME,
    _REQUIREMENTS_FILE_NAME,
    _CONSTRAINTS_FILE_NAME,
    _PYTHON_ENV_FILE_NAME,
    _PythonEnv,
)
from mlflow.utils.requirements_utils import _get_pinned_requirement
from mlflow.utils.file_utils import write_to
from mlflow.utils.model_utils import (
    _get_flavor_configuration,
    _validate_and_copy_code_paths,
    _add_code_from_conf_to_system_path,
    _validate_and_prepare_target_save_path,
)
from mlflow.protos.databricks_pb2 import INVALID_PARAMETER_VALUE
from mlflow.utils.docstring_utils import format_docstring, LOG_MODEL_PARAM_DOCS
from mlflow.tracking._model_registry import DEFAULT_AWAIT_MAX_SLEEP_SECONDS
from mlflow.types import Schema, ColSpec
from mlflow.environment_variables import _MLFLOW_OPENAI_TESTING, MLFLOW_OPENAI_SECRET_SCOPE
from mlflow.utils.annotations import experimental
from mlflow.utils.databricks_utils import (
    check_databricks_secret_scope_access,
    is_in_databricks_runtime,
)

FLAVOR_NAME = "openai"
MODEL_FILENAME = "model.yaml"

_logger = logging.getLogger(__name__)


[docs]@experimental def get_default_pip_requirements(): """ :return: A list of default pip requirements for MLflow Models produced by this flavor. Calls to :func:`save_model()` and :func:`log_model()` produce a pip environment that, at minimum, contains these requirements. """ return list(map(_get_pinned_requirement, ["openai", "tiktoken", "tenacity"]))
[docs]@experimental def get_default_conda_env(): """ :return: The default Conda environment for MLflow Models produced by calls to :func:`save_model()` and :func:`log_model()`. """ return _mlflow_conda_env(additional_pip_deps=get_default_pip_requirements())
def _get_class_to_task_mapping(): from openai.api_resources import ( Audio, ChatCompletion, Completion, Edit, Deployment, Embedding, Engine, FineTune, File, Image, Model as OpenAIModel, Moderation, ) return { Audio: Audio.OBJECT_NAME, ChatCompletion: ChatCompletion.OBJECT_NAME, Completion: Completion.OBJECT_NAME, Edit: Edit.OBJECT_NAME, Deployment: Deployment.OBJECT_NAME, Embedding: Embedding.OBJECT_NAME, Engine: Engine.OBJECT_NAME, File: File.OBJECT_NAME, Image: Image.OBJECT_NAME, FineTune: FineTune.OBJECT_NAME, OpenAIModel: OpenAIModel.OBJECT_NAME, Moderation: "moderations", } def _class_to_task(cls): task = _get_class_to_task_mapping().get(cls) if task is None: raise mlflow.MlflowException( f"Unsupported class: {cls}", error_code=INVALID_PARAMETER_VALUE ) return task def _get_model_name(model): import openai if isinstance(model, str): return model elif isinstance(model, openai.Model): return model.id else: raise mlflow.MlflowException( f"Unsupported model type: {type(model)}", error_code=INVALID_PARAMETER_VALUE ) def _get_task_name(task): if isinstance(task, str): return task elif isinstance(task, type): return _class_to_task(task) else: raise mlflow.MlflowException( f"Unsupported task type: {type(task)}", error_code=INVALID_PARAMETER_VALUE ) def _get_openai_package_version(): import openai try: return openai.__version__ except AttributeError: # openai < 0.27.5 doesn't have a __version__ attribute return openai.version.VERSION # See https://github.com/openai/openai-python/blob/cf03fe16a92cd01f2a8867537399c12e183ba58e/openai/__init__.py#L30-L38 # for the list of environment variables that openai-python uses class _OpenAIEnvVar(str, Enum): OPENAI_API_TYPE = "OPENAI_API_TYPE" OPENAI_API_BASE = "OPENAI_API_BASE" OPENAI_API_KEY = "OPENAI_API_KEY" OPENAI_API_KEY_PATH = "OPENAI_API_KEY_PATH" OPENAI_ORGANIZATION = "OPENAI_ORGANIZATION" @property def secret_key(self): return self.value.lower() @classmethod def read_environ(cls): env_vars = {} for e in _OpenAIEnvVar: if value := os.getenv(e.value): env_vars[e.value] = value return env_vars def _log_secrets_yaml(local_model_dir, scope): with open(os.path.join(local_model_dir, "openai.yaml"), "w") as f: yaml.safe_dump({e.value: f"{scope}:{e.secret_key}" for e in _OpenAIEnvVar}, f) def _parse_format_fields(s): """ Parses format fields from a given string, e.g. "Hello {name}" -> ["name"]. """ return [fn for _, fn, _, _ in Formatter().parse(s) if fn is not None] def _parse_variables(messages): """ Parses variables from a list of messages for chat completion task. For example, if messages = [{"content": "{x}", ...}, {"content": "{y}", ...}], then _parse_variables(messages) returns ["x", "y"]. """ return sorted( set( itertools.chain.from_iterable( _parse_format_fields(message.get("content")) for message in messages ) ) ) def _get_input_schema(messages): if messages: variables = _parse_variables(messages) if len(variables) == 1: return Schema([ColSpec(type="string")]) elif len(variables) > 1: return Schema([ColSpec(name=v, type="string") for v in variables]) else: return Schema([ColSpec(type="string")]) else: return Schema([ColSpec(type="string")])
[docs]@experimental @format_docstring(LOG_MODEL_PARAM_DOCS.format(package_name=FLAVOR_NAME)) def save_model( model, task, path, conda_env=None, code_paths=None, mlflow_model=None, signature: ModelSignature = None, input_example: ModelInputExample = None, pip_requirements=None, extra_pip_requirements=None, metadata=None, **kwargs, ): """ Save an OpenAI model to a path on the local file system. :param model: The OpenAI model name or reference instance, e.g., ``openai.Model.retrieve("gpt-3.5-turbo")``. :param task: The task the model is performing, e.g., ``openai.ChatCompletion`` or ``'chat.completions'``. :param path: Local path where the model is to be saved. :param conda_env: {{ conda_env }} :param code_paths: A list of local filesystem paths to Python file dependencies (or directories containing file dependencies). These files are *prepended* to the system path when the model is loaded. :param mlflow_model: :py:mod:`mlflow.models.Model` this flavor is being added to. :param signature: :py:class:`ModelSignature <mlflow.models.ModelSignature>` describes model input and output :py:class:`Schema <mlflow.types.Schema>`. The model signature can be :py:func:`inferred <mlflow.models.infer_signature>` from datasets with valid model input (e.g. the training dataset with target column omitted) and valid model output (e.g. model predictions generated on the training dataset), for example: .. code-block:: python from mlflow.models.signature import infer_signature train = df.drop_column("target_label") predictions = ... # compute model predictions signature = infer_signature(train, predictions) :param input_example: Input example provides one or several instances of valid model input. The example can be used as a hint of what data to feed the model. The given example will be converted to a Pandas DataFrame and then serialized to json using the Pandas split-oriented format. Bytes are base64-encoded. :param pip_requirements: {{ pip_requirements }} :param extra_pip_requirements: {{ extra_pip_requirements }} :param metadata: Custom metadata dictionary passed to the model and stored in the MLmodel file. .. Note:: Experimental: This parameter may change or be removed in a future release without warning. :param kwargs: Keyword arguments specific to the OpenAI task, such as the ``messages`` (see :ref:`mlflow.openai.messages` for more details on this parameter) or ``top_p`` value to use for chat completion. """ _validate_env_arguments(conda_env, pip_requirements, extra_pip_requirements) path = os.path.abspath(path) _validate_and_prepare_target_save_path(path) code_dir_subpath = _validate_and_copy_code_paths(code_paths, path) task = _get_task_name(task) if mlflow_model is None: mlflow_model = Model() if signature is not None: mlflow_model.signature = signature elif task == "chat.completions": messages = kwargs.get("messages", []) if messages and not ( all(isinstance(m, dict) for m in messages) and all(map(_has_content_and_role, messages)) ): raise mlflow.MlflowException.invalid_parameter_value( "If `messages` is provided, it must be a list of dictionaries with keys " "'role' and 'content'." ) mlflow_model.signature = ModelSignature( inputs=_get_input_schema(messages), outputs=Schema([ColSpec(type="string", name=None)]), ) if input_example is not None: _save_example(mlflow_model, input_example, path) if metadata is not None: mlflow_model.metadata = metadata model_data_path = os.path.join(path, MODEL_FILENAME) model_dict = { "model": _get_model_name(model), "task": task, **kwargs, } with open(model_data_path, "w") as f: yaml.safe_dump(model_dict, f) if task == "chat.completions": pyfunc.add_to_model( mlflow_model, loader_module="mlflow.openai", data=MODEL_FILENAME, conda_env=_CONDA_ENV_FILE_NAME, python_env=_PYTHON_ENV_FILE_NAME, code=code_dir_subpath, ) mlflow_model.add_flavor( FLAVOR_NAME, openai_version=_get_openai_package_version(), data=MODEL_FILENAME, code=code_dir_subpath, ) mlflow_model.save(os.path.join(path, MLMODEL_FILE_NAME)) if is_in_databricks_runtime(): if scope := MLFLOW_OPENAI_SECRET_SCOPE.get(): check_databricks_secret_scope_access(scope) _log_secrets_yaml(path, scope) else: _logger.info( "No secret scope specified, skipping logging of secrets for OpenAI credentials. " "See https://mlflow.org/docs/latest/python_api/openai/index.html#credential-management-for-openai-on-databricks " "for more information." ) if conda_env is None: if pip_requirements is None: default_reqs = get_default_pip_requirements() inferred_reqs = mlflow.models.infer_pip_requirements( path, FLAVOR_NAME, fallback=default_reqs ) default_reqs = sorted(set(inferred_reqs).union(default_reqs)) else: default_reqs = None conda_env, pip_requirements, pip_constraints = _process_pip_requirements( default_reqs, pip_requirements, extra_pip_requirements, ) else: conda_env, pip_requirements, pip_constraints = _process_conda_env(conda_env) with open(os.path.join(path, _CONDA_ENV_FILE_NAME), "w") as f: yaml.safe_dump(conda_env, stream=f, default_flow_style=False) # Save `constraints.txt` if necessary if pip_constraints: write_to(os.path.join(path, _CONSTRAINTS_FILE_NAME), "\n".join(pip_constraints)) # Save `requirements.txt` write_to(os.path.join(path, _REQUIREMENTS_FILE_NAME), "\n".join(pip_requirements)) _PythonEnv.current().to_yaml(os.path.join(path, _PYTHON_ENV_FILE_NAME))
[docs]@experimental @format_docstring(LOG_MODEL_PARAM_DOCS.format(package_name=FLAVOR_NAME)) def log_model( model, task, artifact_path, conda_env=None, code_paths=None, registered_model_name=None, signature: ModelSignature = None, input_example: ModelInputExample = None, await_registration_for=DEFAULT_AWAIT_MAX_SLEEP_SECONDS, pip_requirements=None, extra_pip_requirements=None, metadata=None, **kwargs, ): """ Log an OpenAI model as an MLflow artifact for the current run. :param model: The OpenAI model name or reference instance, e.g., ``openai.Model.retrieve("gpt-3.5-turbo")``. :param task: The task the model is performing, e.g., ``openai.ChatCompletion`` or ``'chat.completions'``. :param artifact_path: Run-relative artifact path. :param conda_env: {{ conda_env }} :param code_paths: A list of local filesystem paths to Python file dependencies (or directories containing file dependencies). These files are *prepended* to the system path when the model is loaded. :param registered_model_name: If given, create a model version under ``registered_model_name``, also creating a registered model if one with the given name does not exist. :param signature: :py:class:`ModelSignature <mlflow.models.ModelSignature>` describes model input and output :py:class:`Schema <mlflow.types.Schema>`. The model signature can be :py:func:`inferred <mlflow.models.infer_signature>` from datasets with valid model input (e.g. the training dataset with target column omitted) and valid model output (e.g. model predictions generated on the training dataset), for example: .. code-block:: python from mlflow.models.signature import infer_signature train = df.drop_column("target_label") predictions = ... # compute model predictions signature = infer_signature(train, predictions) :param input_example: Input example provides one or several instances of valid model input. The example can be used as a hint of what data to feed the model. The given example will be converted to a Pandas DataFrame and then serialized to json using the Pandas split-oriented format. Bytes are base64-encoded. :param await_registration_for: Number of seconds to wait for the model version to finish being created and is in ``READY`` status. By default, the function waits for five minutes. Specify 0 or None to skip waiting. :param pip_requirements: {{ pip_requirements }} :param extra_pip_requirements: {{ extra_pip_requirements }} :param metadata: Custom metadata dictionary passed to the model and stored in the MLmodel file. .. Note:: Experimental: This parameter may change or be removed in a future release without warning. :param kwargs: Keyword arguments specific to the OpenAI task, such as the ``messages`` (see :ref:`mlflow.openai.messages` for more details on this parameter) or ``top_p`` value to use for chat completion. :return: A :py:class:`ModelInfo <mlflow.models.model.ModelInfo>` instance that contains the metadata of the logged model. """ return Model.log( artifact_path=artifact_path, flavor=mlflow.openai, registered_model_name=registered_model_name, model=model, task=task, conda_env=conda_env, code_paths=code_paths, signature=signature, input_example=input_example, await_registration_for=await_registration_for, pip_requirements=pip_requirements, extra_pip_requirements=extra_pip_requirements, metadata=metadata, **kwargs, )
def _load_model(path): with open(path) as f: return yaml.safe_load(f) def _has_content_and_role(d): return "content" in d and "role" in d class _FormattableMessage: def __init__(self, message): self.content = message.get("content") self.role = message.get("role") self.variables = _parse_format_fields(self.content) def format(self, **params): if missing_params := set(self.variables) - set(params): raise mlflow.MlflowException.invalid_parameter_value( f"Expected parameters {self.variables} to be provided, " f"only got {list(params)}, {list(missing_params)} are missing." ) return { "role": self.role, "content": self.content.format(**{v: params[v] for v in self.variables}), } class _OpenAIWrapper: def __init__(self, model): if model["task"] != "chat.completions": raise mlflow.MlflowException.invalid_parameter_value( "Currently, only 'chat.completions' task is supported", ) self.model = model self.messages = self.model.get("messages", []) self.variables = _parse_variables(self.messages) self.formattable_messages = [_FormattableMessage(m) for m in self.messages] def format_messages(self, params_list): return [[m.format(**params) for m in self.formattable_messages] for params in params_list] def get_params_list(self, data): if len(self.variables) == 1: variable = self.variables[0] if variable in data.columns: return data[[variable]].to_dict(orient="records") else: iter_string_columns = (c for c, v in data.iloc[0].items() if isinstance(v, str)) first_string_column = next(iter_string_columns) return [{variable: s} for s in data[first_string_column]] else: return data[self.variables].to_dict(orient="records") def predict(self, data): from mlflow.openai.api_request_parallel_processor import process_api_requests if self.variables: messages_list = self.format_messages(self.get_params_list(data)) else: iter_string_columns = (c for c, v in data.iloc[0].items() if isinstance(v, str)) first_string_column = next(iter_string_columns) messages_list = [ [*self.messages, {"role": "user", "content": s}] for s in data[first_string_column] ] model_dict = self.model.copy() model_dict.pop("task", None) requests = [ { **model_dict, "messages": messages, } for messages in messages_list ] if _OpenAIEnvVar.OPENAI_API_KEY.value not in os.environ: raise mlflow.MlflowException( "OpenAI API key must be set in the OPENAI_API_KEY environment variable." ) results = process_api_requests(requests) return [r["choices"][0]["message"]["content"] for r in results] class _TestOpenAIWrapper(_OpenAIWrapper): """ A wrapper class that should be used for testing purposes only. """ def predict(self, data): from mlflow.openai.utils import _mock_chat_completion_request with _mock_chat_completion_request(): return super().predict(data) def _load_pyfunc(path): """ Load PyFunc implementation. Called by ``pyfunc.load_model``. :param path: Local filesystem path to the MLflow Model with the ``openai`` flavor. """ wrapper_cls = _TestOpenAIWrapper if _MLFLOW_OPENAI_TESTING.get() else _OpenAIWrapper return wrapper_cls(_load_model(path))
[docs]@experimental def load_model(model_uri, dst_path=None): """ Load an OpenAI model from a local file or a run. :param model_uri: The location, in URI format, of the MLflow model. For example: - ``/Users/me/path/to/local/model`` - ``relative/path/to/local/model`` - ``s3://my_bucket/path/to/model`` - ``runs:/<mlflow_run_id>/run-relative/path/to/model`` For more information about supported URI schemes, see `Referencing Artifacts <https://www.mlflow.org/docs/latest/tracking.html# artifact-locations>`_. :param dst_path: The local filesystem path to which to download the model artifact. This directory must already exist. If unspecified, a local output path will be created. :return: A dictionary representing the OpenAI model. """ local_model_path = _download_artifact_from_uri(artifact_uri=model_uri, output_path=dst_path) flavor_conf = _get_flavor_configuration(local_model_path, FLAVOR_NAME) _add_code_from_conf_to_system_path(local_model_path, flavor_conf) model_data_path = os.path.join(local_model_path, flavor_conf.get("data", MODEL_FILENAME)) return _load_model(model_data_path)