layout: false class: title-slide-section-red, middle # `pyspark`: pandas-on-Spark Justin Post --- # Spark Recap Spark - Distributed processing software for big data workloads + Generally faster than Hadoop's MapReduce (and much more flexible) + DAGs make it fault tolerant and improve computational speed Five major parts to (py)Spark - Spark Core and RDDs as its foundation - Spark SQL and DataFrames - Pandas on Spark - Spark Structured Streaming - Spark Machine Learning (MLlib) <img src="data:image/png;base64,#img/sparkPartsNew.png" width="500px" style="display: block; margin: auto;" /> --- # Data Object Used by pyspark **DataFrame** APIs are commonly used in `pyspark` - DataFrames (think usual relational database table) are created and implemented on top of RDDs - DataFrames are stored across the cluster + When transformations are done, lazy evaluation is used + When actions are done, computation starts and results returned --- # Data Object Used by pyspark **DataFrame** APIs are commonly used in `pyspark` - DataFrames (think usual relational database table) are created and implemented on top of RDDs - DataFrames are stored across the cluster + When transformations are done, lazy evaluation is used + When actions are done, computation starts and results returned Two major DataFrame APIs in `pyspark` - [`pandas`-on-Spark](https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/index.html) DataFrames through the `pyspark.pandas` module - [Spark SQL](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/index.html) DataFrames through `pyspark.sql` module --- # `pandas`-on-Spark - `pandas` API on spark is super easy to use since we know `pandas`! - First we can import our modules ```python import pandas as pd import numpy as np import pyspark.pandas as ps ``` --- # `pandas`-on-Spark - Now you can create a `pandas-on-Spark` series or a `pandas-on-Spark` DataFrame - Note the `ps` not `pd`! ```python ps.Series([1, 3, 5, np.nan, 6, 8]) ``` <pre> 0 1.0 1 3.0 2 5.0 3 NaN 4 6.0 5 8.0 dtype: float64 </pre> --- # `pandas`-on-Spark - Now you can create a `pandas-on-Spark` series or a `pandas-on-Spark` DataFrame - Note the `ps` not `pd`! ```python ps.DataFrame( {'a': [1, 2, 3, 4, 5, 6], 'b': [100, 200, 300, 400, 500, 600], 'c': ["one", "two", "three", "four", "five", "six"]}, index=[10, 20, 30, 40, 50, 60]) ``` <img src="data:image/png;base64,#img/psdf1.png" width="170px" style="display: block; margin: auto;" /> --- # pandas-on-Spark - We can also convert from `pandas` to `pandas-on-Spark` ```python pdf = pd.read_csv("https://www4.stat.ncsu.edu/~online/datasets/red-wine.csv", delimiter = ";") psdf = ps.from_pandas(pdf) psdf.head() ``` <img src="data:image/png;base64,#img/psdf2.PNG" width="800px" style="display: block; margin: auto;" /> --- # pandas-on-Spark - We now have a much of the same functionality from `pandas` available through `pandas-on-Spark` ([API reference guide](https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/index.html)) + `.index`, `.columns`, `.shape`, `.info` + `.head()`, `tail()` + `[["column1", "column2"]]`, `.loc[]` + `.mean()`, `.sum()`, `.groupby()`, `.describe()`, `.value_counts()` --- # pandas-on-Spark - We now have a much of the same functionality from `pandas` available through `pandas-on-Spark` ([API reference guide](https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/index.html)) + `.index`, `.columns`, `.shape`, `.info` + `.head()`, `tail()` + `[["column1", "column2"]]`, `.loc[]` + `.mean()`, `.sum()`, `.groupby()`, `.describe()`, `.value_counts()` ```python psdf.loc[psdf.quality > 5, ["alcohol", "quality"]].head() ``` <img src="data:image/png;base64,#img/psdf3.PNG" width="200px" style="display: block; margin: auto;" /> --- # pandas-on-Spark - We now have a much of the same functionality from `pandas` available through `pandas-on-Spark` ([API reference guide](https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/index.html)) + `.index`, `.columns`, `.shape`, `.info` + `.head()`, `tail()` + `[["column1", "column2"]]`, `.loc[]` + `.mean()`, `.sum()`, `.groupby()`, `.describe()`, `.value_counts()` ```python titanic_ps = ps.read_csv("titanic.csv") #data uploaded to jhub in data folder titanic_ps["survived"].value_counts() ``` <img src="data:image/png;base64,#img/psdf6.png" width="400px" style="display: block; margin: auto;" /> --- # pandas-on-Spark - We now have a much of the same functionality from `pandas` available through `pandas-on-Spark` ([API reference guide](https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/index.html)) + `.index`, `.columns`, `.shape`, `.info` + `.head()`, `tail()` + `[["column1", "column2"]]`, `.loc[]` + `.mean()`, `.sum()`, `.groupby()`, `.describe()`, `.value_counts()` ```python titanic_ps.groupby("survived").mean() ``` <img src="data:image/png;base64,#img/psdf5.png" width="600px" style="display: block; margin: auto;" /> --- # pandas-on-Spark - We now have a much of the same functionality from `pandas` available through `pandas-on-Spark` ([API reference guide](https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/index.html)) + `.index`, `.columns`, `.shape`, `.info` + `.head()`, `tail()` + `[["column1", "column2"]]`, `.loc[]` + `.mean()`, `.sum()`, `.groupby()`, `.describe()`, `.value_counts()` ```python titanic_ps.describe() ``` <img src="data:image/png;base64,#img/psdf4.png" width="400px" style="display: block; margin: auto;" /> ] --- # pandas-on-Spark - `.transform()` and `.apply()` methods allow you to perform [operations on columns or rows](https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.transform.html#pyspark.pandas.DataFrame.transform) ```python def standardize(pser) -> ps.Series[np.float64]: return (pser + pser.mean())/pser.std() # should always return the same length as input. ``` --- layout: false # pandas-on-Spark - `.transform()` and `.apply()` methods allow you to perform [operations on columns or rows](https://spark.apache.org/docs/3.3.1/api/python/user_guide/pandas_on_spark/transform_apply.html) ```python def standardize(pser) -> ps.Series[np.float64]: return (pser + pser.mean())/pser.std() # should always return the same length as input. ``` ```python std_res = titanic_ps[["age", "fare"]] \ .rename(columns = {"age": "o_age", "fare": "o_fare"}) \ .join(titanic_ps[["age", "fare"]] .transform(standardize)) ``` <img src="data:image/png;base64,#img/psdf7.png" width="600px" style="display: block; margin: auto;" /> --- # pandas-on-Spark - `.transform()` and `.apply()` methods allow you to perform [operations on columns or rows](https://spark.apache.org/docs/3.3.1/api/python/user_guide/pandas_on_spark/transform_apply.html) ```python def standardize(pser) -> ps.Series[np.float64]: return (pser + pser.mean())/pser.std() # should always return the same length as input. ``` ```python std_res = titanic_ps[["age", "fare"]] \ .rename(columns = {"age": "o_age", "fare": "o_fare"}) \ .join(titanic_ps[["age", "fare"]] .transform(standardize)) std_res.shape ``` `(1310, 4)` --- # pandas-on-Spark - `.transform()` and `.apply()` methods allow you to perform [operations on columns or rows](https://spark.apache.org/docs/3.3.1/api/python/user_guide/pandas_on_spark/transform_apply.html) ```python def standardize_positives(pser) -> ps.Series[np.float64]: return (pser[pser>30] + pser[pser>30].mean())/pser[pser>30].std() # can return something short than input length ``` --- # pandas-on-Spark - `.transform()` and `.apply()` methods allow you to perform [operations on columns or rows](https://spark.apache.org/docs/3.3.1/api/python/user_guide/pandas_on_spark/transform_apply.html) ```python def standardize_positives(pser) -> ps.Series[np.float64]: return (pser[pser>30] + pser[pser>30].mean())/pser[pser>30].std() # can return something short than input length ``` ```python std_pos = titanic_ps[["age"]].apply(standardize_positives) std_pos.head() ``` <img src="data:image/png;base64,#img/psdf8.png" width="150px" style="display: block; margin: auto;" /> --- # pandas-on-Spark - `.transform()` and `.apply()` methods allow you to perform [operations on columns or rows](https://spark.apache.org/docs/3.3.1/api/python/user_guide/pandas_on_spark/transform_apply.html) ```python def standardize_positives(pser) -> ps.Series[np.float64]: return (pser[pser>30] + pser[pser>30].mean())/pser[pser>30].std() # can return something short than input length ``` ```python std_pos = titanic_ps[["age"]].apply(standardize_positives) std_pos.shape ``` `(437, 1)` --- # To Jupyterlab - Let's more easily handle the counting of words in our Oliver Twist example! --- # Recap - **DataFrames** are the type of object (and name of the API) commonly used in `pyspark` + DataFrames built on RDDs - [pandas-on-Spark](https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/index.html) DataFrames through the `pyspark.pandas` module + Most of the usual pandas functionality! - Lazy eval allows you to build up your transformations and then execute only when an action is performed [Important to know **limitations on `pandas` functionality**](https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/supported_pandas_api.html)