mlflow.spark
The mlflow.spark
module provides an API for logging and loading Spark MLlib models. This module
exports Spark MLlib models with the following flavors:
- Spark MLlib (native) format
- Allows models to be loaded as Spark Transformers for scoring in a Spark session. Models with this flavor can be loaded as PySpark PipelineModel objects in Python. This is the main flavor and is always produced.
mlflow.pyfunc
- Supports deployment outside of Spark by instantiating a SparkContext and reading input data as a Spark DataFrame prior to scoring. Also supports deployment in Spark as a Spark UDF. Models with this flavor can be loaded as Python functions for performing inference. This flavor is always produced.
mlflow.mleap
- Enables high-performance deployment outside of Spark by leveraging MLeap’s
custom dataframe and pipeline representations. Models with this flavor cannot be loaded
back as Python objects. Rather, they must be deserialized in Java using the
mlflow/java
package. This flavor is produced only if you specify MLeap-compatible arguments.
-
mlflow.spark.
get_default_conda_env
() Returns: The default Conda environment for MLflow Models produced by calls to save_model()
andlog_model()
.
-
mlflow.spark.
load_model
(model_uri, dfs_tmpdir=None) Load the Spark MLlib model from the path.
Parameters: - model_uri –
The location, in URI format, of the MLflow model, for example:
/Users/me/path/to/local/model
relative/path/to/local/model
s3://my_bucket/path/to/model
runs:/<mlflow_run_id>/run-relative/path/to/model
For more information about supported URI schemes, see Referencing Artifacts.
- dfs_tmpdir – Temporary directory path on Distributed (Hadoop) File System (DFS) or local
filesystem if running in local mode. The model is loaded from this
destination. Defaults to
/tmp/mlflow
.
Returns: pyspark.ml.pipeline.PipelineModel
>>> from mlflow import spark >>> model = mlflow.spark.load_model("spark-model") >>> # Prepare test documents, which are unlabeled (id, text) tuples. >>> test = spark.createDataFrame([ ... (4, "spark i j k"), ... (5, "l m n"), ... (6, "spark hadoop spark"), ... (7, "apache hadoop")], ["id", "text"]) >>> # Make predictions on test documents. >>> prediction = model.transform(test)
- model_uri –
-
mlflow.spark.
log_model
(spark_model, artifact_path, conda_env=None, dfs_tmpdir=None, sample_input=None) Log a Spark MLlib model as an MLflow artifact for the current run. This uses the MLlib persistence format and produces an MLflow Model with the Spark flavor.
Parameters: - spark_model – PipelineModel to be saved.
- artifact_path – Run relative artifact path.
- conda_env –
Either a dictionary representation of a Conda environment or the path to a Conda environment yaml file. If provided, this decribes the environment this model should be run in. At minimum, it should specify the dependencies contained in
get_default_conda_env()
. If None, the defaultget_default_conda_env()
environment is added to the model. The following is an example dictionary representation of a Conda environment:{ 'name': 'mlflow-env', 'channels': ['defaults'], 'dependencies': [ 'python=3.7.0', 'pyspark=2.3.0' ] }
- dfs_tmpdir – Temporary directory path on Distributed (Hadoop) File System (DFS) or local
filesystem if running in local mode. The model is written in this
destination and then copied into the model’s artifact directory. This is
necessary as Spark ML models read from and write to DFS if running on a
cluster. If this operation completes successfully, all temporary files
created on the DFS are removed. Defaults to
/tmp/mlflow
. - sample_input – A sample input used to add the MLeap flavor to the model.
This must be a PySpark DataFrame that the model can evaluate. If
sample_input
isNone
, the MLeap flavor is not added.
>>> from pyspark.ml import Pipeline >>> from pyspark.ml.classification import LogisticRegression >>> from pyspark.ml.feature import HashingTF, Tokenizer >>> training = spark.createDataFrame([ ... (0, "a b c d e spark", 1.0), ... (1, "b d", 0.0), ... (2, "spark f g h", 1.0), ... (3, "hadoop mapreduce", 0.0) ], ["id", "text", "label"]) >>> tokenizer = Tokenizer(inputCol="text", outputCol="words") >>> hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features") >>> lr = LogisticRegression(maxIter=10, regParam=0.001) >>> pipeline = Pipeline(stages=[tokenizer, hashingTF, lr]) >>> model = pipeline.fit(training) >>> mlflow.spark.log_model(model, "spark-model")
-
mlflow.spark.
save_model
(spark_model, path, mlflow_model=<mlflow.models.Model object>, conda_env=None, dfs_tmpdir=None, sample_input=None) Save a Spark MLlib PipelineModel to a local path.
By default, this function saves models using the Spark MLlib persistence mechanism. Additionally, if a sample input is specified using the
sample_input
parameter, the model is also serialized in MLeap format and the MLeap flavor is added.Parameters: - spark_model – Spark PipelineModel to be saved. Can save only PipelineModels.
- path – Local path where the model is to be saved.
- mlflow_model – MLflow model config this flavor is being added to.
- conda_env –
Either a dictionary representation of a Conda environment or the path to a Conda environment yaml file. If provided, this decribes the environment this model should be run in. At minimum, it should specify the dependencies contained in
get_default_conda_env()
. If None, the defaultget_default_conda_env()
environment is added to the model. The following is an example dictionary representation of a Conda environment:{ 'name': 'mlflow-env', 'channels': ['defaults'], 'dependencies': [ 'python=3.7.0', 'pyspark=2.3.0' ] }
- dfs_tmpdir – Temporary directory path on Distributed (Hadoop) File System (DFS) or local
filesystem if running in local mode. The model is be written in this
destination and then copied to the requested local path. This is necessary
as Spark ML models read from and write to DFS if running on a cluster. All
temporary files created on the DFS are removed if this operation
completes successfully. Defaults to
/tmp/mlflow
. - sample_input – A sample input that is used to add the MLeap flavor to the model.
This must be a PySpark DataFrame that the model can evaluate. If
sample_input
isNone
, the MLeap flavor is not added.
>>> from mlflow import spark >>> from pyspark.ml.pipeline.PipelineModel >>> >>> #your pyspark.ml.pipeline.PipelineModel type >>> model = ... >>> mlflow.spark.save_model(model, "spark-model")