Source code for mlflow.openai

"""
The ``mlflow.openai`` module provides an API for logging and loading OpenAI models.

Credential management for OpenAI on Databricks
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

When this flavor logs a model on Databricks, it saves a YAML file with the following contents as
``openai.yaml`` if the ``MLFLOW_OPENAI_SECRET_SCOPE`` environment variable is set.

.. code-block:: yaml

    OPENAI_API_BASE: {scope}:openai_api_base
    OPENAI_API_KEY: {scope}:openai_api_key
    OPENAI_API_KEY_PATH: {scope}:openai_api_key_path
    OPENAI_API_TYPE: {scope}:openai_api_type
    OPENAI_ORGANIZATION: {scope}:openai_organization

- ``{scope}`` is the value of the ``MLFLOW_OPENAI_SECRET_SCOPE`` environment variable.
- The keys are the environment variables that the ``openai-python`` package uses to
  configure the API client.
- The values are the references to the secrets that store the values of the environment
  variables.

When the logged model is served on Databricks, each secret will be resolved and set as the
corresponding environment variable. See https://docs.databricks.com/security/secrets/index.html
for how to set up secrets on Databricks.
"""
import os
import yaml
import logging
from enum import Enum
from string import Formatter
import itertools
from typing import Any, Dict, Optional

import mlflow
from mlflow import pyfunc
from mlflow.models import Model, ModelInputExample, ModelSignature
from mlflow.models.model import MLMODEL_FILE_NAME
from mlflow.models.utils import _save_example
from mlflow.tracking.artifact_utils import _download_artifact_from_uri
from mlflow.utils.environment import (
    _mlflow_conda_env,
    _validate_env_arguments,
    _process_pip_requirements,
    _process_conda_env,
    _CONDA_ENV_FILE_NAME,
    _REQUIREMENTS_FILE_NAME,
    _CONSTRAINTS_FILE_NAME,
    _PYTHON_ENV_FILE_NAME,
    _PythonEnv,
)
from mlflow.utils.requirements_utils import _get_pinned_requirement
from mlflow.utils.file_utils import write_to
from mlflow.utils.model_utils import (
    _get_flavor_configuration,
    _validate_and_copy_code_paths,
    _add_code_from_conf_to_system_path,
    _validate_and_prepare_target_save_path,
)
from mlflow.protos.databricks_pb2 import INVALID_PARAMETER_VALUE
from mlflow.utils.docstring_utils import format_docstring, LOG_MODEL_PARAM_DOCS
from mlflow.tracking._model_registry import DEFAULT_AWAIT_MAX_SLEEP_SECONDS
from mlflow.types import Schema, ColSpec, TensorSpec
from mlflow.environment_variables import _MLFLOW_TESTING, MLFLOW_OPENAI_SECRET_SCOPE
from mlflow.utils.annotations import experimental
from mlflow.utils.databricks_utils import (
    check_databricks_secret_scope_access,
    is_in_databricks_runtime,
)

FLAVOR_NAME = "openai"
MODEL_FILENAME = "model.yaml"
_PYFUNC_SUPPORTED_TASKS = ("chat.completions", "embeddings")

_logger = logging.getLogger(__name__)


[docs]@experimental def get_default_pip_requirements(): """ :return: A list of default pip requirements for MLflow Models produced by this flavor. Calls to :func:`save_model()` and :func:`log_model()` produce a pip environment that, at minimum, contains these requirements. """ return list(map(_get_pinned_requirement, ["openai", "tiktoken", "tenacity"]))
[docs]@experimental def get_default_conda_env(): """ :return: The default Conda environment for MLflow Models produced by calls to :func:`save_model()` and :func:`log_model()`. """ return _mlflow_conda_env(additional_pip_deps=get_default_pip_requirements())
def _get_class_to_task_mapping(): from openai.api_resources import ( Audio, ChatCompletion, Completion, Edit, Deployment, Embedding, Engine, FineTune, File, Image, Model as OpenAIModel, Moderation, ) return { Audio: Audio.OBJECT_NAME, ChatCompletion: ChatCompletion.OBJECT_NAME, Completion: Completion.OBJECT_NAME, Edit: Edit.OBJECT_NAME, Deployment: Deployment.OBJECT_NAME, Embedding: Embedding.OBJECT_NAME, Engine: Engine.OBJECT_NAME, File: File.OBJECT_NAME, Image: Image.OBJECT_NAME, FineTune: FineTune.OBJECT_NAME, OpenAIModel: OpenAIModel.OBJECT_NAME, Moderation: "moderations", } def _class_to_task(cls): task = _get_class_to_task_mapping().get(cls) if task is None: raise mlflow.MlflowException( f"Unsupported class: {cls}", error_code=INVALID_PARAMETER_VALUE ) return task def _get_model_name(model): import openai if isinstance(model, str): return model elif isinstance(model, openai.Model): return model.id else: raise mlflow.MlflowException( f"Unsupported model type: {type(model)}", error_code=INVALID_PARAMETER_VALUE ) def _get_task_name(task): if isinstance(task, str): return task elif isinstance(task, type): return _class_to_task(task) else: raise mlflow.MlflowException( f"Unsupported task type: {type(task)}", error_code=INVALID_PARAMETER_VALUE ) def _get_openai_package_version(): import openai try: return openai.__version__ except AttributeError: # openai < 0.27.5 doesn't have a __version__ attribute return openai.version.VERSION # See https://github.com/openai/openai-python/blob/cf03fe16a92cd01f2a8867537399c12e183ba58e/openai/__init__.py#L30-L38 # for the list of environment variables that openai-python uses class _OpenAIEnvVar(str, Enum): OPENAI_API_TYPE = "OPENAI_API_TYPE" OPENAI_API_BASE = "OPENAI_API_BASE" OPENAI_API_KEY = "OPENAI_API_KEY" OPENAI_API_KEY_PATH = "OPENAI_API_KEY_PATH" OPENAI_ORGANIZATION = "OPENAI_ORGANIZATION" @property def secret_key(self): return self.value.lower() @classmethod def read_environ(cls): env_vars = {} for e in _OpenAIEnvVar: if value := os.getenv(e.value): env_vars[e.value] = value return env_vars def _log_secrets_yaml(local_model_dir, scope): with open(os.path.join(local_model_dir, "openai.yaml"), "w") as f: yaml.safe_dump({e.value: f"{scope}:{e.secret_key}" for e in _OpenAIEnvVar}, f) def _parse_format_fields(s): """ Parses format fields from a given string, e.g. "Hello {name}" -> ["name"]. """ return [fn for _, fn, _, _ in Formatter().parse(s) if fn is not None] def _parse_variables(messages): """ Parses variables from a list of messages for chat completion task. For example, if messages = [{"content": "{x}", ...}, {"content": "{y}", ...}], then _parse_variables(messages) returns ["x", "y"]. """ return sorted( set( itertools.chain.from_iterable( _parse_format_fields(message.get("content")) for message in messages ) ) ) def _get_input_schema(messages): if messages: variables = _parse_variables(messages) if len(variables) == 1: return Schema([ColSpec(type="string")]) elif len(variables) > 1: return Schema([ColSpec(name=v, type="string") for v in variables]) else: return Schema([ColSpec(type="string")]) else: return Schema([ColSpec(type="string")])
[docs]@experimental @format_docstring(LOG_MODEL_PARAM_DOCS.format(package_name=FLAVOR_NAME)) def save_model( model, task, path, conda_env=None, code_paths=None, mlflow_model=None, signature: ModelSignature = None, input_example: ModelInputExample = None, pip_requirements=None, extra_pip_requirements=None, metadata=None, **kwargs, ): """ Save an OpenAI model to a path on the local file system. :param model: The OpenAI model name or reference instance, e.g., ``openai.Model.retrieve("gpt-3.5-turbo")``. :param task: The task the model is performing, e.g., ``openai.ChatCompletion`` or ``'chat.completions'``. :param path: Local path where the model is to be saved. :param conda_env: {{ conda_env }} :param code_paths: A list of local filesystem paths to Python file dependencies (or directories containing file dependencies). These files are *prepended* to the system path when the model is loaded. :param mlflow_model: :py:mod:`mlflow.models.Model` this flavor is being added to. :param signature: :py:class:`ModelSignature <mlflow.models.ModelSignature>` describes model input and output :py:class:`Schema <mlflow.types.Schema>`. The model signature can be :py:func:`inferred <mlflow.models.infer_signature>` from datasets with valid model input (e.g. the training dataset with target column omitted) and valid model output (e.g. model predictions generated on the training dataset), for example: .. code-block:: python from mlflow.models import infer_signature train = df.drop_column("target_label") predictions = ... # compute model predictions signature = infer_signature(train, predictions) :param input_example: Input example provides one or several instances of valid model input. The example can be used as a hint of what data to feed the model. The given example will be converted to a Pandas DataFrame and then serialized to json using the Pandas split-oriented format. Bytes are base64-encoded. :param pip_requirements: {{ pip_requirements }} :param extra_pip_requirements: {{ extra_pip_requirements }} :param metadata: Custom metadata dictionary passed to the model and stored in the MLmodel file. .. Note:: Experimental: This parameter may change or be removed in a future release without warning. :param kwargs: Keyword arguments specific to the OpenAI task, such as the ``messages`` (see :ref:`mlflow.openai.messages` for more details on this parameter) or ``top_p`` value to use for chat completion. .. code-block:: python import mlflow import openai # Chat mlflow.openai.save_model( model="gpt-3.5-turbo", task=openai.ChatCompletion, messages=[{"role": "user", "content": "Tell me a joke."}], path="model", ) # Embeddings mlflow.openai.save_model( model="text-embedding-ada-002", task=openai.Embedding, path="model", ) """ import numpy as np _validate_env_arguments(conda_env, pip_requirements, extra_pip_requirements) path = os.path.abspath(path) _validate_and_prepare_target_save_path(path) code_dir_subpath = _validate_and_copy_code_paths(code_paths, path) task = _get_task_name(task) if mlflow_model is None: mlflow_model = Model() if signature is not None: mlflow_model.signature = signature elif task == "chat.completions": messages = kwargs.get("messages", []) if messages and not ( all(isinstance(m, dict) for m in messages) and all(map(_has_content_and_role, messages)) ): raise mlflow.MlflowException.invalid_parameter_value( "If `messages` is provided, it must be a list of dictionaries with keys " "'role' and 'content'." ) mlflow_model.signature = ModelSignature( inputs=_get_input_schema(messages), outputs=Schema([ColSpec(type="string", name=None)]), ) elif task == "embeddings": mlflow_model.signature = ModelSignature( inputs=Schema([ColSpec(type="string", name=None)]), outputs=Schema([TensorSpec(type=np.dtype("float64"), shape=(-1,))]), ) if input_example is not None: _save_example(mlflow_model, input_example, path) if metadata is not None: mlflow_model.metadata = metadata model_data_path = os.path.join(path, MODEL_FILENAME) model_dict = { "model": _get_model_name(model), "task": task, **kwargs, } with open(model_data_path, "w") as f: yaml.safe_dump(model_dict, f) if task in _PYFUNC_SUPPORTED_TASKS: pyfunc.add_to_model( mlflow_model, loader_module="mlflow.openai", data=MODEL_FILENAME, conda_env=_CONDA_ENV_FILE_NAME, python_env=_PYTHON_ENV_FILE_NAME, code=code_dir_subpath, ) mlflow_model.add_flavor( FLAVOR_NAME, openai_version=_get_openai_package_version(), data=MODEL_FILENAME, code=code_dir_subpath, ) mlflow_model.save(os.path.join(path, MLMODEL_FILE_NAME)) if is_in_databricks_runtime(): if scope := MLFLOW_OPENAI_SECRET_SCOPE.get(): check_databricks_secret_scope_access(scope) _log_secrets_yaml(path, scope) else: _logger.info( "No secret scope specified, skipping logging of secrets for OpenAI credentials. " "See https://mlflow.org/docs/latest/python_api/openai/index.html#credential-management-for-openai-on-databricks " "for more information." ) if conda_env is None: if pip_requirements is None: default_reqs = get_default_pip_requirements() inferred_reqs = mlflow.models.infer_pip_requirements( path, FLAVOR_NAME, fallback=default_reqs ) default_reqs = sorted(set(inferred_reqs).union(default_reqs)) else: default_reqs = None conda_env, pip_requirements, pip_constraints = _process_pip_requirements( default_reqs, pip_requirements, extra_pip_requirements, ) else: conda_env, pip_requirements, pip_constraints = _process_conda_env(conda_env) with open(os.path.join(path, _CONDA_ENV_FILE_NAME), "w") as f: yaml.safe_dump(conda_env, stream=f, default_flow_style=False) # Save `constraints.txt` if necessary if pip_constraints: write_to(os.path.join(path, _CONSTRAINTS_FILE_NAME), "\n".join(pip_constraints)) # Save `requirements.txt` write_to(os.path.join(path, _REQUIREMENTS_FILE_NAME), "\n".join(pip_requirements)) _PythonEnv.current().to_yaml(os.path.join(path, _PYTHON_ENV_FILE_NAME))
[docs]@experimental @format_docstring(LOG_MODEL_PARAM_DOCS.format(package_name=FLAVOR_NAME)) def log_model( model, task, artifact_path, conda_env=None, code_paths=None, registered_model_name=None, signature: ModelSignature = None, input_example: ModelInputExample = None, await_registration_for=DEFAULT_AWAIT_MAX_SLEEP_SECONDS, pip_requirements=None, extra_pip_requirements=None, metadata=None, **kwargs, ): """ Log an OpenAI model as an MLflow artifact for the current run. :param model: The OpenAI model name or reference instance, e.g., ``openai.Model.retrieve("gpt-3.5-turbo")``. :param task: The task the model is performing, e.g., ``openai.ChatCompletion`` or ``'chat.completions'``. :param artifact_path: Run-relative artifact path. :param conda_env: {{ conda_env }} :param code_paths: A list of local filesystem paths to Python file dependencies (or directories containing file dependencies). These files are *prepended* to the system path when the model is loaded. :param registered_model_name: If given, create a model version under ``registered_model_name``, also creating a registered model if one with the given name does not exist. :param signature: :py:class:`ModelSignature <mlflow.models.ModelSignature>` describes model input and output :py:class:`Schema <mlflow.types.Schema>`. The model signature can be :py:func:`inferred <mlflow.models.infer_signature>` from datasets with valid model input (e.g. the training dataset with target column omitted) and valid model output (e.g. model predictions generated on the training dataset), for example: .. code-block:: python from mlflow.models import infer_signature train = df.drop_column("target_label") predictions = ... # compute model predictions signature = infer_signature(train, predictions) :param input_example: Input example provides one or several instances of valid model input. The example can be used as a hint of what data to feed the model. The given example will be converted to a Pandas DataFrame and then serialized to json using the Pandas split-oriented format. Bytes are base64-encoded. :param await_registration_for: Number of seconds to wait for the model version to finish being created and is in ``READY`` status. By default, the function waits for five minutes. Specify 0 or None to skip waiting. :param pip_requirements: {{ pip_requirements }} :param extra_pip_requirements: {{ extra_pip_requirements }} :param metadata: Custom metadata dictionary passed to the model and stored in the MLmodel file. .. Note:: Experimental: This parameter may change or be removed in a future release without warning. :param kwargs: Keyword arguments specific to the OpenAI task, such as the ``messages`` (see :ref:`mlflow.openai.messages` for more details on this parameter) or ``top_p`` value to use for chat completion. :return: A :py:class:`ModelInfo <mlflow.models.model.ModelInfo>` instance that contains the metadata of the logged model. .. code-block:: python import mlflow import openai # Chat with mlflow.start_run(): info = mlflow.openai.log_model( model="gpt-3.5-turbo", task=openai.ChatCompletion, messages=[{"role": "user", "content": "Tell me a joke about {animal}."}], artifact_path="model", ) model = mlflow.pyfunc.load_model(info.model_uri) df = pd.DataFrame({"animal": ["cats", "dogs"]}) print(model.predict(df)) # Embeddings with mlflow.start_run(): info = mlflow.openai.log_model( model="text-embedding-ada-002", task=openai.Embedding, artifact_path="embeddings", ) model = mlflow.pyfunc.load_model(info.model_uri) print(model.predict(["hello", "world"])) """ return Model.log( artifact_path=artifact_path, flavor=mlflow.openai, registered_model_name=registered_model_name, model=model, task=task, conda_env=conda_env, code_paths=code_paths, signature=signature, input_example=input_example, await_registration_for=await_registration_for, pip_requirements=pip_requirements, extra_pip_requirements=extra_pip_requirements, metadata=metadata, **kwargs, )
def _load_model(path): with open(path) as f: return yaml.safe_load(f) def _has_content_and_role(d): return "content" in d and "role" in d class _FormattableMessage: def __init__(self, message): self.content = message.get("content") self.role = message.get("role") self.variables = _parse_format_fields(self.content) def format(self, **params): if missing_params := set(self.variables) - set(params): raise mlflow.MlflowException.invalid_parameter_value( f"Expected parameters {self.variables} to be provided, " f"only got {list(params)}, {list(missing_params)} are missing." ) return { "role": self.role, "content": self.content.format(**{v: params[v] for v in self.variables}), } def _first_string_column(pdf): iter_str_cols = (c for c, v in pdf.iloc[0].items() if isinstance(v, str)) col = next(iter_str_cols, None) if col is None: raise mlflow.MlflowException.invalid_parameter_value( f"Could not find a string column in the input data: {pdf.dtypes.to_dict()}" ) return col class _OpenAIWrapper: def __init__(self, model): task = model["task"] if task not in _PYFUNC_SUPPORTED_TASKS: raise mlflow.MlflowException.invalid_parameter_value( f"Unsupported task: {task}. Supported tasks: {_PYFUNC_SUPPORTED_TASKS}." ) self.model = model self.task = task self.messages = None self.variables = None self.formattable_messages = None if self.task == "chat.completions": self._setup_chat() def _setup_chat(self): self.messages = self.model.get("messages", []) self.variables = _parse_variables(self.messages) self.formattable_messages = [_FormattableMessage(m) for m in self.messages] def format_messages(self, params_list): return [[m.format(**params) for m in self.formattable_messages] for params in params_list] def get_params_list(self, data): if len(self.variables) == 1: variable = self.variables[0] if variable in data.columns: return data[[variable]].to_dict(orient="records") else: first_string_column = _first_string_column(data) return [{variable: s} for s in data[first_string_column]] else: return data[self.variables].to_dict(orient="records") def _predict_chat(self, data): from mlflow.openai.api_request_parallel_processor import process_api_requests if self.variables: messages_list = self.format_messages(self.get_params_list(data)) else: first_string_column = _first_string_column(data) messages_list = [ [*self.messages, {"role": "user", "content": s}] for s in data[first_string_column] ] model_dict = self.model.copy() model_dict.pop("task", None) requests = [ { **model_dict, "messages": messages, } for messages in messages_list ] results = process_api_requests(requests) return [r["choices"][0]["message"]["content"] for r in results] def _predict_embeddings(self, data): import openai kwargs = self.model.copy() kwargs.pop("task", None) first_string_column = _first_string_column(data) texts = data[first_string_column].tolist() res = [] # The maximum batch size is 2048: # https://github.com/openai/openai-python/blob/b82a3f7e4c462a8a10fa445193301a3cefef9a4a/openai/embeddings_utils.py#L43 # We use a smaller batch size to be safe. batch_size = 1024 for i in range(0, len(texts), batch_size): res.extend( d["embedding"] for d in openai.Embedding.create(input=texts[i : i + batch_size], **kwargs)["data"] ) return res def predict( self, data, params: Optional[Dict[str, Any]] = None # pylint: disable=unused-argument ): """ :param data: Model input data. :param params: Additional parameters to pass to the model for inference. .. Note:: Experimental: This parameter may change or be removed in a future release without warning. :return: Model predictions. """ if _OpenAIEnvVar.OPENAI_API_KEY.value not in os.environ: raise mlflow.MlflowException( "OpenAI API key must be set in the OPENAI_API_KEY environment variable." ) if self.task == "chat.completions": return self._predict_chat(data) elif self.task == "embeddings": return self._predict_embeddings(data) class _TestOpenAIWrapper(_OpenAIWrapper): """ A wrapper class that should be used for testing purposes only. """ def predict( self, data, params: Optional[Dict[str, Any]] = None # pylint: disable=unused-argument ): """ :param data: Model input data. :param params: Additional parameters to pass to the model for inference. .. Note:: Experimental: This parameter may change or be removed in a future release without warning. :return: Model predictions. """ from mlflow.openai.utils import _mock_openai_request with _mock_openai_request(): return super().predict(data) def _load_pyfunc(path): """ Load PyFunc implementation. Called by ``pyfunc.load_model``. :param path: Local filesystem path to the MLflow Model with the ``openai`` flavor. """ wrapper_cls = _TestOpenAIWrapper if _MLFLOW_TESTING.get() else _OpenAIWrapper return wrapper_cls(_load_model(path))
[docs]@experimental def load_model(model_uri, dst_path=None): """ Load an OpenAI model from a local file or a run. :param model_uri: The location, in URI format, of the MLflow model. For example: - ``/Users/me/path/to/local/model`` - ``relative/path/to/local/model`` - ``s3://my_bucket/path/to/model`` - ``runs:/<mlflow_run_id>/run-relative/path/to/model`` For more information about supported URI schemes, see `Referencing Artifacts <https://www.mlflow.org/docs/latest/tracking.html# artifact-locations>`_. :param dst_path: The local filesystem path to which to download the model artifact. This directory must already exist. If unspecified, a local output path will be created. :return: A dictionary representing the OpenAI model. """ local_model_path = _download_artifact_from_uri(artifact_uri=model_uri, output_path=dst_path) flavor_conf = _get_flavor_configuration(local_model_path, FLAVOR_NAME) _add_code_from_conf_to_system_path(local_model_path, flavor_conf) model_data_path = os.path.join(local_model_path, flavor_conf.get("data", MODEL_FILENAME)) return _load_model(model_data_path)