Let’s get familiar with some of the common tasks we need to do with streaming data!
Note: These types of webpages are built from Jupyter notebooks (.ipynb files). You can access your own versions of them by clicking here. It is highly recommended that you go through and run the notebooks yourself, modifying and rerunning things where you’d like!
Common Issues: Preprocessing, Detecting Trends, Counting, and Averages
When we work with streaming data, the data pretty much always comes with some kind of time stamp or date-type data. That means we should first do a recap of how to deal with time data.
Then we’ll discuss the basics of processing data and sending an alert or writing to a log.
As we often need to combine two streams, we’ll recap the basic ideas of joining two data frames.
Finally, we take on bascis statistical summaries of streaming data such as - Counting events - Increasing and decreasing trends - Means and standard deviations - Summary stats over time windows
Air Quality Data
Let’s start with some data we can use to emulate some streaming concepts
…dataset contains 9358 instances of hourly averaged responses from an array of 5 metal oxide chemical sensors embedded in an Air Quality Chemical Multisensor Device
We’ll read this data in. Note that the data has a Date and a Time column
import pandas as pdair_data = pd.read_csv("https://www4.stat.ncsu.edu/online/datasets/AirQualityUCI.csv", sep =";", decimal =",")#remove rows at the end that are all NaN valuesair_data = air_data.dropna(how ="all")#drop last two columns that are all NaNair_data = air_data.drop(['Unnamed: 15', 'Unnamed: 16'], axis =1)air_data
Date
Time
CO(GT)
PT08.S1(CO)
NMHC(GT)
C6H6(GT)
PT08.S2(NMHC)
NOx(GT)
PT08.S3(NOx)
NO2(GT)
PT08.S4(NO2)
PT08.S5(O3)
T
RH
AH
0
10/03/2004
18.00.00
2.6
1360.0
150.0
11.9
1046.0
166.0
1056.0
113.0
1692.0
1268.0
13.6
48.9
0.7578
1
10/03/2004
19.00.00
2.0
1292.0
112.0
9.4
955.0
103.0
1174.0
92.0
1559.0
972.0
13.3
47.7
0.7255
2
10/03/2004
20.00.00
2.2
1402.0
88.0
9.0
939.0
131.0
1140.0
114.0
1555.0
1074.0
11.9
54.0
0.7502
3
10/03/2004
21.00.00
2.2
1376.0
80.0
9.2
948.0
172.0
1092.0
122.0
1584.0
1203.0
11.0
60.0
0.7867
4
10/03/2004
22.00.00
1.6
1272.0
51.0
6.5
836.0
131.0
1205.0
116.0
1490.0
1110.0
11.2
59.6
0.7888
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
9352
04/04/2005
10.00.00
3.1
1314.0
-200.0
13.5
1101.0
472.0
539.0
190.0
1374.0
1729.0
21.9
29.3
0.7568
9353
04/04/2005
11.00.00
2.4
1163.0
-200.0
11.4
1027.0
353.0
604.0
179.0
1264.0
1269.0
24.3
23.7
0.7119
9354
04/04/2005
12.00.00
2.4
1142.0
-200.0
12.4
1063.0
293.0
603.0
175.0
1241.0
1092.0
26.9
18.3
0.6406
9355
04/04/2005
13.00.00
2.1
1003.0
-200.0
9.5
961.0
235.0
702.0
156.0
1041.0
770.0
28.3
13.5
0.5139
9356
04/04/2005
14.00.00
2.2
1071.0
-200.0
11.9
1047.0
265.0
654.0
168.0
1129.0
816.0
28.5
13.1
0.5028
9357 rows × 15 columns
The Date and Time columns can’t be handled easily. For instance, there is no way to determine the ‘day’ easily.
air_data.Date.day #no attribute for day!
AttributeError: 'Series' object has no attribute 'day'
Dates and Times in Python
Most standard date/time operations can be handled via the datetime module. This module includes the following data types:
date: attributes of year, month, day
time: attributes of hour, minute, second, microsecond, and tzinfo
datetime: attributes of both date and time
timedelta: difference between two date, time or datetime instances
With this functionality we can add and subtract dates/times to get meaningful info while keeping the data in a more readable format (rather than say looking at the data as days since Jan 1, 1960)!
Dates and Times in pandas
Rather than using the datetime’s functionality, we’ll focus on how pandas handles date data via NumPy’s functionality.
pandas uses NumPy’s datetime64 and timedelta64 dtypes
We saw that our columns that were supposed to be dates were read in as strings. We can coerce a string to a date/time variable.
Date time variables have information on both the date and the time of day such as "04-01-2022 10:00". Of course we need to carefully specify whether this is April or January as different countries do this differently!
a = pd.to_datetime(["04-01-2022 10:00"], dayfirst=True)a
Let’s convert our Date_Time column into the right data type (there is an option to do this as you read in the data with pd.read_csv() but that has been deprecated and it is recommneded to deal with this kind of thing after reading in the data).
If whatever condition is not met we then want to print an alert, write the event to a file, send an email, etc.
Later we’ll jump to pyspark and talk about how to handle actual streaming data. For now, let’s emulate having data streaming in using a simple for loop. We can think of each iteration of the loop as the next data value coming in.
Using our air_data object, let’s focus on the co_gt variable (true hourly averaged CO concentration (mg/m^3))
‘Take data in over time’ (via a loop over the rows)
If the data exceeds 8 we print a message
for i inrange(air_data.shape[0]):if air_data.iloc[i].co_gt >8:print("High CO Concentration at "+str(air_data.Date_Time[i]) +", loop", str(i))
High CO Concentration at 2004-03-15 09:00:00, loop 111
High CO Concentration at 2004-10-22 18:00:00, loop 5424
High CO Concentration at 2004-10-25 18:00:00, loop 5496
High CO Concentration at 2004-10-26 17:00:00, loop 5519
High CO Concentration at 2004-10-26 18:00:00, loop 5520
High CO Concentration at 2004-11-02 20:00:00, loop 5690
High CO Concentration at 2004-11-04 18:00:00, loop 5736
High CO Concentration at 2004-11-05 17:00:00, loop 5759
High CO Concentration at 2004-11-17 18:00:00, loop 6048
High CO Concentration at 2004-11-19 19:00:00, loop 6097
High CO Concentration at 2004-11-19 20:00:00, loop 6098
High CO Concentration at 2004-11-23 18:00:00, loop 6192
High CO Concentration at 2004-11-23 19:00:00, loop 6193
High CO Concentration at 2004-11-23 20:00:00, loop 6194
High CO Concentration at 2004-11-23 21:00:00, loop 6195
High CO Concentration at 2004-11-24 20:00:00, loop 6218
High CO Concentration at 2004-11-26 18:00:00, loop 6264
High CO Concentration at 2004-11-26 21:00:00, loop 6267
High CO Concentration at 2004-12-02 19:00:00, loop 6409
High CO Concentration at 2004-12-13 18:00:00, loop 6672
High CO Concentration at 2004-12-14 18:00:00, loop 6696
High CO Concentration at 2004-12-16 19:00:00, loop 6745
High CO Concentration at 2004-12-16 20:00:00, loop 6746
High CO Concentration at 2004-12-16 21:00:00, loop 6747
High CO Concentration at 2004-12-23 18:00:00, loop 6912
High CO Concentration at 2004-12-23 19:00:00, loop 6913
High CO Concentration at 2004-12-23 20:00:00, loop 6914
High CO Concentration at 2005-01-17 18:00:00, loop 7512
High CO Concentration at 2005-01-17 19:00:00, loop 7513
High CO Concentration at 2005-02-10 20:00:00, loop 8090
Reasonably straight forward! Of course we might have some other tasks that we’d check for. We can use if/else logic for that.
‘Take data in over time’ (via a loop over the rows)
If the data exceeds 8 we print a message
If the data is less than 0 we print a message (-200 represents missing here)
Write either occurrence to a log file (or perhaps a database) rather than printing to the console. Using the menus on the left, create a directory called logs to run the code below.
for i inrange(air_data.shape[0]): temp = air_data.iloc[i] dt = temp.Date_Time value = temp.co_gtif value >8:withopen('logs/COHigh.txt', 'a') as f: f.write(str(dt) +", "+str(value) +"\n")elif value <0:withopen('logs/COInvalid.txt', 'a') as f: f.write(str(dt) +", "+str(value) +"\n")
We may also want to check if something is ‘down’. This can mean a lot of things but if something is down, we might want to be notified. We can use python to send an email!
Unfortunately, it seems that doing this with gmail isn’t doable at this point.
Let’s set up things to send the email. I’ve deleted the password here so you’ll need to create your own account and what-not to run this code!
# Load environment variables from .env fileload_dotenv()# Set up email details from environment variablessender_email ="MS_uW2WFe@trial-eqvygm0z9k8l0p7w.mlsender.net"password ="removed"smtp_server ="smtp.mailersend.net"smtp_port ="587"# Define the receiver emailreceiver_email ="st554testemail@gmail.com"subject ="Sensor down!"#we'll set up the body of the email in the loop
Now we’ll simulate data coming in via our loop. I’m going to restrict the number of rows to cycle through to 1000 as I only want a couple of emails sent (there is a limited number of free ones on the account).
If we observe six missing observations in a row then we’ll send an email as a warning that something seems to be down. To do so:
We’ll initiate a missing variable at 0
If the value is -200 then we’ll add one to missing
If the value is not -200 then we reset missing to 0
If the value of missing becomes 6 then we send an email
#initate the valuemissing =0#loop through the first 1000 rosfor i inrange(1000):#grab the row of interest temp = air_data.iloc[i]#get the date time and co values dt = temp.Date_Time value = temp.co_gt#Check if the value is large, add to log, reset missingif value >8:withopen('logs/COHigh.txt', 'a') as f: f.write(str(dt) +", "+str(value) +"\n") missing =0#else, check if it is missing (<0), add to log, increase missingelif value <0:withopen('logs/COInvalid.txt', 'a') as f: f.write(str(dt) +", "+str(value) +"\n")if value ==-200: missing +=1#if missing is 6, send the email!if missing ==6: body ="We have an issue at "+str(dt) +" to resolve!"# Create the email message message = MIMEMultipart() message["From"] = sender_email message["To"] = receiver_email message["Subject"] = subject# Attach the email body to the message message.attach(MIMEText(body, "plain"))# Establish a connection to the SMTP server and send the emailtry:# Connect to the SMTP serverwith smtplib.SMTP(smtp_server, smtp_port) as server: server.starttls() # Secure the connection server.login(sender_email, password) # Log in to the SMTP server server.sendmail( sender_email, receiver_email, message.as_string() ) # Send the emailprint("Email sent successfully")exceptExceptionas e:print(f"Error: {e}")#if a reasonable value set missing to 0else: missing =0
Email sent successfully
Email sent successfully
Email sent successfully
Checking my email - it worked!
Combining Data Streams
Often have multiple data streams that need to be combined
Usually combined via a shared key or time stamps
Once combined we can then preprocess/summarize/etc.
To understand the important things for this example we need to define two terms:
Impression - ad seen by a user
Clicks - ad was clicked on by user
We may get data each time we get an impression and each time we get a click. There will be more impressions than clicks of course.
Usually there is a userID of some kind that we would want to join this data on. But essentially, once we have a unique ‘key’ or at least criteria for combining the streams the ideas are the same as SQL type joins.
Just as an example, let’s simulate some fake impressions and clicks data with unique userIDs (there would likely be other information in each data stream that would come along as well).
import pandas as pdimport numpy as npnp.random.seed(10)impressions = pd.DataFrame({'userId': range(500),'impressionTime': (pd.to_datetime('2022-01-01') + pd.to_timedelta(np.random.rand(500), unit ="D")).sort_values()})impressions.head()
userId
impressionTime
0
0
2022-01-01 00:02:32.033682620
1
1
2022-01-01 00:05:41.130210730
2
2
2022-01-01 00:12:28.161050454
3
3
2022-01-01 00:13:16.703184279
4
4
2022-01-01 00:14:04.126023142
clicks = impressions.iloc[np.random.randint(size =30, low =0, high =499)].sort_index()#add a slightly positive time delta to get the clicktimeclicks['clickTime'] = clicks.impressionTime + pd.to_timedelta(np.random.rand(30)/100, unit ="D")clicks.drop(columns ='impressionTime', axis =1, inplace =True)clicks.head()
userId
clickTime
35
35
2022-01-01 01:30:34.800044013
38
38
2022-01-01 01:37:35.836843295
58
58
2022-01-01 02:42:47.536707642
90
90
2022-01-01 04:10:54.616200590
94
94
2022-01-01 04:13:45.397812575
We can “join” these too with the usual SQL join idea. Here we just use pd.merge(). We’ll use clicks as our right table and do a ‘right’ join to only return rows that match the clicks rows.
combined = pd.merge(left = impressions, right = clicks, on ="userId", how ='right')combined.head()
userId
impressionTime
clickTime
0
35
2022-01-01 01:24:48.442588485
2022-01-01 01:30:34.800044013
1
38
2022-01-01 01:33:27.963279930
2022-01-01 01:37:35.836843295
2
58
2022-01-01 02:30:39.145926778
2022-01-01 02:42:47.536707642
3
90
2022-01-01 04:02:01.489706816
2022-01-01 04:10:54.616200590
4
94
2022-01-01 04:11:43.920512629
2022-01-01 04:13:45.397812575
Now we could find the time it took someone to click (of those that clicked).
We can’t start looping through the data at \(n=1\) without some special cases. Here we’ll initialize some things and start the loop after a few observations.
#set up things to start the loopn =3#lists to capture the updated means and variancesmeans = [air_data.co_gt[0], air_data.co_gt[0:2].mean()]variances = [np.nan, air_data.co_gt[0:2].var()]o_mean = means[1]o_var = variances[1]#loop through the observationsfor i inrange(2, air_data.shape[0]): means.append(update_mean(air_data.co_gt[i], old_mean = o_mean, count = n)) variances.append(update_var(air_data.co_gt[i], old_var = o_var, old_mean = o_mean, count = n)) o_mean = means[i] o_var = variances[i] n +=1
Now we’ll find the standard deviation as well and zip() it together with the rolling summary stats!
Similar functionality exists for rolling standard deviations.
air_data.rolling(3).co_gt.std().head()
co_gt
0
NaN
1
NaN
2
0.305505
3
0.115470
4
0.346410
Recap
Often need to check/validate the data
Basic checks to create an issue log file or entries in an issues data base
Might send email as a notification
Could of course do basic ETL type operations as the data comes in too!
Filter observations
Quick transformations
Combine data streams
etc.
We’ll see how to do this all in spark next!
If you are on the course website, use the table of contents on the left or the arrows at the bottom of this page to navigate to the next learning material!
If you are on Google Colab, head back to our course website for our next lesson!