Source code for mlflow.genai.optimize.optimizers.gepa_optimizer

import json
import logging
import tempfile
from pathlib import Path
from typing import TYPE_CHECKING, Any

import mlflow
from mlflow.exceptions import MlflowException
from mlflow.genai.optimize.optimizers.base import BasePromptOptimizer, _EvalFunc
from mlflow.genai.optimize.types import EvaluationResultRecord, PromptOptimizerOutput
from mlflow.utils.annotations import experimental

if TYPE_CHECKING:
    import gepa

_logger = logging.getLogger(__name__)

# Artifact path and file name constants
PROMPT_CANDIDATES_DIR = "prompt_candidates"
EVAL_RESULTS_FILE = "eval_results.json"
SCORES_FILE = "scores.json"


[docs]@experimental(version="3.5.0") class GepaPromptOptimizer(BasePromptOptimizer): """ A prompt adapter that uses GEPA (Genetic-Pareto) optimization algorithm to optimize prompts. GEPA uses iterative mutation, reflection, and Pareto-aware candidate selection to improve text components like prompts. It leverages large language models to reflect on system behavior and propose improvements. Args: reflection_model: Name of the model to use for reflection and optimization. Format: "<provider>:/<model>" (e.g., "openai:/gpt-4o", "anthropic:/claude-3-5-sonnet-20241022"). max_metric_calls: Maximum number of evaluation calls during optimization. Higher values may lead to better results but increase optimization time. Default: 100 display_progress_bar: Whether to show a progress bar during optimization. Default: False gepa_kwargs: Additional keyword arguments to pass directly to gepa.optimize <https://github.com/gepa-ai/gepa/blob/main/src/gepa/api.py>. Useful for accessing advanced GEPA features not directly exposed through MLflow's GEPA interface. Note: Parameters already handled by MLflow's GEPA class will be overridden by the direct parameters and should not be passed through gepa_kwargs. List of predefined params: - max_metric_calls - display_progress_bar - seed_candidate - trainset - adapter - reflection_lm - use_mlflow Example: .. code-block:: python import mlflow import openai from mlflow.genai.optimize.optimizers import GepaPromptOptimizer prompt = mlflow.genai.register_prompt( name="qa", template="Answer the following question: {{question}}", ) def predict_fn(question: str) -> str: completion = openai.OpenAI().chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt.format(question=question)}], ) return completion.choices[0].message.content dataset = [ {"inputs": {"question": "What is the capital of France?"}, "outputs": "Paris"}, {"inputs": {"question": "What is the capital of Germany?"}, "outputs": "Berlin"}, ] result = mlflow.genai.optimize_prompts( predict_fn=predict_fn, train_data=dataset, prompt_uris=[prompt.uri], optimizer=GepaPromptOptimizer( reflection_model="openai:/gpt-4o", display_progress_bar=True, ), ) print(result.optimized_prompts[0].template) """ def __init__( self, reflection_model: str, max_metric_calls: int = 100, display_progress_bar: bool = False, gepa_kwargs: dict[str, Any] | None = None, ): self.reflection_model = reflection_model self.max_metric_calls = max_metric_calls self.display_progress_bar = display_progress_bar self.gepa_kwargs = gepa_kwargs or {}
[docs] def optimize( self, eval_fn: _EvalFunc, train_data: list[dict[str, Any]], target_prompts: dict[str, str], enable_tracking: bool = True, ) -> PromptOptimizerOutput: """ Optimize the target prompts using GEPA algorithm. Args: eval_fn: The evaluation function that takes candidate prompts as a dict (prompt template name -> prompt template) and a dataset as a list of dicts, and returns a list of EvaluationResultRecord. train_data: The dataset to use for optimization. Each record should include the inputs and outputs fields with dict values. target_prompts: The target prompt templates to use. The key is the prompt template name and the value is the prompt template. enable_tracking: If True (default), automatically log optimization progress. Returns: The outputs of the prompt optimizer that includes the optimized prompts as a dict (prompt template name -> prompt template). """ from mlflow.metrics.genai.model_utils import _parse_model_uri if not train_data: raise MlflowException.invalid_parameter_value( "GEPA optimizer requires `train_data` to be provided." ) try: import gepa except ImportError as e: raise ImportError( "GEPA >= 0.0.26 is required. Please install it with: `pip install 'gepa>=0.0.26'`" ) from e provider, model = _parse_model_uri(self.reflection_model) class MlflowGEPAAdapter(gepa.GEPAAdapter): """ MLflow optimization adapter for GEPA optimization Args: eval_function: Function that evaluates candidate prompts on a dataset. prompts_dict: Dictionary mapping prompt names to their templates. tracking_enabled: Whether to log traces/metrics/params/artifacts during optimization. full_dataset_size: Size of the full training dataset, used to distinguish full validation passes from minibatch evaluations. """ def __init__(self, eval_function, prompts_dict, tracking_enabled, full_dataset_size): self.eval_function = eval_function self.prompts_dict = prompts_dict self.prompt_names = list(prompts_dict.keys()) self.tracking_enabled = tracking_enabled self.full_dataset_size = full_dataset_size self.validation_iteration = 0 def evaluate( self, batch: list[dict[str, Any]], candidate: dict[str, str], capture_traces: bool = False, ) -> "gepa.EvaluationBatch": """ Evaluate a candidate prompt using the MLflow eval function. Args: batch: List of data instances to evaluate candidate: Proposed text components (prompts) capture_traces: Whether to capture execution traces Returns: EvaluationBatch with outputs, scores, and optional trajectories """ eval_results = self.eval_function(candidate, batch) outputs = [result.outputs for result in eval_results] scores = [result.score for result in eval_results] trajectories = eval_results if capture_traces else None objective_scores = [result.individual_scores for result in eval_results] # Track validation candidates only during full dataset validation # (not during minibatch evaluation in reflective mutation) is_full_validation = not capture_traces and len(batch) == self.full_dataset_size if is_full_validation and self.tracking_enabled: self._log_validation_candidate(candidate, eval_results) return gepa.EvaluationBatch( outputs=outputs, scores=scores, trajectories=trajectories, objective_scores=objective_scores if any(objective_scores) else None, ) def _log_validation_candidate( self, candidate: dict[str, str], eval_results: list[EvaluationResultRecord], ) -> None: """ Log validation candidate prompts and scores as MLflow artifacts. Args: candidate: The candidate prompts being validated eval_results: Evaluation results containing scores """ if not self.tracking_enabled: return iteration = self.validation_iteration self.validation_iteration += 1 # Compute aggregate score across all records aggregate_score = ( sum(r.score for r in eval_results) / len(eval_results) if eval_results else 0.0 ) # Collect all scorer names scorer_names = set() for result in eval_results: scorer_names |= result.individual_scores.keys() # Build the evaluation results table and log to MLflow as a table artifact eval_results_table = { "inputs": [r.inputs for r in eval_results], "output": [r.outputs for r in eval_results], "expectation": [r.expectations for r in eval_results], "aggregate_score": [r.score for r in eval_results], } for scorer_name in scorer_names: eval_results_table[scorer_name] = [ r.individual_scores.get(scorer_name) for r in eval_results ] iteration_dir = f"{PROMPT_CANDIDATES_DIR}/iteration_{iteration}" mlflow.log_table( data=eval_results_table, artifact_file=f"{iteration_dir}/{EVAL_RESULTS_FILE}", ) # Compute per-scorer average scores per_scorer_scores = {} for scorer_name in scorer_names: scores = [ r.individual_scores[scorer_name] for r in eval_results if scorer_name in r.individual_scores ] if scores: per_scorer_scores[scorer_name] = sum(scores) / len(scores) # Log per-scorer metrics for time progression visualization mlflow.log_metrics( {"eval_score": aggregate_score} | {f"eval_score.{name}": score for name, score in per_scorer_scores.items()}, step=iteration, ) # Log scores summary as JSON artifact scores_data = { "aggregate": aggregate_score, "per_scorer": per_scorer_scores, } with tempfile.TemporaryDirectory() as tmp_dir: tmp_path = Path(tmp_dir) scores_path = tmp_path / SCORES_FILE with open(scores_path, "w") as f: json.dump(scores_data, f, indent=2) mlflow.log_artifact(scores_path, artifact_path=iteration_dir) # Write each prompt as a separate text file for prompt_name, prompt_text in candidate.items(): prompt_path = tmp_path / f"{prompt_name}.txt" with open(prompt_path, "w") as f: f.write(prompt_text) mlflow.log_artifact(prompt_path, artifact_path=iteration_dir) def make_reflective_dataset( self, candidate: dict[str, str], eval_batch: "gepa.EvaluationBatch[EvaluationResultRecord, Any]", components_to_update: list[str], ) -> dict[str, list[dict[str, Any]]]: """ Build a reflective dataset for instruction refinement. Args: candidate: The evaluated candidate eval_batch: Result of evaluate with capture_traces=True components_to_update: Component names to update Returns: Dict of reflective dataset per component """ reflective_datasets = {} for component_name in components_to_update: component_data = [] trajectories = eval_batch.trajectories for i, (trajectory, score) in enumerate(zip(trajectories, eval_batch.scores)): trace = trajectory.trace spans = [] if trace: spans = [ { "name": span.name, "inputs": span.inputs, "outputs": span.outputs, } for span in trace.data.spans ] component_data.append( { "component_name": component_name, "current_text": candidate.get(component_name, ""), "trace": spans, "score": score, "inputs": trajectory.inputs, "outputs": trajectory.outputs, "expectations": trajectory.expectations, "rationales": trajectory.rationales, "index": i, } ) reflective_datasets[component_name] = component_data return reflective_datasets adapter = MlflowGEPAAdapter( eval_fn, target_prompts, enable_tracking, full_dataset_size=len(train_data) ) kwargs = self.gepa_kwargs | { "seed_candidate": target_prompts, "trainset": train_data, "adapter": adapter, "reflection_lm": f"{provider}/{model}", "max_metric_calls": self.max_metric_calls, "display_progress_bar": self.display_progress_bar, "use_mlflow": enable_tracking, } gepa_result = gepa.optimize(**kwargs) optimized_prompts = gepa_result.best_candidate ( initial_eval_score, final_eval_score, initial_eval_score_per_scorer, final_eval_score_per_scorer, ) = self._extract_eval_scores(gepa_result) return PromptOptimizerOutput( optimized_prompts=optimized_prompts, initial_eval_score=initial_eval_score, final_eval_score=final_eval_score, initial_eval_score_per_scorer=initial_eval_score_per_scorer, final_eval_score_per_scorer=final_eval_score_per_scorer, )
def _extract_eval_scores( self, result: "gepa.GEPAResult" ) -> tuple[float | None, float | None, dict[str, float], dict[str, float]]: """ Extract initial and final evaluation scores from GEPA result. Args: result: GEPA optimization result Returns: Tuple of (initial_eval_score, final_eval_score, initial_eval_score_per_scorer, final_eval_score_per_scorer). Aggregated scores can be None if unavailable. """ final_eval_score = None initial_eval_score = None initial_eval_score_per_scorer: dict[str, float] = {} final_eval_score_per_scorer: dict[str, float] = {} scores = result.val_aggregate_scores if scores and len(scores) > 0: # The first score is the initial baseline score initial_eval_score = scores[0] # The highest score is the final optimized score final_eval_score = max(scores) # Extract per-scorer scores from val_aggregate_subscores subscores = getattr(result, "val_aggregate_subscores", None) if subscores and len(subscores) > 0: # The first subscore dict is the initial baseline per-scorer scores initial_eval_score_per_scorer = subscores[0] or {} # Find the per-scorer scores corresponding to the best aggregate score if scores and len(scores) > 0: best_idx = scores.index(max(scores)) if best_idx < len(subscores) and subscores[best_idx]: final_eval_score_per_scorer = subscores[best_idx] return ( initial_eval_score, final_eval_score, initial_eval_score_per_scorer, final_eval_score_per_scorer, )