flowchart LR
A["from_tensor_slices<br/>(path, label)"] --> B["map(decode_image)<br/>parallel · AUTOTUNE"]
B --> C["cache()<br/>epoch 1: record<br/>epoch 2+: replay"]
C --> D["shuffle(1000)<br/>fresh order each epoch"]
D --> E["batch(32)<br/>(180,180,3) → (32,180,180,3)"]
E --> F["prefetch(AUTOTUNE)<br/>prep batch n+1 during step n"]
F --> G(("model.fit"))
style B fill:#f59e0b33,stroke:#f59e0b
style C fill:#22c55e33,stroke:#22c55e
style F fill:#6366f133,stroke:#6366f1
📊 Deep Learning with TensorFlow & Keras · Day 3 — Feeding the Beast: Input Pipelines with tf.data
🏠 📊 Course home | ← Day 02 | Day 04 → | 📚 All mini-courses
Day 3 — Feeding the Beast: Input Pipelines with tf.data
Yesterday you built the same model three ways — Sequential, Functional, and subclassed — and every one of them ended with a call like model.fit(x, y) on a NumPy array sitting in memory. That works right up until your dataset stops fitting in RAM, or your GPU starts spending half of every training step waiting for the CPU to hand it the next batch. Today we fix the supply chain. tf.data is TensorFlow’s answer to the question “how do I stream, transform, and batch data fast enough that my accelerator never idles?” — the rough equivalent of PyTorch’s Dataset + DataLoader, except it’s a lazy, composable, graph-compiled pipeline rather than a multiprocessing worker pool. By the end of today you’ll have a real image pipeline reading thousands of JPEGs off disk, and a benchmark proving the optimized version is several times faster than the naive one.
🎯 Today you will: build datasets with from_tensor_slices, chain map/shuffle/batch/cache/prefetch in the right order, decode real JPEGs into a training-ready image pipeline, split train/val cleanly, and benchmark naive vs optimized pipelines to see AUTOTUNE earn its keep.
The mental model: a lazy conveyor belt
A tf.data.Dataset is not a container of data. It’s a recipe — a description of where elements come from and what happens to each one on the way out. Nothing is loaded or computed when you build it; work happens only when something iterates it (a for loop, or model.fit). Each transformation returns a new dataset wrapping the old one, so you build pipelines by chaining:
import tensorflow as tf
import numpy as np
print(tf.__version__) # 2.x — anything ≥ 2.16 is fine
# The simplest possible dataset: slice a tensor along axis 0
x = np.arange(10, dtype=np.float32)
ds = tf.data.Dataset.from_tensor_slices(x)
print(ds.element_spec)
# TensorSpec(shape=(), dtype=tf.float32, name=None)
for item in ds.take(3):
print(item.numpy())
# 0.0
# 1.0
# 2.0Two things to internalize here:
from_tensor_slicesslices along the first axis. Give it a(10, 28, 28)array and you get a dataset of ten(28, 28)elements. Give it a scalar-shaped array by mistake (from_tensor_slices(5.0)) and it errors — there’s no axis 0 to slice. Its siblingfrom_tensorsdoes no slicing: it makes a dataset with exactly one element containing the whole tensor. Confusing the two is a classic Day-3 bug: your “dataset” has one giant element and training finishes in one step.element_specis your shape debugger. Every dataset knows the structure, shape, and dtype of its elements. When a pipeline misbehaves, printelement_specat each stage the way you’d print.shapeon tensors. It’s free — remember, nothing has been computed yet.
Datasets slice structures, too. Pass a tuple and you get a dataset of tuples — this is how you pair features with labels:
features = np.random.rand(6, 4).astype(np.float32) # 6 examples, 4 features
labels = np.array([0, 1, 0, 1, 1, 0])
ds = tf.data.Dataset.from_tensor_slices((features, labels))
print(ds.element_spec)
# (TensorSpec(shape=(4,), dtype=tf.float32, name=None),
# TensorSpec(shape=(), dtype=tf.int64, name=None))model.fit(ds_batched) understands this (inputs, targets) tuple convention directly — no separate y= argument needed. That’s the contract you’ll use for the rest of the course.
One PyTorch contrast worth pausing on: in PyTorch, Dataset.__getitem__ is arbitrary Python, and DataLoader parallelizes it with worker processes. In tf.data, transformations are traced into a TensorFlow graph and parallelized with threads inside the runtime — no pickling, no worker startup cost, but also: the code inside map should be TensorFlow ops, not arbitrary Python, or you lose the speed (more on that trap below).
The transformation toolkit — and why order matters
Five methods do 95% of the work. Here they are on a toy dataset so you can see what each one does:
ds = tf.data.Dataset.range(10) # 0, 1, 2, ..., 9 (int64)
# map: apply a function to every element
ds_sq = ds.map(lambda x: x * x)
print(list(ds_sq.as_numpy_iterator()))
# [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
# shuffle: fill a buffer of N elements, emit random picks from it
ds_sh = ds.shuffle(buffer_size=10, seed=42)
print(list(ds_sh.as_numpy_iterator()))
# [1, 4, 0, 9, 2, 6, 8, 3, 5, 7] (order varies with seed/version)
# batch: group consecutive elements into one tensor with a new axis 0
ds_b = ds.batch(4)
for b in ds_b:
print(b.numpy())
# [0 1 2 3]
# [4 5 6 7]
# [8 9] <- last batch is short!Three details that bite people:
shuffle’s buffer is not the dataset size — but it should be big enough. shuffle(buffer_size=B) maintains a reservoir of B elements and samples uniformly from it. If B is much smaller than your dataset and your data is sorted by class (very common — files listed per directory!), early batches will be nearly all one class. Rule of thumb: buffer_size ≥ dataset size for full shuffling when elements are small (filenames, indices); a few thousand when elements are big (decoded images). Shuffling filenames fully and decoded images lightly is the standard trick — filenames are cheap.
The last batch is ragged. batch(4) on 10 elements yields shapes (4,), (4,), (2,). Usually fine; but if anything downstream requires a static batch dimension (TPUs, some custom layers), pass drop_remainder=True.
Shuffle before batch, always. This is the ordering mistake worth burning into memory:
# WRONG: shuffles the ORDER OF BATCHES, not the examples.
# Every epoch, examples 0-3 travel together forever.
bad = tf.data.Dataset.range(10).batch(4).shuffle(10)
# RIGHT: examples are shuffled, then grouped into fresh batches.
good = tf.data.Dataset.range(10).shuffle(10).batch(4)With .batch().shuffle(), each batch is an immutable brick; you’re only shuffling bricks. Your model sees the exact same example groupings every epoch, which measurably hurts generalization. shuffle also reshuffles automatically every epoch by default (reshuffle_each_iteration=True) — you get fresh epochs for free, another thing PyTorch’s DataLoader(shuffle=True) also does but that hand-rolled pipelines often forget.
Now the two performance methods, which change when work happens rather than what the elements are:
AUTOTUNE = tf.data.AUTOTUNE # sentinel: "runtime, you pick the parallelism"
ds = (
tf.data.Dataset.range(10)
.map(lambda x: x * x, num_parallel_calls=AUTOTUNE) # parallel map
.cache() # memoize results
.shuffle(10)
.batch(4)
.prefetch(AUTOTUNE) # overlap prep & consume
)map(..., num_parallel_calls=AUTOTUNE)runs the mapped function on multiple elements concurrently, with the runtime tuning the thread count dynamically. Omitnum_parallel_callsand mapping is strictly sequential — for image decoding this alone can be a 4–8× difference.cache()records the elements the first time through and replays them from memory (or from a file,cache("/path"), when they don’t fit in RAM) on every later epoch. Everything upstream of the cache runs exactly once, ever.prefetch(AUTOTUNE)decouples producer and consumer: while your model trains on batch \(n\), a background thread is already preparing batch \(n{+}1\). Without it, each training step waits for data prep, then data prep waits for the training step — pure serialized waste.
Prefetching changes the step-time arithmetic from a sum to a max:
\[ t_{\text{step}}^{\text{naive}} \approx t_{\text{prep}} + t_{\text{train}} \qquad\longrightarrow\qquad t_{\text{step}}^{\text{prefetch}} \approx \max(t_{\text{prep}},\, t_{\text{train}}) \]
Here’s that overlap visually — same work, different wall clock:
If t_prep ≤ t_train, prefetch hides data preparation entirely — the GPU never waits. That’s the whole game: make the input pipeline invisible.
A real pipeline: decoding images from disk
Toy ranges are fine for learning the API, but the transformations only start to matter when elements are expensive. Let’s build the pipeline you’ll actually reuse on Day 6 (CNNs) and Day 8 (transfer learning): the TensorFlow flowers dataset — ~3,670 JPEGs in five class-named directories, about 220 MB.
import pathlib
data_dir = tf.keras.utils.get_file(
"flower_photos",
origin="https://storage.googleapis.com/download.tensorflow.org/"
"example_images/flower_photos.tgz",
untar=True,
)
data_dir = pathlib.Path(data_dir)
class_names = sorted(
p.name for p in data_dir.iterdir() if p.is_dir() and p.name != "LICENSE.txt"
)
print(class_names)
# ['daisy', 'dandelion', 'roses', 'sunflowers', 'tulips']The directory layout is the labeling scheme: flower_photos/roses/xyz.jpg is a rose. We collect (filepath, label_index) pairs in plain Python — this part is cheap, so there’s no need to do it in-graph:
all_paths = [str(p) for p in data_dir.glob("*/*.jpg")]
all_labels = [class_names.index(pathlib.Path(p).parent.name) for p in all_paths]
print(len(all_paths))
# 3670
# Shuffle ONCE in Python with a fixed seed, so the train/val split
# is random but reproducible across runs.
rng = np.random.default_rng(42)
perm = rng.permutation(len(all_paths))
all_paths = [all_paths[i] for i in perm]
all_labels = [all_labels[i] for i in perm]Now the piece that must be fast, because it runs on every image on every epoch: the decode function. Everything in it is a TensorFlow op, so tf.data can trace it into a graph and run it in parallel threads:
IMG_SIZE = 180
def decode_image(path, label):
raw = tf.io.read_file(path) # bytes, shape ()
img = tf.io.decode_jpeg(raw, channels=3) # uint8, (H, W, 3) - varies!
img = tf.image.resize(img, [IMG_SIZE, IMG_SIZE]) # float32, (180, 180, 3)
img = img / 255.0 # scale to [0, 1]
return img, labelWalk the shapes: read_file gives a scalar string of raw bytes; decode_jpeg gives a uint8 tensor whose height and width differ per image; resize normalizes everyone to (180, 180, 3) and silently converts to float32 — which is why the / 255.0 comes after it, not before. If you skipped resize, batch() would crash the moment it tried to stack two differently-sized images into one tensor: Cannot batch tensors with different shapes. Ragged shapes must be resolved before batching, always.
The other trap: don’t write this function in plain Python (PIL.Image.open, np.array, …). It would work — wrapped in tf.py_function it even parallelizes poorly rather than not at all — but you’d forfeit graph execution and most of your throughput. Inside map, TensorFlow ops or bust. (This is the mirror image of PyTorch, where __getitem__ is plain Python by design and multiprocessing hides the cost.)
Train/val split, and the assembled pipeline
Because we shuffled the file list in Python with a fixed seed, splitting is just list slicing — no take/skip gymnastics, no risk of leaking examples between splits when someone changes a buffer size:
n_val = int(0.2 * len(all_paths))
train_paths, val_paths = all_paths[n_val:], all_paths[:n_val]
train_labels, val_labels = all_labels[n_val:], all_labels[:n_val]
print(len(train_paths), len(val_paths))
# 2936 734(You can split with ds.take(n) / ds.skip(n) on a dataset, but then correctness depends on the upstream dataset being deterministic — one stray shuffle before the split and your validation set bleeds into training. Splitting concrete lists is boring and safe. Boring wins.)
Now assemble the full training pipeline, with each stage in its load-bearing position:
BATCH = 32
AUTOTUNE = tf.data.AUTOTUNE
train_ds = (
tf.data.Dataset.from_tensor_slices((train_paths, train_labels))
.map(decode_image, num_parallel_calls=AUTOTUNE) # decode in parallel
.cache() # epoch 2+: skip disk & JPEG
.shuffle(1000) # per-epoch reshuffle
.batch(BATCH)
.prefetch(AUTOTUNE) # overlap with training
)
val_ds = (
tf.data.Dataset.from_tensor_slices((val_paths, val_labels))
.map(decode_image, num_parallel_calls=AUTOTUNE)
.cache()
.batch(BATCH) # no shuffle: val order is irrelevant
.prefetch(AUTOTUNE)
)
print(train_ds.element_spec)
# (TensorSpec(shape=(None, 180, 180, 3), dtype=tf.float32, name=None),
# TensorSpec(shape=(None,), dtype=tf.int64, name=None))The batch axis shows as None because the final batch is short — exactly the ragged-last-batch behavior you saw on the toy example.
Why this exact order? Follow one image through the belt:
cacheaftermap, beforeshuffle. Decoding is expensive and deterministic → cache its output so epochs 2+ never touch disk or the JPEG decoder. Shuffling is cheap and must differ per epoch → it goes after the cache. Putcache()aftershuffle()and you freeze one particular shuffle order forever; put random augmentation (coming Day 7) beforecache()and you freeze one particular set of random crops forever — the single most commontf.databug in the wild. The rule: deterministic-and-expensive above the cache, random-or-cheap below it.prefetchlast, always. It should overlap the entire preparation pipeline with training. Prefetching in the middle only overlaps the stages above it.shuffle(1000)notshuffle(2936)because these elements are decoded float32 images: \(1000 \times 180 \times 180 \times 3 \times 4\) bytes ≈ 389 MB of buffer. That’s affordable but worth being deliberate about; the Python-side full shuffle of filenames already removed the sorted-by-class ordering, so the in-graph buffer only needs to provide per-epoch variety.
Sanity-check the output like you’d eyeball any tensor:
images, labels = next(iter(train_ds))
print(images.shape, images.dtype, float(tf.reduce_min(images)), float(tf.reduce_max(images)))
print(labels[:8].numpy())
# (32, 180, 180, 3) <dtype: 'float32'> 0.0 1.0
# [3 0 4 4 1 2 0 3]Shapes right, values in \([0,1]\), labels mixed across classes. This dataset plugs straight into yesterday’s models: model.fit(train_ds, validation_data=val_ds, epochs=5). And a Keras 3 bonus: tf.data pipelines feed fit() under any backend — TensorFlow, JAX, or PyTorch — so this day’s work travels with you even if the model side switches.
Benchmark: naive vs optimized
Claims about performance deserve numbers. Here’s a minimal benchmark harness — it just iterates the dataset, consuming batches as fast as the pipeline can produce them, and times two epochs so we can see the cache kick in:
import time
def benchmark(ds, name, epochs=2):
for epoch in range(epochs):
t0 = time.perf_counter()
n = 0
for _ in ds:
n += 1
dt = time.perf_counter() - t0
print(f"{name:>10s} epoch {epoch+1}: {dt:5.2f}s ({n} batches)")The naive pipeline is what someone writes on their first day with tf.data — same stages, minus every performance affordance:
naive_ds = (
tf.data.Dataset.from_tensor_slices((train_paths, train_labels))
.map(decode_image) # sequential: one image at a time
.shuffle(1000)
.batch(BATCH) # no cache, no prefetch
)
benchmark(naive_ds, "naive")
benchmark(train_ds, "optimized")Representative output on a laptop with a warm filesystem cache (your absolute numbers will differ; the ratios are the story):
naive epoch 1: 22.41s (92 batches)
naive epoch 2: 21.87s (92 batches)
optimized epoch 1: 4.63s (92 batches)
optimized epoch 2: 1.02s (92 batches)
Read this table like a detective:
| Comparison | Speedup | Cause |
|---|---|---|
| naive → optimized, epoch 1 | ~5× | num_parallel_calls=AUTOTUNE: decoding uses all cores instead of one |
| optimized epoch 1 → epoch 2 | ~4.5× more | cache(): no disk reads, no JPEG decode — replay from memory |
| naive epoch 1 → epoch 2 | none | nothing was cached; every epoch pays full price forever |
And note what this benchmark understates: there’s no model here, so prefetch had nothing to overlap with. In real training, prefetch is often the biggest win of the three, because it converts \(t_{\text{prep}} + t_{\text{train}}\) into \(\max(t_{\text{prep}}, t_{\text{train}})\) on every single step. If you want to see your pipeline’s health during actual training, watch GPU utilization (nvidia-smi -l 1): a saw-tooth bouncing between 0% and 90% means the model is starving between batches — an input pipeline problem, not a model problem.
The distilled performance rules, in the order you should reach for them:
prefetch(AUTOTUNE)at the end. Always. It’s one line and it’s never wrong.num_parallel_calls=AUTOTUNEon every non-trivialmap. Sequential mapping is the default and it’s almost never what you want.cache()after the expensive deterministic work, before anything random. Memory cache if it fits,cache(filename)if it doesn’t.- Shuffle before batch, with a buffer sized to your element cost — filenames fully, decoded images in the low thousands.
- Keep Python out of
map. TF ops trace into a parallel graph;tf.py_functionserializes on the GIL. - Vectorize cheap maps after
batchwhen the per-element function is trivial (e.g./ 255.0on a whole batch beats 32 tiny per-image ops) — a refinement, not a requirement.
🧪 Your task
Take the flowers pipeline and answer an empirical question: how much does each optimization contribute on your machine? Build four pipelines — (a) fully naive, (b) naive + parallel map, (c) b + cache, (d) c + prefetch (i.e. the full optimized pipeline) — and benchmark each for two epochs with the benchmark() helper. Present the results as a small table of epoch-1 and epoch-2 times, and write one sentence identifying which single change bought the most.
Hint: build the four datasets in a loop or a dict so you don’t copy-paste five-stage chains four times — a small builder function taking parallel, use_cache, use_prefetch booleans keeps it honest, and guarantees the four variants differ only in the flags you’re testing.
Solution
import time
import tensorflow as tf
AUTOTUNE = tf.data.AUTOTUNE
BATCH = 32
def build_pipeline(parallel=False, use_cache=False, use_prefetch=False):
ds = tf.data.Dataset.from_tensor_slices((train_paths, train_labels))
ds = ds.map(
decode_image,
num_parallel_calls=AUTOTUNE if parallel else None,
)
if use_cache:
ds = ds.cache()
ds = ds.shuffle(1000).batch(BATCH)
if use_prefetch:
ds = ds.prefetch(AUTOTUNE)
return ds
def benchmark(ds, name, epochs=2):
times = []
for epoch in range(epochs):
t0 = time.perf_counter()
for _ in ds:
pass
times.append(time.perf_counter() - t0)
return times
variants = {
"a) naive": build_pipeline(),
"b) +parallel map": build_pipeline(parallel=True),
"c) +cache": build_pipeline(parallel=True, use_cache=True),
"d) +prefetch (full)": build_pipeline(parallel=True, use_cache=True,
use_prefetch=True),
}
print(f"{'pipeline':<22s} {'epoch 1':>8s} {'epoch 2':>8s}")
for name, ds in variants.items():
t1, t2 = benchmark(ds, name)
print(f"{name:<22s} {t1:7.2f}s {t2:7.2f}s")Typical result shape (numbers machine-dependent):
pipeline epoch 1 epoch 2
a) naive 22.4s 21.9s
b) +parallel map 4.7s 4.6s
c) +cache 4.7s 1.0s
d) +prefetch (full) 4.6s 0.9s
Expected findings: parallel map dominates epoch 1 (it attacks the biggest cost — JPEG decode — with all your cores), cache dominates epoch 2+ (the decode cost drops to zero on replay), and prefetch looks like a no-op here — because this benchmark has no training step to overlap with. That last observation is the deepest lesson in the exercise: prefetch’s value is proportional to how much compute it can hide behind, so a data-only benchmark can’t see it. Re-run variant (c) vs (d) inside a real model.fit on Day 5 and the gap appears.
Key takeaways
- A
tf.data.Datasetis a lazy recipe, not a container;from_tensor_slicesslices along axis 0, andelement_specis your shape debugger at every stage. - Canonical order:
map(decode, num_parallel_calls=AUTOTUNE)→cache()→shuffle(buffer)→batch(B)→prefetch(AUTOTUNE). - Shuffle before batch — shuffling batches is not shuffling data. Cache after expensive deterministic work, never after random augmentation.
- Resolve ragged shapes (
tf.image.resize) beforebatch, and keepmapfunctions in TensorFlow ops, not Python. prefetchturns \(t_{\text{prep}} + t_{\text{train}}\) into \(\max(t_{\text{prep}}, t_{\text{train}})\); parallel map wins epoch 1, cache wins every epoch after.- Split train/val on concrete Python lists with a seeded shuffle — reproducible, leak-proof, boring.
Tomorrow: the pipeline meets the optimizer — compile()/fit() versus writing your own training loop with GradientTape, and when each is the right tool.