Relational Operators
Palimpzest supports a variety of traditional relational operators that can be used in conjunction with semantic operators to perform data processing tasks. These operators include map(), flat_map(), filter(), join(), groupby(), project(), limit(), distinct(), and aggregations (e.g. count(), average(), min(), and max()).
For each operator, we will use the following dataset of customer orders as a running example:
orders = [
{"order_id": 1, "customer_id": 101, "status": "shipped", "items": ["shoes", "t-shirt"], "item_amounts": [200.0, 50.0]},
{"order_id": 2, "customer_id": 102, "status": "pending", "items": ["necklace", "shoes", "phone charger"], "item_amounts": [100.0, 250.0, 25.0]},
{"order_id": 3, "customer_id": 101, "status": "delivered", "items": ["espresso machine"], "item_amounts": [300.0]},
{"order_id": 4, "customer_id": 103, "status": "shipped", "items": ["dog food", "t-shirt"], "item_amounts": [60.0, 50.0]},
{"order_id": 5, "customer_id": 104, "status": "delivered", "items": ["book", "headphones"], "item_amounts": [20.0, 150.0]},
]
We share an overview of how to implement each relational operator below.
Relational Map and Flat Map
map() and flat_map()- Both operators take a
pz.Datasetas input and produce a new dataset with the same number of rows (formap()) or potentially more rows (forflat_map()). - Both operators require a user-defined function (UDF) that specifies how to transform each row in the input dataset.
- The output schema of the new dataset can be specified via the
colsparameter, which is a list of column specifications (dictionaries withname,type, anddesckeys). - The user-defined function will be provided with a row from the input dataset as a dictionary of keys and values, and should return a dictionary of keys and values corresponding to the output schema (i.e.
cols).
Suppose we wish to compute the total amount for each order in the orders dataset. We can use the map() operator to achieve this as follows:
import palimpzest as pz
# create dataset from list of orders
ds = pz.MemoryDataset(id="orders", vals=orders)
# use map to compute total amount for each order
ds = ds.map(
udf=lambda row: {"total_amount": sum(row["item_amounts"])},
cols=[
{"name": "total_amount", "type": float, "desc": "The total amount for the order"},
],
)
# execute the program
output = ds.run(max_quality=True)
print(output.to_df())
This produces a new dataset with the total_amount field computed for each order in the input dataset:
order_id customer_id status items item_amounts total_amount
0 2 102 pending [necklace, shoes, phone charger] [100.0, 250.0, 25.0] 375.0
1 5 104 delivered [book, headphones] [20.0, 150.0] 170.0
2 3 101 delivered [espresso machine] [300.0] 300.0
3 4 103 shipped [dog food, t-shirt] [60.0, 50.0] 110.0
4 1 101 shipped [shoes, t-shirt] [200.0, 50.0] 250.0
As another example, suppose we wish to create a new dataset with one record for each item in each order. We can use the flat_map() operator to achieve this as follows:
import palimpzest as pz
# create dataset from list of orders
ds = pz.MemoryDataset(id="orders", vals=orders)
# use flat_map to create a new dataset with one record for each item in each order
ds = ds.flat_map(
udf=lambda row: [
{"item": item, "amount": amount}
for item, amount in zip(row["items"], row["item_amounts"])
],
cols=[
{"name": "item", "type": str, "desc": "An item in an order"},
{"name": "amount", "type": float, "desc": "The amount for the item"},
],
)
ds = ds.project(["order_id", "customer_id", "status", "item", "amount"])
# execute the program
output = ds.run(max_quality=True)
print(output.to_df())
This produces a new dataset with one record for each item in each order:
order_id customer_id status item amount
0 2 102 pending phone charger 25.0
1 5 104 delivered headphones 150.0
2 1 101 shipped t-shirt 50.0
3 2 102 pending shoes 250.0
4 3 101 delivered espresso machine 300.0
5 2 102 pending necklace 100.0
6 1 101 shipped shoes 200.0
7 4 103 shipped dog food 60.0
8 4 103 shipped t-shirt 50.0
9 5 104 delivered book 20.0
Note that we need to use project() in order to drop the list fields items and item_amounts, which would be duplicated across rows with the same order_id. In the future we may provide convenience parameters to flat_map() to automatically drop such fields.
Relational Filter
filter()- The filter operator takes a
pz.Datasetas input and produces a new dataset with only rows that satisfy the filter predicate. - The operator requires a user-defined function (UDF) that applies the filter predicate to each row in the input dataset.
- The user-defined function will be provided with a row from the input dataset as a dictionary of keys and values, and should return
TrueorFalseindicating whether or not the row satisfies the predicate.
Suppose we wish to filter the orders dataset to only include orders that have been shipped. We can use the filter() operator to achieve this as follows:
import palimpzest as pz
# create dataset from list of orders
ds = pz.MemoryDataset(id="orders", vals=orders)
# use filter to retain only orders that have been shipped
ds = ds.filter(lambda row: row["status"] == "shipped")
# execute the program
output = ds.run(max_quality=True)
print(output.to_df())
This produces a new dataset with only the records for orders that have been shipped:
order_id customer_id status items item_amounts
0 1 101 shipped [shoes, t-shirt] [200.0, 50.0]
1 4 103 shipped [dog food, t-shirt] [60.0, 50.0]
Relational Join
join()- The join operator takes in a left and a right
pz.Datasetas input and produces a new dataset with rows that satisfy the join condition. - The type of join can be specified via the
howparameter, which can take on the values"inner"(default),"left","right", or"outer". - If the two datasets have overlapping field names, the duplicate fields from the right dataset will be suffixed with
_rightin the output dataset to avoid naming conflicts. - The operator only supports equi-joins on the (list of) field(s) specified in the
onparameter.
Suppose we have a second dataset containing customer information:
customers = [
{"customer_id": 101, "name": "Alice", "email": "alice123@gmail.com"},
{"customer_id": 102, "name": "Bob", "email": "bob456@gmail.com"},
{"customer_id": 103, "name": "Charlie", "email": "charlie789@gmail.com"},
{"customer_id": 104, "name": "David", "email": "david10@gmail.com"},
{"customer_id": 105, "name": "Eve", "email": "eve11@gmail.com"},
]
We can write the following PZ program to join the orders and customers datasets on the customer_id field:
import palimpzest as pz
# create datasets from list of orders and customers
orders_ds = pz.MemoryDataset(id="orders", vals=orders)
customers_ds = pz.MemoryDataset(id="customers", vals=customers)
# use join to combine the datasets on the customer_id field
ds = orders_ds.join(customers_ds, on="customer_id", how="outer")
# execute the program
output = ds.run(max_quality=True)
print(output.to_df())
This produces a new dataset with the joined records from the two input datasets:
order_id customer_id status items item_amounts name email
0 3 101 delivered [espresso machine] [300.0] Alice alice123@gmail.com
1 5 104 delivered [book, headphones] [20.0, 150.0] David david10@gmail.com
2 1 101 shipped [shoes, t-shirt] [200.0, 50.0] Alice alice123@gmail.com
3 4 103 shipped [dog food, t-shirt] [60.0, 50.0] Charlie charlie789@gmail.com
4 2 102 pending [necklace, shoes, phone charger] [100.0, 250.0, 25.0] Bob bob456@gmail.com
We could also execute an outer join to retain customers without orders as follows:
ds = orders_ds.join(customers_ds, on="customer_id", how="outer")
Which would produce the following output:
order_id customer_id status items item_amounts name email
0 3.0 101 delivered [espresso machine] [300.0] Alice alice123@gmail.com
1 1.0 101 shipped [shoes, t-shirt] [200.0, 50.0] Alice alice123@gmail.com
2 4.0 103 shipped [dog food, t-shirt] [60.0, 50.0] Charlie charlie789@gmail.com
3 2.0 102 pending [necklace, shoes, phone charger] [100.0, 250.0, 25.0] Bob bob456@gmail.com
4 5.0 104 delivered [book, headphones] [20.0, 150.0] David david10@gmail.com
5 NaN 105 None None None Eve eve11@gmail.com
Group By
groupby()- The operator takes a
pz.Datasetas input and produces a new dataset with one row per group. - The groupby is specified via a
pz.GroupBySigobject that defines the fields to group by, the aggregation functions to apply, and the fields to aggregate. - The aggregation functions can be any of the following:
"count","sum","average","min","max","list", or"set".
We can use the groupby() operator to group the orders dataset by the customer_id field and compute the total amount spent by each customer as follows:
import palimpzest as pz
# create dataset from list of orders
ds = pz.MemoryDataset(id="orders", vals=orders)
# use groupby to compute total amount spent by each customer
gby = pz.GroupBySig(group_by_fields=["customer_id"], agg_funcs=["sum"], agg_fields=["item_amounts"])
ds = ds.groupby(gby)
# execute the program
output = ds.run(max_quality=True)
print(output.to_df())
This produces a new dataset with one record for each customer and the total amount spent by that customer:
customer_id sum(item_amounts)
0 102 375.0
1 103 110.0
2 101 550.0
3 104 170.0
Project
The project() operator allows users to select a subset of fields from a dataset. This is useful for dropping unnecessary fields and reducing the size of the dataset. For example, we can use the project operator to drop the items and item_amounts fields from the orders dataset as follows:
import palimpzest as pz
# create dataset from list of orders
ds = pz.MemoryDataset(id="orders", vals=orders)
# use project to drop the items and item_amounts fields
ds = ds.project(["order_id", "customer_id", "status"])
# execute the program
output = ds.run(max_quality=True)
print(output.to_df())
This produces a new dataset with only the order_id, customer_id, and status fields:
order_id customer_id status
0 2 102 pending
1 5 104 delivered
2 1 101 shipped
3 4 103 shipped
4 3 101 delivered
Limit
The limit() operator allows users to restrict the number of rows in a dataset. For example, we can use the limit operator to retain only the first 2 records from the orders dataset as follows:
import palimpzest as pz
# create dataset from list of orders
ds = pz.MemoryDataset(id="orders", vals=orders)
# use limit to retain only the first 2 records
ds = ds.limit(2)
# execute the program
output = ds.run(max_quality=True)
print(output.to_df())
This produces a new dataset with only the first 2 records:
order_id customer_id status items item_amounts
0 1 101 shipped [shoes, t-shirt] [200.0, 50.0]
1 2 102 pending [necklace, shoes, phone charger] [100.0, 250.0, 25.0]
Distinct
The distinct() operator allows users to remove duplicate rows from a dataset, possibly based on a subset of fields. For example, we can use the distinct operator to compute the distinct customer IDs in the orders dataset as follows:
import palimpzest as pz
# create dataset from list of orders
ds = pz.MemoryDataset(id="orders", vals=orders)
# use distinct to compute the distinct customer IDs
ds = ds.project(["customer_id"])
ds = ds.distinct(["customer_id"])
# execute the program
output = ds.run(max_quality=True)
print(output.to_df())
This produces a new dataset with only the distinct customer IDs:
customer_id
0 104
1 102
2 103
3 101
Aggregations
Palimpzest supports the following aggregation functions:
count(): Counts the number of rows in a dataset.sum(): Computes the summation of items in a dataset.average(): Computes the average of items in a dataset.min(): Computes the minimum value of items in a dataset.max(): Computes the maximum value of items in a dataset.
At the moment, each of sum(), average(), min(), and max() only supports aggregating over a single field of type int or float.
We provide an example of using each aggregation function below:
import palimpzest as pz
# create dataset from list of orders
ds = pz.MemoryDataset(id="orders", vals=orders)
# use count to compute the number of orders
count_ds = ds.count()
# compute the min / max order_id
min_ds = ds.project(["order_id"]).min()
max_ds = ds.project(["order_id"]).max()
# compute the sum / average of all item amounts
item_ds = ds.project(["item_amounts"]).flat_map(
udf=lambda row: [{"item_amount": amount} for amount in row["item_amounts"]],
cols=[{"name": "item_amount", "type": float, "desc": "An item amount"}],
)
sum_ds = item_ds.sum()
avg_ds = item_ds.average()
# execute the programs
count_output = count_ds.run(max_quality=True)
...