Source code for mlflow.server.auth.client

from mlflow.server.auth.entities import (
    GetUserPermissionResult,
    Role,
    RolePermission,
    User,
    UserRoleAssignment,
)
from mlflow.server.auth.routes import (
    ADD_ROLE_PERMISSION,
    ASSIGN_ROLE,
    CREATE_ROLE,
    CREATE_USER,
    DELETE_ROLE,
    DELETE_USER,
    GET_ROLE,
    GET_USER,
    GET_USER_PERMISSION,
    GRANT_USER_PERMISSION,
    LIST_ROLE_PERMISSIONS,
    LIST_ROLE_USERS,
    LIST_ROLES,
    LIST_USER_ROLES,
    REMOVE_ROLE_PERMISSION,
    REVOKE_USER_PERMISSION,
    UNASSIGN_ROLE,
    UPDATE_ROLE,
    UPDATE_ROLE_PERMISSION,
    UPDATE_USER_ADMIN,
    UPDATE_USER_PASSWORD,
)
from mlflow.utils.credentials import get_default_host_creds
from mlflow.utils.rest_utils import http_request, verify_rest_response


[docs]class AuthServiceClient: """ Client of an MLflow Tracking Server that enabled the default basic authentication plugin. It is recommended to use :py:func:`mlflow.server.get_app_client()` to instantiate this class. See https://mlflow.org/docs/latest/auth.html for more information. """ def __init__(self, tracking_uri: str): """ Args: tracking_uri: Address of local or remote tracking server. """ self.tracking_uri = tracking_uri def _request(self, endpoint, method, *, expected_status: int = 200, **kwargs): host_creds = get_default_host_creds(self.tracking_uri) resp = http_request(host_creds, endpoint, method, **kwargs) resp = verify_rest_response(resp, endpoint, expected_status=expected_status) if resp.status_code == 204 or not resp.content: return {} return resp.json()
[docs] def create_user(self, username: str, password: str): """ Create a new user. Args: username: The username. password: The user's password. Must not be empty string. Raises: mlflow.exceptions.RestException: if the username is already taken. Returns: A single :py:class:`mlflow.server.auth.entities.User` object. .. code-block:: python :caption: Example from mlflow.server.auth.client import AuthServiceClient client = AuthServiceClient("tracking_uri") user = client.create_user("newuser", "newpassword") print(f"user_id: {user.id}") print(f"username: {user.username}") print(f"password_hash: {user.password_hash}") print(f"is_admin: {user.is_admin}") .. code-block:: text :caption: Output user_id: 3 username: newuser password_hash: REDACTED is_admin: False """ resp = self._request( CREATE_USER, "POST", json={"username": username, "password": password}, ) return User.from_json(resp["user"])
[docs] def get_user(self, username: str): """ Get a user with a specific username. Args: username: The username. Raises: mlflow.exceptions.RestException: if the user does not exist Returns: A single :py:class:`mlflow.server.auth.entities.User` object. .. code-block:: bash :caption: Example export MLFLOW_TRACKING_USERNAME=admin export MLFLOW_TRACKING_PASSWORD=password .. code-block:: python from mlflow.server.auth.client import AuthServiceClient client = AuthServiceClient("tracking_uri") client.create_user("newuser", "newpassword") user = client.get_user("newuser") print(f"user_id: {user.id}") print(f"username: {user.username}") print(f"password_hash: {user.password_hash}") print(f"is_admin: {user.is_admin}") .. code-block:: text :caption: Output user_id: 3 username: newuser password_hash: REDACTED is_admin: False """ resp = self._request( GET_USER, "GET", params={"username": username}, ) return User.from_json(resp["user"])
[docs] def update_user_password( self, username: str, password: str, current_password: str | None = None ): """ Update the password of a specific user. Args: username: The username. password: The new password. current_password: The user's current password. Required when a user is changing their own password (self-service); the server rejects the request otherwise. Admins changing someone else's password may omit this argument. Raises: mlflow.exceptions.RestException: if the user does not exist, or if ``current_password`` is required and missing or incorrect. .. code-block:: bash :caption: Example export MLFLOW_TRACKING_USERNAME=admin export MLFLOW_TRACKING_PASSWORD=password .. code-block:: python from mlflow.server.auth.client import AuthServiceClient client = AuthServiceClient("tracking_uri") client.create_user("newuser", "newpassword") # Admin path — no current_password needed. client.update_user_password("newuser", "anotherpassword") # Self-service path — current_password required. client.update_user_password( "newuser", "thirdpassword", current_password="anotherpassword" ) """ body = {"username": username, "password": password} if current_password is not None: body["current_password"] = current_password self._request( UPDATE_USER_PASSWORD, "PATCH", json=body, )
[docs] def update_user_admin(self, username: str, is_admin: bool): """ Update the admin status of a specific user. Args: username: The username. is_admin: The new admin status. Raises: mlflow.exceptions.RestException: if the user does not exist .. code-block:: bash :caption: Example export MLFLOW_TRACKING_USERNAME=admin export MLFLOW_TRACKING_PASSWORD=password .. code-block:: python from mlflow.server.auth.client import AuthServiceClient client = AuthServiceClient("tracking_uri") client.create_user("newuser", "newpassword") client.update_user_admin("newuser", True) """ self._request( UPDATE_USER_ADMIN, "PATCH", json={"username": username, "is_admin": is_admin}, )
[docs] def delete_user(self, username: str): """ Delete a specific user. Args: username: The username. Raises: mlflow.exceptions.RestException: if the user does not exist .. code-block:: bash :caption: Example export MLFLOW_TRACKING_USERNAME=admin export MLFLOW_TRACKING_PASSWORD=password .. code-block:: python from mlflow.server.auth.client import AuthServiceClient client = AuthServiceClient("tracking_uri") client.create_user("newuser", "newpassword") client.delete_user("newuser") """ self._request( DELETE_USER, "DELETE", json={"username": username}, )
# ---- Role management (RBAC) ----
[docs] def create_role( self, workspace: str, name: str, description: str | None = None, ) -> Role: payload = {"workspace": workspace, "name": name} if description is not None: payload["description"] = description resp = self._request(CREATE_ROLE, "POST", json=payload) return Role.from_json(resp["role"])
[docs] def get_role(self, role_id: int) -> Role: resp = self._request(GET_ROLE, "GET", params={"role_id": str(role_id)}) return Role.from_json(resp["role"])
[docs] def list_roles(self, workspace: str) -> list[Role]: resp = self._request(LIST_ROLES, "GET", params={"workspace": workspace}) return [Role.from_json(r) for r in resp["roles"]]
[docs] def update_role( self, role_id: int, name: str | None = None, description: str | None = None ) -> Role: payload: dict[str, object] = {"role_id": role_id} if name is not None: payload["name"] = name if description is not None: payload["description"] = description resp = self._request(UPDATE_ROLE, "PATCH", json=payload) return Role.from_json(resp["role"])
[docs] def delete_role(self, role_id: int) -> None: self._request(DELETE_ROLE, "DELETE", json={"role_id": role_id})
[docs] def add_role_permission( self, role_id: int, resource_type: str, resource_pattern: str, permission: str ) -> RolePermission: resp = self._request( ADD_ROLE_PERMISSION, "POST", json={ "role_id": role_id, "resource_type": resource_type, "resource_pattern": resource_pattern, "permission": permission, }, ) return RolePermission.from_json(resp["role_permission"])
[docs] def remove_role_permission(self, role_permission_id: int) -> None: self._request( REMOVE_ROLE_PERMISSION, "DELETE", json={"role_permission_id": role_permission_id} )
[docs] def list_role_permissions(self, role_id: int) -> list[RolePermission]: resp = self._request(LIST_ROLE_PERMISSIONS, "GET", params={"role_id": str(role_id)}) return [RolePermission.from_json(p) for p in resp["role_permissions"]]
[docs] def update_role_permission(self, role_permission_id: int, permission: str) -> RolePermission: resp = self._request( UPDATE_ROLE_PERMISSION, "PATCH", json={"role_permission_id": role_permission_id, "permission": permission}, ) return RolePermission.from_json(resp["role_permission"])
[docs] def assign_role(self, username: str, role_id: int) -> UserRoleAssignment: resp = self._request(ASSIGN_ROLE, "POST", json={"username": username, "role_id": role_id}) return UserRoleAssignment.from_json(resp["assignment"])
[docs] def unassign_role(self, username: str, role_id: int) -> None: self._request(UNASSIGN_ROLE, "DELETE", json={"username": username, "role_id": role_id})
[docs] def list_user_roles(self, username: str) -> list[Role]: resp = self._request(LIST_USER_ROLES, "GET", params={"username": username}) return [Role.from_json(r) for r in resp["roles"]]
[docs] def list_role_users(self, role_id: int) -> list[UserRoleAssignment]: resp = self._request(LIST_ROLE_USERS, "GET", params={"role_id": str(role_id)}) return [UserRoleAssignment.from_json(a) for a in resp["assignments"]]
[docs] def list_all_roles(self) -> list[Role]: # Same endpoint as list_roles; omitting the ``workspace`` param returns the # cross-workspace listing (admin-only, enforced server-side). resp = self._request(LIST_ROLES, "GET") return [Role.from_json(r) for r in resp["roles"]]
# ---- Unified per-user permission convenience APIs ---- # Grant / revoke / check one resource permission for a user. Preserve the # legacy per-resource MANAGE delegation (per-resource MANAGE gates writes) # via a uniform ``(resource_type, resource_id)`` shape.
[docs] def grant_user_permission( self, username: str, resource_type: str, resource_id: str, permission: str, ) -> None: self._request( GRANT_USER_PERMISSION, "POST", json={ "username": username, "resource_type": resource_type, "resource_id": resource_id, "permission": permission, }, )
[docs] def revoke_user_permission(self, username: str, resource_type: str, resource_id: str) -> None: self._request( REVOKE_USER_PERMISSION, "POST", json={ "username": username, "resource_type": resource_type, "resource_id": resource_id, }, )
[docs] def get_user_permission( self, username: str, resource_type: str, resource_id: str, ) -> GetUserPermissionResult: resp = self._request( GET_USER_PERMISSION, "GET", params={ "username": username, "resource_type": resource_type, "resource_id": resource_id, }, ) return GetUserPermissionResult.from_json(resp)