Introduction
R holds data.frames entirely in memory. A 10 GB CSV loaded with
read.csv() becomes a 10 GB R object, sometimes larger: R’s
internal representation of character vectors, factor levels, and ALTREP
metadata can inflate the footprint to 2 to 3x the raw data size. On a
machine with 16 GB of RAM, that 10 GB file will either fail to load or
push the system into swap, grinding everything to a halt.
“Larger-than-RAM” is not a theoretical concern. It is the normal state
of affairs for anyone working with multi-year survey data, remote
sensing extracts, or event logs from sensor networks.
vectra sidesteps this by never loading the full dataset. When we call
tbl() on a .vtr file or tbl_csv()
on a CSV, nothing is read yet. The call to collect(),
write_vtr(), or any other terminal verb pulls data through
the plan tree one row group at a time. Each row group is a
self-contained slice of the dataset: a few thousand to a few hundred
thousand rows with all their columns. The engine reads one group,
applies every operation in the pipeline, emits the result, then discards
the group and moves to the next. The key distinction from R’s default
approach: data moves through the pipeline in fixed-size pieces, and each
piece is freed before the next one arrives.
This batch-at-a-time design means that most vectra operations use
constant memory per batch, regardless of how many total rows exist. A
filter node reads a batch, applies a predicate, passes
matching rows downstream, and forgets about them. A mutate
node reads a batch, computes new columns, passes the result on. A
select node simply drops columns from each batch. None of
these ever accumulate the full dataset in memory.
A few operations do need to see all the data before they can produce output. Sorting requires a global ordering. Hash-based grouping needs to track every distinct group key. Joins need a hash table of the build side. These “materializing” operations have bounded memory budgets or known scaling behaviour that we will discuss in detail.
The central implication is this: if we can express our workload as a
chain of streaming operations terminated by a streaming sink, the full
dataset never needs to fit in RAM. A 200 GB CSV can flow through
tbl_csv() |> filter() |> mutate() |> write_vtr()
on a laptop with 8 GB of memory. The pipeline processes each batch in
isolation and writes the result directly to disk.
collect() is the one terminal verb that materializes
everything into an R data.frame. It breaks the streaming model: the
entire result set crosses from C into R and lives in R’s heap until we
remove it. For large data, we avoid it for the final result and instead
use write_vtr(), write_csv(), or
write_sqlite() as our streaming sinks. These functions pull
batches through the pipeline and write each one directly to disk, so the
C engine’s memory is recycled on every iteration. The R process never
holds more than one batch. We still use collect() freely
for small intermediate checks, filtered subsets, and aggregation results
that we know will fit in memory. The question to ask before calling
collect() is always: how many rows will this return? If the
answer is “roughly the same as the input,” we want a streaming sink
instead.
This vignette walks through the patterns and tools vectra provides for working with datasets that exceed available RAM. We will build our examples around a synthetic multi-year ecological survey dataset: species observations recorded across monitoring sites over several years.
library(vectra)
#>
#> Attaching package: 'vectra'
#> The following object is masked from 'package:stats':
#>
#> filter
set.seed(42)
n <- 50000
species_pool <- c(
"Quercus robur", "Fagus sylvatica", "Pinus sylvestris",
"Betula pendula", "Acer pseudoplatanus", "Fraxinus excelsior",
"Picea abies", "Tilia cordata", "Carpinus betulus",
"Alnus glutinosa", "Sorbus aucuparia", "Ulmus glabra"
)
sites <- paste0("SITE_", sprintf("%03d", 1:50))
years <- 2015:2024
obs <- data.frame(
obs_id = seq_len(n),
site = sample(sites, n, replace = TRUE),
species = sample(species_pool, n, replace = TRUE),
year = sample(years, n, replace = TRUE),
abundance = rpois(n, lambda = 15),
cover_pct = round(runif(n, 0.1, 95.0), 1),
quality = sample(c("good", "moderate", "poor"), n,
replace = TRUE, prob = c(0.6, 0.3, 0.1)),
stringsAsFactors = FALSE
)We will write this as both CSV and .vtr to demonstrate
different starting points. In a real workflow the CSV might be a raw
export from a database or field data logger.
Streaming pipelines
The core larger-than-RAM pattern in vectra is source, verbs, sink. We open a lazy reference to the data, chain operations, and terminate with a write function. No intermediate data.frame is ever created.
Suppose we want to extract only high-quality observations, compute a
derived column, and store the result as a new .vtr file.
The entire pipeline streams:
clean_path <- tempfile(fileext = ".vtr")
tbl_csv(csv_path) |>
filter(quality == "good") |>
mutate(log_abundance = log(abundance + 1)) |>
write_vtr(clean_path)At no point did R hold 50,000 rows (or 50 million, if this were a
larger dataset) in memory. tbl_csv() created a lazy scan
node. filter() and mutate() each wrapped it in
another lazy node. write_vtr() triggered execution: the CSV
scanner read a batch of rows, the filter discarded non-matching rows,
the mutate computed the log column, and the writer serialized the result
to disk. Then the next batch.
At the C level, each node in the pipeline exposes a
next_batch() function. The terminal node (the writer) calls
next_batch() on its upstream node, which in turn calls
next_batch() on its own upstream, and so on down to the
scan node that actually reads from disk. Each VecBatch
struct flows upward through the chain, gets transformed at each stage,
and arrives at the writer ready to be serialized. Once the writer
finishes with a batch, the memory is freed. The pipeline then pulls the
next batch. This pull-based design means memory consumption stays flat
regardless of how many millions of rows pass through. A pipeline that
processes 200 GB of CSV data uses the same peak memory as one that
processes 200 KB.
We can verify the result by reading back a small slice:
tbl(clean_path) |>
select(obs_id, site, species, abundance, log_abundance) |>
slice_head(n = 5) |>
collect()
#> obs_id site species abundance log_abundance
#> 1 3 SITE_001 Alnus glutinosa 12 2.564949
#> 2 7 SITE_018 Fraxinus excelsior 17 2.890372
#> 3 8 SITE_049 Betula pendula 17 2.890372
#> 4 9 SITE_047 Fraxinus excelsior 16 2.833213
#> 5 11 SITE_007 Picea abies 14 2.708050The same pattern works for any source-sink combination. CSV to SQLite:
db_path <- tempfile(fileext = ".sqlite")
tbl_csv(csv_path) |>
filter(year >= 2020) |>
write_sqlite(db_path, "recent_obs")Or .vtr to CSV, useful for handing data to colleagues
who expect plain text:
export_csv <- tempfile(fileext = ".csv")
tbl(vtr_path) |>
filter(species == "Fagus sylvatica") |>
select(obs_id, site, year, abundance) |>
write_csv(export_csv)Each pipeline reads only what it needs and writes directly to the target format. The intermediate verbs never buffer more than one batch. This is what makes vectra practical for datasets larger than RAM: the memory footprint of a streaming pipeline is determined by the batch size, not the dataset size.
When the result of a pipeline is small enough for R
(aggregation results, filtered subsets, summary statistics),
collect() is still the right choice. Aggregations in
particular tend to produce far fewer rows than the input:
tbl(vtr_path) |>
group_by(species, year) |>
summarise(
n_obs = n(),
mean_abun = mean(abundance),
mean_cov = mean(cover_pct)
) |>
arrange(species, year) |>
collect() |>
head(10)
#> species year n_obs mean_abun mean_cov
#> 1 Acer pseudoplatanus 2015 407 14.70516 49.73317
#> 2 Acer pseudoplatanus 2016 412 15.16019 47.83883
#> 3 Acer pseudoplatanus 2017 394 15.06345 49.33426
#> 4 Acer pseudoplatanus 2018 431 15.32483 49.17193
#> 5 Acer pseudoplatanus 2019 426 15.15023 46.93474
#> 6 Acer pseudoplatanus 2020 415 15.02892 46.37253
#> 7 Acer pseudoplatanus 2021 416 15.18750 44.88101
#> 8 Acer pseudoplatanus 2022 415 15.31325 48.43277
#> 9 Acer pseudoplatanus 2023 433 14.80831 48.84665
#> 10 Acer pseudoplatanus 2024 375 14.77600 47.40613The rule of thumb: if the output might be large, write to disk. If it fits in a data.frame, collect.
Batch sizing
A .vtr file is organised into row groups. Each row group
is a self-contained unit: it has its own column data, validity bitmaps,
dictionary tables, and zone-map min/max statistics. When vectra scans a
file, it reads one row group at a time. The number of rows per group is
the batch size.
Batch size affects two things: memory per batch and zone-map pruning granularity. Larger batches mean fewer row groups and less per-group overhead, but each batch consumes more memory during processing. Smaller batches mean the zone-map statistics cover a narrower range of values, so filter predicates can skip more groups entirely.
The default batch size is 65,536 rows. This is a reasonable middle
ground for most workloads: large enough that per-batch overhead (reading
the row group header, allocating column arrays, setting up validity
bitmaps) is amortized across many rows, but small enough that each batch
stays in the low megabytes for typical column counts. When choosing a
different batch size, the decision depends on the query pattern.
Analytical workloads that scan most of the file (aggregations,
full-table joins) benefit from large batches because they reduce the
number of next_batch() calls and the associated per-group
bookkeeping. Point queries that target a few matching rows benefit from
small batches, especially when combined with indexes or sorted columns,
because the engine can skip entire row groups whose zone-map ranges do
not overlap the predicate.
For analytical workloads with selective filters, smaller row groups
often pay off. If each row group spans a narrow range of
year values, a filter like year == 2023 can
skip most groups without reading them. With one giant row group, the
filter must scan every row.
We control batch size when writing:
small_groups <- tempfile(fileext = ".vtr")
large_groups <- tempfile(fileext = ".vtr")
tbl(vtr_path) |>
arrange(year) |>
write_vtr(small_groups, batch_size = 5000)
tbl(vtr_path) |>
arrange(year) |>
write_vtr(large_groups, batch_size = 50000)We sorted by year before writing so that row groups have
tight year ranges. The small-batch file has about 10 row groups; the
large-batch file has one. When we filter on year, the
small-batch file can prune most groups using zone maps:
tbl(small_groups) |>
filter(year == 2023) |>
explain()
#> vectra execution plan
#>
#> FilterNode [streaming]
#> ScanNode [streaming, 7 cols, predicate pushdown, v3 stats]
#>
#> Output columns (7):
#> obs_id <int64>
#> site <string>
#> species <string>
#> year <int64>
#> abundance <int64>
#> cover_pct <double>
#> quality <string>CSV and SQLite sources also have a batch_size parameter,
but it controls how many rows the scanner reads per pull rather than how
the file is structured on disk. The default of 65,536 works well for
most cases. Reducing it lowers peak memory; increasing it reduces
per-batch overhead for simple pass-through pipelines.
tbl_csv(csv_path, batch_size = 10000) |>
filter(quality == "good") |>
slice_head(n = 3) |>
collect()
#> obs_id site species year abundance cover_pct quality
#> 1 3 SITE_001 Alnus glutinosa 2015 12 75.9 good
#> 2 7 SITE_018 Fraxinus excelsior 2016 17 13.6 good
#> 3 8 SITE_049 Betula pendula 2020 17 29.3 goodAs a starting point: 50,000 to 100,000 rows per group works well for datasets with a few dozen columns. If the data is sorted on a frequently-filtered column, lean towards smaller groups (10,000 to 50,000) for better pruning. If the workload is dominated by full scans or aggregations, larger groups (100,000+) reduce overhead.
Append workflows
Real datasets rarely arrive all at once. Monitoring data comes in
daily, monthly, or seasonally. append_vtr() adds new rows
to an existing .vtr file as a new row group, without
touching existing data. The file header is updated to reflect the
additional group.
Here is a typical pattern: we start with one year of data and append subsequent years as they arrive.
archive <- tempfile(fileext = ".vtr")
first_year <- obs[obs$year == 2015, ]
write_vtr(first_year, archive)
nrow(tbl(archive) |> collect())
#> [1] 5012Now we simulate receiving 2016 data:
year_2016 <- obs[obs$year == 2016, ]
append_vtr(year_2016, archive)
nrow(tbl(archive) |> collect())
#> [1] 10025append_vtr() also accepts a vectra_node, so
we can stream data directly from a CSV into an existing archive without
loading it into R:
csv_2017 <- tempfile(fileext = ".csv")
write.csv(obs[obs$year == 2017, ], csv_2017, row.names = FALSE)
tbl_csv(csv_2017) |> append_vtr(archive)
nrow(tbl(archive) |> collect())
#> [1] 15023Each call to append_vtr() creates one new row group.
Over time, the file accumulates many groups of varying sizes. This is
fine for append-heavy workloads. If the number of groups becomes
unwieldy (hundreds of small appends, each adding a few rows), we can
compact the file by rewriting it:
This reads every row group from the archive, streams through the writer with a fresh batch size, and produces a clean file. The old file can then be replaced.
The schema of appended data must exactly match the target file: same
column names, same types, same order. If the schema drifts over time (a
new column added in 2018, say), we need to align it before appending.
One approach is to add the missing column with a default value in a
mutate() step:
This produces a node whose schema matches a hypothetical archive that
includes an observer column. The key point: because
append_vtr() accepts nodes, we can chain any transformation
needed to match the target schema.
Delete and tombstones
Sometimes we need to remove rows from an existing .vtr
file. Perhaps a batch of observations was flagged as erroneous, or
privacy regulations require deletion of certain records.
delete_vtr() handles this without rewriting the file.
Deletion works through a tombstone sidecar file. When we call
delete_vtr(path, row_ids), vectra writes the specified row
indices to a .del file next to the .vtr. The
original data file is never modified. On the next tbl()
call, the scan node reads the tombstone file and skips those rows.
Row IDs are 0-based physical positions across the entire file. The first row of the first row group is 0, the first row of the second group continues from where the first left off.
del_demo <- tempfile(fileext = ".vtr")
write_vtr(obs[1:100, ], del_demo)
# Delete rows 0, 1, and 99 (first two and last)
delete_vtr(del_demo, c(0, 1, 99))
tbl(del_demo) |> collect() |> nrow()
#> [1] 97Tombstone files are cumulative. Calling delete_vtr()
again merges the new indices with existing ones:
delete_vtr(del_demo, c(10, 11, 12))
tbl(del_demo) |> collect() |> nrow()
#> [1] 94To undo all deletions, remove the .del file:
Tombstones are lightweight for sparse deletions. If we delete 1% of rows, the overhead is negligible. But if deletions accumulate to a substantial fraction of the file, scan performance degrades because the engine still reads and discards the tombstoned rows. At that point, compaction makes sense:
delete_vtr(del_demo, 0:49)
compacted_del <- tempfile(fileext = ".vtr")
tbl(del_demo) |> write_vtr(compacted_del)
nrow(tbl(compacted_del) |> collect())
#> [1] 50The compacted file contains only the surviving rows, with no tombstone file.
Diff between snapshots
Data pipelines often work with periodic snapshots. We receive
yesterday’s extract and today’s extract, and we need to know what
changed. diff_vtr() computes a key-based logical diff
between two .vtr files.
The function takes the paths to the old and new files plus the name
of a key column. It streams both files and returns a list with two
elements: $deleted contains the key values that were
present in the old file but absent in the new one. $added
is a lazy vectra_node of rows present in the new file but
not in the old.
snap_v1 <- tempfile(fileext = ".vtr")
snap_v2 <- tempfile(fileext = ".vtr")
# Version 1: observations 1-100
write_vtr(obs[1:100, ], snap_v1)
# Version 2: observations 51-150 (rows 1-50 removed, 101-150 added)
write_vtr(obs[51:150, ], snap_v2)
changes <- diff_vtr(snap_v1, snap_v2, "obs_id")
# Keys that disappeared
head(changes$deleted)
#> [1] 1 2 3 4 5 6
length(changes$deleted)
#> [1] 50The deleted keys are the obs_id values 1 through 50.
Meanwhile, the $added element contains a lazy node pointing
to observations with obs_id 101 through 150, which we can
collect or pipe into further processing:
collect(changes$added) |> head()
#> obs_id site species year abundance cover_pct quality
#> 1 101 SITE_042 Picea abies 2022 18 81.7 good
#> 2 102 SITE_018 Quercus robur 2018 16 52.4 good
#> 3 103 SITE_027 Carpinus betulus 2019 13 15.5 poor
#> 4 104 SITE_013 Picea abies 2022 17 14.5 good
#> 5 105 SITE_019 Quercus robur 2019 11 16.8 moderate
#> 6 106 SITE_032 Tilia cordata 2018 20 80.1 goodBecause $added is a lazy node, we can pipe it directly
into further processing. For instance, we might want to append only the
truly new rows to an archive:
archive_diff <- tempfile(fileext = ".vtr")
write_vtr(obs[1:100, ], archive_diff)
changes$added |> append_vtr(archive_diff)
nrow(tbl(archive_diff) |> collect())
#> [1] 100This is a common incremental-load pattern: diff today’s snapshot
against yesterday’s, then append only the additions. Combined with
delete_vtr() for removals, we can maintain an up-to-date
archive without full rewrites.
Note that diff_vtr() performs a set-level diff on the
key column. If a row exists in both snapshots with the same key but
different values in other columns, it will not appear in either
$deleted or $added. To detect modifications,
we would need to compare the full row content after identifying shared
keys.
External sort
Sorting a dataset that does not fit in memory requires an external
merge sort. vectra’s arrange() handles this automatically.
The engine maintains a 1 GB memory budget. As data flows through, it
accumulates rows in memory. When the budget is reached, the accumulated
rows are sorted and written to a temporary .vtr file on
disk (a “sorted run”). After all input has been consumed, a k-way merge
reads from all sorted runs simultaneously using a min-heap, producing
the final sorted output one batch at a time.
From the user’s perspective, none of this is visible. We just call
arrange():
sorted_path <- tempfile(fileext = ".vtr")
tbl(vtr_path) |>
arrange(species, desc(abundance)) |>
write_vtr(sorted_path)
tbl(sorted_path) |>
select(species, abundance, site) |>
slice_head(n = 8) |>
collect()
#> species abundance site
#> 1 Acer pseudoplatanus 33 SITE_050
#> 2 Acer pseudoplatanus 29 SITE_023
#> 3 Acer pseudoplatanus 29 SITE_038
#> 4 Acer pseudoplatanus 28 SITE_040
#> 5 Acer pseudoplatanus 28 SITE_030
#> 6 Acer pseudoplatanus 28 SITE_006
#> 7 Acer pseudoplatanus 28 SITE_031
#> 8 Acer pseudoplatanus 28 SITE_010The sorted output streams to disk. Peak memory stays bounded at the spill budget (1 GB) plus overhead for the merge heap, regardless of input size. For our 50,000-row example the data fits entirely in memory and no spill occurs. But the same code would work on a 500 million row file; the sort would produce multiple temporary runs and merge them transparently.
The 1 GB budget governs how much unsorted data the engine accumulates before flushing to a temporary file. Each flush produces one sorted run. A 10 GB dataset with a 1 GB budget produces roughly 10 runs. The final merge opens all runs simultaneously and maintains a min-heap with one entry per run. At each step, the smallest element across all runs is popped from the heap and emitted as the next output row; the run that contributed it advances by one row and re-inserts into the heap. Because the heap has only as many entries as there are runs (not as many as there are rows), the merge phase uses very little memory.
Data that is already partially sorted produces fewer runs. If the input is sorted on a prefix of the sort key, long stretches of rows will already be in order, so the engine can absorb more rows before hitting the budget and flushing. In the best case (fully sorted input), no spill occurs at all and the sort reduces to a streaming pass-through. There is nothing to tune here. The engine detects the budget internally and manages spill files in the system temp directory. They are cleaned up after the merge completes.
Writing the sorted data to a new .vtr file also improves
query performance. Once the file is sorted on species,
zone-map statistics on that column become tight, and filters on species
can skip most row groups. Sorting on a frequently-queried column is one
of the most effective optimizations available.
Streaming joins
vectra’s join engine uses a build-right, probe-left strategy. The right-side table is fully materialized into a hash table in memory. Then the left-side table streams through, probing the hash table for matches one batch at a time. This means the left side can be arbitrarily large, because only the right side needs to fit in memory.
The natural pattern for large-data joins is: huge fact table on the left, small dimension table on the right.
# Small reference table: site metadata
site_meta <- data.frame(
site = sites,
region = sample(c("North", "South", "East", "West"),
length(sites), replace = TRUE),
elevation = round(runif(length(sites), 100, 2500)),
stringsAsFactors = FALSE
)
site_path <- tempfile(fileext = ".vtr")
write_vtr(site_meta, site_path)
enriched <- tempfile(fileext = ".vtr")
tbl(vtr_path) |>
left_join(tbl(site_path), by = "site") |>
write_vtr(enriched)
tbl(enriched) |>
select(obs_id, site, region, elevation, species) |>
slice_head(n = 5) |>
collect()
#> obs_id site region elevation species
#> 1 1 SITE_049 East 998 Pinus sylvestris
#> 2 2 SITE_037 East 1914 Fraxinus excelsior
#> 3 3 SITE_001 South 2374 Alnus glutinosa
#> 4 4 SITE_025 West 1165 Pinus sylvestris
#> 5 5 SITE_010 East 1976 Carpinus betulusThe 50 site metadata rows become the hash table (right side). The
observation table streams through on the left, each batch probing the
hash table for its site key. Memory usage is proportional
to the site metadata, not the observation count.
What happens when both sides are large? The right side still must fit in memory, so we need to reduce it. The most direct approach is to filter the right side before the join:
joined_path <- tempfile(fileext = ".vtr")
tbl(vtr_path) |>
filter(year >= 2022) |>
inner_join(
tbl(site_path) |> filter(region == "North"),
by = "site"
) |>
write_vtr(joined_path)
nrow(tbl(joined_path) |> collect())
#> [1] 2944The filtered right side is much smaller, so the hash table stays small. The left-side filter also reduces the number of rows that flow through the probe, but that affects throughput rather than memory.
For self-joins or joins between two truly large tables, the options are more constrained. The core problem is that one side must fit in memory as a hash table. When neither side is small, we need a strategy to make one of them small.
The most general approach is split-apply-combine: partition the left
side by a key (say, year), and for each partition, join
against only the matching rows from the right side. If both tables span
10 years, each partition pair is roughly 1/10th the size of the full
join. We write each partial result to disk and consolidate with
bind_rows() afterward. This trades a single large join for
many small joins, each of which fits in memory.
Pre-aggregation is another powerful option. If we only need summary
statistics from the right table,
group_by() |> summarise() can reduce millions of rows to
a handful of groups before the join. A right-side table with 50 million
observations across 10,000 sites collapses to 10,000 rows after a
site-level aggregation. That fits comfortably in a hash table.
# Right side: per-site-year summary from a second dataset
summary_path <- tempfile(fileext = ".vtr")
tbl(vtr_path) |>
group_by(site, year) |>
summarise(site_year_avg = mean(abundance)) |>
write_vtr(summary_path)
# Join the summary back to the detail table
tbl(vtr_path) |>
left_join(tbl(summary_path), by = c("site", "year")) |>
select(obs_id, site, year, abundance, site_year_avg) |>
slice_head(n = 5) |>
collect()
#> obs_id site year abundance site_year_avg
#> 1 1 SITE_049 2023 13 15.86957
#> 2 2 SITE_037 2024 11 14.44330
#> 3 3 SITE_001 2015 12 14.48718
#> 4 4 SITE_025 2018 17 15.00000
#> 5 5 SITE_010 2023 9 14.81579Multi-file workflows
Partitioned data is common. Monthly exports, regional splits, sensor-specific files. The partitioning dimension matters for query performance. Partitioning by date works well when most queries filter on a time range, because we can skip entire files that fall outside the range. Partitioning by category (region, species, sensor type) works when queries target a specific category. The wrong partitioning dimension forces us to read every file for every query, negating the benefit.
bind_rows() combines multiple vectra nodes into a single
streaming pipeline. When all inputs share the same schema (column names
and types), vectra creates a C-level ConcatNode that reads
from each child sequentially. No data is copied; each source’s batches
flow through in order.
monthly_paths <- character(6)
for (m in 1:6) {
monthly_paths[m] <- tempfile(fileext = ".vtr")
idx <- which(obs$year == 2024 & ((obs$obs_id %% 6) + 1) == m)
month_data <- obs[idx[seq_len(min(200, length(idx)))], ]
write_vtr(month_data, monthly_paths[m])
}We can combine all monthly files and aggregate without loading them all:
nodes <- lapply(monthly_paths, tbl)
combined <- do.call(bind_rows, nodes)
combined |>
group_by(species) |>
summarise(
total_obs = n(),
mean_abun = mean(abundance)
) |>
arrange(desc(total_obs)) |>
slice_head(n = 5) |>
collect()
#> species total_obs mean_abun
#> 1 Quercus robur 110 14.77273
#> 2 Ulmus glabra 110 15.44545
#> 3 Carpinus betulus 108 14.41667
#> 4 Tilia cordata 106 15.39623
#> 5 Fagus sylvatica 103 14.68932The bind_rows() call does not read any data. It returns
a new node that, when pulled, first exhausts all batches from the first
child, then the second, and so on. Downstream operations (the grouping,
aggregation, sort) see a single continuous stream.
We can also consolidate the partitioned files into one:
consolidated <- tempfile(fileext = ".vtr")
nodes2 <- lapply(monthly_paths, tbl)
do.call(bind_rows, nodes2) |> write_vtr(consolidated)
nrow(tbl(consolidated) |> collect())
#> [1] 1200For partitioned files that arrive on a schedule, a common workflow
combines bind_rows() with append_vtr(). We
process the latest batch of files and append the result to a running
archive:
running_archive <- tempfile(fileext = ".vtr")
initial_nodes <- lapply(monthly_paths[1:3], tbl)
do.call(bind_rows, initial_nodes) |> write_vtr(running_archive)
# Next month's files arrive
new_nodes <- lapply(monthly_paths[4:6], tbl)
do.call(bind_rows, new_nodes) |> append_vtr(running_archive)
nrow(tbl(running_archive) |> collect())
#> [1] 1200This gives us a single growing file that can be queried as a unit, while each monthly append is a lightweight operation.
At some point we may want to consolidate partitioned files into a
single sorted .vtr for better query performance.
bind_rows() piped into arrange() and then
write_vtr() does this in one streaming pass: the concat
node reads from each partition, the sort accumulates and spills as
needed, and the writer produces a single file with tight zone-map
statistics. The original partitions can then be archived or deleted.
Format conversion ETL
A one-time conversion from CSV to .vtr pays off every
time we query the data afterwards. The .vtr format supports
dictionary encoding, delta encoding, zstd compression, and zone-map
statistics. Repeated queries on a .vtr file are faster
because the engine can skip row groups and decompress only the columns
it needs. CSV requires a full parse every time.
A minimal conversion:
That single line streams the entire CSV through the vectra writer. The output file is typed, compressed, and has zone-map statistics. For repeated queries, the difference is substantial.
In practice, we rarely want a raw copy. The ingest step is a natural place to clean, filter, and reshape:
clean_archive <- tempfile(fileext = ".vtr")
tbl_csv(csv_path) |>
filter(quality != "poor") |>
mutate(
abundance_log = log(abundance + 1),
cover_frac = cover_pct / 100
) |>
select(-quality) |>
arrange(site, year) |>
write_vtr(clean_archive, batch_size = 10000)We filtered out poor-quality records, computed derived columns, dropped the raw quality flag, sorted by site and year for better zone-map pruning, and chose a 10,000-row batch size. The CSV was read once, streaming, and the clean result was written directly to disk.
The inverse direction is also useful. We might need to export a subset to a colleague who uses SQLite:
sqlite_export <- tempfile(fileext = ".sqlite")
tbl(clean_archive) |>
filter(year >= 2020) |>
write_sqlite(sqlite_export, "observations")Or produce a CSV of summary statistics for a report:
summary_csv <- tempfile(fileext = ".csv")
tbl(clean_archive) |>
group_by(site, year) |>
summarise(
n_species = n_distinct(species),
total_abundance = sum(abundance)
) |>
write_csv(summary_csv)
read.csv(summary_csv) |> head()
#> site year n_species total_abundance
#> 1 SITE_001 2015 12 1509
#> 2 SITE_001 2016 12 1509
#> 3 SITE_001 2017 12 1249
#> 4 SITE_001 2018 12 1311
#> 5 SITE_001 2019 12 1252
#> 6 SITE_001 2020 12 1266Multi-source ETL pipelines combine several inputs. Suppose we have regional CSV files that we want to merge and convert:
csv_north <- tempfile(fileext = ".csv")
csv_south <- tempfile(fileext = ".csv")
write.csv(obs[1:25000, ], csv_north, row.names = FALSE)
write.csv(obs[25001:50000, ], csv_south, row.names = FALSE)
merged_vtr <- tempfile(fileext = ".vtr")
bind_rows(
tbl_csv(csv_north),
tbl_csv(csv_south)
) |>
filter(abundance > 0) |>
write_vtr(merged_vtr, batch_size = 25000)
nrow(tbl(merged_vtr) |> collect())
#> [1] 50000Both CSVs streamed through the concat node, the filter, and the writer. Neither was fully loaded into R at any point.
Memory budget planning
Knowing which operations consume memory and how much lets us design pipelines that stay within our system’s limits. The central principle is “stream early, materialize late.” Every row that we can filter out before it reaches a materializing operation (a sort, a join build, a grouping hash table) is a row that never occupies memory. Pushing filters upstream is not just a performance optimization; for larger-than-RAM data, it can be the difference between a pipeline that completes and one that exhausts memory.
Here is a breakdown by operation type.
Constant-memory (streaming) operations.
filter, select, mutate,
limit, slice_head, and concat
(bind_rows) all process one batch at a time. Their memory cost is
proportional to the batch size and the number of columns, not the total
row count. For a typical batch of 50,000 rows with 10 columns, each
batch might occupy a few megabytes. These operations are safe for any
dataset size.
External sort (arrange). vectra’s sort node accumulates data in memory up to a 1 GB budget. When the budget is exceeded, it flushes a sorted run to a temporary file and continues. The final merge reads from all runs simultaneously, using a heap that holds one row per run. Peak memory is bounded at 1 GB plus the merge overhead. For datasets smaller than 1 GB the sort completes entirely in memory.
Hash aggregation (group_by + summarise). The
group_agg node maintains one accumulator per distinct group
key, so memory scales with the number of distinct groups rather than the
number of input rows. If we group by species (12 values),
the hash table is tiny. If we group by obs_id (50,000
distinct values), it is larger. Grouping by a high-cardinality column on
a billion-row dataset could create millions of accumulators, so it is
worth checking the expected group count.
Hash join (build side). The right-side table is fully materialized in a hash table. Memory cost equals the right-side data size. A 1 million row reference table with 5 columns might consume 50 to 100 MB. A 100 million row table would require several gigabytes. The left side streams and adds no persistent memory. String columns on the build side cost more than numeric columns because each string value has variable length and requires its own allocation. A build side with 1 million rows and a 200-character text column will consume substantially more memory than one with only integer and double columns.
Window functions. Window operations
(row_number, lag, lead,
cumsum, etc.) operate on the current batch. Memory scales
with batch size. Partitioned windows (via group_by) hold
data for the current partition. If partitions are balanced and
moderately sized, memory stays bounded.
To estimate memory for a pipeline, we identify the materializing operations and estimate their footprints:
# Example pipeline:
# tbl(huge.vtr) |>
# filter(year == 2023) |> -> streaming, ~5 MB per batch
# left_join(sites, by = "site") -> build side = 50 sites, ~1 KB
# group_by(species) |> -> 12 groups, ~1 KB
# summarise(total = sum(abun)) -> 12 accumulators, ~1 KB
# arrange(desc(total)) -> 12 rows, in-memory
#
# Total peak: ~5 MB (one batch) + negligible join/agg overhead
# Compare to:
# tbl(huge.vtr) |>
# arrange(species) -> external sort, up to 1 GB
# left_join(big_ref, by = "id") -> build side = big_ref size
#
# Total peak: 1 GB (sort) + big_ref sizeThe general strategy: stream everything we can, materialize only what we must. Push filters as early as possible to reduce the volume flowing through materializing operations. Put the smaller table on the right side of joins. Pre-aggregate before joining when we only need summaries from the right side.
To profile actual memory use, explain() prints the query
plan tree before execution. We can read the plan to count the
materializing nodes and estimate their size. For the build side of a
join, we can collect() the right-side node in isolation and
check object.size() on the resulting data.frame. That gives
us the R-level footprint, which is a reasonable upper bound on the
C-level hash table size. For sort operations, the question is simpler:
if the total input is under 1 GB, the sort completes in memory;
otherwise, it spills.
A practical example that puts these principles together:
final_path <- tempfile(fileext = ".vtr")
tbl(vtr_path) |>
filter(year >= 2020, quality != "poor") |>
left_join(tbl(site_path), by = "site") |>
group_by(region, species) |>
summarise(
n_obs = n(),
mean_cov = mean(cover_pct)
) |>
arrange(region, desc(n_obs)) |>
write_vtr(final_path)
tbl(final_path) |> collect() |> head(10)
#> region species n_obs mean_cov
#> 1 East Quercus robur 763 47.77667
#> 2 East Betula pendula 762 47.67244
#> 3 East Fagus sylvatica 747 47.37965
#> 4 East Carpinus betulus 737 48.03609
#> 5 East Sorbus aucuparia 731 48.28468
#> 6 East Acer pseudoplatanus 727 47.93122
#> 7 East Pinus sylvestris 717 47.46304
#> 8 East Ulmus glabra 715 48.95790
#> 9 East Alnus glutinosa 712 45.88975
#> 10 East Fraxinus excelsior 707 48.80877The filter runs first, reducing the data volume before the join. The join’s build side is 50 rows of site metadata. The aggregation produces at most 4 regions times 12 species = 48 groups. The sort handles 48 rows in memory. Total peak memory: one batch from the scan plus a few kilobytes. This pipeline would work unchanged on a table with billions of rows.
Cleanup
all_files <- c(
csv_path, vtr_path, clean_path, db_path, export_csv,
del_demo, paste0(del_demo, ".del"),
snap_v1, snap_v2,
sorted_path, site_path, enriched, joined_path,
summary_path, monthly_paths, consolidated, running_archive,
vtr_archive, clean_archive, sqlite_export, summary_csv,
csv_north, csv_south, merged_vtr,
small_groups, large_groups,
archive, csv_2017, archive_diff, compacted, compacted_del,
final_path
)
unlink(all_files[file.exists(all_files)])