from dataclasses import dataclass, field
from mlflow.exceptions import MlflowException
from mlflow.genai.utils.enum_utils import StrEnum
from mlflow.protos import review_queues_pb2 as _rq_pb
from mlflow.protos.databricks_pb2 import INVALID_PARAMETER_VALUE
from mlflow.utils.annotations import experimental
[docs]@experimental(version="3.14.0")
class ReviewItemType(StrEnum):
"""What kind of object a queue item points at.
v1 ships ``trace`` only; the column is kept wide enough for
``session`` / ``span`` to land later without a migration.
"""
TRACE = "trace"
[docs] def to_proto(self) -> int:
return _rq_pb.TRACE
[docs] @classmethod
def from_proto(cls, proto: int) -> "ReviewItemType":
if proto == _rq_pb.TRACE:
return cls.TRACE
raise MlflowException(
f"`item_type` must be TRACE; got proto enum value {proto}.",
error_code=INVALID_PARAMETER_VALUE,
)
[docs]@experimental(version="3.14.0")
class ReviewQueueType(StrEnum):
"""The flavor of a review queue.
``USER`` — ``name`` equals a user identifier and the queue has exactly
one assigned user (that user). It is the reviewer's personal
worklist and inherits *all* of the experiment's label schemas as
its questions (no chooser, resolved live at read time), so creating
one is just "assign these traces to this person".
``CUSTOM`` — an arbitrary, non-reserved ``name`` with 0..N assigned
users and an explicitly-attached subset of label schemas. The
analog of a Databricks ``LabelingSession``.
"""
USER = "user"
CUSTOM = "custom"
[docs] def to_proto(self) -> int:
if self is ReviewQueueType.USER:
return _rq_pb.USER
return _rq_pb.CUSTOM
[docs] @classmethod
def from_proto(cls, proto: int) -> "ReviewQueueType":
if proto == _rq_pb.USER:
return cls.USER
if proto == _rq_pb.CUSTOM:
return cls.CUSTOM
raise MlflowException(
f"`queue_type` must be one of USER or CUSTOM; got proto enum value {proto}.",
error_code=INVALID_PARAMETER_VALUE,
)
[docs]@experimental(version="3.14.0")
class ReviewStatus(StrEnum):
"""Shared-pool workflow status of a single attached item.
Status is per-``(queue, item)`` — NOT per-user. The queue's assigned
users are a *pool*: an item is addressed when **any** assigned user acts
on it, not when every user does. ``completed_by`` records who that was.
Transitions are always explicit reviewer actions; writing an assessment
against the item does NOT advance the status:
- ``PENDING`` -> ``COMPLETE``: any user marks it done (sets
``completed_by`` + ``completed_time_ms``).
- ``PENDING`` -> ``DECLINED``: an explicit "this item will not be
reviewed in this queue" (out of scope / can't judge) — distinct
from a temporary defer, and also records who declined it.
- ``COMPLETE`` / ``DECLINED`` -> ``PENDING``: reopen, clearing the
``completed_by`` / ``completed_time_ms`` attribution.
There is no ``in_progress`` state and no auto-flip.
"""
PENDING = "pending"
COMPLETE = "complete"
DECLINED = "declined"
[docs] def to_proto(self) -> int:
return {
ReviewStatus.PENDING: _rq_pb.PENDING,
ReviewStatus.COMPLETE: _rq_pb.COMPLETE,
ReviewStatus.DECLINED: _rq_pb.DECLINED,
}[self]
[docs] @classmethod
def from_proto(cls, proto: int) -> "ReviewStatus":
mapping = {
_rq_pb.PENDING: cls.PENDING,
_rq_pb.COMPLETE: cls.COMPLETE,
_rq_pb.DECLINED: cls.DECLINED,
}
if proto not in mapping:
raise MlflowException(
f"`status` must be one of PENDING, COMPLETE, or DECLINED; "
f"got proto enum value {proto}.",
error_code=INVALID_PARAMETER_VALUE,
)
return mapping[proto]
[docs]@experimental(version="3.14.0")
@dataclass
class ReviewQueueItem:
"""One item attached to a queue plus its shared-pool workflow status.
A row of the ``review_queue_items`` table. The same item attached to
two different queues has an independent :class:`ReviewQueueItem` (and
therefore an independent status) in each — intentional, since the
review contexts differ.
``completed_by`` / ``completed_time_ms`` are populated only in the
``COMPLETE`` and ``DECLINED`` terminal states and cleared on reopen.
"""
queue_id: str
item_type: ReviewItemType
item_id: str
status: ReviewStatus
creation_time_ms: int
last_update_time_ms: int
completed_by: str | None = None
completed_time_ms: int | None = None
[docs] def to_proto(self) -> "_rq_pb.ReviewQueueItem":
proto = _rq_pb.ReviewQueueItem(
queue_id=self.queue_id,
item_type=self.item_type.to_proto(),
item_id=self.item_id,
status=self.status.to_proto(),
creation_time_ms=self.creation_time_ms,
last_update_time_ms=self.last_update_time_ms,
)
if self.completed_by is not None:
proto.completed_by = self.completed_by
if self.completed_time_ms is not None:
proto.completed_time_ms = self.completed_time_ms
return proto
[docs] @classmethod
def from_proto(cls, proto: "_rq_pb.ReviewQueueItem") -> "ReviewQueueItem":
return cls(
queue_id=proto.queue_id,
item_type=ReviewItemType.from_proto(proto.item_type),
item_id=proto.item_id,
status=ReviewStatus.from_proto(proto.status),
creation_time_ms=proto.creation_time_ms,
last_update_time_ms=proto.last_update_time_ms,
completed_by=proto.completed_by if proto.HasField("completed_by") else None,
completed_time_ms=(
proto.completed_time_ms if proto.HasField("completed_time_ms") else None
),
)
[docs]@experimental(version="3.14.0")
@dataclass
class ReviewQueue:
"""A named bundle of attached items, questions, and assigned users.
Scoped to an experiment and keyed on ``(experiment_id, name)``. The
attached items are paged separately (they can be many) via
``list_review_queue_items``; the small association sets — assigned
``users`` and attached label-schema ids — are hydrated inline here.
``schema_ids`` reflects the literal ``review_queue_label_schemas`` rows:
it is the chosen subset for a ``CUSTOM`` queue and **empty** for a
``USER`` queue (which resolves to all of the experiment's schemas live
at read time, a concern of the read/handler layer, not storage).
"""
queue_id: str
experiment_id: str
name: str
queue_type: ReviewQueueType
created_by: str | None
creation_time_ms: int
last_update_time_ms: int
users: list[str] = field(default_factory=list)
schema_ids: list[str] = field(default_factory=list)
[docs] def to_proto(self) -> "_rq_pb.ReviewQueue":
proto = _rq_pb.ReviewQueue(
queue_id=self.queue_id,
experiment_id=self.experiment_id,
name=self.name,
queue_type=self.queue_type.to_proto(),
creation_time_ms=self.creation_time_ms,
last_update_time_ms=self.last_update_time_ms,
users=self.users,
schema_ids=self.schema_ids,
)
if self.created_by is not None:
proto.created_by = self.created_by
return proto
[docs] @classmethod
def from_proto(cls, proto: "_rq_pb.ReviewQueue") -> "ReviewQueue":
return cls(
queue_id=proto.queue_id,
experiment_id=proto.experiment_id,
name=proto.name,
queue_type=ReviewQueueType.from_proto(proto.queue_type),
created_by=proto.created_by if proto.HasField("created_by") else None,
creation_time_ms=proto.creation_time_ms,
last_update_time_ms=proto.last_update_time_ms,
users=list(proto.users),
schema_ids=list(proto.schema_ids),
)