ETL, and calling it

etl
analysis
functions
Author

Henrik Vitus Bering Laursen

Published

March 25, 2026

1 Purpose

It’s been a while!

I took some time to really focus on job searching, but I don’t want to lose my programming skills, so I’m going to do some more posts and set aside and hour each day at least to work on this.

So apparently there is a name for a more structured version of a lot of the work with data that I have taught myself (with a little help from colleagues) since 2019: Extract, Transform, Load (ETL).

So I thought that this post should be a process of me learning from the aggregate experiences of R coders everywhere, through the power of AI, how to do this, by asking AI to generate an example of ETL for me, and then me learning about it and figuring out how it works.

1.1 So, what is ETL?

ETL is taking data and doing the following:

  • put them in a place where they can easily be retrieved to measure their problems
  • clean them up and standardise the contents according to predefined criteria (which can be updated as outliers of the criteria are identified)
  • prove they are cleaned up
  • store them in a place where they can be used
  • provide a description of the problems in the data

I have most frequently worked with the data types below, so I want to recreate some of it in a messy state here, and go through the ETL process.

  • Hospital admissions & costs
  • Diagnosis
  • Laboratory results
  • Demographic and socioeconomic
  • Causes of death

2 The process

The process that the AI spit out in very short time needed some adjustments, and some time wrapping my head around, but the general idea is the following:

  1. Create simulated data
  2. Create a modular set of scripts that contain functions that fit together
  3. Have a single script run those modules
  4. Output reports and logs with a run_id suffix

And since this is very code-heavy, I made sure to enable code-folding, to make this post readable.

2.1 Packages and generating some data

First I need make sure I have the necessary packages to do this, and to generate some messy data to work with.

Show the code
# List the packages
packages <- c(
  "readr",
  "dplyr",
  "stringr",
  "lubridate",
  "janitor",
  "arrow",
  "digest",
  "logger",
  "jsonlite",
  "purrr",
  "tibble",
  "here"
)

# Load the packages
lapply(packages, function(pkg) {
  if (!require(pkg, character.only = TRUE)) {
    install.packages(pkg)
    library(pkg, character.only = TRUE)
  }
})

Now let’s generate the following data for patients:

  • Characteristics
  • Visit
  • Lab results

These will be placed in the raw folder of this post. We also need folders for the staging and warehouse data, which will be the prepped, cleaned, and stored in parquet format (which I haven’t worked with yet and am very excited to try), and ready-to-export data, respectively.

Show the code
set.seed(1)

# set the path for data
raw_dir <- here::here("posts", "2026-03-25-etl", "data", "raw")
staging_dir <- here::here("posts", "2026-03-25-etl", "data", "staging")
warehouse_dir <- here::here("posts", "2026-03-25-etl", "data", "warehouse")

# create directory for the data
dir.create(raw_dir, recursive = TRUE, showWarnings = FALSE)
dir.create(staging_dir, recursive = TRUE, showWarnings = FALSE)
dir.create(warehouse_dir, recursive = TRUE, showWarnings = FALSE)

patients <- tibble(
  patient_id = sample(c(paste0("P", 1:500), "P10", "P10 "), 800, TRUE),
  sex = sample(c("M", "F", "male", "female", "Unknown", ""), 800, TRUE),
  dob = sample(c("1980-01-01", "01/02/1975", "1970-13-01", "", NA), 800, TRUE),
  region = sample(
    c(
      "Nordjylland",
      "Midtjylland",
      "Syddanmark",
      "Sjælland",
      "Hovedstaden",
      "??"
    ),
    800,
    TRUE
  )
)

visits <- tibble(
  patient_id = sample(c(paste0("P", 1:500), "P999"), 2000, TRUE),
  visit_date = sample(
    c("2025-01-01", "01/02/2025", "2025-02-30", "", NA),
    2000,
    TRUE
  ),
  clinic_code = sample(c("AAH", "AUH", "OUH", "RH", "UNK", ""), 2000, TRUE),
  visit_type = sample(
    c("outpatient", "inpatient", "ER", "OPD", "IPD"),
    2000,
    TRUE
  )
)

labs <- tibble(
  patient_id = sample(c(paste0("P", 1:500), "P888"), 4000, TRUE),
  sample_date = sample(c("2025-01-05", "05/01/2025", "", NA), 4000, TRUE),
  test = sample(c("hba1c", "HbA1c", "creatinine", "eGFR"), 4000, TRUE),
  value = rnorm(4000, 50, 20),
  unit = sample(c("mmol/mol", "%", "umol/L", "mL/min/1.73m2", ""), 4000, TRUE)
)

write_csv(patients, file.path(raw_dir, "patients.csv"))
write_csv(visits, file.path(raw_dir, "visits.csv"))
write_csv(labs, file.path(raw_dir, "labs.csv"))

2.2 Helpers

Some helper functions will be beneficial to make the data cleaning easier. These can be adjusted if for example it is found that some date values have other formats than anticipated.

Show the code
# run id to use in the quarantining
init_logger <- function(run_id, log_dir = "logs") {
  dir.create(log_dir, showWarnings = FALSE, recursive = TRUE)
  log_appender(appender_tee(file.path(log_dir, paste0("etl_", run_id, ".log"))))
  log_layout(layout_simple)
  log_threshold(INFO)
}

# transforming patient id into a unique value
hash_id <- function(x) {
  vapply(as.character(x), digest, FUN.VALUE = character(1), algo = "xxhash64")
}

# parsing some examples of wrong dates
# this function can be updated as one finds more types of wrong dates that need to be parsed
parse_date_safe <- function(x) {
  suppressWarnings(
    as.Date(parse_date_time(x, orders = c("Y-m-d", "d/m/Y", "d-m-Y")))
  )
}

# basic function to give a message if a certain condition is not met
assert <- function(condition, msg) {
  if (!isTRUE(condition)) stop(msg, call. = FALSE)
}

# to write a report to a list with different items defined by report_list
write_quality_report <- function(path, report_list) {
  dir.create(dirname(path), showWarnings = FALSE, recursive = TRUE)
  writeLines(toJSON(report_list, pretty = TRUE, auto_unbox = TRUE), path)
}

# write a reason and a stage for a quarantine when working with a dataset
make_quarantine <- function(df, reason, stage, run_id) {
  df |>
    mutate(
      quarantine_reason = reason,
      quarantine_stage = stage,
      run_id = run_id,
      quarantine_time_utc = format(Sys.time(), tz = "UTC")
    )
}

# binds rows of quarantined reports
bind_quarantine <- function(...) {
  qs <- list(...)
  qs <- qs[lengths(qs) > 0]
  if (length(qs) == 0) {
    tibble()
  } else {
    bind_rows(qs)
  }
}

2.3 Loading and transforming the data

2.3.1 Extract

First, lets extract the raw csv files and turn tehm into parquet files.

Show the code
extract_raw <- function() {
  log_info("Extract: reading raw files from {raw_dir}")

  patients <- read_csv(
    file.path(raw_dir, "patients.csv"),
    show_col_types = FALSE
  ) |>
    clean_names()

  visits <- read_csv(
    file.path(raw_dir, "visits.csv"),
    show_col_types = FALSE
  ) |>
    clean_names()

  labs <- read_csv(file.path(raw_dir, "labs.csv"), show_col_types = FALSE) |>
    clean_names()

  log_info("Extract: writing staging Parquet")
  write_parquet(patients, file.path(staging_dir, "patients.parquet"))
  write_parquet(visits, file.path(staging_dir, "visits.parquet"))
  write_parquet(labs, file.path(staging_dir, "labs.parquet"))

  list(
    patients_n = nrow(patients),
    visits_n = nrow(visits),
    labs_n = nrow(labs)
  )
}

2.3.2 Validate

Then we have a function that takes that raw data and checks its contents for errors, i.e. validating that it has the overall contents we expect.

Show the code
validate_raw <- function() {
  patients <- read_parquet(file.path(staging_dir, "patients.parquet"))
  visits <- read_parquet(file.path(staging_dir, "visits.parquet"))
  labs <- read_parquet(file.path(staging_dir, "labs.parquet"))

  report <- list()

  # Basic presence checks
  assert(
    all(c("patient_id") %in% names(patients)),
    "patients missing patient_id"
  )
  assert(
    all(c("patient_id", "visit_date") %in% names(visits)),
    "visits missing required columns"
  )
  assert(
    all(c("patient_id", "sample_date", "test", "value") %in% names(labs)),
    "labs missing required columns"
  )

  # Raw metrics
  report$patients <- list(
    n = nrow(patients),
    missing_patient_id = sum(
      is.na(patients$patient_id) | str_trim(patients$patient_id) == ""
    ),
    duplicate_patient_id_rows = sum(duplicated(str_trim(patients$patient_id)))
  )

  report$visits <- list(
    n = nrow(visits),
    missing_patient_id = sum(
      is.na(visits$patient_id) | str_trim(visits$patient_id) == ""
    ),
    missing_visit_date = sum(
      is.na(visits$visit_date) | str_trim(visits$visit_date) == ""
    ),
    unknown_clinic_code = sum(
      !(visits$clinic_code %in% c("AAH", "AUH", "OUH", "RH"))
    )
  )

  report$labs <- list(
    n = nrow(labs),
    missing_patient_id = sum(
      is.na(labs$patient_id) | str_trim(labs$patient_id) == ""
    ),
    missing_sample_date = sum(
      is.na(labs$sample_date) | str_trim(labs$sample_date) == ""
    ),
    weird_units = sum(
      !(labs$unit %in% c("mmol/mol", "%", "umol/L", "mL/min/1.73m2", ""))
    )
  )

  log_info("Validate raw: {toJSON(report, auto_unbox = TRUE)}")
  #log_info("Validate raw: {unlist(report)}")
  report
}

2.3.3 Transform

Next, we have the function that takes the datasets we have extracted and named patients_raw, visits_raw, and labs_raw, and standardises them with the helper functions and specific applications of mutate.

Show the code
transform_to_warehouse <- function(run_id) {
  patients_raw <- read_parquet(file.path(staging_dir, "patients.parquet"))
  visits_raw <- read_parquet(file.path(staging_dir, "visits.parquet"))
  labs_raw <- read_parquet(file.path(staging_dir, "labs.parquet"))

  # PATIENTS --------
  patients_std <- patients_raw |>
    mutate(
      patient_id_raw = patient_id,
      sex_raw = sex,
      dob_raw = dob,
      region_raw = region,
      patient_id = str_to_upper(str_trim(patient_id)),
      sex = str_to_lower(str_trim(sex)),
      sex = case_when(
        sex %in% c("m", "male") ~ "M",
        sex %in% c("f", "female") ~ "F",
        TRUE ~ NA_character_
      ),
      dob = parse_date_safe(dob),
      region = str_trim(region),
      region = case_when(
        region %in%
          c(
            "Nordjylland",
            "Midtjylland",
            "Syddanmark",
            "Sjælland",
            "Hovedstaden"
          ) ~ region,
        TRUE ~ NA_character_
      )
    )

  q_pat_missing_id <- patients_std |>
    filter(is.na(patient_id) | patient_id == "") |>
    make_quarantine(
      reason = "Missing or blank patient_id",
      stage = "transform_patients",
      run_id = run_id
    )

  patients_ok <- patients_std |>
    filter(!(is.na(patient_id) | patient_id == ""))

  # duplicate rows beyond chosen canonical row go to quarantine
  # canonical based on many factors and should be predictable
  patients_ranked <- patients_ok %>%
    mutate(
      original_row = row_number(),
      completeness = rowSums(!is.na(across(c(sex, dob, region)))),
      has_dob = !is.na(dob),
      has_sex = !is.na(sex),
      has_region = !is.na(region)
    ) %>%
    group_by(patient_id) %>%
    arrange(
      desc(completeness),
      desc(has_dob),
      desc(has_sex),
      desc(has_region),
      original_row,
      .by_group = TRUE
    ) %>%
    mutate(row_rank_within_patient = row_number()) %>%
    ungroup()

  q_pat_duplicate <- patients_ranked |>
    filter(row_rank_within_patient > 1) |>
    make_quarantine(
      reason = "Duplicate patient row not chosen as canonical record",
      stage = "transform_patients",
      run_id = run_id
    )

  dim_patient <- patients_ranked |>
    filter(row_rank_within_patient == 1) |>
    select(patient_id, sex, dob, region) |>
    mutate(patient_sk = hash_id(patient_id)) |>
    relocate(patient_sk, .before = patient_id)

  quarantine_patient <- bind_quarantine(q_pat_missing_id, q_pat_duplicate)

  # VISITS --------
  visits_std <- visits_raw |>
    mutate(
      patient_id_raw = patient_id,
      visit_date_raw = visit_date,
      clinic_code_raw = clinic_code,
      visit_type_raw = visit_type,
      patient_id = str_to_upper(str_trim(patient_id)),
      visit_date = parse_date_safe(visit_date),
      clinic_code = str_to_upper(str_trim(clinic_code)),
      clinic_code = if_else(
        clinic_code %in% c("AAH", "AUH", "OUH", "RH"),
        clinic_code,
        NA_character_
      ),
      visit_type = str_to_lower(str_trim(visit_type)),
      visit_type = case_when(
        visit_type %in% c("outpatient", "opd") ~ "outpatient",
        visit_type %in% c("inpatient", "ipd") ~ "inpatient",
        visit_type %in% c("er") ~ "er",
        TRUE ~ NA_character_
      )
    )

  q_visit_missing_id <- visits_std |>
    filter(is.na(patient_id) | patient_id == "") |>
    make_quarantine(
      reason = "Missing or blank patient_id",
      stage = "transform_visits",
      run_id = run_id
    )

  q_visit_missing_date <- visits_std |>
    filter(!(is.na(patient_id) | patient_id == "")) |>
    filter(is.na(visit_date)) |>
    make_quarantine(
      reason = "Missing or invalid visit_date",
      stage = "transform_visits",
      run_id = run_id
    )

  visits_ok <- visits_std |>
    filter(!(is.na(patient_id) | patient_id == "")) |>
    filter(!is.na(visit_date)) |>
    mutate(patient_sk = hash_id(patient_id)) |>
    select(patient_sk, patient_id, visit_date, clinic_code, visit_type)

  quarantine_visit <- bind_quarantine(q_visit_missing_id, q_visit_missing_date)

  # LABS --------
  labs_std <- labs_raw |>
    mutate(
      patient_id_raw = patient_id,
      sample_date_raw = sample_date,
      test_raw = test,
      unit_raw = unit,
      patient_id = str_to_upper(str_trim(patient_id)),
      sample_date = parse_date_safe(sample_date),
      test = str_to_lower(str_trim(test)),
      test = case_when(
        test %in% c("hba1c", "hb a1c", "hb_a1c") ~ "hba1c",
        test %in% c("creatinine") ~ "creatinine",
        test %in% c("egfr") ~ "egfr",
        TRUE ~ NA_character_
      ),
      unit = str_trim(unit)
    )

  q_lab_missing_id <- labs_std |>
    filter(is.na(patient_id) | patient_id == "") |>
    make_quarantine(
      reason = "Missing or blank patient_id",
      stage = "transform_labs",
      run_id = run_id
    )

  q_lab_missing_date <- labs_std |>
    filter(!(is.na(patient_id) | patient_id == "")) |>
    filter(is.na(sample_date)) |>
    make_quarantine(
      reason = "Missing or invalid sample_date",
      stage = "transform_labs",
      run_id = run_id
    )

  q_lab_unknown_test <- labs_std |>
    filter(!(is.na(patient_id) | patient_id == "")) |>
    filter(!is.na(sample_date)) |>
    filter(is.na(test)) |>
    make_quarantine(
      reason = "Unknown or unmapped lab test",
      stage = "transform_labs",
      run_id = run_id
    )

  labs_ok <- labs_std |>
    filter(!(is.na(patient_id) | patient_id == "")) |>
    filter(!is.na(sample_date)) |>
    filter(!is.na(test)) |>
    mutate(patient_sk = hash_id(patient_id)) |>
    select(patient_sk, patient_id, sample_date, test, value, unit)

  quarantine_lab <- bind_quarantine(
    q_lab_missing_id,
    q_lab_missing_date,
    q_lab_unknown_test
  )

  list(
    dim_patient = dim_patient,
    fact_visit = visits_ok,
    fact_lab = labs_ok,
    quarantine_patient = quarantine_patient,
    quarantine_visit = quarantine_visit,
    quarantine_lab = quarantine_lab
  )
}

2.3.4 Further validate

Here we have a function that makes sure that:

  • the original patient id and the patient surrogate key align in dimension
  • we quarantine values from lab results and visits that represent unrealistic results or errors

And then make a report, stored in the object report.

Show the code
validate_curated <- function(dim_patient, fact_visit, fact_lab, run_id) {
  report <- list()

  report$dim_patient <- list(
    n = nrow(dim_patient),
    patient_id_unique = n_distinct(dim_patient$patient_id),
    patient_sk_unique = n_distinct(dim_patient$patient_sk),
    dup_patient_id = sum(duplicated(dim_patient$patient_id)),
    dup_patient_sk = sum(duplicated(dim_patient$patient_sk))
  )

  assert(
    report$dim_patient$dup_patient_id == 0,
    "dim_patient has duplicate patient_id"
  )
  assert(
    report$dim_patient$dup_patient_sk == 0,
    "dim_patient has duplicate patient_sk"
  )

  bad_visits_fk <- anti_join(fact_visit, dim_patient, by = "patient_sk") |>
    make_quarantine(
      reason = "Visit row has patient_sk not found in dim_patient",
      stage = "validate_curated",
      run_id = run_id
    )

  bad_labs_fk <- anti_join(fact_lab, dim_patient, by = "patient_sk") |>
    make_quarantine(
      reason = "Lab row has patient_sk not found in dim_patient",
      stage = "validate_curated",
      run_id = run_id
    )

  fact_visit_ok <- semi_join(fact_visit, dim_patient, by = "patient_sk")
  fact_lab_ok <- semi_join(fact_lab, dim_patient, by = "patient_sk")

  hba1c_bad <- fact_lab_ok |>
    filter(test == "hba1c") |>
    filter(
      (unit == "mmol/mol" & (value < 10 | value > 300)) |
        (unit == "%" & (value < 2 | value > 30))
    ) |>
    make_quarantine(
      reason = "HbA1c outside plausible range for stated unit",
      stage = "validate_curated",
      run_id = run_id
    )

  fact_lab_ok <- fact_lab_ok |>
    anti_join(
      hba1c_bad |>
        select(patient_sk, patient_id, sample_date, test, value, unit),
      by = c("patient_sk", "patient_id", "sample_date", "test", "value", "unit")
    )

  report$referential_integrity <- list(
    visits_missing_dim = nrow(bad_visits_fk),
    labs_missing_dim = nrow(bad_labs_fk)
  )

  report$plausibility <- list(
    hba1c_out_of_range = nrow(hba1c_bad)
  )

  list(
    report = report,
    fact_visit = fact_visit_ok,
    fact_lab = fact_lab_ok,
    quarantine_visit_curated = bad_visits_fk,
    quarantine_lab_curated = bind_quarantine(bad_labs_fk, hba1c_bad)
  )
}

2.3.5 Loading function

This function writes the data to warehouse, and writes some metadata for the run.

Show the code
load_warehouse <- function(
  dim_patient,
  fact_visit,
  fact_lab,
  quarantine_patient,
  quarantine_visit,
  quarantine_lab,
  run_id
) {
  dir.create(warehouse_dir, recursive = TRUE, showWarnings = FALSE)

  write_parquet(dim_patient, file.path(warehouse_dir, "dim_patient.parquet"))
  write_parquet(fact_visit, file.path(warehouse_dir, "fact_visit.parquet"))
  write_parquet(fact_lab, file.path(warehouse_dir, "fact_lab.parquet"))

  write_parquet(
    quarantine_patient,
    file.path(warehouse_dir, "quarantine_patient.parquet")
  )
  write_parquet(
    quarantine_visit,
    file.path(warehouse_dir, "quarantine_visit.parquet")
  )
  write_parquet(
    quarantine_lab,
    file.path(warehouse_dir, "quarantine_lab.parquet")
  )

  run_log_path <- file.path(warehouse_dir, "etl_run_log.csv")

  run_row <- tibble(
    run_id = run_id,
    run_time_utc = format(Sys.time(), tz = "UTC"),
    dim_patient_n = nrow(dim_patient),
    fact_visit_n = nrow(fact_visit),
    fact_lab_n = nrow(fact_lab),
    quarantine_patient_n = nrow(quarantine_patient),
    quarantine_visit_n = nrow(quarantine_visit),
    quarantine_lab_n = nrow(quarantine_lab)
  )

  if (!file.exists(run_log_path)) {
    write_csv(run_row, run_log_path)
  } else {
    old_log <- read_csv(
      run_log_path,
      show_col_types = FALSE,
      col_types = cols(
        run_id = col_character(),
        run_time_utc = col_character(),
        dim_patient_n = col_double(),
        fact_visit_n = col_double(),
        fact_lab_n = col_double(),
        quarantine_patient_n = col_double(),
        quarantine_visit_n = col_double(),
        quarantine_lab_n = col_double()
      )
    )

    write_csv(bind_rows(old_log, run_row), run_log_path)
  }

  log_info("Load complete: wrote warehouse and quarantine tables")
}

2.3.6 Running all the functions

Now that we have built up a great amount of different functions to help us in the ETL process, it is time to put them to work.

Below, the following happens:

  • a run id is specified, which will be pasted into log names and values in data frames and lists
  • the data is extracted, standardised, transformed to parquet, an object containing the result is made, and validated and further validated data is saved
Show the code
run_id <- format(Sys.time(), "%Y%m%d_%H%M%S")
init_logger(run_id)

log_info("ETL start, run_id={run_id}")

extract_stats <- extract_raw()
raw_report <- validate_raw()

curated <- transform_to_warehouse(run_id = run_id)

curated_checked <- validate_curated(
  dim_patient = curated$dim_patient,
  fact_visit = curated$fact_visit,
  fact_lab = curated$fact_lab,
  run_id = run_id
)

quarantine_visit_all <- bind_quarantine(
  curated$quarantine_visit,
  curated_checked$quarantine_visit_curated
)

quarantine_lab_all <- bind_quarantine(
  curated$quarantine_lab,
  curated_checked$quarantine_lab_curated
)

write_quality_report(
  path = file.path(warehouse_dir, paste0("quality_report_", run_id, ".json")),
  report_list = list(
    extract = extract_stats,
    raw = raw_report,
    curated = curated_checked$report,
    quarantine_counts = list(
      patient = nrow(curated$quarantine_patient),
      visit = nrow(quarantine_visit_all),
      lab = nrow(quarantine_lab_all)
    )
  )
)

load_warehouse(
  dim_patient = curated$dim_patient,
  fact_visit = curated_checked$fact_visit,
  fact_lab = curated_checked$fact_lab,
  quarantine_patient = curated$quarantine_patient,
  quarantine_visit = quarantine_visit_all,
  quarantine_lab = quarantine_lab_all,
  run_id = run_id
)

log_info("ETL done, run_id={run_id}")

2.4 Summary

I like what the AI gave me - in spite of the numerous corrections I needed to add (none of them particularly substantial).

It is:

  • Modular
    • The AI assumed it was for a single script, so it broke up each part above and “sourced” them for the ETL run
    • My previous “ETL”’s were all self taught, so usually it started in one big file in Stata or R
      • Only creating modules to source in broad categories like “wrangle”, “tables”, “plots”, “logs”
  • Fairly complicated
    • Yet perfectly readable and understandable logic
  • Creates things my previous code does not
    • a quarantine dataset with reasons
    • log files

And I learned a lot in terms of setting up logging and quality reports.

However, I am not going to run through and check everything, just checking the quality report and some of the curated data.

2.4.1 Checking the data

Below is an example of the json formatted quality report, with basic numbers for the datasets, like amounts in raw and amounts in the validated and checked datasets.

Show the code
read_lines(file.path(warehouse_dir, "quality_report_20260327_134503.json")) |>
  prettify()
{
    "extract": {
        "patients_n": 800,
        "visits_n": 2000,
        "labs_n": 4000
    },
    "raw": {
        "patients": {
            "n": 800,
            "missing_patient_id": 0,
            "duplicate_patient_id_rows": 393
        },
        "visits": {
            "n": 2000,
            "missing_patient_id": 0,
            "missing_visit_date": 828,
            "unknown_clinic_code": 678
        },
        "labs": {
            "n": 4000,
            "missing_patient_id": 0,
            "missing_sample_date": 2069,
            "weird_units": 802
        }
    },
    "curated": {
        "dim_patient": {
            "n": 407,
            "patient_id_unique": 407,
            "patient_sk_unique": 407,
            "dup_patient_id": 0,
            "dup_patient_sk": 0
        },
        "referential_integrity": {
            "visits_missing_dim": 143,
            "labs_missing_dim": 354
        },
        "plausibility": {
            "hba1c_out_of_range": 127
        }
    },
    "quarantine_counts": {
        "patient": 393,
        "visit": 1344,
        "lab": 2550
    }
}
 

So lets look at these curated datasets. First start with curated$fact_labs, as there were plenty of variables and values there.

Show the code
curated$fact_lab
# A tibble: 1,931 × 6
   patient_sk       patient_id sample_date test  value unit         
   <chr>            <chr>      <date>      <chr> <dbl> <chr>        
 1 f9010b6e3b64d80a P23        2025-01-05  hba1c  37.2 mmol/mol     
 2 5aa3d854d3b0ad79 P284       2025-01-05  hba1c  41.8 umol/L       
 3 71f68bc43f3c3c6c P388       2025-01-05  hba1c  70.9 %            
 4 c805a8d0f1b528f5 P473       2025-01-05  hba1c  70.2 <NA>         
 5 b2c65428392feb61 P169       2025-01-05  hba1c  47.8 mL/min/1.73m2
 6 82359724201204d7 P252       2025-01-05  hba1c  26.3 <NA>         
 7 747845b89f5b80cf P401       2025-01-05  hba1c  36.4 %            
 8 2a8543dc29836683 P69        2025-01-05  hba1c  53.9 mL/min/1.73m2
 9 d5b646f40b624348 P435       2025-01-05  hba1c  29.1 mmol/mol     
10 fab7a7342c5691b5 P461       2025-01-05  hba1c  16.9 umol/L       
# ℹ 1,921 more rows

Now, as far as I remember, HbA1c is measured in either mmol/mol, or % 12. Not “mL/min/1.73m2”, which to the experienced reader sounds like an eGFR measurement. Creatinine is measured in a bunch of different ways 3, among the µmol/L.

The @chunk-generate-data chunk generated several lab measurements:

  • HbA1c
  • eGFR
  • Creatinine

How many of these are still present in the curated?

Show the code
unique(curated$fact_lab$test)
[1] "hba1c"      "creatinine" "egfr"      

But as we saw in @chunk-curated-check1, the unit variable values don’t match the test variable values.

This means that somewhere along the way, something went wrong.

Let’s find out where.

Now, this is actually quite silly, as there wouldn’t be this many errors.

Patient 169 seems to have some dubious values, let’s see what went wrong.

Show the code
filter(labs, patient_id == "P169")
# A tibble: 10 × 5
   patient_id sample_date  test       value unit           
   <chr>      <chr>        <chr>      <dbl> <chr>          
 1 P169       "05/01/2025" HbA1c       47.8 "mL/min/1.73m2"
 2 P169       ""           hba1c       63.0 "mmol/mol"     
 3 P169       ""           hba1c       59.3 "%"            
 4 P169       "05/01/2025" eGFR        52.5 ""             
 5 P169        <NA>        hba1c       58.2 "umol/L"       
 6 P169       ""           HbA1c       39.4 "%"            
 7 P169        <NA>        eGFR        54.7 "mL/min/1.73m2"
 8 P169       ""           creatinine  50.9 ""             
 9 P169       "2025-01-05" creatinine  42.7 ""             
10 P169       "05/01/2025" HbA1c       58.4 "umol/L"       
Show the code
filter(curated$fact_lab, patient_id == "P23")
# A tibble: 2 × 6
  patient_sk       patient_id sample_date test       value unit    
  <chr>            <chr>      <date>      <chr>      <dbl> <chr>   
1 f9010b6e3b64d80a P23        2025-01-05  hba1c       37.2 mmol/mol
2 f9010b6e3b64d80a P23        2025-01-05  creatinine  37.7 umol/L  

This patient went from 10 observations in labs to just 2. test and unit correspond well. Let’s take another patient.

Show the code
print(arrange(curated$fact_lab, patient_id), n = 40)
# A tibble: 1,931 × 6
   patient_sk       patient_id sample_date test       value unit         
   <chr>            <chr>      <date>      <chr>      <dbl> <chr>        
 1 e46464c232f4e578 P1         2025-01-05  hba1c       57.4 mL/min/1.73m2
 2 e46464c232f4e578 P1         2025-01-05  egfr        69.5 mL/min/1.73m2
 3 e46464c232f4e578 P1         2025-01-05  hba1c       61.2 mmol/mol     
 4 e46464c232f4e578 P1         2025-01-05  creatinine  35.2 umol/L       
 5 e46464c232f4e578 P1         2025-01-05  creatinine  60.8 <NA>         
 6 e46464c232f4e578 P1         2025-01-05  hba1c       48.4 mL/min/1.73m2
 7 6a2eda0f5a5b70fe P10        2025-01-05  creatinine  32.6 umol/L       
 8 6a2eda0f5a5b70fe P10        2025-01-05  hba1c       50.4 %            
 9 6a2eda0f5a5b70fe P10        2025-01-05  hba1c       41.7 mmol/mol     
10 6a2eda0f5a5b70fe P10        2025-01-05  egfr        64.6 <NA>         
11 6a2eda0f5a5b70fe P10        2025-01-05  creatinine  21.9 <NA>         
12 6a2eda0f5a5b70fe P10        2025-01-05  creatinine  46.2 <NA>         
13 970cfc29bd01f855 P100       2025-01-05  egfr        31.7 umol/L       
14 970cfc29bd01f855 P100       2025-01-05  egfr        44.0 mL/min/1.73m2
15 970cfc29bd01f855 P100       2025-01-05  hba1c       31.3 umol/L       
16 970cfc29bd01f855 P100       2025-01-05  creatinine  20.2 %            
17 970cfc29bd01f855 P100       2025-01-05  egfr        57.9 mL/min/1.73m2
18 970cfc29bd01f855 P100       2025-01-05  hba1c       12.5 mL/min/1.73m2
19 90f9cdb4541da3f6 P101       2025-01-05  hba1c       37.3 mL/min/1.73m2
20 90f9cdb4541da3f6 P101       2025-01-05  creatinine  31.7 mmol/mol     
21 90f9cdb4541da3f6 P101       2025-01-05  hba1c       21.6 umol/L       
22 90f9cdb4541da3f6 P101       2025-01-05  egfr        77.9 %            
23 90f9cdb4541da3f6 P101       2025-01-05  hba1c       22.3 umol/L       
24 90f9cdb4541da3f6 P101       2025-01-05  creatinine  50.1 mL/min/1.73m2
25 463c73b74acae2f3 P102       2025-01-05  hba1c       46.4 mmol/mol     
26 463c73b74acae2f3 P102       2025-01-05  hba1c       46.6 <NA>         
27 463c73b74acae2f3 P102       2025-01-05  egfr        70.8 umol/L       
28 463c73b74acae2f3 P102       2025-01-05  egfr        54.0 umol/L       
29 463c73b74acae2f3 P102       2025-01-05  hba1c       53.2 umol/L       
30 463c73b74acae2f3 P102       2025-01-05  creatinine  69.6 umol/L       
31 85eafa5b499d346a P103       2025-01-05  hba1c       12.0 %            
32 85eafa5b499d346a P103       2025-01-05  hba1c       54.4 umol/L       
33 e51f3fa73a18b8b1 P104       2025-01-05  hba1c       62.6 mmol/mol     
34 e51f3fa73a18b8b1 P104       2025-01-05  creatinine  55.3 <NA>         
35 e51f3fa73a18b8b1 P104       2025-01-05  hba1c       47.5 mL/min/1.73m2
36 cd213580ea71c5ac P105       2025-01-05  hba1c       40.4 %            
37 cd213580ea71c5ac P105       2025-01-05  hba1c       19.9 <NA>         
38 cd213580ea71c5ac P105       2025-01-05  creatinine  63.7 umol/L       
39 cd213580ea71c5ac P105       2025-01-05  creatinine  52.3 umol/L       
40 5cd074966cab62d0 P106       2025-01-05  hba1c       67.2 <NA>         
# ℹ 1,891 more rows

Here we find patient 100, who has:

  • eGFR measured in common creatinine units
  • hba1c measured in common creatinine units
  • creatinine measured in hba1c units (%)
  • hba1c measured in eGFR units

The astounding amount of errors are ofcourse due to the quick simulated nature of the data.

But this can also be used to demonstrate that maybe it is necessary to include checks for whether the test, value, and unit variables actually correspond to eachother.

For now, I will just try to include checks for whether test and unit correspond to each other, assuming that test is the variable most likely to be “true”. If unit has a missing value, the unit should be able to be inferred from the test variable. This ofcourse also assumes that the values in the value column are true, otherwise one would have to test the values against a range of what they are likely to be, given the test performed.

It can quickly become very complicated, and articifically so, with simulated data.

I will just make an updated version of the code-chunk that perfoms the operations that turn the raw data into the curated data, below. That should be the “LABS” part of @chunk-transform-to-warehouse.

Let’s first look at what it does.

Show the code
labs_raw <- read_parquet(file.path(staging_dir, "labs.parquet"))
labs_std <- labs_raw |>
  mutate(
    patient_id_raw = patient_id,
    sample_date_raw = sample_date,
    test_raw = test,
    unit_raw = unit,
    patient_id = str_to_upper(str_trim(patient_id)),
    sample_date = parse_date_safe(sample_date),
    test = str_to_lower(str_trim(test)),
    test = case_when(
      test %in% c("hba1c", "hb a1c", "hb_a1c") ~ "hba1c",
      test %in% c("creatinine") ~ "creatinine",
      test %in% c("egfr") ~ "egfr",
      TRUE ~ NA_character_
    ),
    unit = str_trim(unit)
  )

q_lab_missing_id <- labs_std |>
  filter(is.na(patient_id) | patient_id == "") |>
  make_quarantine(
    reason = "Missing or blank patient_id",
    stage = "transform_labs",
    run_id = run_id
  )

q_lab_missing_date <- labs_std |>
  filter(!(is.na(patient_id) | patient_id == "")) |>
  filter(is.na(sample_date)) |>
  make_quarantine(
    reason = "Missing or invalid sample_date",
    stage = "transform_labs",
    run_id = run_id
  )

q_lab_unknown_test <- labs_std |>
  filter(!(is.na(patient_id) | patient_id == "")) |>
  filter(!is.na(sample_date)) |>
  filter(is.na(test)) |>
  make_quarantine(
    reason = "Unknown or unmapped lab test",
    stage = "transform_labs",
    run_id = run_id
  )

labs_ok <- labs_std |>
  filter(!(is.na(patient_id) | patient_id == "")) |>
  filter(!is.na(sample_date)) |>
  filter(!is.na(test)) |>
  mutate(patient_sk = hash_id(patient_id)) |>
  select(patient_sk, patient_id, sample_date, test, value, unit)

quarantine_lab <- bind_quarantine(
  q_lab_missing_id,
  q_lab_missing_date,
  q_lab_unknown_test
)

The chunk above just standardises the contents, quarantines, and later removes, observations with missing values in critical variables like patient_id, sample_date, and test. The case_when tests are limited and some are unecessary, like test %in% c("egfr") ~ "egfr",.

What should also belong in this validation stage is checking whether the units and tests correspond to eachother.

The curated lab output should be correct after adding the following, since it is based on labs_ok:

Show the code
q_lab_wrong_unit <- labs_std |>
  filter(!(is.na(patient_id) | patient_id == "")) |>
  filter(!is.na(sample_date)) |>
  filter(
    (test == "hba1c" & unit %in% c("%", "mmol/mol")) |
      (test == "egfr" & unit == "mL/min/1.73m2") |
      (test == "creatinine" & unit == "umol/L")
  ) |>
  make_quarantine(
    reason = "Wrong unit for the test",
    stage = "transform_labs",
    run_id = run_id
  )

labs_ok <- labs_std |>
  filter(!(is.na(patient_id) | patient_id == "")) |>
  filter(!is.na(sample_date)) |>
  filter(!is.na(test)) |>
  filter(
    (test == "hba1c" & unit %in% c("%", "mmol/mol")) |
      (test == "egfr" & unit == "mL/min/1.73m2") |
      (test == "creatinine" & unit == "umol/L")
  ) |>
  mutate(patient_sk = hash_id(patient_id)) |>
  select(patient_sk, patient_id, sample_date, test, value, unit)

quarantine_lab <- bind_quarantine(
  q_lab_missing_id,
  q_lab_missing_date,
  q_lab_unknown_test,
  q_lab_wrong_unit
)

In @chunk-validate-curated, certain values of the variables that are deemed unacceptable are defined, those rows are quarantined, and subsequently removed, later in the process.

Dividing it up into stages - in other words - can make it more digestible to read through and check.

In the past, I would have done all of it in one go - but I can definitely see the idea in dividing it up. It all depends on who needs to read the code. These days, one could ask AI to refactor all of it into the shortest possible looking code, while still being readable. But that might not be what the people who are supposed to read and validate it want.

With the code change below, I think I am satisfied with the fix.

After trying those fixes out (see the refactor section in the bottom), and running all the scripts again, here are the results:

Show the code
# quality report
read_lines(file.path(warehouse_dir, "quality_report_20260327_134503.json")) |>
  prettify()
{
    "extract": {
        "patients_n": 800,
        "visits_n": 2000,
        "labs_n": 4000
    },
    "raw": {
        "patients": {
            "n": 800,
            "missing_patient_id": 0,
            "duplicate_patient_id_rows": 393
        },
        "visits": {
            "n": 2000,
            "missing_patient_id": 0,
            "missing_visit_date": 828,
            "unknown_clinic_code": 678
        },
        "labs": {
            "n": 4000,
            "missing_patient_id": 0,
            "missing_sample_date": 2069,
            "weird_units": 802
        }
    },
    "curated": {
        "dim_patient": {
            "n": 407,
            "patient_id_unique": 407,
            "patient_sk_unique": 407,
            "dup_patient_id": 0,
            "dup_patient_sk": 0
        },
        "referential_integrity": {
            "visits_missing_dim": 143,
            "labs_missing_dim": 354
        },
        "plausibility": {
            "hba1c_out_of_range": 127
        }
    },
    "quarantine_counts": {
        "patient": 393,
        "visit": 1344,
        "lab": 2550
    }
}
 
Show the code
# new data without wrong tests and units
print(curated_checked$fact_lab, n = 15)
# A tibble: 1,450 × 6
   patient_sk       patient_id sample_date test       value unit         
   <chr>            <chr>      <date>      <chr>      <dbl> <chr>        
 1 5aa3d854d3b0ad79 P284       2025-01-05  hba1c       41.8 umol/L       
 2 c805a8d0f1b528f5 P473       2025-01-05  hba1c       70.2 <NA>         
 3 b2c65428392feb61 P169       2025-01-05  hba1c       47.8 mL/min/1.73m2
 4 82359724201204d7 P252       2025-01-05  hba1c       26.3 <NA>         
 5 2a8543dc29836683 P69        2025-01-05  hba1c       53.9 mL/min/1.73m2
 6 d5b646f40b624348 P435       2025-01-05  hba1c       29.1 mmol/mol     
 7 85beec9f7f35d310 P98        2025-01-05  creatinine  54.2 umol/L       
 8 451a54ccfaefa84d P490       2025-01-05  hba1c       79.3 umol/L       
 9 1144e6f94e7dc635 P463       2025-01-05  hba1c       43.3 <NA>         
10 0668a7dc33f720f2 P2         2025-01-05  hba1c       44.3 mL/min/1.73m2
11 9f6fe7dbd000a9b9 P212       2025-01-05  egfr        44.6 umol/L       
12 8737bd33bb435976 P250       2025-01-05  creatinine  48.0 %            
13 40909080760d048d P225       2025-01-05  creatinine  72.0 umol/L       
14 54dee76ea0568c55 P222       2025-01-05  egfr        58.7 mmol/mol     
15 04f1e5129f34264c P70        2025-01-05  egfr        54.8 <NA>         
# ℹ 1,435 more rows

Validated data

So, after the data has been curated and checked, we end up with the following deltas:

  • patients: 800 to 407 fewer patients
  • labs: 4000 to 1931 fewer lab observations
  • visits: 2000 to 799 fewer visit observations

Ranges of the lab values

Haven’t checked whether the ranges themselves make sense. But again, it feels forced to check whether randomly simulated ranges are ok. But if this was a real study I prepped data for, I could:

  • check with the protocol or clinicians I am working with for whether these ranges made sense
  • Set up a function that checks whether the value is within expected ranges of the test and unit it has
  • Go through them manually with a list of expected ranges

3 Calling it quits

Another thing I have learned when making this blog, which I didn’t learn during my PhD, is when to call it. Sometimes, you have to know when to quit.

I learned more about how to set up and semi-automate a lot of ETL, but in truth, a lot of this is overkill at least compared to what I usually did for research. Except for the concept of making it modular and using helper functions that all play together, adding run_id, quarantining (especially if the studied cohort is small), and making quality reports for each run.

Below are additional reasons that I am not motivated to finish this blog post.

  • These are not real data, for a real job, no stakes
  • This hasn’t been produced as part of a greater whole, with a team, a project or trial

These blog posts should be more modular, just like the code above, and serve a clear, distinct purpose. Some specific skill I would like to learn, or code I want to understand.

Working with simulated data alone like this is just getting frustrating.

So I will end the post with the following: Learn WHEN to quit - and move on to something else. With that also ofcourse comes learning when NOT to quit.

And a plot for the thumbnail:

Show the code
library(ggplot2)
library(tidyr)

counts <- tibble(
  dataset = c("patients", "visits", "labs"),
  before = c(
    nrow(read_csv(file.path(raw_dir, "patients.csv"), show_col_types = FALSE)),
    nrow(read_csv(file.path(raw_dir, "visits.csv"), show_col_types = FALSE)),
    nrow(read_csv(file.path(raw_dir, "labs.csv"), show_col_types = FALSE))
  ),
  after = c(
    nrow(read_parquet(file.path(warehouse_dir, "dim_patient.parquet"))),
    nrow(read_parquet(file.path(warehouse_dir, "fact_visit.parquet"))),
    nrow(read_parquet(file.path(warehouse_dir, "fact_lab.parquet")))
  )
)

plot_df <- counts |>
  pivot_longer(
    cols = c(before, after),
    names_to = "stage",
    values_to = "n"
  ) |>
  mutate(
    stage = factor(stage, levels = c("before", "after")),
    dataset = factor(dataset, levels = c("patients", "visits", "labs"))
  )

ggplot(plot_df, aes(x = stage, y = n, group = dataset)) +
  geom_line() +
  geom_point(size = 3) +
  geom_text(
    data = subset(plot_df, stage == "after"),
    aes(label = dataset),
    hjust = -0.4,
    size = 5,
    show.legend = FALSE
  ) +
  expand_limits(x = 2.2) +
  labs(
    title = "ETL row counts: before and after curation",
    x = NULL,
    y = "Number of rows"
  ) +
  theme_minimal(base_size = 16)

Show the code
ggsave("thumbnail.jpg", plot = last_plot())
Saving 8 x 6 in image

4 Refactored

# chunk-generate-data ----------------------------------------------------

packages <- c(
  "readr",
  "dplyr",
  "stringr",
  "lubridate",
  "janitor",
  "arrow",
  "digest",
  "logger",
  "jsonlite",
  "purrr",
  "tibble",
  "here"
)

invisible(lapply(packages, \(pkg) {
  if (!require(pkg, character.only = TRUE)) {
    install.packages(pkg)
  }
  library(pkg, character.only = TRUE)
}))

set.seed(1)

base_dir <- here::here("posts", "2026-03-25-etl")
raw_dir <- file.path(base_dir, "data", "raw")
staging_dir <- file.path(base_dir, "data", "staging")
warehouse_dir <- file.path(base_dir, "data", "warehouse")

walk(
  c(raw_dir, staging_dir, warehouse_dir),
  dir.create,
  recursive = TRUE,
  showWarnings = FALSE
)

patients <- tibble(
  patient_id = sample(c(paste0("P", 1:500), "P10", "P10 "), 800, TRUE),
  sex = sample(c("M", "F", "male", "female", "Unknown", ""), 800, TRUE),
  dob = sample(c("1980-01-01", "01/02/1975", "1970-13-01", "", NA), 800, TRUE),
  region = sample(
    c(
      "Nordjylland",
      "Midtjylland",
      "Syddanmark",
      "Sjælland",
      "Hovedstaden",
      "??"
    ),
    800,
    TRUE
  )
)

visits <- tibble(
  patient_id = sample(c(paste0("P", 1:500), "P999"), 2000, TRUE),
  visit_date = sample(
    c("2025-01-01", "01/02/2025", "2025-02-30", "", NA),
    2000,
    TRUE
  ),
  clinic_code = sample(c("AAH", "AUH", "OUH", "RH", "UNK", ""), 2000, TRUE),
  visit_type = sample(
    c("outpatient", "inpatient", "ER", "OPD", "IPD"),
    2000,
    TRUE
  )
)

labs <- tibble(
  patient_id = sample(c(paste0("P", 1:500), "P888"), 4000, TRUE),
  sample_date = sample(c("2025-01-05", "05/01/2025", "", NA), 4000, TRUE),
  test = sample(c("hba1c", "HbA1c", "creatinine", "eGFR"), 4000, TRUE),
  value = rnorm(4000, 50, 20),
  unit = sample(c("mmol/mol", "%", "umol/L", "mL/min/1.73m2", ""), 4000, TRUE)
)

write_csv(patients, file.path(raw_dir, "patients.csv"))
write_csv(visits, file.path(raw_dir, "visits.csv"))
write_csv(labs, file.path(raw_dir, "labs.csv"))

# chunk-helpers ----------------------------------------------------------

valid_regions <- c(
  "Nordjylland",
  "Midtjylland",
  "Syddanmark",
  "Sjælland",
  "Hovedstaden"
)
valid_clinics <- c("AAH", "AUH", "OUH", "RH")

missing_id <- function(x) is.na(x) | x == ""

valid_lab_unit <- function(test, unit) {
  case_when(
    test == "hba1c" ~ unit %in% c("%", "mmol/mol"),
    test == "egfr" ~ unit == "mL/min/1.73m2",
    test == "creatinine" ~ unit == "umol/L",
    TRUE ~ FALSE
  )
}

init_logger <- function(run_id, log_dir = "logs") {
  dir.create(log_dir, showWarnings = FALSE, recursive = TRUE)
  log_appender(appender_tee(file.path(log_dir, paste0("etl_", run_id, ".log"))))
  log_layout(layout_simple)
  log_threshold(INFO)
}

hash_id <- function(x) {
  vapply(as.character(x), digest, character(1), algo = "xxhash64")
}

parse_date_safe <- function(x) {
  suppressWarnings(as.Date(parse_date_time(
    x,
    orders = c("Y-m-d", "d/m/Y", "d-m-Y")
  )))
}

assert <- function(condition, msg) {
  if (!isTRUE(condition)) stop(msg, call. = FALSE)
}

write_quality_report <- function(path, report_list) {
  dir.create(dirname(path), showWarnings = FALSE, recursive = TRUE)
  writeLines(toJSON(report_list, pretty = TRUE, auto_unbox = TRUE), path)
}

make_quarantine <- function(df, reason, stage, run_id) {
  df |>
    mutate(
      quarantine_reason = reason,
      quarantine_stage = stage,
      run_id = run_id,
      quarantine_time_utc = format(Sys.time(), tz = "UTC")
    )
}

bind_quarantine <- function(...) {
  qs <- list(...)
  qs <- qs[lengths(qs) > 0]
  if (length(qs) == 0) tibble() else bind_rows(qs)
}

# chunk-extract-raw ------------------------------------------------------

extract_raw <- function() {
  log_info("Extract: reading raw files from {raw_dir}")

  patients <- read_csv(
    file.path(raw_dir, "patients.csv"),
    show_col_types = FALSE
  ) |>
    clean_names()
  visits <- read_csv(
    file.path(raw_dir, "visits.csv"),
    show_col_types = FALSE
  ) |>
    clean_names()
  labs <- read_csv(file.path(raw_dir, "labs.csv"), show_col_types = FALSE) |>
    clean_names()

  log_info("Extract: writing staging Parquet")
  write_parquet(patients, file.path(staging_dir, "patients.parquet"))
  write_parquet(visits, file.path(staging_dir, "visits.parquet"))
  write_parquet(labs, file.path(staging_dir, "labs.parquet"))

  list(
    patients_n = nrow(patients),
    visits_n = nrow(visits),
    labs_n = nrow(labs)
  )
}

# chunk-validate ---------------------------------------------------------

validate_raw <- function() {
  patients <- read_parquet(file.path(staging_dir, "patients.parquet"))
  visits <- read_parquet(file.path(staging_dir, "visits.parquet"))
  labs <- read_parquet(file.path(staging_dir, "labs.parquet"))

  assert("patient_id" %in% names(patients), "patients missing patient_id")
  assert(
    all(c("patient_id", "visit_date") %in% names(visits)),
    "visits missing required columns"
  )
  assert(
    all(c("patient_id", "sample_date", "test", "value") %in% names(labs)),
    "labs missing required columns"
  )

  report <- list(
    patients = list(
      n = nrow(patients),
      missing_patient_id = sum(missing_id(str_trim(patients$patient_id))),
      duplicate_patient_id_rows = sum(duplicated(str_trim(patients$patient_id)))
    ),
    visits = list(
      n = nrow(visits),
      missing_patient_id = sum(missing_id(str_trim(visits$patient_id))),
      missing_visit_date = sum(missing_id(str_trim(visits$visit_date))),
      unknown_clinic_code = sum(!(visits$clinic_code %in% valid_clinics))
    ),
    labs = list(
      n = nrow(labs),
      missing_patient_id = sum(missing_id(str_trim(labs$patient_id))),
      missing_sample_date = sum(missing_id(str_trim(labs$sample_date))),
      weird_units = sum(
        !(labs$unit %in% c("mmol/mol", "%", "umol/L", "mL/min/1.73m2", ""))
      )
    )
  )

  log_info("Validate raw: {toJSON(report, auto_unbox = TRUE)}")
  report
}

# chunk-transform_to_warehouse -------------------------------------------

transform_to_warehouse <- function(run_id) {
  patients_raw <- read_parquet(file.path(staging_dir, "patients.parquet"))
  visits_raw <- read_parquet(file.path(staging_dir, "visits.parquet"))
  labs_raw <- read_parquet(file.path(staging_dir, "labs.parquet"))

  # PATIENTS
  patients_std <- patients_raw |>
    mutate(
      across(c(patient_id, sex, region), str_trim),
      patient_id_raw = patient_id,
      sex_raw = sex,
      dob_raw = dob,
      region_raw = region,
      patient_id = str_to_upper(patient_id),
      sex = case_when(
        str_to_lower(sex) %in% c("m", "male") ~ "M",
        str_to_lower(sex) %in% c("f", "female") ~ "F",
        TRUE ~ NA_character_
      ),
      dob = parse_date_safe(dob),
      region = if_else(region %in% valid_regions, region, NA_character_)
    )

  q_pat_missing_id <- patients_std |>
    filter(missing_id(patient_id)) |>
    make_quarantine("Missing or blank patient_id", "transform_patients", run_id)

  patients_ranked <- patients_std |>
    filter(!missing_id(patient_id)) |>
    mutate(
      original_row = row_number(),
      completeness = rowSums(!is.na(across(c(sex, dob, region)))),
      has_dob = !is.na(dob),
      has_sex = !is.na(sex),
      has_region = !is.na(region)
    ) |>
    group_by(patient_id) |>
    arrange(
      desc(completeness),
      desc(has_dob),
      desc(has_sex),
      desc(has_region),
      original_row,
      .by_group = TRUE
    ) |>
    mutate(row_rank_within_patient = row_number()) |>
    ungroup()

  q_pat_duplicate <- patients_ranked |>
    filter(row_rank_within_patient > 1) |>
    make_quarantine(
      "Duplicate patient row not chosen as canonical record",
      "transform_patients",
      run_id
    )

  dim_patient <- patients_ranked |>
    filter(row_rank_within_patient == 1) |>
    select(patient_id, sex, dob, region) |>
    mutate(patient_sk = hash_id(patient_id), .before = patient_id)

  quarantine_patient <- bind_quarantine(q_pat_missing_id, q_pat_duplicate)

  # VISITS
  visits_std <- visits_raw |>
    mutate(
      across(c(patient_id, visit_date, clinic_code, visit_type), str_trim),
      patient_id_raw = patient_id,
      visit_date_raw = visit_date,
      clinic_code_raw = clinic_code,
      visit_type_raw = visit_type,
      patient_id = str_to_upper(patient_id),
      visit_date = parse_date_safe(visit_date),
      clinic_code = if_else(
        str_to_upper(clinic_code) %in% valid_clinics,
        str_to_upper(clinic_code),
        NA_character_
      ),
      visit_type = case_when(
        str_to_lower(visit_type) %in% c("outpatient", "opd") ~ "outpatient",
        str_to_lower(visit_type) %in% c("inpatient", "ipd") ~ "inpatient",
        str_to_lower(visit_type) == "er" ~ "er",
        TRUE ~ NA_character_
      )
    )

  q_visit_missing_id <- visits_std |>
    filter(missing_id(patient_id)) |>
    make_quarantine("Missing or blank patient_id", "transform_visits", run_id)

  q_visit_missing_date <- visits_std |>
    filter(!missing_id(patient_id), is.na(visit_date)) |>
    make_quarantine("Missing or invalid visit_date", "transform_visits", run_id)

  visits_ok <- visits_std |>
    filter(!missing_id(patient_id), !is.na(visit_date)) |>
    transmute(
      patient_sk = hash_id(patient_id),
      patient_id,
      visit_date,
      clinic_code,
      visit_type
    )

  quarantine_visit <- bind_quarantine(q_visit_missing_id, q_visit_missing_date)

  # LABS
  labs_std <- labs_raw |>
    mutate(
      across(c(patient_id, sample_date, test, unit), str_trim),
      patient_id_raw = patient_id,
      sample_date_raw = sample_date,
      test_raw = test,
      unit_raw = unit,
      patient_id = str_to_upper(patient_id),
      sample_date = parse_date_safe(sample_date),
      test = case_when(
        str_to_lower(test) %in% c("hba1c", "hb a1c", "hb_a1c") ~ "hba1c",
        str_to_lower(test) == "creatinine" ~ "creatinine",
        str_to_lower(test) == "egfr" ~ "egfr",
        TRUE ~ NA_character_
      )
    )

  q_lab_missing_id <- labs_std |>
    filter(missing_id(patient_id)) |>
    make_quarantine("Missing or blank patient_id", "transform_labs", run_id)

  q_lab_missing_date <- labs_std |>
    filter(!missing_id(patient_id), is.na(sample_date)) |>
    make_quarantine("Missing or invalid sample_date", "transform_labs", run_id)

  q_lab_unknown_test <- labs_std |>
    filter(!missing_id(patient_id), !is.na(sample_date), is.na(test)) |>
    make_quarantine("Unknown or unmapped lab test", "transform_labs", run_id)

  q_lab_wrong_unit <- labs_std |>
    filter(
      !missing_id(patient_id),
      !is.na(sample_date),
      !is.na(test),
      !valid_lab_unit(test, unit)
    ) |>
    make_quarantine("Wrong unit for the test", "transform_labs", run_id)

  labs_ok <- labs_std |>
    filter(
      !missing_id(patient_id),
      !is.na(sample_date),
      !is.na(test),
      valid_lab_unit(test, unit)
    ) |>
    transmute(
      patient_sk = hash_id(patient_id),
      patient_id,
      sample_date,
      test,
      value,
      unit
    )

  quarantine_lab <- bind_quarantine(
    q_lab_missing_id,
    q_lab_missing_date,
    q_lab_unknown_test,
    q_lab_wrong_unit
  )

  list(
    dim_patient = dim_patient,
    fact_visit = visits_ok,
    fact_lab = labs_ok,
    quarantine_patient = quarantine_patient,
    quarantine_visit = quarantine_visit,
    quarantine_lab = quarantine_lab
  )
}

# chunk-validate-curated -------------------------------------------------

validate_curated <- function(dim_patient, fact_visit, fact_lab, run_id) {
  report <- list(
    dim_patient = list(
      n = nrow(dim_patient),
      patient_id_unique = n_distinct(dim_patient$patient_id),
      patient_sk_unique = n_distinct(dim_patient$patient_sk),
      dup_patient_id = sum(duplicated(dim_patient$patient_id)),
      dup_patient_sk = sum(duplicated(dim_patient$patient_sk))
    )
  )

  assert(
    report$dim_patient$dup_patient_id == 0,
    "dim_patient has duplicate patient_id"
  )
  assert(
    report$dim_patient$dup_patient_sk == 0,
    "dim_patient has duplicate patient_sk"
  )

  bad_visits_fk <- anti_join(fact_visit, dim_patient, by = "patient_sk") |>
    make_quarantine(
      "Visit row has patient_sk not found in dim_patient",
      "validate_curated",
      run_id
    )

  bad_labs_fk <- anti_join(fact_lab, dim_patient, by = "patient_sk") |>
    make_quarantine(
      "Lab row has patient_sk not found in dim_patient",
      "validate_curated",
      run_id
    )

  fact_visit_ok <- semi_join(fact_visit, dim_patient, by = "patient_sk")
  fact_lab_ok <- semi_join(fact_lab, dim_patient, by = "patient_sk") |>
    mutate(lab_row_id = row_number(), unit = str_trim(unit))

  hba1c_bad <- fact_lab_ok |>
    filter(
      test == "hba1c",
      (unit == "mmol/mol" & (value < 10 | value > 200)) |
        (unit == "%" & (value < 2 | value > 16)) |
        is.na(unit)
    ) |>
    make_quarantine(
      "HbA1c outside plausible range for stated unit",
      "validate_curated",
      run_id
    )

  fact_lab_ok <- fact_lab_ok |>
    anti_join(select(hba1c_bad, lab_row_id), by = "lab_row_id")

  report$referential_integrity <- list(
    visits_missing_dim = nrow(bad_visits_fk),
    labs_missing_dim = nrow(bad_labs_fk)
  )

  report$plausibility <- list(
    hba1c_out_of_range = nrow(hba1c_bad)
  )

  list(
    report = report,
    fact_visit = fact_visit_ok,
    fact_lab = fact_lab_ok,
    quarantine_visit_curated = bad_visits_fk,
    quarantine_lab_curated = bind_quarantine(bad_labs_fk, hba1c_bad)
  )
}

# chunk-load -------------------------------------------------------------

load_warehouse <- function(
  dim_patient,
  fact_visit,
  fact_lab,
  quarantine_patient,
  quarantine_visit,
  quarantine_lab,
  run_id
) {
  walk2(
    list(
      dim_patient,
      fact_visit,
      fact_lab,
      quarantine_patient,
      quarantine_visit,
      quarantine_lab
    ),
    file.path(
      warehouse_dir,
      c(
        "dim_patient.parquet",
        "fact_visit.parquet",
        "fact_lab.parquet",
        "quarantine_patient.parquet",
        "quarantine_visit.parquet",
        "quarantine_lab.parquet"
      )
    ),
    write_parquet
  )

  run_log_path <- file.path(warehouse_dir, "etl_run_log.csv")

  run_row <- tibble(
    run_id = run_id,
    run_time_utc = format(Sys.time(), tz = "UTC"),
    dim_patient_n = nrow(dim_patient),
    fact_visit_n = nrow(fact_visit),
    fact_lab_n = nrow(fact_lab),
    quarantine_patient_n = nrow(quarantine_patient),
    quarantine_visit_n = nrow(quarantine_visit),
    quarantine_lab_n = nrow(quarantine_lab)
  )

  if (!file.exists(run_log_path)) {
    write_csv(run_row, run_log_path)
  } else {
    old_log <- read_csv(
      run_log_path,
      show_col_types = FALSE,
      col_types = cols(
        run_id = col_character(),
        run_time_utc = col_character(),
        dim_patient_n = col_double(),
        fact_visit_n = col_double(),
        fact_lab_n = col_double(),
        quarantine_patient_n = col_double(),
        quarantine_visit_n = col_double(),
        quarantine_lab_n = col_double()
      )
    )
    write_csv(bind_rows(old_log, run_row), run_log_path)
  }

  log_info("Load complete: wrote warehouse and quarantine tables")
}

# chunk-run --------------------------------------------------------------

run_id <- format(Sys.time(), "%Y%m%d_%H%M%S")
init_logger(run_id)

log_info("ETL start, run_id={run_id}")

extract_stats <- extract_raw()
raw_report <- validate_raw()
curated <- transform_to_warehouse(run_id)

curated_checked <- validate_curated(
  dim_patient = curated$dim_patient,
  fact_visit = curated$fact_visit,
  fact_lab = curated$fact_lab,
  run_id = run_id
)

quarantine_visit_all <- bind_quarantine(
  curated$quarantine_visit,
  curated_checked$quarantine_visit_curated
)
quarantine_lab_all <- bind_quarantine(
  curated$quarantine_lab,
  curated_checked$quarantine_lab_curated
)

write_quality_report(
  path = file.path(warehouse_dir, paste0("quality_report_", run_id, ".json")),
  report_list = list(
    extract = extract_stats,
    raw = raw_report,
    curated = curated_checked$report,
    quarantine_counts = list(
      patient = nrow(curated$quarantine_patient),
      visit = nrow(quarantine_visit_all),
      lab = nrow(quarantine_lab_all)
    )
  )
)

load_warehouse(
  dim_patient = curated$dim_patient,
  fact_visit = curated_checked$fact_visit,
  fact_lab = curated_checked$fact_lab,
  quarantine_patient = curated$quarantine_patient,
  quarantine_visit = quarantine_visit_all,
  quarantine_lab = quarantine_lab_all,
  run_id = run_id
)

log_info("ETL done, run_id={run_id}")

Footnotes

  1. % of haemoglobin A1c in the red blood cells that has glucose attached↩︎

  2. https://www.hba1cnet.com/hba1c-calculator/↩︎

  3. https://unitslab.com/node/44↩︎