Published

2025-03-31

Open In Colab

Common Streaming Tasks

Justin Post

Let’s get familiar with some of the common tasks we need to do with streaming data!

Note: These types of webpages are built from Jupyter notebooks (.ipynb files). You can access your own versions of them by clicking here. It is highly recommended that you go through and run the notebooks yourself, modifying and rerunning things where you’d like!

Air Quality Data

Let’s start with some data we can use to emulate some streaming concepts

From: https://archive.ics.uci.edu/ml/datasets/Air+quality

…dataset contains 9358 instances of hourly averaged responses from an array of 5 metal oxide chemical sensors embedded in an Air Quality Chemical Multisensor Device

We’ll read this data in. Note that the data has a Date and a Time column

import pandas as pd
air_data = pd.read_csv("https://www4.stat.ncsu.edu/online/datasets/AirQualityUCI.csv", sep = ";", decimal = ",")
air_data
Date Time CO(GT) PT08.S1(CO) NMHC(GT) C6H6(GT) PT08.S2(NMHC) NOx(GT) PT08.S3(NOx) NO2(GT) PT08.S4(NO2) PT08.S5(O3) T RH AH Unnamed: 15 Unnamed: 16
0 10/03/2004 18.00.00 2.6 1360.0 150.0 11.9 1046.0 166.0 1056.0 113.0 1692.0 1268.0 13.6 48.9 0.7578 NaN NaN
1 10/03/2004 19.00.00 2.0 1292.0 112.0 9.4 955.0 103.0 1174.0 92.0 1559.0 972.0 13.3 47.7 0.7255 NaN NaN
2 10/03/2004 20.00.00 2.2 1402.0 88.0 9.0 939.0 131.0 1140.0 114.0 1555.0 1074.0 11.9 54.0 0.7502 NaN NaN
3 10/03/2004 21.00.00 2.2 1376.0 80.0 9.2 948.0 172.0 1092.0 122.0 1584.0 1203.0 11.0 60.0 0.7867 NaN NaN
4 10/03/2004 22.00.00 1.6 1272.0 51.0 6.5 836.0 131.0 1205.0 116.0 1490.0 1110.0 11.2 59.6 0.7888 NaN NaN
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
9466 NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
9467 NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
9468 NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
9469 NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
9470 NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN

9471 rows × 17 columns

The Date and Time columns can’t be handled easily. For instance, there is no way to determine the ‘day’ easily.

air_data.Date.day #no attribute for day!
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-4-76fe55ca24a5> in <cell line: 0>()
----> 1 air_data.Date.day #no attribute for day!

/usr/local/lib/python3.11/dist-packages/pandas/core/generic.py in __getattr__(self, name)
   6297         ):
   6298             return self[name]
-> 6299         return object.__getattribute__(self, name)
   6300 
   6301     @final

AttributeError: 'Series' object has no attribute 'day'

Dates and Times in Python

Most standard date/time operations can be handled via the datetime module. This module includes the following data types:

  • date: attributes of year, month, day
  • time: attributes of hour, minute, second, microsecond, and tzinfo
  • datetime: attributes of both date and time
  • timedelta: difference between two date, time or datetime instances

With this functionality we can add and subtract dates/times to get meaningful info while keeping the data in a more readable format (rather than say looking at the data as days since Jan 1, 1960)!

Dates and Times in pandas

Rather than using the datetime’s functionality, we’ll focus on how pandas handles date data via NumPy’s functionality.

  • pandas uses NumPy’s datetime64 and timedelta64 dtypes

These have very similar functionality for doing useful things with dates!

We saw that our columns that were supposed to be dates were read in as strings. We can coerce a string to a date/time variable.

Date time variables have information on both the date and the time of day such as "04-01-2022 10:00". Of course we need to carefully specify whether this is April or January as different countries do this differently!

a = pd.to_datetime(["04-01-2022 10:00"], dayfirst=True)
a
DatetimeIndex(['2022-01-04 10:00:00'], dtype='datetime64[ns]', freq=None)

As mentioned, date-time type variables have useful attributes we can pull from them.

a.day
Index([4], dtype='int32')
a.hour
Index([10], dtype='int32')

There are also useful methods to get descriptive names.

a.day_name()
Index(['Tuesday'], dtype='object')
a.month_name()
Index(['January'], dtype='object')

As mentioned, now that we have a date we can actually subtract two dates to get useful information (while still seeing the date in a nice format).

b = pd.to_datetime(["04-01-2022 11:00"])
a-b
TimedeltaIndex(['-88 days +23:00:00'], dtype='timedelta64[ns]', freq=None)

With our data, we can try to read in the columns appropriately with options on the pd.read_csv() function.

air_data = pd.read_csv("https://www4.stat.ncsu.edu/online/datasets/AirQualityUCI.csv",
                       sep = ";",
                       decimal = ",",
                       parse_dates = [["Date", "Time"]])
air_data.info()
FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.
  air_data = pd.read_csv("https://www4.stat.ncsu.edu/online/datasets/AirQualityUCI.csv",
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 9471 entries, 0 to 9470
Data columns (total 16 columns):
 #   Column         Non-Null Count  Dtype  
---  ------         --------------  -----  
 0   Date_Time      9471 non-null   object 
 1   CO(GT)         9357 non-null   float64
 2   PT08.S1(CO)    9357 non-null   float64
 3   NMHC(GT)       9357 non-null   float64
 4   C6H6(GT)       9357 non-null   float64
 5   PT08.S2(NMHC)  9357 non-null   float64
 6   NOx(GT)        9357 non-null   float64
 7   PT08.S3(NOx)   9357 non-null   float64
 8   NO2(GT)        9357 non-null   float64
 9   PT08.S4(NO2)   9357 non-null   float64
 10  PT08.S5(O3)    9357 non-null   float64
 11  T              9357 non-null   float64
 12  RH             9357 non-null   float64
 13  AH             9357 non-null   float64
 14  Unnamed: 15    0 non-null      float64
 15  Unnamed: 16    0 non-null      float64
dtypes: float64(15), object(1)
memory usage: 1.2+ MB
UserWarning: Could not infer format, so each element will be parsed individually, falling back to `dateutil`. To ensure parsing is consistent and as-expected, please specify a format.
  air_data = pd.read_csv("https://www4.stat.ncsu.edu/online/datasets/AirQualityUCI.csv",

This read those two columns in as a single column of the appropriate type! We now have all the nice functionality to deal with dates.

Lastly, let’s rename the CO(GT) column since we’ll use that one in the upcoming tasks and remove the bunch of missing rows at the end of the data frame.

air_data = air_data.rename(columns = {'CO(GT)': 'co_gt'})
air_data.dropna(subset=['co_gt'], inplace = True)

Preprocessing & Sending alerts

As data comes in we often need to:

  • Check if the data is missing
  • Check if data is in an appropriate range
  • etc.

If whatever condition is not met we then want to print an alert, write the event to a file, send an email, etc.

Later we’ll jump to pyspark and talk about how to handle actual streaming data. For now, let’s emulate having data streaming in using a simple for loop. We can think of each iteration of the loop as the next data value coming in.

Using our air_data object, let’s focus on the co_gt variable (true hourly averaged CO concentration (mg/m^3))

  • ‘Take data in over time’ (via a loop over the rows)
  • If the data exceeds 8 we print a message
for i in range(air_data.shape[0]):
    if air_data.iloc[i].co_gt > 8:
        print("High CO Concentration at " + str(air_data.Date_Time[i]) + ", loop", str(i))
High CO Concentration at 15/03/2004 09.00.00, loop 111
High CO Concentration at 22/10/2004 18.00.00, loop 5424
High CO Concentration at 25/10/2004 18.00.00, loop 5496
High CO Concentration at 26/10/2004 17.00.00, loop 5519
High CO Concentration at 26/10/2004 18.00.00, loop 5520
High CO Concentration at 02/11/2004 20.00.00, loop 5690
High CO Concentration at 04/11/2004 18.00.00, loop 5736
High CO Concentration at 05/11/2004 17.00.00, loop 5759
High CO Concentration at 17/11/2004 18.00.00, loop 6048
High CO Concentration at 19/11/2004 19.00.00, loop 6097
High CO Concentration at 19/11/2004 20.00.00, loop 6098
High CO Concentration at 23/11/2004 18.00.00, loop 6192
High CO Concentration at 23/11/2004 19.00.00, loop 6193
High CO Concentration at 23/11/2004 20.00.00, loop 6194
High CO Concentration at 23/11/2004 21.00.00, loop 6195
High CO Concentration at 24/11/2004 20.00.00, loop 6218
High CO Concentration at 26/11/2004 18.00.00, loop 6264
High CO Concentration at 26/11/2004 21.00.00, loop 6267
High CO Concentration at 02/12/2004 19.00.00, loop 6409
High CO Concentration at 13/12/2004 18.00.00, loop 6672
High CO Concentration at 14/12/2004 18.00.00, loop 6696
High CO Concentration at 16/12/2004 19.00.00, loop 6745
High CO Concentration at 16/12/2004 20.00.00, loop 6746
High CO Concentration at 16/12/2004 21.00.00, loop 6747
High CO Concentration at 23/12/2004 18.00.00, loop 6912
High CO Concentration at 23/12/2004 19.00.00, loop 6913
High CO Concentration at 23/12/2004 20.00.00, loop 6914
High CO Concentration at 17/01/2005 18.00.00, loop 7512
High CO Concentration at 17/01/2005 19.00.00, loop 7513
High CO Concentration at 10/02/2005 20.00.00, loop 8090

Reasonably straight forward! Of course we might have some other tasks that we’d check for. We can use if/else logic for that.

  • ‘Take data in over time’ (via a loop over the rows)
  • If the data exceeds 8 we print a message
  • If the data is less than 0 we print a message (-200 represents missing here)

Write either occurrence to a log file (or perhaps a database) rather than printing to the console. Using the menus on the left, create a directory called logs to run the code below.

for i in range(air_data.shape[0]):
    temp = air_data.iloc[i]
    dt = temp.Date_Time
    value = temp.co_gt
    if value > 8:
        with open('logs/COHigh.txt', 'a') as f:
            f.write(str(dt) + ", " + str(value) + "\n")
    elif value < 0:
        with open('logs/COInvalid.txt', 'a') as f:
            f.write(str(dt) + ", " + str(value) + "\n")

We may also want to check if something is ‘down’. This can mean a lot of things but if something is down, we might want to be notified. We can use python to send an email!

  • Unfortunately, it seems that doing this with gmail isn’t doable at this point.
  • I’ll follow this article!

First we need to have the dotenv package. I’ll install this vai pip.

!pip install dotenv
Collecting dotenv
  Downloading dotenv-0.9.9-py2.py3-none-any.whl.metadata (279 bytes)
Collecting python-dotenv (from dotenv)
  Downloading python_dotenv-1.1.0-py3-none-any.whl.metadata (24 kB)
Downloading dotenv-0.9.9-py2.py3-none-any.whl (1.9 kB)
Downloading python_dotenv-1.1.0-py3-none-any.whl (20 kB)
Installing collected packages: python-dotenv, dotenv
Successfully installed dotenv-0.9.9 python-dotenv-1.1.0

Now let’s read in some module things.

import os
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders
from dotenv import load_dotenv

Let’s set up things to send the email. I’ve deleted the password here so you’ll need to create your own account and what-not to run this code!

# Load environment variables from .env file
load_dotenv()

# Set up email details from environment variables
sender_email = "MS_uW2WFe@trial-eqvygm0z9k8l0p7w.mlsender.net"
password = "removed"
smtp_server = "smtp.mailersend.net"
smtp_port = "587"

# Define the receiver email
receiver_email = "st554testemail@gmail.com"

subject = "Sensor down!"
#we'll set up the body of the email in the loop

Now we’ll simulate data coming in via our loop. I’m going to restrict the number of rows to cycle through to 1000 as I only want a couple of emails sent (there is a limited number of free ones on the account).

If we observe six missing observations in a row then we’ll send an email as a warning that something seems to be down. To do so:

  • We’ll initiate a missing variable at 0
  • If the value is -200 then we’ll add one to missing
  • If the value is not -200 then we reset missing to 0
  • If the value of missing becomes 6 then we send an email
#initate the value
missing = 0

#loop through the first 1000 ros
for i in range(1000):
    #grab the row of interest
    temp = air_data.iloc[i]
    #get the date time and co values
    dt = temp.Date_Time
    value = temp.co_gt

    #Check if the value is large, add to log, reset missing
    if value > 8:
        with open('logs/COHigh.txt', 'a') as f:
            f.write(str(dt) + ", " + str(value) + "\n")
        missing = 0
    #else, check if it is missing (<0), add to log, increase missing
    elif value < 0:
        with open('logs/COInvalid.txt', 'a') as f:
            f.write(str(dt) + ", " + str(value) + "\n")
        if value == -200:
            missing += 1
        #if missing is 6, send the email!
        if missing == 6:
            body = "We have an issue at " + str(dt) + " to resolve!"
            # Create the email message
            message = MIMEMultipart()
            message["From"] = sender_email
            message["To"] = receiver_email
            message["Subject"] = subject
            # Attach the email body to the message
            message.attach(MIMEText(body, "plain"))
            # Establish a connection to the SMTP server and send the email
            try:
                # Connect to the SMTP server
                with smtplib.SMTP(smtp_server, smtp_port) as server:
                    server.starttls()  # Secure the connection
                    server.login(sender_email, password)  # Log in to the SMTP server
                    server.sendmail(
                        sender_email, receiver_email, message.as_string()
                    )  # Send the email
                print("Email sent successfully")
            except Exception as e:
                print(f"Error: {e}")
    #if a reasonable value set missing to 0
    else:
        missing = 0
Email sent successfully
Email sent successfully
Email sent successfully

Checking my email - it worked!

Combining Data Streams

  • Often have multiple data streams that need to be combined

    • Usually combined via a shared key or time stamps
  • Once combined we can then preprocess/summarize/etc.

Example of streams using google ads type data.

To understand the important things for this example we need to define two terms:

  • Impression - ad seen by a user
  • Clicks - ad was clicked on by user

We may get data each time we get an impression and each time we get a click. There will be more impressions than clicks of course.

Usually there is a userID of some kind that we would want to join this data on. But essentially, once we have a unique ‘key’ or at least criteria for combining the streams the ideas are the same as SQL type joins.

Just as an example, let’s simulate some fake impressions and clicks data with unique userIDs (there would likely be other information in each data stream that would come along as well).

import pandas as pd
import numpy as np
np.random.seed(10)
impressions = pd.DataFrame({
  'userId': range(500),
  'impressionTime': (pd.to_datetime('2022-01-01') + pd.to_timedelta(np.random.rand(500), unit = "D")).sort_values()
})
impressions.head()
userId impressionTime
0 0 2022-01-01 00:02:32.033682620
1 1 2022-01-01 00:05:41.130210730
2 2 2022-01-01 00:12:28.161050454
3 3 2022-01-01 00:13:16.703184279
4 4 2022-01-01 00:14:04.126023142
clicks = impressions.iloc[np.random.randint(size = 30, low = 0, high = 499)].sort_index()
#add a slightly positive time delta to get the clicktime
clicks['clickTime'] = clicks.impressionTime + pd.to_timedelta(np.random.rand(30)/100, unit = "D")
clicks.drop(columns = 'impressionTime', axis = 1, inplace = True)
clicks.head()
userId clickTime
35 35 2022-01-01 01:30:34.800044013
38 38 2022-01-01 01:37:35.836843295
58 58 2022-01-01 02:42:47.536707642
90 90 2022-01-01 04:10:54.616200590
94 94 2022-01-01 04:13:45.397812575

We can “join” these too with the usual SQL join idea. Here we just use pd.merge(). We’ll use clicks as our right table and do a ‘right’ join to only return rows that match the clicks rows.

combined = pd.merge(left = impressions, right = clicks, on = "userId", how = 'right')
combined.head()
userId impressionTime clickTime
0 35 2022-01-01 01:24:48.442588485 2022-01-01 01:30:34.800044013
1 38 2022-01-01 01:33:27.963279930 2022-01-01 01:37:35.836843295
2 58 2022-01-01 02:30:39.145926778 2022-01-01 02:42:47.536707642
3 90 2022-01-01 04:02:01.489706816 2022-01-01 04:10:54.616200590
4 94 2022-01-01 04:11:43.920512629 2022-01-01 04:13:45.397812575

Now we could find the time it took someone to click (of those that clicked).

(combined.clickTime-combined.impressionTime).head()
0
0 0 days 00:05:46.357455528
1 0 days 00:04:07.873563365
2 0 days 00:12:08.390780864
3 0 days 00:08:53.126493774
4 0 days 00:02:01.477299946

Idea is easy :) Will be harder with actual data streams!

Finding Averages & Standard Deviations

We all know how to find averages and standard deviations. Given data y1,...,yn

y¯=i=1nyin s=1n1i=1n(yiy¯)2=1n1(i=1nyi2(i=1nyi)2n)

When dealing with streaming data we don’t have all the data to start with!

  • For the mean, we could just store the sum and the count
  • Update each as new data comes in to find the new mean
  • Don’t want to store sums as they can get very large over time!

Updating an Average

We can modify the formula for the sample mean to update it for a new observation:

y¯n=y¯n1+yny¯n1n

  • Here y¯n1 is the sample mean we have with our first n1 observations

Let’s write a quick function to do this in python!

def update_mean(new_data, old_mean, count):
    return old_mean + (1/count)*(new_data-old_mean)

Let’s apply it to our air_data.

  • First we’ll remove missing values.
  • Then we’ll initiate a mean value of 0.
  • Now we’ll loop through the rows, again acting as though they are streaming in.
air_data = air_data.loc[air_data.co_gt != -200].reset_index()

#initialize the mean
o_mean = 0
n = 1
means = []
for i in range(air_data.shape[0]):
    means.append(update_mean(air_data.co_gt[i], old_mean = o_mean, count = n))
    o_mean = means[i]
    n += 1

pd.DataFrame(zip(air_data.co_gt, means), columns = ["Data", "Means"])
Data Means
0 2.6 2.600000
1 2.0 2.300000
2 2.2 2.266667
3 2.2 2.250000
4 1.6 2.120000
... ... ...
7669 3.1 2.152686
7670 2.4 2.152718
7671 2.4 2.152750
7672 2.1 2.152743
7673 2.2 2.152750

7674 rows × 2 columns

Updating a Standard Deviations

Updating the standard deviation is a bit more complicated but can be done!

sn2=n2n1sn12+(yny¯n1)2n,n>1

  • Here we deal with the sample variance as we can just take the square root to then find the sample standard deviation
  • sn12 represents the sample variance using just the first n1 observations.

Let’s create a function in python to update our sample variance.

def update_var(new_data, old_var, old_mean, count):
    return ((count-2)/(count-1))*old_var + (new_data - old_mean)**2/count

We can’t start looping through the data at n=1 without some special cases. Here we’ll initialize some things and start the loop after a few observations.

#set up things to start the loop
n = 3
#lists to capture the updated means and variances
means = [air_data.co_gt[0], air_data.co_gt[0:2].mean()]
variances = [np.nan, air_data.co_gt[0:2].var()]

o_mean = means[1]
o_var = variances[1]

#loop through the observations
for i in range(2, air_data.shape[0]):
    means.append(update_mean(air_data.co_gt[i], old_mean = o_mean, count = n))
    variances.append(update_var(air_data.co_gt[i], old_var = o_var, old_mean = o_mean, count = n))
    o_mean = means[i]
    o_var = variances[i]
    n += 1

Now we’ll find the standard deviation as well and zip() it together with the rolling summary stats!

pd.DataFrame(zip(air_data.co_gt, means, np.sqrt(np.array(variances)), np.array(variances)),
  columns = ["Data", "Means", "SDs", "Vars"])
Data Means SDs Vars
0 2.6 2.600000 NaN NaN
1 2.0 2.300000 0.424264 0.180000
2 2.2 2.266667 0.305505 0.093333
3 2.2 2.250000 0.251661 0.063333
4 1.6 2.120000 0.363318 0.132000
... ... ... ... ...
7669 3.1 2.152686 1.453625 2.113026
7670 2.4 2.152718 1.453533 2.112759
7671 2.4 2.152750 1.453441 2.112491
7672 2.1 2.152743 1.453347 2.112216
7673 2.2 2.152750 1.453252 2.111941

7674 rows × 4 columns

Finding Averages & Standard Deviations in Selected Windows

Luckily, if we want to find sample means and standard deviations in windows things aren’t too complicated.

Let’s just worry about the sample mean.

  • We need to choose our time window, say the most recent 10 values
  • Store the most recent 3 values and find the average
  • When new data comes in, pop out the first value (oldest) and add the newest value
  • Repeat!
my_values = []
rolling_means = [np.nan, np.nan]
window = 3
for i in range(air_data.shape[0]):
    my_values.append(air_data.co_gt[i])
    if i < window-1:
        continue
    rolling_means.append(np.mean(my_values))
    my_values = my_values[1:]

We can compare what we calculated to the pandas functionality:

pd.DataFrame(zip(air_data.rolling(3).co_gt.mean(), rolling_means), columns = ["pandas", "us"]).head()
pandas us
0 NaN NaN
1 NaN NaN
2 2.266667 2.266667
3 2.133333 2.133333
4 2.000000 2.000000

Similar functionality exists for rolling standard deviations.

air_data.rolling(3).co_gt.std().head()
co_gt
0 NaN
1 NaN
2 0.305505
3 0.115470
4 0.346410

Recap

Often need to check/validate the data

  • Basic checks to create an issue log file or entries in an issues data base

  • Might send email as a notification

  • Could of course do basic ETL type operations as the data comes in too!

    • Filter observations
    • Quick transformations
    • Combine data streams
    • etc.

We’ll see how to do this all in spark next!

If you are on the course website, use the table of contents on the left or the arrows at the bottom of this page to navigate to the next learning material!

If you are on Google Colab, head back to our course website for our next lesson!