pkgdown/mathjax-config.html

Skip to contents

Introduction

R holds data.frames entirely in memory. A 10 GB CSV loaded with read.csv() becomes a 10 GB R object, sometimes larger: R’s internal representation of character vectors, factor levels, and ALTREP metadata can inflate the footprint to 2 to 3x the raw data size. On a machine with 16 GB of RAM, that 10 GB file will either fail to load or push the system into swap, grinding everything to a halt. “Larger-than-RAM” is not a theoretical concern. It is the normal state of affairs for anyone working with multi-year survey data, remote sensing extracts, or event logs from sensor networks.

vectra sidesteps this by never loading the full dataset. When we call tbl() on a .vtr file or tbl_csv() on a CSV, nothing is read yet. The call to collect(), write_vtr(), or any other terminal verb pulls data through the plan tree one row group at a time. Each row group is a self-contained slice of the dataset: a few thousand to a few hundred thousand rows with all their columns. The engine reads one group, applies every operation in the pipeline, emits the result, then discards the group and moves to the next. The key distinction from R’s default approach: data moves through the pipeline in fixed-size pieces, and each piece is freed before the next one arrives.

This batch-at-a-time design means that most vectra operations use constant memory per batch, regardless of how many total rows exist. A filter node reads a batch, applies a predicate, passes matching rows downstream, and forgets about them. A mutate node reads a batch, computes new columns, passes the result on. A select node simply drops columns from each batch. None of these ever accumulate the full dataset in memory.

A few operations do need to see all the data before they can produce output. Sorting requires a global ordering. Hash-based grouping needs to track every distinct group key. Joins need a hash table of the build side. These “materializing” operations have bounded memory budgets or known scaling behaviour that we will discuss in detail.

The central implication is this: if we can express our workload as a chain of streaming operations terminated by a streaming sink, the full dataset never needs to fit in RAM. A 200 GB CSV can flow through tbl_csv() |> filter() |> mutate() |> write_vtr() on a laptop with 8 GB of memory. The pipeline processes each batch in isolation and writes the result directly to disk.

collect() is the one terminal verb that materializes everything into an R data.frame. It breaks the streaming model: the entire result set crosses from C into R and lives in R’s heap until we remove it. For large data, we avoid it for the final result and instead use write_vtr(), write_csv(), or write_sqlite() as our streaming sinks. These functions pull batches through the pipeline and write each one directly to disk, so the C engine’s memory is recycled on every iteration. The R process never holds more than one batch. We still use collect() freely for small intermediate checks, filtered subsets, and aggregation results that we know will fit in memory. The question to ask before calling collect() is always: how many rows will this return? If the answer is “roughly the same as the input,” we want a streaming sink instead.

This vignette walks through the patterns and tools vectra provides for working with datasets that exceed available RAM. We will build our examples around a synthetic multi-year ecological survey dataset: species observations recorded across monitoring sites over several years.

library(vectra)
#> 
#> Attaching package: 'vectra'
#> The following object is masked from 'package:stats':
#> 
#>     filter

set.seed(42)

n <- 50000
species_pool <- c(
  "Quercus robur", "Fagus sylvatica", "Pinus sylvestris",
  "Betula pendula", "Acer pseudoplatanus", "Fraxinus excelsior",
  "Picea abies", "Tilia cordata", "Carpinus betulus",
  "Alnus glutinosa", "Sorbus aucuparia", "Ulmus glabra"
)
sites <- paste0("SITE_", sprintf("%03d", 1:50))
years <- 2015:2024

obs <- data.frame(
  obs_id    = seq_len(n),
  site      = sample(sites, n, replace = TRUE),
  species   = sample(species_pool, n, replace = TRUE),
  year      = sample(years, n, replace = TRUE),
  abundance = rpois(n, lambda = 15),
  cover_pct = round(runif(n, 0.1, 95.0), 1),
  quality   = sample(c("good", "moderate", "poor"), n,
                     replace = TRUE, prob = c(0.6, 0.3, 0.1)),
  stringsAsFactors = FALSE
)

We will write this as both CSV and .vtr to demonstrate different starting points. In a real workflow the CSV might be a raw export from a database or field data logger.

csv_path <- tempfile(fileext = ".csv")
write.csv(obs, csv_path, row.names = FALSE)

vtr_path <- tempfile(fileext = ".vtr")
write_vtr(obs, vtr_path)

Streaming pipelines

The core larger-than-RAM pattern in vectra is source, verbs, sink. We open a lazy reference to the data, chain operations, and terminate with a write function. No intermediate data.frame is ever created.

Suppose we want to extract only high-quality observations, compute a derived column, and store the result as a new .vtr file. The entire pipeline streams:

clean_path <- tempfile(fileext = ".vtr")

tbl_csv(csv_path) |>
  filter(quality == "good") |>
  mutate(log_abundance = log(abundance + 1)) |>
  write_vtr(clean_path)

At no point did R hold 50,000 rows (or 50 million, if this were a larger dataset) in memory. tbl_csv() created a lazy scan node. filter() and mutate() each wrapped it in another lazy node. write_vtr() triggered execution: the CSV scanner read a batch of rows, the filter discarded non-matching rows, the mutate computed the log column, and the writer serialized the result to disk. Then the next batch.

At the C level, each node in the pipeline exposes a next_batch() function. The terminal node (the writer) calls next_batch() on its upstream node, which in turn calls next_batch() on its own upstream, and so on down to the scan node that actually reads from disk. Each VecBatch struct flows upward through the chain, gets transformed at each stage, and arrives at the writer ready to be serialized. Once the writer finishes with a batch, the memory is freed. The pipeline then pulls the next batch. This pull-based design means memory consumption stays flat regardless of how many millions of rows pass through. A pipeline that processes 200 GB of CSV data uses the same peak memory as one that processes 200 KB.

We can verify the result by reading back a small slice:

tbl(clean_path) |>
  select(obs_id, site, species, abundance, log_abundance) |>
  slice_head(n = 5) |>
  collect()
#>   obs_id     site            species abundance log_abundance
#> 1      3 SITE_001    Alnus glutinosa        12      2.564949
#> 2      7 SITE_018 Fraxinus excelsior        17      2.890372
#> 3      8 SITE_049     Betula pendula        17      2.890372
#> 4      9 SITE_047 Fraxinus excelsior        16      2.833213
#> 5     11 SITE_007        Picea abies        14      2.708050

The same pattern works for any source-sink combination. CSV to SQLite:

db_path <- tempfile(fileext = ".sqlite")

tbl_csv(csv_path) |>
  filter(year >= 2020) |>
  write_sqlite(db_path, "recent_obs")

Or .vtr to CSV, useful for handing data to colleagues who expect plain text:

export_csv <- tempfile(fileext = ".csv")

tbl(vtr_path) |>
  filter(species == "Fagus sylvatica") |>
  select(obs_id, site, year, abundance) |>
  write_csv(export_csv)

Each pipeline reads only what it needs and writes directly to the target format. The intermediate verbs never buffer more than one batch. This is what makes vectra practical for datasets larger than RAM: the memory footprint of a streaming pipeline is determined by the batch size, not the dataset size.

When the result of a pipeline is small enough for R (aggregation results, filtered subsets, summary statistics), collect() is still the right choice. Aggregations in particular tend to produce far fewer rows than the input:

tbl(vtr_path) |>
  group_by(species, year) |>
  summarise(
    n_obs     = n(),
    mean_abun = mean(abundance),
    mean_cov  = mean(cover_pct)
  ) |>
  arrange(species, year) |>
  collect() |>
  head(10)
#>                species year n_obs mean_abun mean_cov
#> 1  Acer pseudoplatanus 2015   407  14.70516 49.73317
#> 2  Acer pseudoplatanus 2016   412  15.16019 47.83883
#> 3  Acer pseudoplatanus 2017   394  15.06345 49.33426
#> 4  Acer pseudoplatanus 2018   431  15.32483 49.17193
#> 5  Acer pseudoplatanus 2019   426  15.15023 46.93474
#> 6  Acer pseudoplatanus 2020   415  15.02892 46.37253
#> 7  Acer pseudoplatanus 2021   416  15.18750 44.88101
#> 8  Acer pseudoplatanus 2022   415  15.31325 48.43277
#> 9  Acer pseudoplatanus 2023   433  14.80831 48.84665
#> 10 Acer pseudoplatanus 2024   375  14.77600 47.40613

The rule of thumb: if the output might be large, write to disk. If it fits in a data.frame, collect.

Batch sizing

A .vtr file is organised into row groups. Each row group is a self-contained unit: it has its own column data, validity bitmaps, dictionary tables, and zone-map min/max statistics. When vectra scans a file, it reads one row group at a time. The number of rows per group is the batch size.

Batch size affects two things: memory per batch and zone-map pruning granularity. Larger batches mean fewer row groups and less per-group overhead, but each batch consumes more memory during processing. Smaller batches mean the zone-map statistics cover a narrower range of values, so filter predicates can skip more groups entirely.

The default batch size is 65,536 rows. This is a reasonable middle ground for most workloads: large enough that per-batch overhead (reading the row group header, allocating column arrays, setting up validity bitmaps) is amortized across many rows, but small enough that each batch stays in the low megabytes for typical column counts. When choosing a different batch size, the decision depends on the query pattern. Analytical workloads that scan most of the file (aggregations, full-table joins) benefit from large batches because they reduce the number of next_batch() calls and the associated per-group bookkeeping. Point queries that target a few matching rows benefit from small batches, especially when combined with indexes or sorted columns, because the engine can skip entire row groups whose zone-map ranges do not overlap the predicate.

For analytical workloads with selective filters, smaller row groups often pay off. If each row group spans a narrow range of year values, a filter like year == 2023 can skip most groups without reading them. With one giant row group, the filter must scan every row.

We control batch size when writing:

small_groups <- tempfile(fileext = ".vtr")
large_groups <- tempfile(fileext = ".vtr")

tbl(vtr_path) |>
  arrange(year) |>
  write_vtr(small_groups, batch_size = 5000)

tbl(vtr_path) |>
  arrange(year) |>
  write_vtr(large_groups, batch_size = 50000)

We sorted by year before writing so that row groups have tight year ranges. The small-batch file has about 10 row groups; the large-batch file has one. When we filter on year, the small-batch file can prune most groups using zone maps:

tbl(small_groups) |>
  filter(year == 2023) |>
  explain()
#> vectra execution plan
#> 
#> FilterNode [streaming] 
#>   ScanNode [streaming, 7 cols, predicate pushdown, v3 stats] 
#> 
#> Output columns (7):
#>   obs_id <int64>
#>   site <string>
#>   species <string>
#>   year <int64>
#>   abundance <int64>
#>   cover_pct <double>
#>   quality <string>

CSV and SQLite sources also have a batch_size parameter, but it controls how many rows the scanner reads per pull rather than how the file is structured on disk. The default of 65,536 works well for most cases. Reducing it lowers peak memory; increasing it reduces per-batch overhead for simple pass-through pipelines.

tbl_csv(csv_path, batch_size = 10000) |>
  filter(quality == "good") |>
  slice_head(n = 3) |>
  collect()
#>   obs_id     site            species year abundance cover_pct quality
#> 1      3 SITE_001    Alnus glutinosa 2015        12      75.9    good
#> 2      7 SITE_018 Fraxinus excelsior 2016        17      13.6    good
#> 3      8 SITE_049     Betula pendula 2020        17      29.3    good

As a starting point: 50,000 to 100,000 rows per group works well for datasets with a few dozen columns. If the data is sorted on a frequently-filtered column, lean towards smaller groups (10,000 to 50,000) for better pruning. If the workload is dominated by full scans or aggregations, larger groups (100,000+) reduce overhead.

Append workflows

Real datasets rarely arrive all at once. Monitoring data comes in daily, monthly, or seasonally. append_vtr() adds new rows to an existing .vtr file as a new row group, without touching existing data. The file header is updated to reflect the additional group.

Here is a typical pattern: we start with one year of data and append subsequent years as they arrive.

archive <- tempfile(fileext = ".vtr")
first_year <- obs[obs$year == 2015, ]
write_vtr(first_year, archive)

nrow(tbl(archive) |> collect())
#> [1] 5012

Now we simulate receiving 2016 data:

year_2016 <- obs[obs$year == 2016, ]
append_vtr(year_2016, archive)

nrow(tbl(archive) |> collect())
#> [1] 10025

append_vtr() also accepts a vectra_node, so we can stream data directly from a CSV into an existing archive without loading it into R:

csv_2017 <- tempfile(fileext = ".csv")
write.csv(obs[obs$year == 2017, ], csv_2017, row.names = FALSE)

tbl_csv(csv_2017) |> append_vtr(archive)

nrow(tbl(archive) |> collect())
#> [1] 15023

Each call to append_vtr() creates one new row group. Over time, the file accumulates many groups of varying sizes. This is fine for append-heavy workloads. If the number of groups becomes unwieldy (hundreds of small appends, each adding a few rows), we can compact the file by rewriting it:

compacted <- tempfile(fileext = ".vtr")
tbl(archive) |> write_vtr(compacted, batch_size = 50000)

This reads every row group from the archive, streams through the writer with a fresh batch size, and produces a clean file. The old file can then be replaced.

The schema of appended data must exactly match the target file: same column names, same types, same order. If the schema drifts over time (a new column added in 2018, say), we need to align it before appending. One approach is to add the missing column with a default value in a mutate() step:

new_data_node <- tbl_csv(csv_2017) |>
  mutate(observer = NA_character_)

This produces a node whose schema matches a hypothetical archive that includes an observer column. The key point: because append_vtr() accepts nodes, we can chain any transformation needed to match the target schema.

Delete and tombstones

Sometimes we need to remove rows from an existing .vtr file. Perhaps a batch of observations was flagged as erroneous, or privacy regulations require deletion of certain records. delete_vtr() handles this without rewriting the file.

Deletion works through a tombstone sidecar file. When we call delete_vtr(path, row_ids), vectra writes the specified row indices to a .del file next to the .vtr. The original data file is never modified. On the next tbl() call, the scan node reads the tombstone file and skips those rows.

Row IDs are 0-based physical positions across the entire file. The first row of the first row group is 0, the first row of the second group continues from where the first left off.

del_demo <- tempfile(fileext = ".vtr")
write_vtr(obs[1:100, ], del_demo)

# Delete rows 0, 1, and 99 (first two and last)
delete_vtr(del_demo, c(0, 1, 99))

tbl(del_demo) |> collect() |> nrow()
#> [1] 97

Tombstone files are cumulative. Calling delete_vtr() again merges the new indices with existing ones:

delete_vtr(del_demo, c(10, 11, 12))

tbl(del_demo) |> collect() |> nrow()
#> [1] 94

To undo all deletions, remove the .del file:

unlink(paste0(del_demo, ".del"))
tbl(del_demo) |> collect() |> nrow()
#> [1] 100

Tombstones are lightweight for sparse deletions. If we delete 1% of rows, the overhead is negligible. But if deletions accumulate to a substantial fraction of the file, scan performance degrades because the engine still reads and discards the tombstoned rows. At that point, compaction makes sense:

delete_vtr(del_demo, 0:49)
compacted_del <- tempfile(fileext = ".vtr")
tbl(del_demo) |> write_vtr(compacted_del)

nrow(tbl(compacted_del) |> collect())
#> [1] 50

The compacted file contains only the surviving rows, with no tombstone file.

Diff between snapshots

Data pipelines often work with periodic snapshots. We receive yesterday’s extract and today’s extract, and we need to know what changed. diff_vtr() computes a key-based logical diff between two .vtr files.

The function takes the paths to the old and new files plus the name of a key column. It streams both files and returns a list with two elements: $deleted contains the key values that were present in the old file but absent in the new one. $added is a lazy vectra_node of rows present in the new file but not in the old.

snap_v1 <- tempfile(fileext = ".vtr")
snap_v2 <- tempfile(fileext = ".vtr")

# Version 1: observations 1-100
write_vtr(obs[1:100, ], snap_v1)

# Version 2: observations 51-150 (rows 1-50 removed, 101-150 added)
write_vtr(obs[51:150, ], snap_v2)
changes <- diff_vtr(snap_v1, snap_v2, "obs_id")

# Keys that disappeared
head(changes$deleted)
#> [1] 1 2 3 4 5 6
length(changes$deleted)
#> [1] 50

The deleted keys are the obs_id values 1 through 50. Meanwhile, the $added element contains a lazy node pointing to observations with obs_id 101 through 150, which we can collect or pipe into further processing:

collect(changes$added) |> head()
#>   obs_id     site          species year abundance cover_pct  quality
#> 1    101 SITE_042      Picea abies 2022        18      81.7     good
#> 2    102 SITE_018    Quercus robur 2018        16      52.4     good
#> 3    103 SITE_027 Carpinus betulus 2019        13      15.5     poor
#> 4    104 SITE_013      Picea abies 2022        17      14.5     good
#> 5    105 SITE_019    Quercus robur 2019        11      16.8 moderate
#> 6    106 SITE_032    Tilia cordata 2018        20      80.1     good

Because $added is a lazy node, we can pipe it directly into further processing. For instance, we might want to append only the truly new rows to an archive:

archive_diff <- tempfile(fileext = ".vtr")
write_vtr(obs[1:100, ], archive_diff)

changes$added |> append_vtr(archive_diff)

nrow(tbl(archive_diff) |> collect())
#> [1] 100

This is a common incremental-load pattern: diff today’s snapshot against yesterday’s, then append only the additions. Combined with delete_vtr() for removals, we can maintain an up-to-date archive without full rewrites.

Note that diff_vtr() performs a set-level diff on the key column. If a row exists in both snapshots with the same key but different values in other columns, it will not appear in either $deleted or $added. To detect modifications, we would need to compare the full row content after identifying shared keys.

External sort

Sorting a dataset that does not fit in memory requires an external merge sort. vectra’s arrange() handles this automatically. The engine maintains a 1 GB memory budget. As data flows through, it accumulates rows in memory. When the budget is reached, the accumulated rows are sorted and written to a temporary .vtr file on disk (a “sorted run”). After all input has been consumed, a k-way merge reads from all sorted runs simultaneously using a min-heap, producing the final sorted output one batch at a time.

From the user’s perspective, none of this is visible. We just call arrange():

sorted_path <- tempfile(fileext = ".vtr")

tbl(vtr_path) |>
  arrange(species, desc(abundance)) |>
  write_vtr(sorted_path)

tbl(sorted_path) |>
  select(species, abundance, site) |>
  slice_head(n = 8) |>
  collect()
#>               species abundance     site
#> 1 Acer pseudoplatanus        33 SITE_050
#> 2 Acer pseudoplatanus        29 SITE_023
#> 3 Acer pseudoplatanus        29 SITE_038
#> 4 Acer pseudoplatanus        28 SITE_040
#> 5 Acer pseudoplatanus        28 SITE_030
#> 6 Acer pseudoplatanus        28 SITE_006
#> 7 Acer pseudoplatanus        28 SITE_031
#> 8 Acer pseudoplatanus        28 SITE_010

The sorted output streams to disk. Peak memory stays bounded at the spill budget (1 GB) plus overhead for the merge heap, regardless of input size. For our 50,000-row example the data fits entirely in memory and no spill occurs. But the same code would work on a 500 million row file; the sort would produce multiple temporary runs and merge them transparently.

The 1 GB budget governs how much unsorted data the engine accumulates before flushing to a temporary file. Each flush produces one sorted run. A 10 GB dataset with a 1 GB budget produces roughly 10 runs. The final merge opens all runs simultaneously and maintains a min-heap with one entry per run. At each step, the smallest element across all runs is popped from the heap and emitted as the next output row; the run that contributed it advances by one row and re-inserts into the heap. Because the heap has only as many entries as there are runs (not as many as there are rows), the merge phase uses very little memory.

Data that is already partially sorted produces fewer runs. If the input is sorted on a prefix of the sort key, long stretches of rows will already be in order, so the engine can absorb more rows before hitting the budget and flushing. In the best case (fully sorted input), no spill occurs at all and the sort reduces to a streaming pass-through. There is nothing to tune here. The engine detects the budget internally and manages spill files in the system temp directory. They are cleaned up after the merge completes.

Writing the sorted data to a new .vtr file also improves query performance. Once the file is sorted on species, zone-map statistics on that column become tight, and filters on species can skip most row groups. Sorting on a frequently-queried column is one of the most effective optimizations available.

Streaming joins

vectra’s join engine uses a build-right, probe-left strategy. The right-side table is fully materialized into a hash table in memory. Then the left-side table streams through, probing the hash table for matches one batch at a time. This means the left side can be arbitrarily large, because only the right side needs to fit in memory.

The natural pattern for large-data joins is: huge fact table on the left, small dimension table on the right.

# Small reference table: site metadata
site_meta <- data.frame(
  site      = sites,
  region    = sample(c("North", "South", "East", "West"),
                     length(sites), replace = TRUE),
  elevation = round(runif(length(sites), 100, 2500)),
  stringsAsFactors = FALSE
)
site_path <- tempfile(fileext = ".vtr")
write_vtr(site_meta, site_path)
enriched <- tempfile(fileext = ".vtr")

tbl(vtr_path) |>
  left_join(tbl(site_path), by = "site") |>
  write_vtr(enriched)

tbl(enriched) |>
  select(obs_id, site, region, elevation, species) |>
  slice_head(n = 5) |>
  collect()
#>   obs_id     site region elevation            species
#> 1      1 SITE_049   East       998   Pinus sylvestris
#> 2      2 SITE_037   East      1914 Fraxinus excelsior
#> 3      3 SITE_001  South      2374    Alnus glutinosa
#> 4      4 SITE_025   West      1165   Pinus sylvestris
#> 5      5 SITE_010   East      1976   Carpinus betulus

The 50 site metadata rows become the hash table (right side). The observation table streams through on the left, each batch probing the hash table for its site key. Memory usage is proportional to the site metadata, not the observation count.

What happens when both sides are large? The right side still must fit in memory, so we need to reduce it. The most direct approach is to filter the right side before the join:

joined_path <- tempfile(fileext = ".vtr")

tbl(vtr_path) |>
  filter(year >= 2022) |>
  inner_join(
    tbl(site_path) |> filter(region == "North"),
    by = "site"
  ) |>
  write_vtr(joined_path)

nrow(tbl(joined_path) |> collect())
#> [1] 2944

The filtered right side is much smaller, so the hash table stays small. The left-side filter also reduces the number of rows that flow through the probe, but that affects throughput rather than memory.

For self-joins or joins between two truly large tables, the options are more constrained. The core problem is that one side must fit in memory as a hash table. When neither side is small, we need a strategy to make one of them small.

The most general approach is split-apply-combine: partition the left side by a key (say, year), and for each partition, join against only the matching rows from the right side. If both tables span 10 years, each partition pair is roughly 1/10th the size of the full join. We write each partial result to disk and consolidate with bind_rows() afterward. This trades a single large join for many small joins, each of which fits in memory.

Pre-aggregation is another powerful option. If we only need summary statistics from the right table, group_by() |> summarise() can reduce millions of rows to a handful of groups before the join. A right-side table with 50 million observations across 10,000 sites collapses to 10,000 rows after a site-level aggregation. That fits comfortably in a hash table.

# Right side: per-site-year summary from a second dataset
summary_path <- tempfile(fileext = ".vtr")

tbl(vtr_path) |>
  group_by(site, year) |>
  summarise(site_year_avg = mean(abundance)) |>
  write_vtr(summary_path)

# Join the summary back to the detail table
tbl(vtr_path) |>
  left_join(tbl(summary_path), by = c("site", "year")) |>
  select(obs_id, site, year, abundance, site_year_avg) |>
  slice_head(n = 5) |>
  collect()
#>   obs_id     site year abundance site_year_avg
#> 1      1 SITE_049 2023        13      15.86957
#> 2      2 SITE_037 2024        11      14.44330
#> 3      3 SITE_001 2015        12      14.48718
#> 4      4 SITE_025 2018        17      15.00000
#> 5      5 SITE_010 2023         9      14.81579

Multi-file workflows

Partitioned data is common. Monthly exports, regional splits, sensor-specific files. The partitioning dimension matters for query performance. Partitioning by date works well when most queries filter on a time range, because we can skip entire files that fall outside the range. Partitioning by category (region, species, sensor type) works when queries target a specific category. The wrong partitioning dimension forces us to read every file for every query, negating the benefit.

bind_rows() combines multiple vectra nodes into a single streaming pipeline. When all inputs share the same schema (column names and types), vectra creates a C-level ConcatNode that reads from each child sequentially. No data is copied; each source’s batches flow through in order.

monthly_paths <- character(6)
for (m in 1:6) {
  monthly_paths[m] <- tempfile(fileext = ".vtr")
  idx <- which(obs$year == 2024 & ((obs$obs_id %% 6) + 1) == m)
  month_data <- obs[idx[seq_len(min(200, length(idx)))], ]
  write_vtr(month_data, monthly_paths[m])
}

We can combine all monthly files and aggregate without loading them all:

nodes <- lapply(monthly_paths, tbl)
combined <- do.call(bind_rows, nodes)

combined |>
  group_by(species) |>
  summarise(
    total_obs  = n(),
    mean_abun  = mean(abundance)
  ) |>
  arrange(desc(total_obs)) |>
  slice_head(n = 5) |>
  collect()
#>            species total_obs mean_abun
#> 1    Quercus robur       110  14.77273
#> 2     Ulmus glabra       110  15.44545
#> 3 Carpinus betulus       108  14.41667
#> 4    Tilia cordata       106  15.39623
#> 5  Fagus sylvatica       103  14.68932

The bind_rows() call does not read any data. It returns a new node that, when pulled, first exhausts all batches from the first child, then the second, and so on. Downstream operations (the grouping, aggregation, sort) see a single continuous stream.

We can also consolidate the partitioned files into one:

consolidated <- tempfile(fileext = ".vtr")
nodes2 <- lapply(monthly_paths, tbl)
do.call(bind_rows, nodes2) |> write_vtr(consolidated)

nrow(tbl(consolidated) |> collect())
#> [1] 1200

For partitioned files that arrive on a schedule, a common workflow combines bind_rows() with append_vtr(). We process the latest batch of files and append the result to a running archive:

running_archive <- tempfile(fileext = ".vtr")
initial_nodes <- lapply(monthly_paths[1:3], tbl)
do.call(bind_rows, initial_nodes) |> write_vtr(running_archive)

# Next month's files arrive
new_nodes <- lapply(monthly_paths[4:6], tbl)
do.call(bind_rows, new_nodes) |> append_vtr(running_archive)

nrow(tbl(running_archive) |> collect())
#> [1] 1200

This gives us a single growing file that can be queried as a unit, while each monthly append is a lightweight operation.

At some point we may want to consolidate partitioned files into a single sorted .vtr for better query performance. bind_rows() piped into arrange() and then write_vtr() does this in one streaming pass: the concat node reads from each partition, the sort accumulates and spills as needed, and the writer produces a single file with tight zone-map statistics. The original partitions can then be archived or deleted.

Format conversion ETL

A one-time conversion from CSV to .vtr pays off every time we query the data afterwards. The .vtr format supports dictionary encoding, delta encoding, zstd compression, and zone-map statistics. Repeated queries on a .vtr file are faster because the engine can skip row groups and decompress only the columns it needs. CSV requires a full parse every time.

A minimal conversion:

vtr_archive <- tempfile(fileext = ".vtr")
tbl_csv(csv_path) |> write_vtr(vtr_archive)

That single line streams the entire CSV through the vectra writer. The output file is typed, compressed, and has zone-map statistics. For repeated queries, the difference is substantial.

In practice, we rarely want a raw copy. The ingest step is a natural place to clean, filter, and reshape:

clean_archive <- tempfile(fileext = ".vtr")

tbl_csv(csv_path) |>
  filter(quality != "poor") |>
  mutate(
    abundance_log = log(abundance + 1),
    cover_frac    = cover_pct / 100
  ) |>
  select(-quality) |>
  arrange(site, year) |>
  write_vtr(clean_archive, batch_size = 10000)

We filtered out poor-quality records, computed derived columns, dropped the raw quality flag, sorted by site and year for better zone-map pruning, and chose a 10,000-row batch size. The CSV was read once, streaming, and the clean result was written directly to disk.

The inverse direction is also useful. We might need to export a subset to a colleague who uses SQLite:

sqlite_export <- tempfile(fileext = ".sqlite")

tbl(clean_archive) |>
  filter(year >= 2020) |>
  write_sqlite(sqlite_export, "observations")

Or produce a CSV of summary statistics for a report:

summary_csv <- tempfile(fileext = ".csv")

tbl(clean_archive) |>
  group_by(site, year) |>
  summarise(
    n_species = n_distinct(species),
    total_abundance = sum(abundance)
  ) |>
  write_csv(summary_csv)

read.csv(summary_csv) |> head()
#>       site year n_species total_abundance
#> 1 SITE_001 2015        12            1509
#> 2 SITE_001 2016        12            1509
#> 3 SITE_001 2017        12            1249
#> 4 SITE_001 2018        12            1311
#> 5 SITE_001 2019        12            1252
#> 6 SITE_001 2020        12            1266

Multi-source ETL pipelines combine several inputs. Suppose we have regional CSV files that we want to merge and convert:

csv_north <- tempfile(fileext = ".csv")
csv_south <- tempfile(fileext = ".csv")
write.csv(obs[1:25000, ], csv_north, row.names = FALSE)
write.csv(obs[25001:50000, ], csv_south, row.names = FALSE)

merged_vtr <- tempfile(fileext = ".vtr")

bind_rows(
  tbl_csv(csv_north),
  tbl_csv(csv_south)
) |>
  filter(abundance > 0) |>
  write_vtr(merged_vtr, batch_size = 25000)

nrow(tbl(merged_vtr) |> collect())
#> [1] 50000

Both CSVs streamed through the concat node, the filter, and the writer. Neither was fully loaded into R at any point.

Memory budget planning

Knowing which operations consume memory and how much lets us design pipelines that stay within our system’s limits. The central principle is “stream early, materialize late.” Every row that we can filter out before it reaches a materializing operation (a sort, a join build, a grouping hash table) is a row that never occupies memory. Pushing filters upstream is not just a performance optimization; for larger-than-RAM data, it can be the difference between a pipeline that completes and one that exhausts memory.

Here is a breakdown by operation type.

Constant-memory (streaming) operations. filter, select, mutate, limit, slice_head, and concat (bind_rows) all process one batch at a time. Their memory cost is proportional to the batch size and the number of columns, not the total row count. For a typical batch of 50,000 rows with 10 columns, each batch might occupy a few megabytes. These operations are safe for any dataset size.

External sort (arrange). vectra’s sort node accumulates data in memory up to a 1 GB budget. When the budget is exceeded, it flushes a sorted run to a temporary file and continues. The final merge reads from all runs simultaneously, using a heap that holds one row per run. Peak memory is bounded at 1 GB plus the merge overhead. For datasets smaller than 1 GB the sort completes entirely in memory.

Hash aggregation (group_by + summarise). The group_agg node maintains one accumulator per distinct group key, so memory scales with the number of distinct groups rather than the number of input rows. If we group by species (12 values), the hash table is tiny. If we group by obs_id (50,000 distinct values), it is larger. Grouping by a high-cardinality column on a billion-row dataset could create millions of accumulators, so it is worth checking the expected group count.

Hash join (build side). The right-side table is fully materialized in a hash table. Memory cost equals the right-side data size. A 1 million row reference table with 5 columns might consume 50 to 100 MB. A 100 million row table would require several gigabytes. The left side streams and adds no persistent memory. String columns on the build side cost more than numeric columns because each string value has variable length and requires its own allocation. A build side with 1 million rows and a 200-character text column will consume substantially more memory than one with only integer and double columns.

Window functions. Window operations (row_number, lag, lead, cumsum, etc.) operate on the current batch. Memory scales with batch size. Partitioned windows (via group_by) hold data for the current partition. If partitions are balanced and moderately sized, memory stays bounded.

To estimate memory for a pipeline, we identify the materializing operations and estimate their footprints:

# Example pipeline:
# tbl(huge.vtr) |>
#   filter(year == 2023) |>        -> streaming, ~5 MB per batch
#   left_join(sites, by = "site")  -> build side = 50 sites, ~1 KB
#   group_by(species) |>           -> 12 groups, ~1 KB
#   summarise(total = sum(abun))   -> 12 accumulators, ~1 KB
#   arrange(desc(total))           -> 12 rows, in-memory
#
# Total peak: ~5 MB (one batch) + negligible join/agg overhead

# Compare to:
# tbl(huge.vtr) |>
#   arrange(species)               -> external sort, up to 1 GB
#   left_join(big_ref, by = "id")  -> build side = big_ref size
#
# Total peak: 1 GB (sort) + big_ref size

The general strategy: stream everything we can, materialize only what we must. Push filters as early as possible to reduce the volume flowing through materializing operations. Put the smaller table on the right side of joins. Pre-aggregate before joining when we only need summaries from the right side.

To profile actual memory use, explain() prints the query plan tree before execution. We can read the plan to count the materializing nodes and estimate their size. For the build side of a join, we can collect() the right-side node in isolation and check object.size() on the resulting data.frame. That gives us the R-level footprint, which is a reasonable upper bound on the C-level hash table size. For sort operations, the question is simpler: if the total input is under 1 GB, the sort completes in memory; otherwise, it spills.

A practical example that puts these principles together:

final_path <- tempfile(fileext = ".vtr")

tbl(vtr_path) |>
  filter(year >= 2020, quality != "poor") |>
  left_join(tbl(site_path), by = "site") |>
  group_by(region, species) |>
  summarise(
    n_obs     = n(),
    mean_cov  = mean(cover_pct)
  ) |>
  arrange(region, desc(n_obs)) |>
  write_vtr(final_path)

tbl(final_path) |> collect() |> head(10)
#>    region             species n_obs mean_cov
#> 1    East       Quercus robur   763 47.77667
#> 2    East      Betula pendula   762 47.67244
#> 3    East     Fagus sylvatica   747 47.37965
#> 4    East    Carpinus betulus   737 48.03609
#> 5    East    Sorbus aucuparia   731 48.28468
#> 6    East Acer pseudoplatanus   727 47.93122
#> 7    East    Pinus sylvestris   717 47.46304
#> 8    East        Ulmus glabra   715 48.95790
#> 9    East     Alnus glutinosa   712 45.88975
#> 10   East  Fraxinus excelsior   707 48.80877

The filter runs first, reducing the data volume before the join. The join’s build side is 50 rows of site metadata. The aggregation produces at most 4 regions times 12 species = 48 groups. The sort handles 48 rows in memory. Total peak memory: one batch from the scan plus a few kilobytes. This pipeline would work unchanged on a table with billions of rows.

Cleanup

all_files <- c(
  csv_path, vtr_path, clean_path, db_path, export_csv,
  del_demo, paste0(del_demo, ".del"),
  snap_v1, snap_v2,
  sorted_path, site_path, enriched, joined_path,
  summary_path, monthly_paths, consolidated, running_archive,
  vtr_archive, clean_archive, sqlite_export, summary_csv,
  csv_north, csv_south, merged_vtr,
  small_groups, large_groups,
  archive, csv_2017, archive_diff, compacted, compacted_del,
  final_path
)
unlink(all_files[file.exists(all_files)])