Extracting Sessions from Interaction Data in Snowflake
Extracting Sessions from Interaction Data in Snowflake
Processing user interaction data into something useful can be challenging. On the Pluralsight Search team, we can see every interaction a learner has with our search engine, but these events are much more meaningful when considered in context. Learners don’t think about what they are trying to accomplish as isolated API calls, so if we want to understand their motivation we should look at sets of interactions bundled together into sessions.
However, that bundling task can be challenging. There are a variety of ways we might construct sessions, such as:
- Explicitly (create a session ID in the learners browser, and annotate each search event with that ID)
- Based on sessions elsewhere (correlate search events with session times reported by some other bounded context)
- Based on an arbitrary timeout (an event is part of the current session if it happened within
session_timeout
seconds of the last event)
There are pros and cons to all of these. Explicitly giving sessions an ID and passing that around to link events can give very deep and specific knowledge about how a sequence of events are related, but involves a lot of up front design. It also often isn’t flexible enough to reuse historical data to look at a new problem, because the new problem may require a different session structure. In contrast, time based sessions offer much more flexibility to work with historical data, but you can’t always depend on having an external source of truth for session data.
Time based sessions can also be reconstructed purely from event data, but in the past sessionization of this form required stream processing because each new output depends on past outputs.
With the help of Python UDTFs in Snowflake, we can generate sessions from historical data on demand, on any stream of timestamped events.
In this article, we’re going to create a new Python UDTF called timed_session
that lets us add a column to an event stream table.
This new column is a session id that groups the events into timed base sessions.
How Do I Use This?
Once we have a session id column, we can group events into sessions with normal SQL code, and analyze what changed for a learner over the course of a session.
- How much did the queries the learner made change over the course of a session?
- How long do sessions tend to be?
- How often do learners come back to search after having clicked on a result?
In addition to simple exploratory questions, sessionized data can be used to drive machine learning models, like Search’s click model.
What is a UDTF?
UDTF stands for “User Defined Table Function”. Snowflake allows us to define functions that can run against Snowflake tables in a variety of languages. Currently, the options are:
- Java
- Javascript
- Python
- SQL
SQL functions are more or less like subqueries, but these other languages allow us to write imperative code that can directly interact with Snowflake tables. Table functions go a step further, allowing us to maintain state within our code. That statefulness is what we’ll lean on to generate sessions.
Code:
# things we'll need along the way
from notebook_utilities.snowflake_conn import conn
import numpy as np
import polars as pl
from uuid import uuid4
from datetime import datetime, timedelta
import random
from typing import List
Simulating User Interactions
To start with, we need to generate some synthetic interaction data.
We want to be able to validate our approach against our synthetic data, so our generation approach needs to be able to take in n
(the number of sessions we expect) and generate a sequence of interaction timestamps that should break into exactly that number of sessions.
We can do that by generating our sessions sequentially.
For each session we want the user to have, we pick a start time more than session_timeout
seconds after the last session ended and then generate a sequence of interactions each within session_timeout
of the previous interaction.
Code:
# timestamps are in seconds
nowish = datetime.utcnow().timestamp()
# 90 days
recently = nowish - 60 ** 2 * 24 * 90
session_timeout = 15 * 60
def gen_sessions(n: int):
'''simulate `n` user interaction sessions.'''
# pick a random start time between recently and now
last = np.random.random() * nowish
interactions = []
for session in range(n):
for interaction in range(np.random.randint(1, 15)):
# add an interaction within session_timeout
bump = np.random.randint(1, session_timeout - 1)
last += bump
interactions.append(last)
# make sure the next interaction is in a different session
last += session_timeout + np.random.randint(
60 ** 2, # at least an hour later
60 ** 2 * 24 * 7 # at most a week later
)
# simulate receiving out of order events
random.shuffle(interactions)
return interactions
Then we can use our data generation function to produce a data set that contains session data for many users, whose interactions are overlapping in time. This data should simulate real interaction data from any number of sources: user queries, search result clicks, content started events, etc.
Code:
users = pl.DataFrame({
"id": [str(uuid4()) for _ in range(100)],
"n_sessions": np.random.randint(1, 10, 100)
})
user_interactions = users.select([
"id",
# generate `n_sessions` for each user
pl.col("n_sessions").apply(gen_sessions).alias("interaction_time")
# explode takes a list column and generates a row for each entry, duplicating other column entries.
]).explode("interaction_time")
user_interactions.head(3)
Output:
> shape: (3,2)
| id | interaction_time |
| ------------------------------------ | ---------------- |
| str | f64 |
| 694b501d-6593-470a-9cc4-a560cbeffee9 | 6.0716e8 |
| 694b501d-6593-470a-9cc4-a560cbeffee9 | 6.0763e8 |
| 694b501d-6593-470a-9cc4-a560cbeffee9 | 6.0857e8 |
Writing a Table Function
Now that we have test data to work with, we want to write our Snowflake UDTF.
Our UDTF is a Python class with a process
method.
The process
method takes in data from the rows of whatever we pass the function on the Snowflake side, and returns rows of the resulting table.
Attributes of the class set up in the __init__
method can be accessed and mutated from process
, giving us mutable state that we can refer to.
Our class is simple.
It remembers the last_time
there was an interaction and the current_id
of the active session.
- If the next interaction is within
session_timeout
of thelast_time
, return thecurrent_id
- If the next interaction is more than
session_timeout
seconds afterlast_time
, make a newcurrent_id
and return that.
Code:
%%writefile sessionize.py
from datetime import timedelta
class Sessionize:
'''
UDTF handler for generating timed sessions from interaction data.
Can be joined to an `interactions` stream to add session ids to each interaction.
```
SELECT
interactions.*,
session.session_id
FROM
interactions,
table(
timed_session(interactions.time)
over (partition by interactions.id order by interactions.time)
) session
```
'''
session_timeout: int = timedelta(milliseconds=15 * 60 * 1000)
def __init__(self):
self.current_id: int = 0
self.last_time: int = None
def process(self, time):
if not self.last_time or (time > (self.last_time + Sessionize.session_timeout)):
self.current_id += 1
self.last_time = time
# process has to return tuples
return [(self.current_id,)]
The %%writefile
line tells Jupyter notebooks to write this code to a file instead of executing it.
Because we wrote our class to a file, we can import it below to do some testing.
An easy check is to confirm that the data generated from gen_sessions(n)
has n
sessions according to our aggregating class.
We need a helper function to handle the process of applying our UDTF class to Python data, but it’s fairly straightforward.
Code:
from sessionize import Sessionize
def apply_sessionize(interactions: List[int]):
'''
Helper function for applying the `Sessionize` handler to a list of interactions locally.
'''
sessionize = Sessionize()
return [
# gen_sessions returns ints, but in Snowflake we get timestamps
sessionize.process(datetime.fromtimestamp(interaction))[0][0]
for interaction in sorted(interactions)
]
# how many distinct session ids do we get when we generate one session? (hopefully just one)
len(set(apply_sessionize(gen_sessions(1))))
Output:
> 1
A more interesting test would be to confirm that we get the right number of sessions for every user in the users
testing data we generated.
We can check this by applying our helper function across the entire user_interactions
dataframe and joining back to users
.
Our expectation would be that every row of the result has the same number in n_sessions
as we calculate with our UDTF.
In that case, if we filter on rows where the n_sessions
we asked for is different from the n_sessions
we observed, we should get no results.
Code:
(user_interactions
.groupby("id")
# format the data for our helper function
.agg([pl.col("interaction_time").sort().list()])
.select([
"id",
(pl.col("interaction_time")
.apply(apply_sessionize)
# only keep the count of distinct sessions for testing
.apply(lambda sessions: len(set(sessions)))
.alias("n_sessions"))
])
.join(users, on="id", suffix="_observed")
# only show us users where we got the number of sessions wrong
.filter(pl.col("n_sessions") != pl.col("n_sessions_observed"))
)
Output:
> shape: (0,3)
| id | interaction_time |
| --- | ---------------- |
| str | f64 |
Working with Snowflake
Now that we’ve confirmed that our UDTF class works the way we’d expect locally, we want to test it as a table function in Snowflake. First, we’ll dump our simulated data into a temporary table for testing.
Code:
cursor = conn.cursor()
list(cursor.execute("""
CREATE OR REPLACE TEMPORARY TABLE simulated_interactions (id text, time timestamptz);
""").execute("""
INSERT INTO simulated_interactions (id, time) VALUES
""" + ",".join(
f"('{row['id']}', TO_TIMESTAMP({row['interaction_time']}))"
for row in user_interactions.to_dicts()
)))
Output:
[(3794,)]
Then we can write our function to Snowflake, directly referencing the class we wrote above.
Code:
# read in our class as plain text to send to Snowflake
sessionize_text = open("sessionize.py").read()
list(cursor.execute(f"""
create or replace function timed_session(time timestamptz)
returns table (session_id int)
language python
runtime_version=3.8
handler='Sessionize'
as $$
{sessionize_text}
$$
"""))
Output:
[('Function TIMED_SESSION successfully created.',)]
Let’s briefly talk through the important lines here.
create or replace function timed_session(time timestamptz)
This names the function timed_session
and says that it accepts a single column of type timestamptz
.
returns table (session_id int)
This states that we’re writing a table function, and that the returned table will have a single column of type int
(our session ids).
handler='Sessionize'
The handler
specifies the name of the class we want to use for our table function.
In our case we only have one, but for more complex functions you might import other classes or define helper classes within the uploaded script.
$$...$$
Finally, we write the code for class between $$
delimeters.
You might notice that the indentation here is a little inconsistent.
That’s because Python is whitespace delimited.
I find these SQL commands easier to read when they are indented blocks within Python code.
However, when there is Python code in our SQL statements, indentation becomes much more finicky.
Using the Function
Now that the function exists in Snowflake, we can write a query that actually uses it on our simulated interaction data. Table functions can be a little awkward to use compared to normal window functions, but the basic structure is the same. The main differences are:
- the function call needs to be wrapped in the
table
constructor - the function call needs to be in the
FROM
list
However, Snowflake is clever enough to understand that a table from a table function shouldn’t be cross-joined against the other FROM
clauses.
Instead, it behaves like an implicit inner join to each source row.
To test our function, we want to run it over the simulated interaction data and see how many sessions it identifies for each user.
Then we can look at the result locally and compare it to our users
dataframe, very much like how we tested the class on the Python side.
Code:
sessionized = list(cursor.execute("""
SELECT
si.id,
count(distinct session.session_id)
FROM
simulated_interactions si,
table(timed_session(si.time) over (partition by si.id order by si.time)) session
GROUP BY si.id
"""))
sessionized[:5]
Output:
[('ba20cbab-b87d-443e-b6e6-2bebe2782e1c', 4),
('24175f96-c0f9-4e80-943f-9b000ce9ad80', 6),
('e3d07788-6e3c-4849-a92d-50d3da289033', 5),
('98112c51-ee52-4271-b51c-37c79498c5e4', 7),
('040a7bac-358b-44f9-be43-6128e073d62b', 3)]
Code:
ids, n_sessions = zip(*sessionized)
sessionized_df = pl.DataFrame({
"id": ids,
"n_sessions": n_sessions
})
joint = users.join(sessionized_df, on="id", suffix="_snowflake")
# again, there should be no output rows if we got all the users right
joint.filter(pl.col("n_sessions") != pl.col("n_sessions_snowflake"))
Output:
> shape: (0,3)
| id | interaction_time |
| --- | ---------------- |
| str | f64 |
Code:
# Snowflake can be expensive. Always drop temporary tables when you're done with them.
list(cursor.execute("DROP TABLE simulated_interactions"))
Output:
[('SIMULATED_INTERACTIONS successfully dropped.',)]
Performance Concerns
Snowflake is is usually very fast.
However, part of how it achieves that performance is by optimizing the queries it runs before actually executing them.
Putting stateful code in Snowflake limits its ability to do that, because the correctness of table functions depend on process
being called sequentially over each row of a given partition.
That means that very large partitions (like the entire query history of a given user) could present problems.
Keep this in mind when constructing queries, and try to make sure that your partitions are relatively small.
In the context of search data, this might mean requiring that sessions always end at midnight even if that would technically split one session into two, or considering which page a user is on as part of the scope of their session.
Often the errors that such simplifications introduce into your data are extremely small for the performance gains that they allow.
In addition to the limits on parallelism, Python code in general is quite slow compared to native code.
Snowflake allows us to optimize our code to operate on batches so we can leverage things like pandas
to speed up our code, but that doesn’t work for UDTFs because we need mutable state.
Conclusion
Snowflake has done a great job making data more accessible to the people who need it. Rather than needing to build complex data pipelines or carefully design new database indices every time an analyst has a new question they need answered, they can write queries themselves and have confidence that Snowflake will run them fast, even over massive datasets. Table functions in Python take that accessibility story even further, by allowing analysts and data scientists to ask questions that, without Snowflake, would have needed online stream processing platforms or custom instrumentation of application code. I’m excited to see how they move that story forward next.