Let’s get familiar with some of the common tasks we need to do with streaming data!
Note: These types of webpages are built from Jupyter notebooks (.ipynb files). You can access your own versions of them by clicking here. It is highly recommended that you go through and run the notebooks yourself, modifying and rerunning things where you’d like!
Common Issues: Preprocessing, Detecting Trends, Counting, and Averages
When we work with streaming data, the data pretty much always comes with some kind of time stamp or date-type data. That means we should first do a recap of how to deal with time data.
Then we’ll discuss the basics of processing data and sending an alert or writing to a log.
As we often need to combine two streams, we’ll recap the basic ideas of joining two data frames.
Finally, we take on bascis statistical summaries of streaming data such as - Counting events - Increasing and decreasing trends - Means and standard deviations - Summary stats over time windows
Air Quality Data
Let’s start with some data we can use to emulate some streaming concepts
…dataset contains 9358 instances of hourly averaged responses from an array of 5 metal oxide chemical sensors embedded in an Air Quality Chemical Multisensor Device
We’ll read this data in. Note that the data has a Date and a Time column
import pandas as pdair_data = pd.read_csv("https://www4.stat.ncsu.edu/online/datasets/AirQualityUCI.csv", sep =";", decimal =",")air_data
Date
Time
CO(GT)
PT08.S1(CO)
NMHC(GT)
C6H6(GT)
PT08.S2(NMHC)
NOx(GT)
PT08.S3(NOx)
NO2(GT)
PT08.S4(NO2)
PT08.S5(O3)
T
RH
AH
Unnamed: 15
Unnamed: 16
0
10/03/2004
18.00.00
2.6
1360.0
150.0
11.9
1046.0
166.0
1056.0
113.0
1692.0
1268.0
13.6
48.9
0.7578
NaN
NaN
1
10/03/2004
19.00.00
2.0
1292.0
112.0
9.4
955.0
103.0
1174.0
92.0
1559.0
972.0
13.3
47.7
0.7255
NaN
NaN
2
10/03/2004
20.00.00
2.2
1402.0
88.0
9.0
939.0
131.0
1140.0
114.0
1555.0
1074.0
11.9
54.0
0.7502
NaN
NaN
3
10/03/2004
21.00.00
2.2
1376.0
80.0
9.2
948.0
172.0
1092.0
122.0
1584.0
1203.0
11.0
60.0
0.7867
NaN
NaN
4
10/03/2004
22.00.00
1.6
1272.0
51.0
6.5
836.0
131.0
1205.0
116.0
1490.0
1110.0
11.2
59.6
0.7888
NaN
NaN
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
9466
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
9467
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
9468
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
9469
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
9470
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
NaN
9471 rows × 17 columns
The Date and Time columns can’t be handled easily. For instance, there is no way to determine the ‘day’ easily.
air_data.Date.day #no attribute for day!
---------------------------------------------------------------------------AttributeError Traceback (most recent call last)
<ipython-input-4-76fe55ca24a5> in <cell line: 0>()----> 1air_data.Date.day #no attribute for day!/usr/local/lib/python3.11/dist-packages/pandas/core/generic.py in __getattr__(self, name) 6297 ):
6298return self[name]-> 6299return object.__getattribute__(self, name) 6300 6301@final
AttributeError: 'Series' object has no attribute 'day'
Dates and Times in Python
Most standard date/time operations can be handled via the datetime module. This module includes the following data types:
date: attributes of year, month, day
time: attributes of hour, minute, second, microsecond, and tzinfo
datetime: attributes of both date and time
timedelta: difference between two date, time or datetime instances
With this functionality we can add and subtract dates/times to get meaningful info while keeping the data in a more readable format (rather than say looking at the data as days since Jan 1, 1960)!
Dates and Times in pandas
Rather than using the datetime’s functionality, we’ll focus on how pandas handles date data via NumPy’s functionality.
pandas uses NumPy’s datetime64 and timedelta64 dtypes
We saw that our columns that were supposed to be dates were read in as strings. We can coerce a string to a date/time variable.
Date time variables have information on both the date and the time of day such as "04-01-2022 10:00". Of course we need to carefully specify whether this is April or January as different countries do this differently!
a = pd.to_datetime(["04-01-2022 10:00"], dayfirst=True)a
FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.
air_data = pd.read_csv("https://www4.stat.ncsu.edu/online/datasets/AirQualityUCI.csv",
UserWarning: Could not infer format, so each element will be parsed individually, falling back to `dateutil`. To ensure parsing is consistent and as-expected, please specify a format.
air_data = pd.read_csv("https://www4.stat.ncsu.edu/online/datasets/AirQualityUCI.csv",
This read those two columns in as a single column of the appropriate type! We now have all the nice functionality to deal with dates.
Lastly, let’s rename the CO(GT) column since we’ll use that one in the upcoming tasks and remove the bunch of missing rows at the end of the data frame.
If whatever condition is not met we then want to print an alert, write the event to a file, send an email, etc.
Later we’ll jump to pyspark and talk about how to handle actual streaming data. For now, let’s emulate having data streaming in using a simple for loop. We can think of each iteration of the loop as the next data value coming in.
Using our air_data object, let’s focus on the co_gt variable (true hourly averaged CO concentration (mg/m^3))
‘Take data in over time’ (via a loop over the rows)
If the data exceeds 8 we print a message
for i inrange(air_data.shape[0]):if air_data.iloc[i].co_gt >8:print("High CO Concentration at "+str(air_data.Date_Time[i]) +", loop", str(i))
High CO Concentration at 15/03/2004 09.00.00, loop 111
High CO Concentration at 22/10/2004 18.00.00, loop 5424
High CO Concentration at 25/10/2004 18.00.00, loop 5496
High CO Concentration at 26/10/2004 17.00.00, loop 5519
High CO Concentration at 26/10/2004 18.00.00, loop 5520
High CO Concentration at 02/11/2004 20.00.00, loop 5690
High CO Concentration at 04/11/2004 18.00.00, loop 5736
High CO Concentration at 05/11/2004 17.00.00, loop 5759
High CO Concentration at 17/11/2004 18.00.00, loop 6048
High CO Concentration at 19/11/2004 19.00.00, loop 6097
High CO Concentration at 19/11/2004 20.00.00, loop 6098
High CO Concentration at 23/11/2004 18.00.00, loop 6192
High CO Concentration at 23/11/2004 19.00.00, loop 6193
High CO Concentration at 23/11/2004 20.00.00, loop 6194
High CO Concentration at 23/11/2004 21.00.00, loop 6195
High CO Concentration at 24/11/2004 20.00.00, loop 6218
High CO Concentration at 26/11/2004 18.00.00, loop 6264
High CO Concentration at 26/11/2004 21.00.00, loop 6267
High CO Concentration at 02/12/2004 19.00.00, loop 6409
High CO Concentration at 13/12/2004 18.00.00, loop 6672
High CO Concentration at 14/12/2004 18.00.00, loop 6696
High CO Concentration at 16/12/2004 19.00.00, loop 6745
High CO Concentration at 16/12/2004 20.00.00, loop 6746
High CO Concentration at 16/12/2004 21.00.00, loop 6747
High CO Concentration at 23/12/2004 18.00.00, loop 6912
High CO Concentration at 23/12/2004 19.00.00, loop 6913
High CO Concentration at 23/12/2004 20.00.00, loop 6914
High CO Concentration at 17/01/2005 18.00.00, loop 7512
High CO Concentration at 17/01/2005 19.00.00, loop 7513
High CO Concentration at 10/02/2005 20.00.00, loop 8090
Reasonably straight forward! Of course we might have some other tasks that we’d check for. We can use if/else logic for that.
‘Take data in over time’ (via a loop over the rows)
If the data exceeds 8 we print a message
If the data is less than 0 we print a message (-200 represents missing here)
Write either occurrence to a log file (or perhaps a database) rather than printing to the console. Using the menus on the left, create a directory called logs to run the code below.
for i inrange(air_data.shape[0]): temp = air_data.iloc[i] dt = temp.Date_Time value = temp.co_gtif value >8:withopen('logs/COHigh.txt', 'a') as f: f.write(str(dt) +", "+str(value) +"\n")elif value <0:withopen('logs/COInvalid.txt', 'a') as f: f.write(str(dt) +", "+str(value) +"\n")
We may also want to check if something is ‘down’. This can mean a lot of things but if something is down, we might want to be notified. We can use python to send an email!
Unfortunately, it seems that doing this with gmail isn’t doable at this point.
Let’s set up things to send the email. I’ve deleted the password here so you’ll need to create your own account and what-not to run this code!
# Load environment variables from .env fileload_dotenv()# Set up email details from environment variablessender_email ="MS_uW2WFe@trial-eqvygm0z9k8l0p7w.mlsender.net"password ="removed"smtp_server ="smtp.mailersend.net"smtp_port ="587"# Define the receiver emailreceiver_email ="st554testemail@gmail.com"subject ="Sensor down!"#we'll set up the body of the email in the loop
Now we’ll simulate data coming in via our loop. I’m going to restrict the number of rows to cycle through to 1000 as I only want a couple of emails sent (there is a limited number of free ones on the account).
If we observe six missing observations in a row then we’ll send an email as a warning that something seems to be down. To do so:
We’ll initiate a missing variable at 0
If the value is -200 then we’ll add one to missing
If the value is not -200 then we reset missing to 0
If the value of missing becomes 6 then we send an email
#initate the valuemissing =0#loop through the first 1000 rosfor i inrange(1000):#grab the row of interest temp = air_data.iloc[i]#get the date time and co values dt = temp.Date_Time value = temp.co_gt#Check if the value is large, add to log, reset missingif value >8:withopen('logs/COHigh.txt', 'a') as f: f.write(str(dt) +", "+str(value) +"\n") missing =0#else, check if it is missing (<0), add to log, increase missingelif value <0:withopen('logs/COInvalid.txt', 'a') as f: f.write(str(dt) +", "+str(value) +"\n")if value ==-200: missing +=1#if missing is 6, send the email!if missing ==6: body ="We have an issue at "+str(dt) +" to resolve!"# Create the email message message = MIMEMultipart() message["From"] = sender_email message["To"] = receiver_email message["Subject"] = subject# Attach the email body to the message message.attach(MIMEText(body, "plain"))# Establish a connection to the SMTP server and send the emailtry:# Connect to the SMTP serverwith smtplib.SMTP(smtp_server, smtp_port) as server: server.starttls() # Secure the connection server.login(sender_email, password) # Log in to the SMTP server server.sendmail( sender_email, receiver_email, message.as_string() ) # Send the emailprint("Email sent successfully")exceptExceptionas e:print(f"Error: {e}")#if a reasonable value set missing to 0else: missing =0
Email sent successfully
Email sent successfully
Email sent successfully
Checking my email - it worked!
Combining Data Streams
Often have multiple data streams that need to be combined
Usually combined via a shared key or time stamps
Once combined we can then preprocess/summarize/etc.
To understand the important things for this example we need to define two terms:
Impression - ad seen by a user
Clicks - ad was clicked on by user
We may get data each time we get an impression and each time we get a click. There will be more impressions than clicks of course.
Usually there is a userID of some kind that we would want to join this data on. But essentially, once we have a unique ‘key’ or at least criteria for combining the streams the ideas are the same as SQL type joins.
Just as an example, let’s simulate some fake impressions and clicks data with unique userIDs (there would likely be other information in each data stream that would come along as well).
import pandas as pdimport numpy as npnp.random.seed(10)impressions = pd.DataFrame({'userId': range(500),'impressionTime': (pd.to_datetime('2022-01-01') + pd.to_timedelta(np.random.rand(500), unit ="D")).sort_values()})impressions.head()
userId
impressionTime
0
0
2022-01-01 00:02:32.033682620
1
1
2022-01-01 00:05:41.130210730
2
2
2022-01-01 00:12:28.161050454
3
3
2022-01-01 00:13:16.703184279
4
4
2022-01-01 00:14:04.126023142
clicks = impressions.iloc[np.random.randint(size =30, low =0, high =499)].sort_index()#add a slightly positive time delta to get the clicktimeclicks['clickTime'] = clicks.impressionTime + pd.to_timedelta(np.random.rand(30)/100, unit ="D")clicks.drop(columns ='impressionTime', axis =1, inplace =True)clicks.head()
userId
clickTime
35
35
2022-01-01 01:30:34.800044013
38
38
2022-01-01 01:37:35.836843295
58
58
2022-01-01 02:42:47.536707642
90
90
2022-01-01 04:10:54.616200590
94
94
2022-01-01 04:13:45.397812575
We can “join” these too with the usual SQL join idea. Here we just use pd.merge(). We’ll use clicks as our right table and do a ‘right’ join to only return rows that match the clicks rows.
combined = pd.merge(left = impressions, right = clicks, on ="userId", how ='right')combined.head()
userId
impressionTime
clickTime
0
35
2022-01-01 01:24:48.442588485
2022-01-01 01:30:34.800044013
1
38
2022-01-01 01:33:27.963279930
2022-01-01 01:37:35.836843295
2
58
2022-01-01 02:30:39.145926778
2022-01-01 02:42:47.536707642
3
90
2022-01-01 04:02:01.489706816
2022-01-01 04:10:54.616200590
4
94
2022-01-01 04:11:43.920512629
2022-01-01 04:13:45.397812575
Now we could find the time it took someone to click (of those that clicked).
We can’t start looping through the data at without some special cases. Here we’ll initialize some things and start the loop after a few observations.
#set up things to start the loopn =3#lists to capture the updated means and variancesmeans = [air_data.co_gt[0], air_data.co_gt[0:2].mean()]variances = [np.nan, air_data.co_gt[0:2].var()]o_mean = means[1]o_var = variances[1]#loop through the observationsfor i inrange(2, air_data.shape[0]): means.append(update_mean(air_data.co_gt[i], old_mean = o_mean, count = n)) variances.append(update_var(air_data.co_gt[i], old_var = o_var, old_mean = o_mean, count = n)) o_mean = means[i] o_var = variances[i] n +=1
Now we’ll find the standard deviation as well and zip() it together with the rolling summary stats!
Similar functionality exists for rolling standard deviations.
air_data.rolling(3).co_gt.std().head()
co_gt
0
NaN
1
NaN
2
0.305505
3
0.115470
4
0.346410
Recap
Often need to check/validate the data
Basic checks to create an issue log file or entries in an issues data base
Might send email as a notification
Could of course do basic ETL type operations as the data comes in too!
Filter observations
Quick transformations
Combine data streams
etc.
We’ll see how to do this all in spark next!
If you are on the course website, use the table of contents on the left or the arrows at the bottom of this page to navigate to the next learning material!
If you are on Google Colab, head back to our course website for our next lesson!