import logging
import os
import signal
from abc import abstractmethod
from mlflow.entities import RunStatus
from mlflow.utils.annotations import developer_stable
_logger = logging.getLogger(__name__)
[docs]@developer_stable
class SubmittedRun:
    """
    Wrapper around an MLflow project run (e.g. a subprocess running an entry point
    command or a Databricks job run) and exposing methods for waiting on and cancelling the run.
    This class defines the interface that the MLflow project runner uses to manage the lifecycle
    of runs launched in different environments (e.g. runs launched locally or on Databricks).
    ``SubmittedRun`` is not thread-safe. That is, concurrent calls to wait() / cancel()
    from multiple threads may inadvertently kill resources (e.g. local processes) unrelated to the
    run.
    NOTE:
        Subclasses of ``SubmittedRun`` must expose a ``run_id`` member containing the
        run's MLflow run ID.
    """
[docs]    @abstractmethod
    def wait(self):
        """
        Wait for the run to finish, returning True if the run succeeded and false otherwise. Note
        that in some cases (e.g. remote execution on Databricks), we may wait until the remote job
        completes rather than until the MLflow run completes.
        """ 
[docs]    @abstractmethod
    def get_status(self):
        """
        Get status of the run.
        """ 
[docs]    @abstractmethod
    def cancel(self):
        """
        Cancel the run (interrupts the command subprocess, cancels the Databricks run, etc) and
        waits for it to terminate. The MLflow run status may not be set correctly
        upon run cancellation.
        """ 
    @property
    @abstractmethod
    def run_id(self):
        pass 
class LocalSubmittedRun(SubmittedRun):
    """
    Instance of ``SubmittedRun`` corresponding to a subprocess launched to run an entry point
    command locally.
    """
    def __init__(self, run_id, command_proc):
        super().__init__()
        self._run_id = run_id
        self.command_proc = command_proc
    @property
    def run_id(self):
        return self._run_id
    def wait(self):
        return self.command_proc.wait() == 0
    def cancel(self):
        # Interrupt child process if it hasn't already exited
        if self.command_proc.poll() is None:
            # Kill the the process tree rooted at the child if it's the leader of its own process
            # group, otherwise just kill the child
            try:
                if self.command_proc.pid == os.getpgid(self.command_proc.pid):
                    os.killpg(self.command_proc.pid, signal.SIGTERM)
                else:
                    self.command_proc.terminate()
            except OSError:
                # The child process may have exited before we attempted to terminate it, so we
                # ignore OSErrors raised during child process termination
                _logger.info(
                    "Failed to terminate child process (PID %s) corresponding to MLflow "
                    "run with ID %s. The process may have already exited.",
                    self.command_proc.pid,
                    self._run_id,
                )
            self.command_proc.wait()
    def _get_status(self):
        exit_code = self.command_proc.poll()
        if exit_code is None:
            return RunStatus.RUNNING
        if exit_code == 0:
            return RunStatus.FINISHED
        return RunStatus.FAILED
    def get_status(self):
        return RunStatus.to_string(self._get_status())