# Notebook Corresponding to `pyspark` Notes

This notebook has the code and examples from the three sets of notes:
- `pyspark`: RDDs
- `pyspark`: pandas-on-Spark
- `pyspark`: Spark SQL

Each section should be able to be run without running the cells from the other sections.

## RDDs

First, the code from the notes is given below. Make sure that the kernel chosen to run these notes is the `pyspark` kernel. See the top right of the notebook!

Note: We can still run python code and load in libraries as normal.

Now, we start by creating our spark session.

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[*]').appName('my_app').getOrCreate()

Let's populate a list with tuples and explicitly create an RDD.

In [None]:
#populate a list with tuples
quick_cat = lambda x: "a" if x < 20 else "b"
my_data = [(quick_cat(x), x) for x in range(1,51)]
my_data[:3]
#create the RDD
my_rdd = spark.sparkContext.parallelize(my_data)
my_rdd

This is an object stored (likely) over multiple partitions.

In [None]:
my_rdd.getNumPartitions()

We see that `my_rdd` doesn't actually print out the data when we look at the object. This is because there may be a ton of data and it doesn't want to show it to you by default. Instead we can perform an action like the `.take()` to actually have some data returned to us.

In [None]:
my_rdd.take(3)

When we have tuple type object passed in as the data, the first value represents the `key` and the second the associated `values`.

In [None]:
my_rdd.keys().take(3)

In [None]:
my_rdd.values().take(3)

This allows us to do operations by key if we'd like! Note that `.count()` and `.countByKey()` are actions and so they return the value locally.

In [None]:
my_rdd.count()

In [None]:
my_rdd.countByKey()

If instead we wanted to use the result of this counting operation as a new RDD, we could instead use something like the `mapValues()` method. This returns an RDD rather than a value and so we need to use `.collect()` to see the data.

In [None]:
my_rdd \
    .groupByKey() \
    .mapValues(len) \
    .collect()

With this, we could do some other transformation on the resulting object (say using `.map()`, which can apply a function to each element of our RDD). For instance, creating a log transformed value as well.

In [None]:
from numpy import log
my_rdd \
    .groupByKey() \
    .mapValues(len) \
    .map(lambda x: (x[0], x[1], log(x[1]))) \
    .collect()

RDD functions are hard to use though! We might want to find the total sum of the values for each key. We can use `.groupByKey()` and `mapValues()` for this but the documentation says it is better to use `aggregateByKey()`. But this function requires some confusing arguments.

In [None]:
my_rdd \
    .groupByKey() \
    .mapValues(sum) \
    .collect()

In [None]:
my_rdd \
  .aggregateByKey(0, #initial value for each partition
                  lambda within_1, within_2: within_1 + within_2, #how to combine values on the same partition, next function is how to combine across partitions
                  lambda across_1, across_2: across_1 + across_2) \
  .collect()

We can use `.map()` instead if we wanted to.

### MapReduce Example Done Explicity Using RDDs

Recall that in the Hadoop section, we did a MapReduce algorithm to count the number of words in Oliver Twist.  We can redo that example using Spark!  It will actually be parallelized and all that automatically across our machine too!  

In [None]:
from pyspark.sql import SparkSession
#create a spark session object (simplified, defaults to local)
spark = SparkSession.builder.master('local[*]').appName('my_app').getOrCreate()

Now let's read in our 53 chapters as a list.

In [None]:
my_chap = []
for i in range(1, 6):
    with open('dickens/chap' + str(i) + '.txt', 'r') as f:
        my_chap.append(f.read())

We want spark to handle this using RDDs explicitly. We can do that is via the `sparkContext.parallelize()` method.  This just tells spark to take our list and distribute it/prepare it for parallel computations.

Let's create some RDDs! We don't care about the chapters themselves, we just want the final word counts. This means we can start with an RDD without keys.

In [None]:
my_chap_rdd = spark.sparkContext.parallelize(my_chap)
type(my_chap_rdd)

In [None]:
my_chap_rdd.take(1)

Great, now we want to take these different values (each chapter is a value) and split those strings by spaces. 

In [None]:
my_chap_rdd \
    .flatMap(lambda x: x.split(" ")) \
    .take(10)

Now we have an RDD whose elements are each word (again no keys). First let's filter to remove any empty spaces.

In [None]:
my_chap_rdd \
    .flatMap(lambda x: x.split(" ")) \
    .filter(lambda x: x != "") \
    .take(10)

Let's do a transform on this where we make each word a key and assign it a value of 1.

In [None]:
my_chap_rdd \
    .flatMap(lambda x: x.split(" ")) \
    .filter(lambda x: x != "") \
    .map(lambda word: (word, 1)) \
    .take(5)

Nice! Now we just need to reduce this by key and we are done!

In [None]:
my_chap_rdd \
    .flatMap(lambda x: x.split(" ")) \
    .filter(lambda x: x != "") \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .take(10)

Let's sort this by the keys. We can sort descending by simply sorting on the negative of the values.

In [None]:
my_chap_rdd \
    .flatMap(lambda x: x.split(" ")) \
    .filter(lambda x: x != "") \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .sortBy(lambda x: -x[1]) \
    .take(10)

Let's collect all of the data and turn it into a regular pandas data frame.

In [None]:
results = my_chap_rdd \
    .flatMap(lambda x: x.split(" ")) \
    .filter(lambda x: x != "") \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .sortBy(lambda x: -x[1]) \
    .collect()
import pandas as pd
df_results = pd.DataFrame(results, columns = ["word", "count"])
df_results

## pandas-on-Spark

Below is the code from the notes on pandas-on-Spark.

First let's import our modules.

In [None]:
import pandas as pd
import numpy as np
import pyspark.pandas as ps

Create a pandas-on-Spark series via `ps.Series()`

In [None]:
ps.Series([1, 3, 5, np.nan, 6, 8]) #ignore the warning

Create a pandas-on-Spark DataFrame via `ps.DataFrame()`

In [None]:
ps.DataFrame(
    {'a': [1, 2, 3, 4, 5, 6],
     'b': [100, 200, 300, 400, 500, 600],
     'c': ["one", "two", "three", "four", "five", "six"]},
    index=[10, 20, 30, 40, 50, 60])

Convert from a pandas DataFrame to a pandas-on-spark easily.

In [None]:
pdf = pd.read_csv("https://www4.stat.ncsu.edu/~online/datasets/red-wine.csv", delimiter = ";")
psdf = ps.from_pandas(pdf)
psdf.head()

Can subset the data using things we know like the `.loc[]` method.

In [None]:
psdf.loc[psdf.quality > 5, ["alcohol", "quality"]].head()

Can also read data directly into a pandas-on-spark data frame using the `ps.read_csv()` function (can't read from a URL though).

In [None]:
titanic_ps = ps.read_csv("data/titanic.csv") #data uploaded to jhub in data folder
titanic_ps["survived"].value_counts()

Can now do our usual summarizations using the `.groupby()` method along with a summarization method.

In [None]:
titanic_ps.groupby("survived").mean()

In [None]:
titanic_ps.describe()

We can also use the `.transform()` and `.apply()` methods (also used in regular pandas) to perform other common operations.

First we can transform the values in our columns (say center and scale them).

In [None]:
def standardize(pser) -> ps.Series[np.float64]:
     return (pser + pser.mean())/pser.std()  # should always return the same length as input.

In [None]:
std_res = titanic_ps[["age", "fare"]] \
    .rename(columns = {"age": "o_age", "fare": "o_fare"}) \
    .join(titanic_ps[["age", "fare"]]
              .transform(standardize))
std_res.head()

In [None]:
std_res.shape

Can use `.apply()` to possible return something shorter than the original.


In [None]:
def standardize_positives(pser) -> ps.Series[np.float64]:
     return (pser[pser>30] + pser[pser>30].mean())/pser[pser>30].std()
# can return something short than input length

In [None]:
std_pos = titanic_ps[["age"]].apply(standardize_positives)
std_pos.head()

In [None]:
std_pos.shape

### MapReduce Example Done via pandas-on-Spark

Let's repeat our map reduce example but, you know, do it more easily :)

Recall the `my_chap` object.

In [1]:
import pandas as pd
import numpy as np
import pyspark.pandas as ps
#read in each chapter to a list element
my_chap = []
for i in range(1, 6):
    with open('dickens/chap' + str(i) + '.txt', 'r') as f:
        my_chap.append(f.read())
        
my_chap[0]



'chapter i  treats of the place where oliver twist was born and of the circumstances attending his birth  among other public buildings in a certain town which for many reasons it will be prudent to refrain from mentioning and to which i will assign no fictitious name there is one anciently common to most towns great or small to wit a workhouse and in this workhouse was born on a day and date which i need not trouble myself to repeat inasmuch as it can be of no possible consequence to the reader in this stage of the business at all events the item of mortality whose name is prefixed to the head of this chapter  for a long time after it was ushered into this world of sorrow and trouble by the parish surgeon it remained a matter of considerable doubt whether the child would survive to bear any name at all in which case it is somewhat more than probable that these memoirs would never have appeared or if they had that being comprised within a couple of pages they would have possessed the in

Let's put this into a pandas-on-Spark series and manipulate from there!

In [2]:
#combine the list elements into one large string
from functools import reduce
big_string = reduce(lambda x, y: x + y, my_chap)

#create a series with the big string
chap_pss = ps.Series(big_string)
chap_pss

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/03 07:47:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

0    chapter i  treats of the place where oliver tw...
dtype: object

Now use the `.str.split()` method on a pandas-on-Spark series to create a series with a list of words.

In [None]:
chap_pss.str.split()

What we want is to have each word be in a column with the count associated as another column. Let's convert the list stored in the series to a data frame. Then remove the empty spaces.

In [None]:
word_df = ps.DataFrame(chap_pss.str.split(" ")[0], columns = ["word"])
word_df.head()

In [None]:
word_df = word_df.loc[word_df.word != ""]
word_df.head()

At first, we can just assign each value to a 1 and then use our usual `.groupby()` to get our desired result. Add a count of 1 for each word.

In [None]:
word_df["count"] = 1
word_df.head()

Awesome! Now just group by the word and sum it up!

In [None]:
word_df \
    .groupby("word") \
    .sum() \
    .head()

Sort it so we can compare to our previous work.

In [None]:
word_df \
    .groupby("word") \
    .sum() \
    .sort_values(by = "count", ascending = False) \
    .head()

Woot!

## Spark SQL

Below is the code from the Spark SQL notes.

Start with creation a spark session.

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[*]').appName('my_app').getOrCreate()

25/03/03 08:19:23 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


Now let's look at a few ways to create a Spark SQL Data Frame.

In [None]:
from pyspark.sql import Row
from datetime import datetime, date
df = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df

In [None]:
df = spark.createDataFrame([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
df

In [None]:
df = spark.read.load("data/neuralgia.csv",
                     format="csv", 
                     sep=",", 
                     inferSchema="true", 
                     header="true")
df

In [None]:
import pandas as pd
pandas_df = pd.DataFrame({
    'a': [1, 2, 3],
    'b': [2., 3., 4.],
    'c': ['string1', 'string2', 'string3'],
    'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
    'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
})
df = spark.createDataFrame(pandas_df)
df

Conveniently, we can go back and forth between Spark SQL style Data Frames and pandas-on-Spark style Data Frames.

In [None]:
sdf = spark.read.load("data/neuralgia.csv",
                     format="csv", 
                     sep=",", 
                     inferSchema="true", 
                     header="true")
type(sdf)

In [None]:
dfps = sdf.pandas_api()
type(dfps)

In [None]:
sdf2 = dfps.to_spark()
type(sdf2)

Schema and column names are important to know.

In [None]:
df = spark.read.load("data/neuralgia.csv",
                     format="csv", 
                     sep=",", 
                     inferSchema="true", 
                     header="true")
df.printSchema()
df.columns

We can return the data using `.take()`, `.show()`. and `.collect()`.

In [None]:
df.show(3)

In [None]:
df.take(3)

Next, we'll look at common transformations. Starting with column operations.

In [None]:
#select columns you want
df.select("Age")

In [None]:
df.Age

In [None]:
df.select("Age", "Pain").show(3)

`.withColumn()` can be used to create new columns.

In [None]:
df.withColumn("Current_Age", df.Age + 2).show(3)

Can also rename columns.

In [None]:
from pyspark.sql.functions import col
df \
  .withColumnRenamed('Age', 'Former_Age') \
  .withColumn("Current_Age", col("Former_Age") + 2) \
  .show(3)

We can using conditional logic with `when()` from `pyspark.sql.functions`.

In [None]:
from pyspark.sql.functions import *
df.withColumn("Age_cat", 
               when(df.Age>75, "75+")
              .when(df.Age>=70, "70-75")
              .otherwise("<70")) \
    .show(3)

In [None]:
df.withColumn("Age_cat", 
               when(df.Age>75, "75+")
              .when(df.Age>=70, "70-75")
              .otherwise("<70")) \
   .withColumn("ln_Duration", log(df.Duration)) \
   .show(3)

We can also create our own functions with `udf`.

In [None]:
code_trt = udf(lambda x: "P Trt" if x == "P" else "Other")
df.withColumn('my_trt', code_trt('Treatment')).show(3)

We can do the common operations on rows as well.

In [None]:
df.sort(df.Duration).show(3)

In [None]:
df.sort(df.Duration, ascending = False).show(3)

In [None]:
df.filter(df.Age < 65).show(3)

We can do basic summaries including grouped summaries!

In [None]:
df.select("Age", "Pain").describe().show()

In [None]:
df \
    .select(["Duration", "Age", "Treatment"]) \
    .agg(sum("Duration"), avg("Age"), count("Treatment")) \
    .show()

In [None]:
df.select(["Duration", "Age", "Treatment"]) \
    .groupBy("Treatment") \
    .sum() \
    .withColumnRenamed("sum(Duration)", "sum_Duration") \
    .withColumnRenamed("sum(Age)", "sum_Age") \
    .show()

In [None]:
df.createTempView("df")
spark.sql("SELECT sex, age FROM df LIMIT 4").show()

### MapReduce Example Done via Spark SQL 

Let's redo it with Spark SQL!

In [5]:
#read in each chapter to a list element
my_chap = []
for i in range(1, 6):
    with open('dickens/chap' + str(i) + '.txt', 'r') as f:
        my_chap.append(f.read())

from pyspark.sql.types import StringType
sql_text = spark.createDataFrame(my_chap, StringType())
sql_text

DataFrame[value: string]

In [6]:
sql_text.take(1)

[Row(value='chapter i  treats of the place where oliver twist was born and of the circumstances attending his birth  among other public buildings in a certain town which for many reasons it will be prudent to refrain from mentioning and to which i will assign no fictitious name there is one anciently common to most towns great or small to wit a workhouse and in this workhouse was born on a day and date which i need not trouble myself to repeat inasmuch as it can be of no possible consequence to the reader in this stage of the business at all events the item of mortality whose name is prefixed to the head of this chapter  for a long time after it was ushered into this world of sorrow and trouble by the parish surgeon it remained a matter of considerable doubt whether the child would survive to bear any name at all in which case it is somewhat more than probable that these memoirs would never have appeared or if they had that being comprised within a couple of pages they would have posse

Ok, first we need to split the words out within each *row*. When we read in all the SQL functions there was a `split()` function that will work for us!

Note the way we use the function without `.withColumn()` by using `.select()`. This is a common way to use these functions without adding to the original data frame.

In [12]:
from pyspark.sql.functions import *

In [14]:
split(sql_text.value, " ").alias("words")

Column<'split(value,  , -1) AS words'>

In [15]:
sql_text.select(split(sql_text.value, " ").alias("words")).show(4)

+--------------------+
|               words|
+--------------------+
|[chapter, i, , tr...|
|[chapter, ii, , t...|
|[chapter, iii, , ...|
|[chapter, iv, , o...|
+--------------------+
only showing top 4 rows



Ok, now we have a data frame with one column where each entry is a list of the words! This is closer. 

What we need to do is now **explode** out these lists. We read in a function called **explode** that will split these values up and create new rows for each entry!

Notice how we call the function inside select again.

In [17]:
explode(split(sql_text.value, " ")).alias("word")

Column<'explode(split(value,  , -1)) AS word'>

In [18]:
sql_text.select(explode(split(sql_text.value, " ")).alias("word")).show(4)

+-------+
|   word|
+-------+
|chapter|
|      i|
|       |
| treats|
+-------+
only showing top 4 rows



Woo, almost there. Now we can filter out the blank spaces.

In [19]:
my_words = sql_text.select(explode(split(sql_text.value, " ")).alias("word"))
my_words

DataFrame[word: string]

In [20]:
my_words \
    .filter(my_words.word != "") \
    .show(4)

+-------+
|   word|
+-------+
|chapter|
|      i|
| treats|
|     of|
+-------+
only showing top 4 rows



Finally we group and count!

In [22]:
my_words \
    .filter(my_words.word != "") \
    .groupBy("word") \
    .count() \
    .show(5)



+----------+-----+
|      word|count|
+----------+-----+
|      some|   31|
|       few|    7|
|      hope|    5|
| overseers|    2|
|surrounded|    2|
+----------+-----+
only showing top 5 rows



                                                                                

Arrange it!

In [23]:
counts = my_words \
                .filter(my_words.word != "") \
                .groupBy("word") \
                .count()
counts.sort(counts["count"], ascending = False).show(10)

+----+-----+
|word|count|
+----+-----+
| the| 1005|
| and|  439|
|   a|  416|
|  of|  389|
|  to|  357|
|  in|  257|
| was|  222|
| his|  219|
|  he|  203|
|  mr|  157|
+----+-----+
only showing top 10 rows

