Source code for mlflow.genai.review_queues

"""Review queues for expert trace-review workflows.

A ``ReviewQueue`` is a named bundle of attached items, a set of
questions (label schemas), and a set of assigned users, scoped to an
experiment. Two flavors of the same entity:

- a **user queue** (``name`` = a user, exactly that one user, all of the
  experiment's schemas) is a reviewer's personal worklist; and
- a **custom queue** (arbitrary name, 0..N users, a chosen subset of
  schemas) is a curated, possibly collaborative review task.

Assigned users form a *pool*: an item is addressed when any one of them
acts on it, and the per-``(queue, item)`` :class:`ReviewStatus` records
who. Reviewers answer the queue's questions by writing ``Feedback``
assessments against the item (no new answer storage); the queue carries
only the review *workflow*.

This module exposes the entity types plus the fluent SDK for managing
queues against the MLflow tracking store.
"""

from typing import TYPE_CHECKING, Literal

from mlflow.exceptions import MlflowException
from mlflow.genai.review_queues.review_queues import (
    ReviewItemType,
    ReviewQueue,
    ReviewQueueItem,
    ReviewQueueType,
    ReviewStatus,
)
from mlflow.protos.databricks_pb2 import INVALID_PARAMETER_VALUE
from mlflow.tracing.client import TracingClient
from mlflow.utils.annotations import experimental

if TYPE_CHECKING:
    from mlflow.store.entities.paged_list import PagedList

__all__ = [
    "ReviewItemType",
    "ReviewQueue",
    "ReviewQueueItem",
    "ReviewQueueType",
    "ReviewStatus",
    "add_items_to_review_queue",
    "create_review_queue",
    "delete_review_queue",
    "get_or_create_user_queue",
    "get_review_queue",
    "list_review_queue_items",
    "list_review_queues",
    "remove_items_from_review_queue",
    "set_review_queue_item_status",
    "update_review_queue",
]


def _resolve_experiment_id(experiment_id: str | None) -> str:
    if experiment_id is not None:
        return experiment_id
    from mlflow.tracking.fluent import _get_experiment_id

    return _get_experiment_id()


[docs]@experimental(version="3.14.0") def create_review_queue( name: str, *, queue_type: Literal["user", "custom"], users: list[str] | None = None, schema_ids: list[str] | None = None, experiment_id: str | None = None, ) -> ReviewQueue: """ Create a review queue scoped to an experiment. Args: name: Queue name, unique within the experiment. For a ``"user"`` queue this is the user identifier; ``"default"`` is reserved case-insensitively (the no-auth default user queue) and rejected for custom queues. queue_type: ``"user"`` (exactly one assigned user equal to ``name``, inherits all of the experiment's label schemas) or ``"custom"`` (0 to 10 users and an explicit subset of schemas). users: Assigned users (at most 10). Derived as ``[name]`` for a user queue when omitted; 0 to 10 for a custom queue. schema_ids: Attached label-schema ids. Must be empty for a user queue (it resolves to all of the experiment's schemas); the chosen subset for a custom queue. experiment_id: Parent experiment; defaults to the current experiment. Returns: The created :py:class:`ReviewQueue`. Its owner (``created_by``) is set by the server from the authenticated user, not by the caller. """ return TracingClient()._create_review_queue( _resolve_experiment_id(experiment_id), name=name, queue_type=queue_type, users=users, schema_ids=schema_ids, )
[docs]@experimental(version="3.14.0") def get_or_create_user_queue( user: str, *, experiment_id: str | None = None, ) -> ReviewQueue: """ Return a user's personal review queue, creating it if absent. Idempotent: the backbone of "assign these items to this person" — call this, then :func:`add_items_to_review_queue`. Args: user: The reviewer identifier (also the queue name). experiment_id: Parent experiment; defaults to the current experiment. Returns: The user's :py:class:`ReviewQueue` (owned by that user). """ return TracingClient()._get_or_create_user_queue( _resolve_experiment_id(experiment_id), user=user )
[docs]@experimental(version="3.14.0") def get_review_queue( queue_id: str | None = None, *, name: str | None = None, experiment_id: str | None = None, ) -> ReviewQueue: """ Fetch a review queue by ``queue_id`` or by ``(experiment_id, name)``. Provide exactly one of ``queue_id`` or ``name``. When ``name`` is given, ``experiment_id`` defaults to the current experiment. Returns: The matching :py:class:`ReviewQueue`. """ if (queue_id is None) == (name is None): raise MlflowException( "Provide exactly one of `queue_id` or `name`.", error_code=INVALID_PARAMETER_VALUE, ) client = TracingClient() if queue_id is not None: return client._get_review_queue(queue_id) return client._get_review_queue_by_name(_resolve_experiment_id(experiment_id), name)
[docs]@experimental(version="3.14.0") def list_review_queues( *, user: str | None = None, experiment_id: str | None = None, max_results: int | None = None, page_token: str | None = None, ) -> "PagedList[ReviewQueue]": """ List an experiment's review queues, newest first. Args: user: If set, return only queues this user is assigned to. experiment_id: Parent experiment; defaults to the current experiment. max_results: Page size. page_token: Continuation token from a previous call. Returns: A :py:class:`PagedList` of :py:class:`ReviewQueue`. """ return TracingClient()._list_review_queues( _resolve_experiment_id(experiment_id), user=user, max_results=max_results, page_token=page_token, )
[docs]@experimental(version="3.14.0") def update_review_queue( queue_id: str, *, name: str | None = None, new_owner: str | None = None, users: list[str] | None = None, schema_ids: list[str] | None = None, ) -> ReviewQueue: """Update a custom queue's name, owner, assigned users, and/or schemas. Pass only the fields you want to change; ``None`` leaves a field untouched (an empty ``users`` / ``schema_ids`` list clears that set). ``queue_type`` is immutable and user queues reject this. Reassigning the owner (``new_owner``) requires experiment MANAGE — enforced server-side — while the queue's owner may make the other edits with EDIT. A queue's ``schema_ids`` (its questions) are frozen once it has attached items; detach the items first to edit them. Returns: The updated :py:class:`ReviewQueue`. """ return TracingClient()._update_review_queue( queue_id, name=name, new_owner=new_owner, users=users, schema_ids=schema_ids )
[docs]@experimental(version="3.14.0") def delete_review_queue(queue_id: str) -> None: """ Delete a queue and its associations. No-op if it doesn't exist. Reviewer assessments on the queue's items are unaffected. """ TracingClient()._delete_review_queue(queue_id)
[docs]@experimental(version="3.14.0") def add_items_to_review_queue(queue_id: str, *, item_ids: list[str]) -> list[ReviewQueueItem]: """ Attach items to a queue, returning the resulting queue items. Idempotent per item (re-attaching preserves the existing status). The returned list covers every requested ``item_id``, in request order. """ return TracingClient()._add_items_to_review_queue(queue_id, item_ids=item_ids)
[docs]@experimental(version="3.14.0") def remove_items_from_review_queue(queue_id: str, *, item_ids: list[str]) -> None: """Detach items from a queue. No-op for items not attached.""" TracingClient()._remove_items_from_review_queue(queue_id, item_ids=item_ids)
[docs]@experimental(version="3.14.0") def list_review_queue_items( queue_id: str, *, status: Literal["pending", "complete", "declined"] | None = None, max_results: int | None = None, page_token: str | None = None, ) -> "PagedList[ReviewQueueItem]": """ List a queue's attached items, newest-attached first. Args: queue_id: The queue to list. status: Optional filter on shared-pool status. max_results: Page size. page_token: Continuation token from a previous call. Returns: A :py:class:`PagedList` of :py:class:`ReviewQueueItem`. """ return TracingClient()._list_review_queue_items( queue_id, status=status, max_results=max_results, page_token=page_token )
[docs]@experimental(version="3.14.0") def set_review_queue_item_status( queue_id: str, *, item_id: str, status: Literal["pending", "complete", "declined"], completed_by: str | None = None, ) -> ReviewQueueItem: """ Set the shared-pool status of an attached item. Moving to ``"complete"`` / ``"declined"`` records ``completed_by``; moving back to ``"pending"`` (reopen) clears it. ``completed_by`` is required for the terminal states and rejected for ``"pending"``. Returns: The updated :py:class:`ReviewQueueItem`. """ return TracingClient()._set_review_queue_item_status( queue_id, item_id=item_id, status=status, completed_by=completed_by )