mlflow.spark

MLflow integration for Spark MLlib models.

Spark MLlib models are saved and loaded using native Spark MLlib persistence. The models can be exported as pyfunc for out-of Spark deployment or it can be loaded back as Spark Transformer in order to score it in Spark. The pyfunc flavor instantiates SparkContext internally and reads the input data as Spark DataFrame prior to scoring.

mlflow.spark.load_model(path, run_id=None, dfs_tmpdir=None)

Load the Spark MLlib model from the path.

Parameters:
  • run_id – Run ID. If provided, combined with path to identify the model.
  • path – Local filesystem path or run-relative artifact path to the model.
Returns:

SparkML model.

Return type:

pyspark.ml.pipeline.PipelineModel

>>> from mlflow import spark
>>> model = mlflow.spark.load_model("spark-model")
>>> # Prepare test documents, which are unlabeled (id, text) tuples.
>>> test = spark.createDataFrame([
...   (4, "spark i j k"),
...   (5, "l m n"),
...   (6, "spark hadoop spark"),
...   (7, "apache hadoop")], ["id", "text"])
>>>  # Make predictions on test documents.
>>> prediction = model.transform(test)
mlflow.spark.load_pyfunc(path)

Load a Python Function model from a local file.

Parameters:path – Local path.
Returns:The model as PyFunc.
>>> pyfunc_model = load_pyfunc("/tmp/pyfunc-spark-model")
>>> predictions = pyfunc_model.predict(test_pandas_df)
mlflow.spark.log_model(spark_model, artifact_path, conda_env=None, jars=None, dfs_tmpdir=None)

Log a Spark MLlib model as an MLflow artifact for the current run.

Parameters:
  • spark_model – PipelineModel to be saved.
  • artifact_path – Run relative artifact path.
  • conda_env – Path to a Conda environment file. If provided, defines environment for the model. At minimum, it should specify python, pyspark, and mlflow with appropriate versions.
  • jars – List of JARs needed by the model.
  • dfs_tmpdir – Temporary directory path on Distributed (Hadoop) File System (DFS) or local filesystem if running in local mode. The model will be writen in this destination and then copied into the model’s artifact directory. This is necessary as Spark ML models read / write from / to DFS if running on a cluster. All temporary files created on the DFS will be removed if this operation completes successfully. Defaults to /tmp/mlflow.
>>> from pyspark.ml import Pipeline
>>> from pyspark.ml.classification import LogisticRegression
>>> from pyspark.ml.feature import HashingTF, Tokenizer
>>> training = spark.createDataFrame([
...   (0, "a b c d e spark", 1.0),
...   (1, "b d", 0.0),
...   (2, "spark f g h", 1.0),
...   (3, "hadoop mapreduce", 0.0) ], ["id", "text", "label"])
>>> tokenizer = Tokenizer(inputCol="text", outputCol="words")
>>> hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
>>> lr = LogisticRegression(maxIter=10, regParam=0.001)
>>> pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
>>> model = pipeline.fit(training)
>>> mlflow.spark.log_model(model, "spark-model")
mlflow.spark.save_model(spark_model, path, mlflow_model=<mlflow.models.Model object>, conda_env=None, jars=None, dfs_tmpdir=None)

Save Spark MLlib PipelineModel at given local path.

Uses Spark MLlib persistence mechanism.

Parameters:
  • spark_model – Spark PipelineModel to be saved. Can save only PipelineModels.
  • path – Local path where the model is to be saved.
  • mlflow_model – MLflow model config this flavor is being added to.
  • conda_env – Conda environment this model depends on.
  • jars – List of JARs needed by the model.
  • dfs_tmpdir – Temporary directory path on Distributed (Hadoop) File System (DFS) or local filesystem if running in local mode. The model will be writen in this destination and then copied to the requested local path. This is necessary as Spark ML models read / write from / to DFS if running on a cluster. All temporary files created on the DFS will be removed if this operation completes successfully. Defaults to /tmp/mlflow.
>>> from mlflow import spark
>>> from pyspark.ml.pipeline.PipelineModel
>>>
>>> #your pyspark.ml.pipeline.PipelineModel type
>>> model = ...
>>> mlflow.spark.save_model(model, "spark-model")