# Using `MLlib` from `pyspark` to Fit Machine Learning Models

Start our `spark` session.

In [1]:
import pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/08 17:02:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Now read in a data set using `pandas` and convert it to a `spark` SQL style data frame.

In [2]:
bike_data = pd.read_csv("bikeDetails.csv")
bike_data.head()

Unnamed: 0,name,selling_price,year,seller_type,owner,km_driven,ex_showroom_price
0,Royal Enfield Classic 350,175000,2019,Individual,1st owner,350,
1,Honda Dio,45000,2017,Individual,1st owner,5650,
2,Royal Enfield Classic Gunmetal Grey,150000,2018,Individual,1st owner,12000,148114.0
3,Yamaha Fazer FI V 2.0 [2016-2018],65000,2015,Individual,1st owner,23000,89643.0
4,Yamaha SZ [2013-2014],20000,2011,Individual,2nd owner,21000,


Convert to a spark SQL data frame.

In [3]:
bike = spark.createDataFrame(bike_data)
bike.show(5)

                                                                                

+--------------------+-------------+----+-----------+---------+---------+-----------------+
|                name|selling_price|year|seller_type|    owner|km_driven|ex_showroom_price|
+--------------------+-------------+----+-----------+---------+---------+-----------------+
|Royal Enfield Cla...|       175000|2019| Individual|1st owner|      350|              NaN|
|           Honda Dio|        45000|2017| Individual|1st owner|     5650|              NaN|
|Royal Enfield Cla...|       150000|2018| Individual|1st owner|    12000|         148114.0|
|Yamaha Fazer FI V...|        65000|2015| Individual|1st owner|    23000|          89643.0|
|Yamaha SZ [2013-2...|        20000|2011| Individual|2nd owner|    21000|              NaN|
+--------------------+-------------+----+-----------+---------+---------+-----------------+
only showing top 5 rows



We'll fit a linear regression model using log selling price as our response and log km driven, year, and a 1st owner indicator variable as our predictors.  This means we need to create some new columns and drop a bunch of others. 
- We also **need to rename the response** as 'label'.  
- These first steps can easily be done using the `SQLTransformer()`, `StringIndexer()`, and `Binarizer()` [functions from `pyspark.ml.feature`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.ml.html)
- Using the `MLlib` functions will allow us to place these transformations into a `pipeline` (which we'll do shortly).  

In [4]:
from pyspark.ml.feature import SQLTransformer, StringIndexer, Binarizer, VectorAssembler

First let's try to create our dummy variable. This isn't as easy as you'd like (nothing in here really is!). We can first take the string to a numeric index. Then we can binary-ize that!

We'll use `StringIndexer()` first. An [example is given here](https://spark.apache.org/docs/latest/ml-features.html#stringindexer). This is an estimator that we can use the `.fit()` method on. Then we can use the `.transform()` method on that.

In [5]:
indexer = StringIndexer(inputCols = ["owner"], outputCols = ["owner_numeric"])
indexerTrans = indexer.fit(bike) #now indexerTrans will have a .transform() method
indexerTrans.transform(bike).show(30)

                                                                                

+--------------------+-------------+----+-----------+---------+---------+-----------------+-------------+
|                name|selling_price|year|seller_type|    owner|km_driven|ex_showroom_price|owner_numeric|
+--------------------+-------------+----+-----------+---------+---------+-----------------+-------------+
|Royal Enfield Cla...|       175000|2019| Individual|1st owner|      350|              NaN|          0.0|
|           Honda Dio|        45000|2017| Individual|1st owner|     5650|              NaN|          0.0|
|Royal Enfield Cla...|       150000|2018| Individual|1st owner|    12000|         148114.0|          0.0|
|Yamaha Fazer FI V...|        65000|2015| Individual|1st owner|    23000|          89643.0|          0.0|
|Yamaha SZ [2013-2...|        20000|2011| Individual|2nd owner|    21000|              NaN|          1.0|
|    Honda CB Twister|        18000|2010| Individual|1st owner|    60000|          53857.0|          0.0|
|Honda CB Hornet 160R|        78500|2018| Indi

Alright, now let's conver that to a 0/1 indicator with `Binarizer()`.

In [6]:
binaryTrans = Binarizer(threshold = 0.5, inputCol = "owner_numeric", outputCol = "owner_indicator")
binaryTrans.transform(
    indexerTrans.transform(bike)
).show()

+--------------------+-------------+----+-----------+---------+---------+-----------------+-------------+---------------+
|                name|selling_price|year|seller_type|    owner|km_driven|ex_showroom_price|owner_numeric|owner_indicator|
+--------------------+-------------+----+-----------+---------+---------+-----------------+-------------+---------------+
|Royal Enfield Cla...|       175000|2019| Individual|1st owner|      350|              NaN|          0.0|            0.0|
|           Honda Dio|        45000|2017| Individual|1st owner|     5650|              NaN|          0.0|            0.0|
|Royal Enfield Cla...|       150000|2018| Individual|1st owner|    12000|         148114.0|          0.0|            0.0|
|Yamaha Fazer FI V...|        65000|2015| Individual|1st owner|    23000|          89643.0|          0.0|            0.0|
|Yamaha SZ [2013-2...|        20000|2011| Individual|2nd owner|    21000|              NaN|          1.0|            1.0|
|    Honda CB Twister|  

Great! Now the easy part. We just want to create some log variables and select only certain columns. The `SQLTransformer()` allows for (only) basic SQL commands. Note we want our response to be called `label` so we'll rename it here.

In [7]:
sqlTrans = SQLTransformer(
    statement = """
                SELECT owner_indicator, year, log(km_driven) as log_km_driven, log(selling_price) as label FROM __THIS__
                """
)

In [8]:
sqlTrans.transform(
    binaryTrans.transform(
        indexerTrans.transform(bike)
    )
).show(30)

+---------------+----+------------------+------------------+
|owner_indicator|year|     log_km_driven|             label|
+---------------+----+------------------+------------------+
|            0.0|2019| 5.857933154483459|12.072541252905651|
|            0.0|2017| 8.639410824140487|10.714417768752456|
|            0.0|2018| 9.392661928770137|11.918390573078392|
|            0.0|2015|10.043249494911286|11.082142548877775|
|            1.0|2011|  9.95227771670556| 9.903487552536127|
|            0.0|2010|11.002099841204238| 9.798127036878302|
|            0.0|2018| 9.740968623038354|  11.2708539037705|
|            1.0|2008|10.571316925111784|12.100712129872347|
|            0.0|2010|10.373491181781864|10.308952660644293|
|            0.0|2016|10.645424897265505|10.819778284410283|
|            0.0|2015|10.373491181781864| 10.46310334047155|
|            1.0|2016| 9.210340371976184|10.239959789157341|
|            0.0|2018|   9.9607181859904|11.289781913656018|
|            0.0|2019| 7

We also **need to put the predictors into a single column** called 'features'.  
Placing multiple columns into one can be done via `VectorAssembler()` from `pyspark.ml.feature`.

In [9]:
assembler = VectorAssembler(inputCols = ["year", "log_km_driven", "owner_indicator"], outputCol = "features", handleInvalid = 'keep')

Notice that we are passing what would be the result columns from the previous SQL transform we did.  The `VectorAssembler()` also has a `.transform()` method.  Let's see how these would be used together to produce a new data set.

In [10]:
assembler.transform(
    sqlTrans.transform(
        binaryTrans.transform(
            indexerTrans.transform(bike)
        )
    )
).show(10)

+---------------+----+------------------+------------------+--------------------+
|owner_indicator|year|     log_km_driven|             label|            features|
+---------------+----+------------------+------------------+--------------------+
|            0.0|2019| 5.857933154483459|12.072541252905651|[2019.0,5.8579331...|
|            0.0|2017| 8.639410824140487|10.714417768752456|[2017.0,8.6394108...|
|            0.0|2018| 9.392661928770137|11.918390573078392|[2018.0,9.3926619...|
|            0.0|2015|10.043249494911286|11.082142548877775|[2015.0,10.043249...|
|            1.0|2011|  9.95227771670556| 9.903487552536127|[2011.0,9.9522777...|
|            0.0|2010|11.002099841204238| 9.798127036878302|[2010.0,11.002099...|
|            0.0|2018| 9.740968623038354|  11.2708539037705|[2018.0,9.7409686...|
|            1.0|2008|10.571316925111784|12.100712129872347|[2008.0,10.571316...|
|            0.0|2010|10.373491181781864|10.308952660644293|[2010.0,10.373491...|
|            0.0

Awesome!  Our data is now transformed to have the new variables we want and the data is in the format needed to fit model using `MLlib`.  Specifically:
- A column named `label` that is the response variable
- A column named `features` that is a column containing all the predictor values together

Next step, we'll fit a basic multiple linear regression model using the `LinearRegression()` function from `pyspark.ml.regression`.  This function does regularized regression (Elastic net, which includes LASSO and Ridge Regression as special cases) so there are a few set up parameters we can modify to just get the usual MLR fit.

In [11]:
from pyspark.ml.regression import LinearRegression

In [12]:
lr = LinearRegression(regParam = 0, elasticNetParam = 0)

Now we can use the `.fit()` method on the transformed data we made above.

In [13]:
lr_data = assembler.transform(
                sqlTrans.transform(
                    binaryTrans.transform(
                        indexerTrans.transform(bike)
                    )
                )
)
lr_data

DataFrame[owner_indicator: double, year: bigint, log_km_driven: double, label: double, features: vector]

In [14]:
lrModel = lr.fit(lr_data)

24/03/08 17:03:10 WARN Instrumentation: [1d9ded2d] regParam is zero, which might cause numerical instability and overfitting.
                                                                                

We can inspect the model fit using attributes of our `lrModel` object.

In [15]:
print("Intercept: %s" % str(lrModel.intercept), "Coefficients: %s" % str(lrModel.coefficients))

Intercept: -151.65184885852284 Coefficients: [0.08175654813171404,-0.22825871252866586,0.10021253855848164]


Let's inspect the training set RMSE and other model fit metrics.

In [16]:
trainingSummary = lrModel.summary
trainingSummary.residuals.show(5)
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

+--------------------+
|           residuals|
+--------------------+
|-0.00495628658079...|
| -0.5646701626673813|
|  0.7294822208803797|
| 0.28700612130943703|
| -0.6856003220335012|
+--------------------+
only showing top 5 rows

RMSE: 0.509892
r2: 0.485191


Of course we will often want to do predictions as well.  This is easy to do!  The model itself actually becomes a transformer once it is fit!  We just use the `.transform()` method and pass it data similar in format to that on which the model was trained.  Here we'll just look at the prediction from the data used to fit the model.

In [17]:
preds = lrModel.transform(lr_data)

In [18]:
preds.show(5)

+---------------+----+------------------+------------------+--------------------+------------------+
|owner_indicator|year|     log_km_driven|             label|            features|        prediction|
+---------------+----+------------------+------------------+--------------------+------------------+
|            0.0|2019| 5.857933154483459|12.072541252905651|[2019.0,5.8579331...|12.077497539486444|
|            0.0|2017| 8.639410824140487|10.714417768752456|[2017.0,8.6394108...|11.279087931419838|
|            0.0|2018| 9.392661928770137|11.918390573078392|[2018.0,9.3926619...|11.188908352198013|
|            0.0|2015|10.043249494911286|11.082142548877775|[2015.0,10.043249...|10.795136427568337|
|            1.0|2011|  9.95227771670556| 9.903487552536127|[2011.0,9.9522777...|10.589087874569628|
+---------------+----+------------------+------------------+--------------------+------------------+
only showing top 5 rows



In [19]:
preds.select("label", "prediction").show(5)

+------------------+------------------+
|             label|        prediction|
+------------------+------------------+
|12.072541252905651|12.077497539486444|
|10.714417768752456|11.279087931419838|
|11.918390573078392|11.188908352198013|
|11.082142548877775|10.795136427568337|
| 9.903487552536127|10.589087874569628|
+------------------+------------------+
only showing top 5 rows



Just a sanity check to show we get the same RMSE if we do it with these values:

In [20]:
my_preds = preds.select("label", "prediction").toPandas()
import numpy as np
np.sqrt(np.mean((my_preds["label"]-my_preds["prediction"])**2))

0.5098915575316225

One more sanity check. Compare with `sklearn`.

Note: I had an error with the `scipy` module not being recognized. I opened a new terminal window (file --> New Launcher, choose Terminal at the bottom) and had to submit the code:
`pip install scipy --force`

In [23]:
from sklearn import linear_model
from sklearn.metrics import mean_squared_error
bike_data = pd.read_csv("https://www4.stat.ncsu.edu/~online/datasets/bikeDetails.csv")
bike_data['log_selling_price'] = np.log(bike_data['selling_price'])
bike_data['log_km_driven'] = np.log(bike_data['km_driven'])
bike_data['one_owner'] = pd.get_dummies(data = bike_data['owner'])['1st owner']
mlr_fit = linear_model.LinearRegression() #Create a reg object
mlr_fit.fit(bike_data[['year','log_km_driven','one_owner']], bike_data['log_selling_price'].values)
print(mlr_fit.intercept_, mlr_fit.coef_)
print(mean_squared_error(bike_data['log_selling_price'], mlr_fit.predict(bike_data[["year", "log_km_driven", "one_owner"]]), squared = False))

-151.55163631488233 [ 0.08175655 -0.22825871 -0.10021254]
0.5098915575316225




Cool!  We've fit an MLR model using the `pyspark` `MLlib` library!

# Using Cross-Validation to Select Our Model

Of course we know that what we care about is the quality of the prediction our model makes on **data it wasn't trained on**.  Generally, this means we need to either 
- split our data into a training and test (sometimes called validation) set
- use cross-validation

And of course, we've talked about the need for both when selecting from many types of models.

Let's focus on doing k-fold CV using `pyspark`.  We need to set up our grid of tuning parameters (if applicable) and then run our CV algorithm.  This can be done using the `ParamGridBuilder()` and `CrossValidator()` functions from `pyspark.ml.tuning`, respectively.

In [24]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

The only tuning parameter we'll worry about is the `regParam`. which is $\lambda$ in the penalty part of the loss function:
$$\alpha(\lambda||w||_1)+(1-\alpha)(\lambda/2||w||_2^2)$$
If we set the `elasticNetParam` to 1 ($\alpha$ above) then we are doing LASSO regression.

We can see some [details for each algorithm via the help files](https://spark.apache.org/docs/latest/api/python/reference/pyspark.ml.html#regression).  We can look at how well the model does when we use different amounts of LASSO regularization.  We use `ParamGridBuilder()` and `.addGrid()` to specify the tuning parameter values.  Then finally we use the `.build()` method to instruct it to build the grid.

In [25]:
lr = LinearRegression()
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0, 0.05, 0.1, 0.15]) \
    .addGrid(lr.elasticNetParam, [1]) \
    .build()

Next up, we can use the `CrossValidator()` function to run k-fold CV over the grid of tuning parameters we just set up.  

We need to tell `pyspark` what loss function to use when evaluating.  This is done via the `RegressionEvaluator()` function from `pyspark.ml.evaluation`.  To override the default metric used we can specify it explicitly using the `metricName=` argument when calling the function.  `rmse` is the default here.

In [26]:
from pyspark.ml.evaluation import RegressionEvaluator

In [27]:
crossval = CrossValidator(estimator = lr,
                          estimatorParamMaps = paramGrid,
                          evaluator = RegressionEvaluator(metricName='rmse'),
                          numFolds=5)

We've now set up the `crossval` object.  Just like with `sklearn` we now use the `.fit()` method to actually fit the models.

In [28]:
cvModel = crossval.fit(lr_data)

24/03/08 17:06:44 WARN Instrumentation: [e4f7ec65] regParam is zero, which might cause numerical instability and overfitting.
24/03/08 17:06:49 WARN Instrumentation: [96732fed] regParam is zero, which might cause numerical instability and overfitting.
24/03/08 17:06:52 WARN Instrumentation: [ccc91038] regParam is zero, which might cause numerical instability and overfitting.
24/03/08 17:06:54 WARN Instrumentation: [04867e84] regParam is zero, which might cause numerical instability and overfitting.
24/03/08 17:06:56 WARN Instrumentation: [e89224fc] regParam is zero, which might cause numerical instability and overfitting.
24/03/08 17:06:58 WARN Instrumentation: [ca268c57] regParam is zero, which might cause numerical instability and overfitting.


By default, the only model returned is the **best** model as measured by our Loss function. 

To determine which model was returned we can look at the `.avgMetrics` attribute along with the `paramGrid` object we used to fit the models.

In [29]:
list(zip([0, 0.05, 0.1, 0.15], cvModel.avgMetrics))

[(0, 0.5112406194486974),
 (0.05, 0.5136269855905575),
 (0.1, 0.5235001947739562),
 (0.15, 0.5395464815213404)]

We can see the best model's parameter estimates as well.

In [30]:
print(cvModel.bestModel._java_obj.intercept(), cvModel.bestModel._java_obj.coefficients())

-151.6518488577565 [0.08175654813133452,-0.22825871252886246,0.10021253855757116]


As with the single linear model fit we did above, this new object, `cvModel`, is now a transformation as well.  This allows us to get prediction using the best fit model.  If we had a test set we could use it to get the test set prediction easily.  As we didn't split into a test set, let's just see how to use it to predict on the training data (i.e. return the fitted values for the model).

In [31]:
predsCV = cvModel.transform(lr_data)

In [32]:
predsCV.select("label", "prediction").show(5)

+------------------+------------------+
|             label|        prediction|
+------------------+------------------+
|12.072541252905651|12.077497539485364|
|10.714417768752456|11.279087931418985|
|11.918390573078392|11.188908352196648|
|11.082142548877775|10.795136427567968|
| 9.903487552536127|10.589087874569884|
+------------------+------------------+
only showing top 5 rows



As our model chosen was not regularized, we get the same predictions as previous!

In [33]:
preds.select("label", "prediction").show(5)

+------------------+------------------+
|             label|        prediction|
+------------------+------------------+
|12.072541252905651|12.077497539486444|
|10.714417768752456|11.279087931419838|
|11.918390573078392|11.188908352198013|
|11.082142548877775|10.795136427568337|
| 9.903487552536127|10.589087874569628|
+------------------+------------------+
only showing top 5 rows



We can find the training RMSE by passing these predictions to the `RegressionEvaluator().evaluate()` method.

In [34]:
RegressionEvaluator().evaluate(cvModel.transform(lr_data))

0.509891557531623

# Pipelines and the Training/Test Split

Of course we often want to have a training and test set so we can do all of our model fitting and tuning on the training set and then see how different model types compare on the test set.  We should always split our data into training and test sets first, before doing transformations.  Reason being:
- If we do transformations on the training set, we want to use the exact same transformations on the test set
- For instance, if we center some predictors (subtract the mean) in our training set, we want to subtract the mean of the **training** set to standardize the test set values prior to doing test set predictions!
- By splitting the data first, we can make sure we aren't using any test data (and the knowledge that comes with it) when training our models

By setting up a **pipeline** in `MLlib` we can easily (ha, that's kind of a joke) create the sequence of transformations/model fits and apply those same transformations on our test set!

Let's start with the training/test split.  This can be done using the `.randomSplit()` method on a spark SQL style data frame.

In [35]:
train, test = bike.randomSplit([0.8,0.2], seed = 1)
print(train.count(), test.count())

840 221


Now that we've split our data we can **set up** the transformations to do.  Just like with other spark stuff, things are set up as a DAG and done only at run time.  
We created the transformation plans previously so we'll just pull them down here for clarity.

In [36]:
#Sequence of transformations
#indexerTrans
#binaryTrans
#sqlTrans
#assembler
#(linear model or other model here!)

We are then ready to create our pipeline that includes these transformations and the model that we want to fit.  We use the `Pipeline()` function from the `pyspark.ml` module to set up our sequence of transformations/estimators.

In [37]:
from pyspark.ml import Pipeline

In [38]:
lr = LinearRegression()
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0, 0.01, 0.04, 0.06, 0.1]) \
    .addGrid(lr.elasticNetParam, [0, 0.5, 0.8, 0.9, 1]) \
    .build()
pipeline = Pipeline(stages = [indexerTrans, binaryTrans, sqlTrans, assembler, lr])

Our DAG is now set up and we can use this pipeline within our CV calculation (or basic model fitting).  What's nice is that since it contains all the information about the transformations done, we can easily apply this to a test set and not have to worry about how to do the transformations/prepping of the data on that set.  

Instead of using the model type as the `estimator` we'll pass the pipeline we've set up.

In [39]:
crossval = CrossValidator(estimator = pipeline,
                          estimatorParamMaps = paramGrid,
                          evaluator = RegressionEvaluator(),
                          numFolds=5)

With everything set up, we can now fit our models!

In [40]:
# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(train)

24/03/08 17:08:42 WARN Instrumentation: [45408c1a] regParam is zero, which might cause numerical instability and overfitting.
24/03/08 17:08:43 WARN Instrumentation: [bdf949e2] regParam is zero, which might cause numerical instability and overfitting.
24/03/08 17:08:43 WARN Instrumentation: [c9e03b2c] regParam is zero, which might cause numerical instability and overfitting.
24/03/08 17:08:44 WARN Instrumentation: [cc03885d] regParam is zero, which might cause numerical instability and overfitting.
24/03/08 17:08:45 WARN Instrumentation: [8f5c1468] regParam is zero, which might cause numerical instability and overfitting.
24/03/08 17:08:56 WARN Instrumentation: [d5eaed65] regParam is zero, which might cause numerical instability and overfitting.
24/03/08 17:08:57 WARN Instrumentation: [e1d0afb2] regParam is zero, which might cause numerical instability and overfitting.
24/03/08 17:08:58 WARN Instrumentation: [1e88fc13] regParam is zero, which might cause numerical instability and overf

Check which model is chosen as the best:

In [41]:
my_list = []
for i in range(len(paramGrid)):
    my_list.append([cvModel.avgMetrics[i], paramGrid[i].values()])
my_list

[[0.5124021026719168, dict_values([0.0, 0.0])],
 [0.5124021026718817, dict_values([0.0, 0.5])],
 [0.5124021026718577, dict_values([0.0, 0.8])],
 [0.5124021026718556, dict_values([0.0, 0.9])],
 [0.5124021026719084, dict_values([0.0, 1.0])],
 [0.5123205996805408, dict_values([0.01, 0.0])],
 [0.5123483858580572, dict_values([0.01, 0.5])],
 [0.5122975622688871, dict_values([0.01, 0.8])],
 [0.5122867285770079, dict_values([0.01, 0.9])],
 [0.5122589945611346, dict_values([0.01, 1.0])],
 [0.5123909761343102, dict_values([0.04, 0.0])],
 [0.5122102563576977, dict_values([0.04, 0.5])],
 [0.5120946331251414, dict_values([0.04, 0.8])],
 [0.5121238218400215, dict_values([0.04, 0.9])],
 [0.51224440770079, dict_values([0.04, 1.0])],
 [0.512663579012802, dict_values([0.06, 0.0])],
 [0.5126627597862455, dict_values([0.06, 0.5])],
 [0.5137013318120469, dict_values([0.06, 0.8])],
 [0.5142717497055167, dict_values([0.06, 0.9])],
 [0.5148905572778366, dict_values([0.06, 1.0])],
 [0.5136433512562688, dict_v

Use that best model to get test error on the test set.

In [42]:
cvModel.transform(test).show(5)

+---------------+----+------------------+------------------+--------------------+------------------+
|owner_indicator|year|     log_km_driven|             label|            features|        prediction|
+---------------+----+------------------+------------------+--------------------+------------------+
|            0.0|2019| 5.857933154483459|12.072541252905651|[2019.0,5.8579331...|11.953749958528732|
|            0.0|2020| 7.438383530044307|12.128111104060462|[2020.0,7.4383835...|11.705822488439821|
|            1.0|2013| 9.937695723865865|10.373491181781864|[2013.0,9.9376957...|10.679205674574945|
|            0.0|2018|10.571316925111784|11.695247021764184|[2018.0,10.571316...|10.919885599143953|
|            0.0|2017| 7.824046010856292|10.915088464214607|[2017.0,7.8240460...|11.405445806006924|
+---------------+----+------------------+------------------+--------------------+------------------+
only showing top 5 rows



In [43]:
test_error = RegressionEvaluator().evaluate(cvModel.transform(test))
print(test_error)

0.5114772577953631


Fantastic!  We can now set up a pipeline and use CV to fit our model.  Then we can take the best model and find test set predictions!  

Remember, once we have our final model we fit that model on the entire data set.  Then we use it for future predictions and what-not.

# `MLflow` Basics

Note: To run this code in our jupyterhub I had to do a few things first. I needed to force install a few modules. Open a new terminal window and do the following:

`pip install requests --force`  
`pip install pyyaml --force`  
`pip install entrypoints --force`


This section is modified from the [tutorial given here](https://github.com/mlflow/mlflow/blob/master/examples/sklearn_elasticnet_wine/train.ipynb). Although this uses `sklearn`, it can be modified to work with `MLlib`!

First, they create a function to train an elastic net model specifically on the wine data we've used before. This function takes in the $\alpha$ and $L_1$ ratio values from the `sklearn` `ElasticNet()` function. This function can then be passed different values of these and the given metrics will be found and logged.

In [1]:
# Wine Quality Sample
def train(in_alpha, in_l1_ratio):
    import os
    import warnings
    import sys

    import pandas as pd
    import numpy as np
    from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
    from sklearn.model_selection import train_test_split
    from sklearn.linear_model import ElasticNet

    import mlflow
    import mlflow.sklearn

    import logging

    logging.basicConfig(level=logging.WARN)
    logger = logging.getLogger(__name__)

    def eval_metrics(actual, pred):
        rmse = np.sqrt(mean_squared_error(actual, pred))
        mae = mean_absolute_error(actual, pred)
        r2 = r2_score(actual, pred)
        return rmse, mae, r2

    warnings.filterwarnings("ignore")
    np.random.seed(40)

    # Read the wine-quality csv file from the URL
    csv_url = (
        "http://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv"
    )
    try:
        data = pd.read_csv(csv_url, sep=";")
    except Exception as e:
        logger.exception(
            "Unable to download training & test CSV, check your internet connection. Error: %s", e
        )

    # Split the data into training and test sets. (0.75, 0.25) split.
    train, test = train_test_split(data)

    # The predicted column is "quality" which is a scalar from [3, 9]
    train_x = train.drop(["quality"], axis=1)
    test_x = test.drop(["quality"], axis=1)
    train_y = train[["quality"]]
    test_y = test[["quality"]]

    # Set default values if no alpha is provided
    if float(in_alpha) is None:
        alpha = 0.5
    else:
        alpha = float(in_alpha)

    # Set default values if no l1_ratio is provided
    if float(in_l1_ratio) is None:
        l1_ratio = 0.5
    else:
        l1_ratio = float(in_l1_ratio)

    # Useful for multiple runs (only doing one run in this sample notebook)
    with mlflow.start_run():
        # Execute ElasticNet
        lr = ElasticNet(alpha=alpha, l1_ratio=l1_ratio, random_state=42)
        lr.fit(train_x, train_y)

        # Evaluate Metrics
        predicted_qualities = lr.predict(test_x)
        (rmse, mae, r2) = eval_metrics(test_y, predicted_qualities)

        # Print out metrics
        print("Elasticnet model (alpha=%f, l1_ratio=%f):" % (alpha, l1_ratio))
        print("  RMSE: %s" % rmse)
        print("  MAE: %s" % mae)
        print("  R2: %s" % r2)

        # Log parameter, metrics, and model to MLflow
        mlflow.log_param("alpha", alpha)
        mlflow.log_param("l1_ratio", l1_ratio)
        mlflow.log_metric("rmse", rmse)
        mlflow.log_metric("r2", r2)
        mlflow.log_metric("mae", mae)

        mlflow.sklearn.log_model(lr, "model")

With this function set up, we now just call it with different values and it creates logs for them.

In [2]:
train(0.5, 0.5)



Elasticnet model (alpha=0.500000, l1_ratio=0.500000):
  RMSE: 0.7931640229276851
  MAE: 0.6271946374319586
  R2: 0.10862644997792614


In [3]:
train(0.5, 1)

Elasticnet model (alpha=0.500000, l1_ratio=1.000000):
  RMSE: 0.832819092896359
  MAE: 0.6681279771237894
  R2: 0.017268050734704055


In [4]:
train(0.1, 0.5)

Elasticnet model (alpha=0.100000, l1_ratio=0.500000):
  RMSE: 0.7308996187375898
  MAE: 0.5615486628017713
  R2: 0.2430813606733676


In [5]:
train(1, 0)

Elasticnet model (alpha=1.000000, l1_ratio=0.000000):
  RMSE: 0.7508731220796289
  MAE: 0.5811664801219333
  R2: 0.2011470433755671


If not in a docker container we would now be able to call the mlflow UI and look through things. I couldn't get that to work and gave up! Instead, we can just read in the data it uses in the UI and look at it within python.

In [6]:
import mlflow
#throws an issue but it returns the data appropriately
runs = mlflow.search_runs(experiment_ids=["0"])
runs.head()

Unnamed: 0,run_id,experiment_id,status,artifact_uri,start_time,end_time,metrics.r2,metrics.mae,metrics.rmse,params.l1_ratio,params.alpha,tags.mlflow.log-model.history,tags.mlflow.source.type,tags.mlflow.user,tags.mlflow.runName,tags.mlflow.source.name
0,65d548898f1e40558fef1624d3d78cc6,0,FINISHED,file:///home/jupyter-jbpost2%40ncsu.edu/mlruns...,2024-03-08 22:15:28.734000+00:00,2024-03-08 22:15:29.939000+00:00,0.201147,0.581166,0.750873,0.0,1.0,"[{""run_id"": ""65d548898f1e40558fef1624d3d78cc6""...",LOCAL,jupyter-jbpost2@ncsu.edu,bedecked-dolphin-527,/opt/tljh/user/envs/pySpark/lib/python3.9/site...
1,6f421ddea3b5467889fcbc030ff9c9df,0,FINISHED,file:///home/jupyter-jbpost2%40ncsu.edu/mlruns...,2024-03-08 22:15:26.864000+00:00,2024-03-08 22:15:28.101000+00:00,0.243081,0.561549,0.7309,0.5,0.1,"[{""run_id"": ""6f421ddea3b5467889fcbc030ff9c9df""...",LOCAL,jupyter-jbpost2@ncsu.edu,enthused-ape-110,/opt/tljh/user/envs/pySpark/lib/python3.9/site...
2,e79aa57b53084ea79b24950ecbb0284f,0,FINISHED,file:///home/jupyter-jbpost2%40ncsu.edu/mlruns...,2024-03-08 22:15:23.857000+00:00,2024-03-08 22:15:25.117000+00:00,0.017268,0.668128,0.832819,1.0,0.5,"[{""run_id"": ""e79aa57b53084ea79b24950ecbb0284f""...",LOCAL,jupyter-jbpost2@ncsu.edu,wise-worm-607,/opt/tljh/user/envs/pySpark/lib/python3.9/site...
3,c40bba6d7d8d43dc8cbce75cfb448b93,0,FINISHED,file:///home/jupyter-jbpost2%40ncsu.edu/mlruns...,2024-03-08 22:15:16.728000+00:00,2024-03-08 22:15:18.462000+00:00,0.108626,0.627195,0.793164,0.5,0.5,"[{""run_id"": ""c40bba6d7d8d43dc8cbce75cfb448b93""...",LOCAL,jupyter-jbpost2@ncsu.edu,welcoming-pug-473,/opt/tljh/user/envs/pySpark/lib/python3.9/site...


In [7]:
runs.columns

Index(['run_id', 'experiment_id', 'status', 'artifact_uri', 'start_time',
       'end_time', 'metrics.r2', 'metrics.mae', 'metrics.rmse',
       'params.l1_ratio', 'params.alpha', 'tags.mlflow.log-model.history',
       'tags.mlflow.source.type', 'tags.mlflow.user', 'tags.mlflow.runName',
       'tags.mlflow.source.name'],
      dtype='object')

In [8]:
runs[["metrics.rmse", "metrics.r2", "metrics.mae", "params.l1_ratio", "params.alpha"]].sort_values("metrics.rmse")

Unnamed: 0,metrics.rmse,metrics.r2,metrics.mae,params.l1_ratio,params.alpha
1,0.7309,0.243081,0.561549,0.5,0.1
0,0.750873,0.201147,0.581166,0.0,1.0
3,0.793164,0.108626,0.627195,0.5,0.5
2,0.832819,0.017268,0.668128,1.0,0.5
