Kader Mohideen
  • About
  • Blog
  • Projects
  • Health
  • Mini Courses
  • Extra
    • AI & ML Encyclopedia
    • Interview Guide
    • AI Interview Prep
    • Book References
    • Quest for AGI
    • AI Papers
    • Lupus

In this lesson

  • Lesson 3 — Feeding the Beast: Input Pipelines with tf.data
    • The mental model: a lazy conveyor belt
    • The transformation toolkit — and why order matters
    • A real pipeline: decoding images from disk
    • Train/val split, and the assembled pipeline
    • Benchmark: naive vs optimized
    • 🧪 Your task
    • Key takeaways

📊 Deep Learning with TensorFlow & Keras · Lesson 3 — Feeding the Beast: Input Pipelines with tf.data

🏠 📊 Course home  |  ← Lesson 02  |  Lesson 04 →  |  📚 All mini-courses


Lesson 3 — Feeding the Beast: Input Pipelines with tf.data

In the previous lesson you built the same model three ways — Sequential, Functional, and subclassed — and every one of them ended with a call like model.fit(x, y) on a NumPy array sitting in memory. That works right up until your dataset stops fitting in RAM, or your GPU starts spending half of every training step waiting for the CPU to hand it the next batch. In this lesson we fix the supply chain. tf.data is TensorFlow’s answer to the question “how do I stream, transform, and batch data fast enough that my accelerator never idles?” — the rough equivalent of PyTorch’s Dataset + DataLoader, except it’s a lazy, composable, graph-compiled pipeline rather than a multiprocessing worker pool. By the end of in this lesson you’ll have a real image pipeline reading thousands of JPEGs off disk, and a benchmark proving the optimized version is several times faster than the naive one.

🎯 In this lesson you will: build datasets with from_tensor_slices, chain map/shuffle/batch/cache/prefetch in the right order, decode real JPEGs into a training-ready image pipeline, split train/val cleanly, and benchmark naive vs optimized pipelines to see AUTOTUNE earn its keep.

The mental model: a lazy conveyor belt

A tf.data.Dataset is not a container of data. It’s a recipe — a description of where elements come from and what happens to each one on the way out. Nothing is loaded or computed when you build it; work happens only when something iterates it (a for loop, or model.fit). Each transformation returns a new dataset wrapping the old one, so you build pipelines by chaining:

import tensorflow as tf
import numpy as np

print(tf.__version__)  # 2.x — anything ≥ 2.16 is fine

# The simplest possible dataset: slice a tensor along axis 0
x = np.arange(10, dtype=np.float32)
ds = tf.data.Dataset.from_tensor_slices(x)

print(ds.element_spec)
# TensorSpec(shape=(), dtype=tf.float32, name=None)

for item in ds.take(3):
    print(item.numpy())
# 0.0
# 1.0
# 2.0

Two things to internalize here:

  • from_tensor_slices slices along the first axis. Give it a (10, 28, 28) array and you get a dataset of ten (28, 28) elements. Give it a scalar-shaped array by mistake (from_tensor_slices(5.0)) and it errors — there’s no axis 0 to slice. Its sibling from_tensors does no slicing: it makes a dataset with exactly one element containing the whole tensor. Confusing the two is a classic Lesson-3 bug: your “dataset” has one giant element and training finishes in one step.
  • element_spec is your shape debugger. Every dataset knows the structure, shape, and dtype of its elements. When a pipeline misbehaves, print element_spec at each stage the way you’d print .shape on tensors. It’s free — remember, nothing has been computed yet.

Datasets slice structures, too. Pass a tuple and you get a dataset of tuples — this is how you pair features with labels:

features = np.random.rand(6, 4).astype(np.float32)   # 6 examples, 4 features
labels   = np.array([0, 1, 0, 1, 1, 0])

ds = tf.data.Dataset.from_tensor_slices((features, labels))
print(ds.element_spec)
# (TensorSpec(shape=(4,), dtype=tf.float32, name=None),
#  TensorSpec(shape=(), dtype=tf.int64, name=None))

model.fit(ds_batched) understands this (inputs, targets) tuple convention directly — no separate y= argument needed. That’s the contract you’ll use for the rest of the course.

One PyTorch contrast worth pausing on: in PyTorch, Dataset.__getitem__ is arbitrary Python, and DataLoader parallelizes it with worker processes. In tf.data, transformations are traced into a TensorFlow graph and parallelized with threads inside the runtime — no pickling, no worker startup cost, but also: the code inside map should be TensorFlow ops, not arbitrary Python, or you lose the speed (more on that trap below).

The transformation toolkit — and why order matters

Five methods do 95% of the work. Here they are on a toy dataset so you can see what each one does:

ds = tf.data.Dataset.range(10)          # 0, 1, 2, ..., 9 (int64)

# map: apply a function to every element
ds_sq = ds.map(lambda x: x * x)
print(list(ds_sq.as_numpy_iterator()))
# [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

# shuffle: fill a buffer of N elements, emit random picks from it
ds_sh = ds.shuffle(buffer_size=10, seed=42)
print(list(ds_sh.as_numpy_iterator()))
# [1, 4, 0, 9, 2, 6, 8, 3, 5, 7]   (order varies with seed/version)

# batch: group consecutive elements into one tensor with a new axis 0
ds_b = ds.batch(4)
for b in ds_b:
    print(b.numpy())
# [0 1 2 3]
# [4 5 6 7]
# [8 9]        <- last batch is short!

Three details that bite people:

shuffle’s buffer is not the dataset size — but it should be big enough. shuffle(buffer_size=B) maintains a reservoir of B elements and samples uniformly from it. If B is much smaller than your dataset and your data is sorted by class (very common — files listed per directory!), early batches will be nearly all one class. Rule of thumb: buffer_size ≥ dataset size for full shuffling when elements are small (filenames, indices); a few thousand when elements are big (decoded images). Shuffling filenames fully and decoded images lightly is the standard trick — filenames are cheap.

The last batch is ragged. batch(4) on 10 elements yields shapes (4,), (4,), (2,). Usually fine; but if anything downstream requires a static batch dimension (TPUs, some custom layers), pass drop_remainder=True.

Shuffle before batch, always. This is the ordering mistake worth burning into memory:

# WRONG: shuffles the ORDER OF BATCHES, not the examples.
# Every epoch, examples 0-3 travel together forever.
bad = tf.data.Dataset.range(10).batch(4).shuffle(10)

# RIGHT: examples are shuffled, then grouped into fresh batches.
good = tf.data.Dataset.range(10).shuffle(10).batch(4)

With .batch().shuffle(), each batch is an immutable brick; you’re only shuffling bricks. Your model sees the exact same example groupings every epoch, which measurably hurts generalization. shuffle also reshuffles automatically every epoch by default (reshuffle_each_iteration=True) — you get fresh epochs for free, another thing PyTorch’s DataLoader(shuffle=True) also does but that hand-rolled pipelines often forget.

Now the two performance methods, which change when work happens rather than what the elements are:

AUTOTUNE = tf.data.AUTOTUNE   # sentinel: "runtime, you pick the parallelism"

ds = (
    tf.data.Dataset.range(10)
      .map(lambda x: x * x, num_parallel_calls=AUTOTUNE)  # parallel map
      .cache()                                            # memoize results
      .shuffle(10)
      .batch(4)
      .prefetch(AUTOTUNE)                                 # overlap prep & consume
)
  • map(..., num_parallel_calls=AUTOTUNE) runs the mapped function on multiple elements concurrently, with the runtime tuning the thread count dynamically. Omit num_parallel_calls and mapping is strictly sequential — for image decoding this alone can be a 4–8× difference.
  • cache() records the elements the first time through and replays them from memory (or from a file, cache("/path"), when they don’t fit in RAM) on every later epoch. Everything upstream of the cache runs exactly once, ever.
  • prefetch(AUTOTUNE) decouples producer and consumer: while your model trains on batch \(n\), a background thread is already preparing batch \(n{+}1\). Without it, each training step waits for data prep, then data prep waits for the training step — pure serialized waste.

Prefetching changes the step-time arithmetic from a sum to a max:

\[ t_{\text{step}}^{\text{naive}} \approx t_{\text{prep}} + t_{\text{train}} \qquad\longrightarrow\qquad t_{\text{step}}^{\text{prefetch}} \approx \max(t_{\text{prep}},\, t_{\text{train}}) \]

Here’s that overlap visually — same work, different wall clock:

Naive: CPU prep and GPU train take turns CPU prep 1 prep 2 prep 3 GPU train 1 train 2 train 3 6 units of time With prefetch: prep batch n+1 while training on batch n CPU prep 1 prep 2 prep 3 GPU train 1 train 2 train 3 4 units of time t_step ≈ max(prep, train) instead of prep + train

If t_prep ≤ t_train, prefetch hides data preparation entirely — the GPU never waits. That’s the whole game: make the input pipeline invisible.

A real pipeline: decoding images from disk

Toy ranges are fine for learning the API, but the transformations only start to matter when elements are expensive. Let’s build the pipeline you’ll actually reuse on Lesson 6 (CNNs) and Lesson 8 (transfer learning): the TensorFlow flowers dataset — ~3,670 JPEGs in five class-named directories, about 220 MB.

import pathlib

data_dir = tf.keras.utils.get_file(
    "flower_photos",
    origin="https://storage.googleapis.com/download.tensorflow.org/"
           "example_images/flower_photos.tgz",
    untar=True,
)
data_dir = pathlib.Path(data_dir)

class_names = sorted(
    p.name for p in data_dir.iterdir() if p.is_dir() and p.name != "LICENSE.txt"
)
print(class_names)
# ['daisy', 'dandelion', 'roses', 'sunflowers', 'tulips']

The directory layout is the labeling scheme: flower_photos/roses/xyz.jpg is a rose. We collect (filepath, label_index) pairs in plain Python — this part is cheap, so there’s no need to do it in-graph:

all_paths = [str(p) for p in data_dir.glob("*/*.jpg")]
all_labels = [class_names.index(pathlib.Path(p).parent.name) for p in all_paths]
print(len(all_paths))
# 3670

# Shuffle ONCE in Python with a fixed seed, so the train/val split
# is random but reproducible across runs.
rng = np.random.default_rng(42)
perm = rng.permutation(len(all_paths))
all_paths  = [all_paths[i]  for i in perm]
all_labels = [all_labels[i] for i in perm]

Now the piece that must be fast, because it runs on every image on every epoch: the decode function. Everything in it is a TensorFlow op, so tf.data can trace it into a graph and run it in parallel threads:

IMG_SIZE = 180

def decode_image(path, label):
    raw = tf.io.read_file(path)                          # bytes, shape ()
    img = tf.io.decode_jpeg(raw, channels=3)             # uint8, (H, W, 3) - varies!
    img = tf.image.resize(img, [IMG_SIZE, IMG_SIZE])     # float32, (180, 180, 3)
    img = img / 255.0                                    # scale to [0, 1]
    return img, label

Walk the shapes: read_file gives a scalar string of raw bytes; decode_jpeg gives a uint8 tensor whose height and width differ per image; resize normalizes everyone to (180, 180, 3) and silently converts to float32 — which is why the / 255.0 comes after it, not before. If you skipped resize, batch() would crash the moment it tried to stack two differently-sized images into one tensor: Cannot batch tensors with different shapes. Ragged shapes must be resolved before batching, always.

The other trap: don’t write this function in plain Python (PIL.Image.open, np.array, …). It would work — wrapped in tf.py_function it even parallelizes poorly rather than not at all — but you’d forfeit graph execution and most of your throughput. Inside map, TensorFlow ops or bust. (This is the mirror image of PyTorch, where __getitem__ is plain Python by design and multiprocessing hides the cost.)

Train/val split, and the assembled pipeline

Because we shuffled the file list in Python with a fixed seed, splitting is just list slicing — no take/skip gymnastics, no risk of leaking examples between splits when someone changes a buffer size:

n_val = int(0.2 * len(all_paths))
train_paths,  val_paths  = all_paths[n_val:],  all_paths[:n_val]
train_labels, val_labels = all_labels[n_val:], all_labels[:n_val]
print(len(train_paths), len(val_paths))
# 2936 734

(You can split with ds.take(n) / ds.skip(n) on a dataset, but then correctness depends on the upstream dataset being deterministic — one stray shuffle before the split and your validation set bleeds into training. Splitting concrete lists is boring and safe. Boring wins.)

Now assemble the full training pipeline, with each stage in its load-bearing position:

BATCH = 32
AUTOTUNE = tf.data.AUTOTUNE

train_ds = (
    tf.data.Dataset.from_tensor_slices((train_paths, train_labels))
      .map(decode_image, num_parallel_calls=AUTOTUNE)  # decode in parallel
      .cache()                                         # epoch 2+: skip disk & JPEG
      .shuffle(1000)                                   # per-epoch reshuffle
      .batch(BATCH)
      .prefetch(AUTOTUNE)                              # overlap with training
)

val_ds = (
    tf.data.Dataset.from_tensor_slices((val_paths, val_labels))
      .map(decode_image, num_parallel_calls=AUTOTUNE)
      .cache()
      .batch(BATCH)                                    # no shuffle: val order is irrelevant
      .prefetch(AUTOTUNE)
)

print(train_ds.element_spec)
# (TensorSpec(shape=(None, 180, 180, 3), dtype=tf.float32, name=None),
#  TensorSpec(shape=(None,), dtype=tf.int64, name=None))

The batch axis shows as None because the final batch is short — exactly the ragged-last-batch behavior you saw on the toy example.

Why this exact order? Follow one image through the belt:

flowchart LR
    A["from_tensor_slices<br/>(path, label)"] --> B["map(decode_image)<br/>parallel · AUTOTUNE"]
    B --> C["cache()<br/>epoch 1: record<br/>epoch 2+: replay"]
    C --> D["shuffle(1000)<br/>fresh order each epoch"]
    D --> E["batch(32)<br/>(180,180,3) → (32,180,180,3)"]
    E --> F["prefetch(AUTOTUNE)<br/>prep batch n+1 during step n"]
    F --> G(("model.fit"))
    style B fill:#f59e0b33,stroke:#f59e0b
    style C fill:#22c55e33,stroke:#22c55e
    style F fill:#6366f133,stroke:#6366f1

  • cache after map, before shuffle. Decoding is expensive and deterministic → cache its output so epochs 2+ never touch disk or the JPEG decoder. Shuffling is cheap and must differ per epoch → it goes after the cache. Put cache() after shuffle() and you freeze one particular shuffle order forever; put random augmentation (coming Lesson 7) before cache() and you freeze one particular set of random crops forever — the single most common tf.data bug in the wild. The rule: deterministic-and-expensive above the cache, random-or-cheap below it.
  • prefetch last, always. It should overlap the entire preparation pipeline with training. Prefetching in the middle only overlaps the stages above it.
  • shuffle(1000) not shuffle(2936) because these elements are decoded float32 images: \(1000 \times 180 \times 180 \times 3 \times 4\) bytes ≈ 389 MB of buffer. That’s affordable but worth being deliberate about; the Python-side full shuffle of filenames already removed the sorted-by-class ordering, so the in-graph buffer only needs to provide per-epoch variety.

Sanity-check the output like you’d eyeball any tensor:

images, labels = next(iter(train_ds))
print(images.shape, images.dtype, float(tf.reduce_min(images)), float(tf.reduce_max(images)))
print(labels[:8].numpy())
# (32, 180, 180, 3) <dtype: 'float32'> 0.0 1.0
# [3 0 4 4 1 2 0 3]

Shapes right, values in \([0,1]\), labels mixed across classes. This dataset plugs straight into in the previous lesson’s models: model.fit(train_ds, validation_data=val_ds, epochs=5). And a Keras 3 bonus: tf.data pipelines feed fit() under any backend — TensorFlow, JAX, or PyTorch — so this lesson’s work travels with you even if the model side switches.

Benchmark: naive vs optimized

Claims about performance deserve numbers. Here’s a minimal benchmark harness — it just iterates the dataset, consuming batches as fast as the pipeline can produce them, and times two epochs so we can see the cache kick in:

import time

def benchmark(ds, name, epochs=2):
    for epoch in range(epochs):
        t0 = time.perf_counter()
        n = 0
        for _ in ds:
            n += 1
        dt = time.perf_counter() - t0
        print(f"{name:>10s}  epoch {epoch+1}: {dt:5.2f}s  ({n} batches)")

The naive pipeline is what someone writes on their first day with tf.data — same stages, minus every performance affordance:

naive_ds = (
    tf.data.Dataset.from_tensor_slices((train_paths, train_labels))
      .map(decode_image)          # sequential: one image at a time
      .shuffle(1000)
      .batch(BATCH)               # no cache, no prefetch
)

benchmark(naive_ds, "naive")
benchmark(train_ds, "optimized")

Representative output on a laptop with a warm filesystem cache (your absolute numbers will differ; the ratios are the story):

     naive  epoch 1: 22.41s  (92 batches)
     naive  epoch 2: 21.87s  (92 batches)
 optimized  epoch 1:  4.63s  (92 batches)
 optimized  epoch 2:  1.02s  (92 batches)

Read this table like a detective:

Comparison Speedup Cause
naive → optimized, epoch 1 ~5× num_parallel_calls=AUTOTUNE: decoding uses all cores instead of one
optimized epoch 1 → epoch 2 ~4.5× more cache(): no disk reads, no JPEG decode — replay from memory
naive epoch 1 → epoch 2 none nothing was cached; every epoch pays full price forever

And note what this benchmark understates: there’s no model here, so prefetch had nothing to overlap with. In real training, prefetch is often the biggest win of the three, because it converts \(t_{\text{prep}} + t_{\text{train}}\) into \(\max(t_{\text{prep}}, t_{\text{train}})\) on every single step. If you want to see your pipeline’s health during actual training, watch GPU utilization (nvidia-smi -l 1): a saw-tooth bouncing between 0% and 90% means the model is starving between batches — an input pipeline problem, not a model problem.

The distilled performance rules, in the order you should reach for them:

  1. prefetch(AUTOTUNE) at the end. Always. It’s one line and it’s never wrong.
  2. num_parallel_calls=AUTOTUNE on every non-trivial map. Sequential mapping is the default and it’s almost never what you want.
  3. cache() after the expensive deterministic work, before anything random. Memory cache if it fits, cache(filename) if it doesn’t.
  4. Shuffle before batch, with a buffer sized to your element cost — filenames fully, decoded images in the low thousands.
  5. Keep Python out of map. TF ops trace into a parallel graph; tf.py_function serializes on the GIL.
  6. Vectorize cheap maps after batch when the per-element function is trivial (e.g. / 255.0 on a whole batch beats 32 tiny per-image ops) — a refinement, not a requirement.

🧪 Your task

Take the flowers pipeline and answer an empirical question: how much does each optimization contribute on your machine? Build four pipelines — (a) fully naive, (b) naive + parallel map, (c) b + cache, (d) c + prefetch (i.e. the full optimized pipeline) — and benchmark each for two epochs with the benchmark() helper. Present the results as a small table of epoch-1 and epoch-2 times, and write one sentence identifying which single change bought the most.

Hint: build the four datasets in a loop or a dict so you don’t copy-paste five-stage chains four times — a small builder function taking parallel, use_cache, use_prefetch booleans keeps it honest, and guarantees the four variants differ only in the flags you’re testing.

Solution
import time
import tensorflow as tf

AUTOTUNE = tf.data.AUTOTUNE
BATCH = 32

def build_pipeline(parallel=False, use_cache=False, use_prefetch=False):
    ds = tf.data.Dataset.from_tensor_slices((train_paths, train_labels))
    ds = ds.map(
        decode_image,
        num_parallel_calls=AUTOTUNE if parallel else None,
    )
    if use_cache:
        ds = ds.cache()
    ds = ds.shuffle(1000).batch(BATCH)
    if use_prefetch:
        ds = ds.prefetch(AUTOTUNE)
    return ds

def benchmark(ds, name, epochs=2):
    times = []
    for epoch in range(epochs):
        t0 = time.perf_counter()
        for _ in ds:
            pass
        times.append(time.perf_counter() - t0)
    return times

variants = {
    "a) naive":            build_pipeline(),
    "b) +parallel map":    build_pipeline(parallel=True),
    "c) +cache":           build_pipeline(parallel=True, use_cache=True),
    "d) +prefetch (full)": build_pipeline(parallel=True, use_cache=True,
                                          use_prefetch=True),
}

print(f"{'pipeline':<22s} {'epoch 1':>8s} {'epoch 2':>8s}")
for name, ds in variants.items():
    t1, t2 = benchmark(ds, name)
    print(f"{name:<22s} {t1:7.2f}s {t2:7.2f}s")

Typical result shape (numbers machine-dependent):

pipeline                epoch 1  epoch 2
a) naive                 22.4s    21.9s
b) +parallel map          4.7s     4.6s
c) +cache                 4.7s     1.0s
d) +prefetch (full)       4.6s     0.9s

Expected findings: parallel map dominates epoch 1 (it attacks the biggest cost — JPEG decode — with all your cores), cache dominates epoch 2+ (the decode cost drops to zero on replay), and prefetch looks like a no-op here — because this benchmark has no training step to overlap with. That last observation is the deepest lesson in the exercise: prefetch’s value is proportional to how much compute it can hide behind, so a data-only benchmark can’t see it. Re-run variant (c) vs (d) inside a real model.fit on Lesson 5 and the gap appears.

Key takeaways

  • A tf.data.Dataset is a lazy recipe, not a container; from_tensor_slices slices along axis 0, and element_spec is your shape debugger at every stage.
  • Canonical order: map(decode, num_parallel_calls=AUTOTUNE) → cache() → shuffle(buffer) → batch(B) → prefetch(AUTOTUNE).
  • Shuffle before batch — shuffling batches is not shuffling data. Cache after expensive deterministic work, never after random augmentation.
  • Resolve ragged shapes (tf.image.resize) before batch, and keep map functions in TensorFlow ops, not Python.
  • prefetch turns \(t_{\text{prep}} + t_{\text{train}}\) into \(\max(t_{\text{prep}}, t_{\text{train}})\); parallel map wins epoch 1, cache wins every epoch after.
  • Split train/val on concrete Python lists with a seeded shuffle — reproducible, leak-proof, boring.

In the next lesson: the pipeline meets the optimizer — compile()/fit() versus writing your own training loop with GradientTape, and when each is the right tool.


🏠 📊 Course home  |  ← Lesson 02  |  Lesson 04 →  |  📚 All mini-courses

 

© Kader Mohideen