library(googlesheets4)
library(tidyverse)
library(here)
library(janitor)
library(pointblank)
library(glue)
library(gt)
library(knitr)
library(lubridate)
5 Different Checks for Different Datasets
5.1 Load Packages
5.2 Let’s Be Real, Different Datasets Will Require Different Checks
While there are certain checks we’ll want to do in a vast majority of datasets,1 we’ll need different automated checks for different datasets.
This might feel like a huge wrench in our plan for automated checks, but it’s not! Let’s walk through how we can keep our pipeline moving.
5.3 Let’s Read in All Our Raw Data
# Write way less per path
<- c("palmer_penguins_improved",
file_names "wellbeing",
"telemetry")
<- map(file_names, ~{
all_datasets_different
# Create file name
<- glue("data_from_gsheets/raw/{.x}.csv")
path_for_read
# Read in the csv and clean the column names
read_csv(path_for_read) %>%
clean_names()
})
5.4 Creating ‘Flags’ For Different Kinds of Checks
Let’s start with the example we’ve seen before, checking for duplicate IDs. Let’s say we’re not concerned about duplicate IDs in the telemetry
data, but we still want to do that check in the palmer_penguins
and well_being
datasets.
We can create a “flag” for the duplicate ID check. “Flag” is just a somewhat fancy way of saying a variable that returns TRUE
when we want to perform the check and FALSE
when we don’t.
Here’s what that looks like in code. Don’t worry if there’s a lot here you don’t understand yet, we’ll go through a version step by step later!
# Need to input the datasets and the name of the id variable for each
<- c("individual_id","participant_id", "id")
id_var_names
<- c(rep(TRUE, 2),rep(FALSE, 1))
check_dupe_ids
<- pmap(list(all_datasets_different,
all_agents_conditional
id_var_names, ~{
check_dupe_ids),
if(..3 == TRUE) {
1 %>%
..create_agent(
label = "Check for unique ids",
actions = action_levels(warn_at = 1)
%>%
) rows_distinct(
columns = vars(!!!..2)
%>%
) interrogate()
else {
}
data.frame(no_check = TRUE) %>%
gt() %>%
tab_header("No Check for Duplicate IDs")
}
})
3]] all_agents_conditional[[
No Check for Duplicate IDs |
no_check |
---|
TRUE |
We have to introduce a new extension of the map()
function: pmap()
.
Remember how we had map2()
when we had two variables we wanted to iterate over at the same time? Rather than have map3()
for 3 inputs, map4()
for 4 inputs, etc. we can use pmap with however many inputs we want.
The biggest differences when using pmap is that instead of.x
and .y
we could use ..1
for the first input, ..2
for the second input, etc.
We then use an if else
statement to only perform the check for duplicate IDs when our flag equals TRUE
. If it’s false we create a table output explicitly saying the check wasn’t performed.
5.5 Creating More Scalable, Automated Checks
This is a great start, and I could imagine this getting unwieldy with even one more kind of check. If we wanted to add a variety of checks that do and don’t apply to different datasets this code would become an impossible mess.
Let’s add another check and create a more scalable version of this code.
First let’s create the values we’ll need for our checks. We’ll need to grab the names of the columns we’re checking.
# Create necessary input for id variable names
<- c("individual_id","participant_id", "id")
id_var_names
# Get the names of variables whose values we want to check
<- list(all_datasets_different[[1]] %>%
vars_value_check select(starts_with("culmen")) %>%
names(),
2]] %>%
all_datasets_different[[select(starts_with("wellbeing")) %>%
names(),
3]] %>%
all_datasets_different[[select(starts_with("accelerometer")) %>%
names())
But we know how to spot non-scalable copy-paste + coding now! Let’s create a couple functions and map over them to get those column names more efficiently.
<- map(all_datasets_different, ~{
id_var_names
%>%
.x select(ends_with("id")) %>%
names()
})
<- list("culmen","wellbeing","accelerometer")
var_start_with
<- map2(all_datasets_different, var_start_with, ~{
vars_value_check
%>%
.x select(starts_with(.y)) %>%
names()
})
Notice how the ends_with("id")
call is only possible because all our ID variables end with the characters “id”
Naming things well and consistently is just as much a part of data engineering as writing code part 87!
Anyway, let’s create the flags for our checks. We’ll use the rep()
function to reduce keystrokes and make our flags more scalable.
# Create check flags
<- c(rep(TRUE, 2),rep(FALSE, 1))
check_dupe_ids
<- c(rep(FALSE, 1), rep(TRUE, 2)) check_var_range
And now we get to pmap()
. First we’ll create a function that uses the ..1
, ..2
, ..3
etc. to make our function work.
# Create more scalable pmap
<- pmap(.l = list(all_datasets_different,
all_agents_conditional
id_var_names,
check_dupe_ids,
check_var_range,
vars_value_check),~{
1 %>%
..create_agent(
label = "Conduct automated checks",
actions = action_levels(warn_at = 1)
%>%
) if (..3 == TRUE)
{rows_distinct(.,
columns = vars(!!!..2)
)else .
%>%
} if (..4 == TRUE)
{::col_vals_between(.,
pointblankcolumns = ..5,
left = -4,
right = 4,
na_pass = TRUE
)else .
%>%
} interrogate()
})
This code will technically run, give us the right output, and use functions to make it more scalable.
It’s also really hard to figure out which variable is which and what is happening! Let’s create a version where we name all the variables we’re passing into pmap()
instead.
<- pmap(.l = list(dataset_for_check = all_datasets_different,
all_agents_conditional id = id_var_names,
check_dupe_ids = check_dupe_ids,
check_var_range = check_var_range,
vars_value_check = vars_value_check),
.f = function(dataset_for_check,
id,
check_dupe_ids,
check_var_range,
vars_value_check) {
%>%
dataset_for_check create_agent(
label = "Conduct automated tests",
actions = action_levels(warn_at = 1)
%>%
) if (check_dupe_ids == TRUE)
{rows_distinct(.,
columns = vars(!!!id)
)else .
%>%
} if (check_var_range == TRUE)
{::col_vals_between(.,
pointblankcolumns = vars_value_check,
left = -4,
right = 4,
na_pass = TRUE
)else .
%>%
} interrogate()
})
5.6 A Deep Dive Into pmap()
Still probably intimidating if you’ve never seen a pmap()
function before.2 Let’s break down each part of this function using plain language.
This part of the function lets us name our inputs. We can either give them shorter names (e.g., “id” instead of “id_var_names”) or keep them the same (“check_var_range”).
# .l = list(dataset_for_check = all_datasets_different,
# id = id_var_names,
# check_dupe_ids = check_dupe_ids,
# check_var_range = check_var_range,
# vars_value_check = vars_value_check)
This part of the function let’s pmap
know we’re going to execute a function using the parameters we just created instead of ..l
etc. Naming these parameters here lets us use those names in the function itself, which is way easier to understand than ..1
or ..2
.
# .f = function(dataset_for_check,
# id,
# check_dupe_ids,
# check_var_range,
# vars_value_check
# )
In this chunk of the code we take a dataset from our list of raw datasets and create an agent using the {pointblank}
package. We also tell the process to warn us if there’s even one failure.3
# dataset_for_check %>%
# create_agent(
# label = "Conduct automated tests",
# actions = action_levels(warn_at = 1)
# )
This looks a bit different than anything we’ve seen so far. What this code allows us to do is say “Hey, if we said we wanted to check if there were duplicate IDs4 then check if there are duplicate IDs5. Otherwise, just pass the dataframe through the pipleline as is without checking for duplicate IDs[else .
].”
Another note, the !!!id
looks a bit wild, and you don’t have to fully understand what’s happening there right now. All you need to know is this is one way to “unquote” variables that are passed in as character values6 but for coding purposes need to not have quotes around them.7 For more in depth discussion of this topic you can check out this resource.
# {if (check_dupe_ids == TRUE)
# rows_distinct(.,
# columns = vars(!!!id)
# )
# else .
# }
This chunk follows the same logic as the previous chunk, just for a different kind of check. “Hey, if we said we wanted to check if some columns only had values inside of a certain range8 then check the columns we specify for only values between a certain range9. Otherwise, just pass the dataframe through the pipleline as is without checking for if some columns only had values inside of a certain range[else .
].”
# {if (check_var_range == TRUE)
# pointblank::col_vals_between(.,
# columns = vars_value_check,
# left = -4,
# right = 4,
# na_pass = TRUE
# )
# else .
# }
And this final bit produces the reports for each dataset!
#interrogate()
5.7 Looking at Our Reports
Lets look at those reports now.
# Breaking my own rule about not copy/pasting code here since if I try
# to print list of agents all at once the formatting looks terrible
# in the book
1]] all_agents_conditional[[
2]] all_agents_conditional[[
Pointblank Validation | |||||||||||||
Conduct automated tests
tibbleWARN
1
STOP
—
NOTIFY
—
|
|||||||||||||
STEP | COLUMNS | VALUES | TBL | EVAL | UNITS | PASS | FAIL | W | S | N | EXT | ||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1 | rows_distinct()
|
— |
|
✓ |
238 |
234 0.98 |
4 0.02 |
● |
— |
— |
|||
2 | col_vals_between()
|
|
✓ |
238 |
238 1.00 |
0 0.00 |
○ |
— |
— |
— | |||
3 | col_vals_between()
|
|
✓ |
238 |
238 1.00 |
0 0.00 |
○ |
— |
— |
— | |||
4 | col_vals_between()
|
|
✓ |
238 |
238 1.00 |
0 0.00 |
○ |
— |
— |
— | |||
5 | col_vals_between()
|
|
✓ |
238 |
238 1.00 |
0 0.00 |
○ |
— |
— |
— | |||
6 | col_vals_between()
|
|
✓ |
238 |
238 1.00 |
0 0.00 |
○ |
— |
— |
— | |||
7 | col_vals_between()
|
|
✓ |
238 |
238 1.00 |
0 0.00 |
○ |
— |
— |
— | |||
8 | col_vals_between()
|
|
✓ |
238 |
238 1.00 |
0 0.00 |
○ |
— |
— |
— | |||
9 | col_vals_between()
|
|
✓ |
238 |
238 1.00 |
0 0.00 |
○ |
— |
— |
— | |||
10 | col_vals_between()
|
|
✓ |
238 |
238 1.00 |
0 0.00 |
○ |
— |
— |
— | |||
11 | col_vals_between()
|
|
✓ |
238 |
238 1.00 |
0 0.00 |
○ |
— |
— |
— | |||
2023-09-27 10:40:32 MDT < 1 s 2023-09-27 10:40:33 MDT |
3]] all_agents_conditional[[
Pointblank Validation | |||||||||||||
Conduct automated tests
tibbleWARN
1
STOP
—
NOTIFY
—
|
|||||||||||||
STEP | COLUMNS | VALUES | TBL | EVAL | UNITS | PASS | FAIL | W | S | N | EXT | ||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1 | col_vals_between()
|
|
✓ |
1K |
1K 1.00 |
0 0.00 |
○ |
— |
— |
— | |||
2 | col_vals_between()
|
|
✓ |
1K |
1K 0.99 |
1 0.01 |
● |
— |
— |
||||
3 | col_vals_between()
|
|
✓ |
1K |
1K 0.99 |
2 0.01 |
● |
— |
— |
||||
2023-09-27 10:40:33 MDT < 1 s 2023-09-27 10:40:33 MDT |
We now have automated, different checks across multiple datasets!
We didn’t check whether values fell within a certain range in the palmer_penguins
data, we didn’t check for duplicate IDs in our telemetry
data, and we performed both of those checks in the wellbeing
data.
We also caught duplicate IDs in both datasets where we checked, along with a couple of columns with values way outside of our specified range in telemetry
.
5.8 Preprocessing Based on Tests
Let’s apply the necessary preprocessing to the datasets, also using pmap()
with preprocessing flags.
This applies the same logic as our initial checks but this time we’ll do a preprocessing step or not based on the flag.
For example, we won’t run distinct()
on the telemetry dataset because we aren’t concerned about duplicate IDs in that data.
<- pmap(.l = list(dataset_for_clean = all_datasets_different, id = id_var_names,
all_datasets_processed clean_dupe_ids = check_dupe_ids, clean_var_range = check_var_range,
vars_value_check = vars_value_check),
.f = function(dataset_for_clean,
id,
clean_dupe_ids,
clean_var_range,
vars_value_check) {
%>%
dataset_for_clean if (clean_dupe_ids == TRUE)
{distinct(.,
pick(contains(id)), .keep_all = TRUE)
else .
%>%
} if (clean_var_range == TRUE)
{mutate(.,
across(
.cols = c(!!!vars_value_check),
.fns = ~case_when(
> 4 ~ 4.0,
.x < -4 ~ -4.0,
.x TRUE ~ .x
)
)
)else .
}
})
And finally let’s re-run our tests
<- pmap(.l = list(dataset_for_check = all_datasets_processed, id = id_var_names,
all_agents_processed check_dupe_ids = check_dupe_ids, check_var_range = check_var_range,
vars_value_check = vars_value_check),
.f = function(dataset_for_check,
id,
check_dupe_ids,
check_var_range,
vars_value_check) {
%>%
dataset_for_check create_agent(
label = "Conduct automated tests",
actions = action_levels(warn_at = 1)
%>%
) if (check_dupe_ids == TRUE)
{rows_distinct(.,
columns = vars(!!!id)
)else .
%>%
} if (check_var_range == TRUE)
{::col_vals_between(.,
pointblankcolumns = vars_value_check,
left = -4,
right = 4,
na_pass = TRUE
)else .
%>%
} interrogate()
})
1]] all_agents_processed[[
Pointblank Validation | |||||||||||||
Conduct automated tests
tibbleWARN
1
STOP
—
NOTIFY
—
|
|||||||||||||
STEP | COLUMNS | VALUES | TBL | EVAL | UNITS | PASS | FAIL | W | S | N | EXT | ||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1 | rows_distinct()
|
— |
|
✓ |
190 |
190 1.00 |
0 0.00 |
○ |
— |
— |
— | ||
2023-09-27 10:40:35 MDT < 1 s 2023-09-27 10:40:35 MDT |
2]] all_agents_processed[[
Pointblank Validation | |||||||||||||
Conduct automated tests
tibbleWARN
1
STOP
—
NOTIFY
—
|
|||||||||||||
STEP | COLUMNS | VALUES | TBL | EVAL | UNITS | PASS | FAIL | W | S | N | EXT | ||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1 | rows_distinct()
|
— |
|
✓ |
236 |
236 1.00 |
0 0.00 |
○ |
— |
— |
— | ||
2 | col_vals_between()
|
|
✓ |
236 |
236 1.00 |
0 0.00 |
○ |
— |
— |
— | |||
3 | col_vals_between()
|
|
✓ |
236 |
236 1.00 |
0 0.00 |
○ |
— |
— |
— | |||
4 | col_vals_between()
|
|
✓ |
236 |
236 1.00 |
0 0.00 |
○ |
— |
— |
— | |||
5 | col_vals_between()
|
|
✓ |
236 |
236 1.00 |
0 0.00 |
○ |
— |
— |
— | |||
6 | col_vals_between()
|
|
✓ |
236 |
236 1.00 |
0 0.00 |
○ |
— |
— |
— | |||
7 | col_vals_between()
|
|
✓ |
236 |
236 1.00 |
0 0.00 |
○ |
— |
— |
— | |||
8 | col_vals_between()
|
|
✓ |
236 |
236 1.00 |
0 0.00 |
○ |
— |
— |
— | |||
9 | col_vals_between()
|
|
✓ |
236 |
236 1.00 |
0 0.00 |
○ |
— |
— |
— | |||
10 | col_vals_between()
|
|
✓ |
236 |
236 1.00 |
0 0.00 |
○ |
— |
— |
— | |||
11 | col_vals_between()
|
|
✓ |
236 |
236 1.00 |
0 0.00 |
○ |
— |
— |
— | |||
2023-09-27 10:40:35 MDT < 1 s 2023-09-27 10:40:36 MDT |
3]] all_agents_processed[[
Pointblank Validation | |||||||||||||
Conduct automated tests
tibbleWARN
1
STOP
—
NOTIFY
—
|
|||||||||||||
STEP | COLUMNS | VALUES | TBL | EVAL | UNITS | PASS | FAIL | W | S | N | EXT | ||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1 | col_vals_between()
|
|
✓ |
1K |
1K 1.00 |
0 0.00 |
○ |
— |
— |
— | |||
2 | col_vals_between()
|
|
✓ |
1K |
1K 1.00 |
0 0.00 |
○ |
— |
— |
— | |||
3 | col_vals_between()
|
|
✓ |
1K |
1K 1.00 |
0 0.00 |
○ |
— |
— |
— | |||
2023-09-27 10:40:36 MDT < 1 s 2023-09-27 10:40:36 MDT |
5.9 Finishing the Pipeline by Writing the New Datasets to Processed Data…
# Write way less per path
<- c("palmer_penguins_improved",
file_names "wellbeing",
"telemetry")
# Start with an empty list of file paths
<- list()
output_paths_processed
# Create the for loop that will create a populated list of file paths
for (file_name in file_names) {
# Create the string needed for the file path using `glue`
<- glue("data_from_gsheets/processed/{file_name}.csv")
path_init
# Create the path itself using `here`
# Note, we could do this all at once but it's less readable
<- here::here(path_init)
path
# Append (or add to the end) the current path to the formerly empty list
<- append(output_paths_processed, path)
output_paths_processed
}
map2(all_datasets_processed, output_paths_processed, ~write_csv(.x, .y))
5.10 And Writing Automated Reports to Their Folder
# Start with an empty list of file paths
<- list()
report_paths_processed
# Create the for loop that will create a populated list of file paths
for (file_name in file_names) {
# Create the string needed for the file path using `glue`
<- glue("reports/processed/{now()}_{file_name}_report.html")
path_init
# Create the path itself using `here`
# Note, we could do this all at once but it's less readable
<- here::here(path_init)
path
# Append (or add to the end) the current path to the formerly empty list
<- append(report_paths_processed, path)
report_paths_processed
}
# Commented out here so we don't create a new version of the report during every reload
# map2(all_agents_processed, report_paths_processed, ~export_report(.x, .y))
5.11 Conclusion
We now have all the building blocks necessary to create a scalable, reproducible pipeline with automated checks across all datasets from a single type of source. {pointblank}
has a dizzying number of kinds of tests you can perform on your data, and you can learn more from its docs.
Many organizations will understandably stop here. They get there data from one kind of source10 and will plan to run this pipeline manually whenever a lot of new data comes in. They’ll also add new checks as they go.
- However, there are still three more advanced topics this resource might cover:
- Automated preprocessing based on failed checks + retesting after that preprocessing
- Getting data from multiple kinds of sources before testing + preprocessing
- Automatically running your data pipeline on a schedule.
- If there’s massive demand and I have more time than I anticipate, I might:
- Add a case study for data engineering in R
- Create a companion version of this textbook using Python + SQL to help people bridge their data engineering skills to the far more commonly used tools at larger scales.
Like check for duplicate IDs↩︎
Or even if you have!↩︎
Though we don’t stop the process at all becuase we want all the checks to run even if an early one “fails”↩︎
if (check_dupe_ids == TRUE)
↩︎rows_distinct(.,columns = vars(!!!id))
↩︎like “individual_id”↩︎
individual_id↩︎
if (check_var_range == TRUE)
↩︎pointblank::col_vals_between(.,columns = vars_value_check,left = -4,right = 4,na_pass = TRUE)
↩︎like Qualtrics↩︎