mlflow.spark

The mlflow.spark module provides an API for logging and loading Spark MLlib models. This module exports Spark MLlib models with the following flavors:

Spark MLlib (native) format
Allows models to be loaded as Spark Transformers for scoring in a Spark session. Models with this flavor can be loaded as PySpark PipelineModel objects in Python. This is the main flavor and is always produced.
mlflow.pyfunc
Supports deployment outside of Spark by instantiating a SparkContext and reading input data as a Spark DataFrame prior to scoring. Also supports deployment in Spark as a Spark UDF. Models with this flavor can be loaded as Python functions for performing inference. This flavor is always produced.
mlflow.mleap
Enables high-performance deployment outside of Spark by leveraging MLeap’s custom dataframe and pipeline representations. Models with this flavor cannot be loaded back as Python objects. Rather, they must be deserialized in Java using the mlflow/java package. This flavor is produced only if you specify MLeap-compatible arguments.
mlflow.spark.get_default_conda_env()
Returns:The default Conda environment for MLflow Models produced by calls to save_model() and log_model().
mlflow.spark.load_model(model_uri, dfs_tmpdir=None)

Load the Spark MLlib model from the path.

Parameters:
  • model_uri

    The location, in URI format, of the MLflow model, for example:

    • /Users/me/path/to/local/model
    • relative/path/to/local/model
    • s3://my_bucket/path/to/model
    • runs:/<mlflow_run_id>/run-relative/path/to/model

    For more information about supported URI schemes, see Referencing Artifacts.

  • dfs_tmpdir – Temporary directory path on Distributed (Hadoop) File System (DFS) or local filesystem if running in local mode. The model is loaded from this destination. Defaults to /tmp/mlflow.
Returns:

pyspark.ml.pipeline.PipelineModel

>>> from mlflow import spark
>>> model = mlflow.spark.load_model("spark-model")
>>> # Prepare test documents, which are unlabeled (id, text) tuples.
>>> test = spark.createDataFrame([
...   (4, "spark i j k"),
...   (5, "l m n"),
...   (6, "spark hadoop spark"),
...   (7, "apache hadoop")], ["id", "text"])
>>>  # Make predictions on test documents.
>>> prediction = model.transform(test)
mlflow.spark.log_model(spark_model, artifact_path, conda_env=None, dfs_tmpdir=None, sample_input=None, registered_model_name=None)

Log a Spark MLlib model as an MLflow artifact for the current run. This uses the MLlib persistence format and produces an MLflow Model with the Spark flavor.

Parameters:
  • spark_model – Spark model to be saved - MLFlow can only save descendants of pyspark.ml.Model which implement MLReadable and MLWritable.
  • artifact_path – Run relative artifact path.
  • conda_env

    Either a dictionary representation of a Conda environment or the path to a Conda environment yaml file. If provided, this decribes the environment this model should be run in. At minimum, it should specify the dependencies contained in get_default_conda_env(). If None, the default get_default_conda_env() environment is added to the model. The following is an example dictionary representation of a Conda environment:

    {
        'name': 'mlflow-env',
        'channels': ['defaults'],
        'dependencies': [
            'python=3.7.0',
            'pyspark=2.3.0'
        ]
    }
    
  • dfs_tmpdir – Temporary directory path on Distributed (Hadoop) File System (DFS) or local filesystem if running in local mode. The model is written in this destination and then copied into the model’s artifact directory. This is necessary as Spark ML models read from and write to DFS if running on a cluster. If this operation completes successfully, all temporary files created on the DFS are removed. Defaults to /tmp/mlflow.
  • sample_input – A sample input used to add the MLeap flavor to the model. This must be a PySpark DataFrame that the model can evaluate. If sample_input is None, the MLeap flavor is not added.
  • registered_model_name – Note:: Experimental: This argument may change or be removed in a future release without warning. If given, create a model version under registered_model_name, also creating a registered model if one with the given name does not exist.
>>> from pyspark.ml import Pipeline
>>> from pyspark.ml.classification import LogisticRegression
>>> from pyspark.ml.feature import HashingTF, Tokenizer
>>> training = spark.createDataFrame([
...   (0, "a b c d e spark", 1.0),
...   (1, "b d", 0.0),
...   (2, "spark f g h", 1.0),
...   (3, "hadoop mapreduce", 0.0) ], ["id", "text", "label"])
>>> tokenizer = Tokenizer(inputCol="text", outputCol="words")
>>> hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
>>> lr = LogisticRegression(maxIter=10, regParam=0.001)
>>> pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
>>> model = pipeline.fit(training)
>>> mlflow.spark.log_model(model, "spark-model")
mlflow.spark.save_model(spark_model, path, mlflow_model=<mlflow.models.Model object>, conda_env=None, dfs_tmpdir=None, sample_input=None)

Save a Spark MLlib Model to a local path.

By default, this function saves models using the Spark MLlib persistence mechanism. Additionally, if a sample input is specified using the sample_input parameter, the model is also serialized in MLeap format and the MLeap flavor is added.

Parameters:
  • spark_model – Spark model to be saved - MLFlow can only save descendants of pyspark.ml.Model which implement MLReadable and MLWritable.
  • path – Local path where the model is to be saved.
  • mlflow_model – MLflow model config this flavor is being added to.
  • conda_env

    Either a dictionary representation of a Conda environment or the path to a Conda environment yaml file. If provided, this decribes the environment this model should be run in. At minimum, it should specify the dependencies contained in get_default_conda_env(). If None, the default get_default_conda_env() environment is added to the model. The following is an example dictionary representation of a Conda environment:

    {
        'name': 'mlflow-env',
        'channels': ['defaults'],
        'dependencies': [
            'python=3.7.0',
            'pyspark=2.3.0'
        ]
    }
    
  • dfs_tmpdir – Temporary directory path on Distributed (Hadoop) File System (DFS) or local filesystem if running in local mode. The model is be written in this destination and then copied to the requested local path. This is necessary as Spark ML models read from and write to DFS if running on a cluster. All temporary files created on the DFS are removed if this operation completes successfully. Defaults to /tmp/mlflow.
  • sample_input – A sample input that is used to add the MLeap flavor to the model. This must be a PySpark DataFrame that the model can evaluate. If sample_input is None, the MLeap flavor is not added.
>>> from mlflow import spark
>>> from pyspark.ml.pipeline.PipelineModel
>>>
>>> #your pyspark.ml.pipeline.PipelineModel type
>>> model = ...
>>> mlflow.spark.save_model(model, "spark-model")