Source code for mlflow.metrics.genai.genai_metric

import json
import logging
import re
from concurrent.futures import ThreadPoolExecutor, as_completed
from inspect import Parameter, Signature
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union

from mlflow.exceptions import MlflowException
from mlflow.metrics.base import MetricValue
from mlflow.metrics.genai import model_utils
from mlflow.metrics.genai.base import EvaluationExample
from mlflow.metrics.genai.utils import _get_default_model, _get_latest_metric_version
from mlflow.models import EvaluationMetric, make_metric
from mlflow.protos.databricks_pb2 import (
    BAD_REQUEST,
    INTERNAL_ERROR,
    INVALID_PARAMETER_VALUE,
    UNAUTHENTICATED,
    ErrorCode,
)
from mlflow.utils.annotations import experimental
from mlflow.utils.class_utils import _get_class_from_string

if TYPE_CHECKING:
    import pandas as pd

_logger = logging.getLogger(__name__)


def _format_args_string(grading_context_columns: Optional[List[str]], eval_values, indx) -> str:
    args_dict = {}
    for arg in grading_context_columns:
        if arg in eval_values:
            args_dict[arg] = eval_values[arg][indx]
        else:
            raise MlflowException(
                f"{arg} does not exist in the eval function {list(eval_values.keys())}."
            )

    return (
        ""
        if args_dict is None or len(args_dict) == 0
        else (
            "Additional information used by the model:\n"
            + "\n".join(
                [f"key: {arg}\nvalue:\n{arg_value}" for arg, arg_value in args_dict.items()]
            )
        )
    )


# Function to extract Score and Justification
def _extract_score_and_justification(text):
    if text:
        text = re.sub(r"score", "score", text, flags=re.IGNORECASE)
        text = re.sub(r"justification", "justification", text, flags=re.IGNORECASE)
        # Attempt to parse JSON
        try:
            data = json.loads(text)
            score = int(data.get("score"))
            justification = data.get("justification")
        except json.JSONDecodeError:
            # If parsing fails, use regex
            if (match := re.search(r"score: (\d+),?\s*justification: (.+)", text)) or (
                match := re.search(r"\s*score:\s*(\d+)\s*justification:\s*(.+)", text, re.DOTALL)
            ):
                score = int(match.group(1))
                justification = match.group(2)
            else:
                score = None
                justification = f"Failed to extract score and justification. Raw output: {text}"

        if not isinstance(score, (int, float)) or not isinstance(justification, str):
            return None, f"Failed to extract score and justification. Raw output: {text}"

        return score, justification

    return None, None


[docs]@experimental def make_genai_metric( name: str, definition: str, grading_prompt: str, examples: Optional[List[EvaluationExample]] = None, version: Optional[str] = _get_latest_metric_version(), model: Optional[str] = _get_default_model(), grading_context_columns: Optional[Union[str, List[str]]] = [], # noqa: B006 parameters: Optional[Dict[str, Any]] = None, aggregations: Optional[List[str]] = ["mean", "variance", "p90"], # noqa: B006 greater_is_better: bool = True, max_workers: int = 10, ) -> EvaluationMetric: """ Create a genai metric used to evaluate LLM using LLM as a judge in MLflow. The full grading prompt is stored in the metric_details field of the ``EvaluationMetric`` object. :param name: Name of the metric. :param definition: Definition of the metric. :param grading_prompt: Grading criteria of the metric. :param examples: (Optional) Examples of the metric. :param version: (Optional) Version of the metric. Currently supported versions are: v1. :param model: (Optional) Model uri of an openai, gateway, or deployments judge model in the format of "openai:/gpt-4", "gateway:/my-route", "endpoints:/databricks-llama-2-70b-chat". Defaults to "openai:/gpt-4". If using Azure OpenAI, the ``OPENAI_DEPLOYMENT_NAME`` environment variable will take precedence. Your use of a third party LLM service (e.g., OpenAI) for evaluation may be subject to and governed by the LLM service's terms of use. :param grading_context_columns: (Optional) The name of the grading context column, or a list of grading context column names, required to compute the metric. The ``grading_context_columns`` are used by the LLM as a judge as additional information to compute the metric. The columns are extracted from the input dataset or output predictions based on ``col_mapping`` in the ``evaluator_config`` passed to :py:func:`mlflow.evaluate()`. :param parameters: (Optional) Parameters for the LLM used to compute the metric. By default, we set the temperature to 0.0, max_tokens to 200, and top_p to 1.0. We recommend setting the temperature to 0.0 for the LLM used as a judge to ensure consistent results. :param aggregations: (Optional) The list of options to aggregate the scores. Currently supported options are: min, max, mean, median, variance, p90. :param greater_is_better: (Optional) Whether the metric is better when it is greater. :param max_workers: (Optional) The maximum number of workers to use for judge scoring. Defaults to 10 workers. :return: A metric object. .. testcode:: python :caption: Example for creating a genai metric from mlflow.metrics.genai import EvaluationExample, make_genai_metric example = EvaluationExample( input="What is MLflow?", output=( "MLflow is an open-source platform for managing machine " "learning workflows, including experiment tracking, model packaging, " "versioning, and deployment, simplifying the ML lifecycle." ), score=4, justification=( "The definition effectively explains what MLflow is " "its purpose, and its developer. It could be more concise for a 5-score.", ), grading_context={ "targets": ( "MLflow is an open-source platform for managing " "the end-to-end machine learning (ML) lifecycle. It was developed by " "Databricks, a company that specializes in big data and machine learning " "solutions. MLflow is designed to address the challenges that data " "scientists and machine learning engineers face when developing, training, " "and deploying machine learning models." ) }, ) metric = make_genai_metric( name="answer_correctness", definition=( "Answer correctness is evaluated on the accuracy of the provided output based on " "the provided targets, which is the ground truth. Scores can be assigned based on " "the degree of semantic similarity and factual correctness of the provided output " "to the provided targets, where a higher score indicates higher degree of accuracy." ), grading_prompt=( "Answer correctness: Below are the details for different scores:" "- Score 1: The output is completely incorrect. It is completely different from " "or contradicts the provided targets." "- Score 2: The output demonstrates some degree of semantic similarity and " "includes partially correct information. However, the output still has significant " "discrepancies with the provided targets or inaccuracies." "- Score 3: The output addresses a couple of aspects of the input accurately, " "aligning with the provided targets. However, there are still omissions or minor " "inaccuracies." "- Score 4: The output is mostly correct. It provides mostly accurate information, " "but there may be one or more minor omissions or inaccuracies." "- Score 5: The output is correct. It demonstrates a high degree of accuracy and " "semantic similarity to the targets." ), examples=[example], version="v1", model="openai:/gpt-4", grading_context_columns=["targets"], parameters={"temperature": 0.0}, aggregations=["mean", "variance", "p90"], greater_is_better=True, ) """ if not isinstance(grading_context_columns, list): grading_context_columns = [grading_context_columns] def process_example(example): if example.grading_context is None and len(grading_context_columns) == 0: grading_context = {} elif isinstance(example.grading_context, dict): grading_context = example.grading_context else: # The grading context is string-like. Assume that it corresponds to the first # grading context column and update the example accordingly grading_context = {grading_context_columns[0]: example.grading_context} example.grading_context = grading_context if set(grading_context.keys()) != set(grading_context_columns): raise MlflowException.invalid_parameter_value( f"Example grading context does not contain required columns.\n" f" Example grading context columns: {list(grading_context.keys())}\n" f" Required grading context columns: {grading_context_columns}\n" ) return example if examples is not None: examples = [process_example(example) for example in examples] class_name = f"mlflow.metrics.genai.prompts.{version}.EvaluationModel" try: evaluation_model_class_module = _get_class_from_string(class_name) except ModuleNotFoundError: raise MlflowException( f"Failed to find evaluation model for version {version}." f" Please check the correctness of the version", error_code=INVALID_PARAMETER_VALUE, ) from None except Exception as e: raise MlflowException( f"Failed to construct evaluation model {version}. Error: {e!r}", error_code=INTERNAL_ERROR, ) from None evaluation_context = evaluation_model_class_module( name, definition, grading_prompt, examples, model, *(parameters,) if parameters is not None else (), ).to_dict() def eval_fn( predictions: "pd.Series", metrics: Dict[str, MetricValue], inputs: "pd.Series", *args, ) -> MetricValue: """ This is the function that is called when the metric is evaluated. """ eval_values = dict(zip(grading_context_columns, args)) outputs = predictions.to_list() inputs = inputs.to_list() eval_model = evaluation_context["model"] eval_parameters = evaluation_context["parameters"] # TODO: Save the metric definition in a yaml file for model monitoring if not isinstance(eval_model, str): raise MlflowException( message="The model argument must be a string URI referring to an openai model " "(openai:/gpt-3.5-turbo) or gateway (gateway:/my-route), " f"passed {eval_model} instead", error_code=INVALID_PARAMETER_VALUE, ) # generate grading payloads grading_payloads = [] for indx, (input, output) in enumerate(zip(inputs, outputs)): try: arg_string = _format_args_string(grading_context_columns, eval_values, indx) except Exception as e: raise MlflowException( f"Values for grading_context_columns are malformed and cannot be " f"formatted into a prompt for metric '{name}'.\n" f"Required columns: {grading_context_columns}\n" f"Values: {eval_values}\n" f"Error: {e!r}\n" f"Please check the following: \n" "- predictions and targets (if required) are provided correctly\n" "- grading_context_columns are mapped correctly using the evaluator_config " "parameter\n" "- input and output data are formatted correctly." ) grading_payloads.append( evaluation_context["eval_prompt"].format( input=input, output=output, grading_context_columns=arg_string ) ) def score_model_on_one_payload( payload, eval_model, ): try: raw_result = model_utils.score_model_on_payload( eval_model, payload, eval_parameters ) return _extract_score_and_justification(raw_result) except ImportError: raise except MlflowException as e: if e.error_code in [ ErrorCode.Name(BAD_REQUEST), ErrorCode.Name(UNAUTHENTICATED), ErrorCode.Name(INVALID_PARAMETER_VALUE), ]: raise else: return None, f"Failed to score model on payload. Error: {e!s}" except Exception as e: return None, f"Failed to score model on payload. Error: {e!s}" scores = [None] * len(inputs) justifications = [None] * len(inputs) with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = { executor.submit( score_model_on_one_payload, payload, eval_model, ): indx for indx, payload in enumerate(grading_payloads) } as_comp = as_completed(futures) try: from tqdm.auto import tqdm as_comp = tqdm(as_comp, total=len(futures)) except ImportError: pass for future in as_comp: indx = futures[future] score, justification = future.result() scores[indx] = score justifications[indx] = justification # loop over the aggregations and compute the aggregate results on the scores def aggregate_function(aggregate_option, scores): import numpy as np options = { "min": np.min, "max": np.max, "mean": np.mean, "median": np.median, "variance": np.var, "p90": lambda x: np.percentile(x, 90) if x else None, } if aggregate_option not in options: raise MlflowException( message=f"Invalid aggregate option {aggregate_option}.", error_code=INVALID_PARAMETER_VALUE, ) return options[aggregate_option](scores) scores_for_aggregation = [score for score in scores if score is not None] aggregate_results = ( {option: aggregate_function(option, scores_for_aggregation) for option in aggregations} if aggregations is not None else {} ) return MetricValue(scores, justifications, aggregate_results) signature_parameters = [ Parameter("predictions", Parameter.POSITIONAL_OR_KEYWORD, annotation="pd.Series"), Parameter("metrics", Parameter.POSITIONAL_OR_KEYWORD, annotation=Dict[str, MetricValue]), Parameter("inputs", Parameter.POSITIONAL_OR_KEYWORD, annotation="pd.Series"), ] # Add grading_context_columns to signature list for var in grading_context_columns: signature_parameters.append(Parameter(var, Parameter.POSITIONAL_OR_KEYWORD)) eval_fn.__signature__ = Signature(signature_parameters) return make_metric( eval_fn=eval_fn, greater_is_better=greater_is_better, name=name, version=version, metric_details=evaluation_context["eval_prompt"].__str__(), )