1. Torchdata 0.10.1 and more
Torchdata has been our choice since 2023 March, in Prescient-LM — where I have been training LLMs. It was great! It was somewhat difficult to use sometimes, it didn’t have full documentation, etc, but it worked when it worked. Then it was 2023 July when it was officially discontinued at 0.7.0. We were so sad. A year later, the maintainers announced that it will become alive again (yes!), with deprecating the existing approach (hm?) at 0.8.0, putting me into a mixed feeling.
Torchdata 0.10.1, the latest version as of 2025 Feb, is an outcome of the new approach. It has some essential implementation. I wasn’t sure if it’s ready to use yet, based on the README and the documentation. Turned out, the code is much more ready than I thought. I gave a shot, and gosh, it works! As an ENFP, my only natural action item is to write a blog post about it.
2. Why Torchdata
As of 2025, the training data loading is a solved problem when the model and data are small. Use torch or something to load the model to GPU, load the data on memory, feed the data to the GPU with utilities such as torch.utils.data.Dataset and then go - Also, there are enough docs and examples online.
But why is it not enough? Well, let’s see why Torchdata exists.
2.1 Requirements
Torchdata is a library of composable iterators (not iterables!) that let you chain together common dataloading and pre-proc operations. It follows a streaming programming model, although “sampler + Map-style” can still be configured if you desire.
torchdata.nodes adds more flexibility to the standard torch.utils.data offering, and introduces multi-threaded parallelism in addition to multi-process (the only supported approach in torch.utils.data.DataLoader), as well as first-class support for mid-epoch checkpointing through a state_dict/load_state_dict interface.
In other words, the existing torch.utils.data.Dataset is not ideal if -
Often, this may means:
Now we’re talking about large-scale training, where you’d like to ensure everything is fine. One requirement may be:
2.2 Can’t we do this with Dataset and DataLoader?
Technically, almost everything is possible except some limitation on multithreading. Perhaps that’s optional and you could just do it with Dataset and IterableDataset. But it would be cumbersome, and as Homo Sapiens, we aspire to find better tools.
3. Example
I’ll show you the Node classes I’m using now. This is the data / functional flow for a dataset I use.
[list files]
↓
.jsonl files
↓
[load jsonl]
↓
json dict
↓
[text processor]
↓
processed and rendered text
↓
[tokenizer]
↓
token ids
↓
[pack it or trim it]
↓
token ids
↓
[batching]
↓
batch of token ids
↓
[end of data loader]
3.1 Basic io
Based the provided nodes and the README under torchdata.nodes , here’s some basic custom nodes I wrote. First, LocalFileListNode .
import json
import logging
from pathlib import Path
from typing import Any, Dict, Iterator, List, Optional, Union
import pyarrow.parquet as pq
from torchdata.nodes import BaseNode
logger = logging.getLogger(__name__)
class LocalFileListNode(BaseNode[str]):
"""Node that lists files from a local directory matching specified patterns."""
def __init__(self, root_dir: Union[str, Path], patterns: List[str]):
super().__init__()
self.root_dir = Path(root_dir)
self.patterns = patterns
self._files: Optional[List[Path]] = None
self._current_idx: int = 0
def reset(self, initial_state: Optional[Dict[str, Any]] = None):
super().reset(initial_state)
if initial_state is not None:
self._current_idx = initial_state["current_idx"]
else:
self._current_idx = 0
if self._files is None:
self._files = []
for pattern in self.patterns:
self._files.extend(self.root_dir.glob(pattern))
self._files.sort() # for deterministic ordering
def next(self) -> str:
if self._current_idx >= len(self._files):
raise StopIteration
file_path = self._files[self._current_idx]
self._current_idx += 1
return str(file_path)
def get_state(self) -> Dict[str, Any]:
return {"current_idx": self._current_idx}
Let’s move on and build the json loader.
class JsonLinesReaderNode(BaseNode[Dict]):
"""Node that reads JSON Lines format files."""
def __init__(self, source_node: BaseNode[str]):
super().__init__()
self.source = source_node
self._current_lines = None
self._current_line_idx = 0
def reset(self, initial_state: Optional[Dict[str, Any]] = None):
super().reset(initial_state)
self.source.reset(initial_state.get("source_state") if initial_state else None)
self._current_lines = None
if initial_state is not None:
self._current_line_idx = initial_state["current_line_idx"]
else:
self._current_line_idx = 0
def _load_next_file(self) -> tuple[bool, str]:
try:
filepath = next(self.source)
# Load entire file into memory
with open(filepath, 'r') as f:
self._current_lines = f.readlines()
self._current_line_idx = 0
return True, filepath
except StopIteration:
return False, ""
def next(self) -> Dict:
file_path = "" # just to calm down my ide
while True:
if self._current_lines is None or self._current_line_idx >= len(self._current_lines):
success, file_path = self._load_next_file()
if not success:
raise StopIteration
try:
line = self._current_lines[self._current_line_idx]
self._current_line_idx += 1
try:
data = json.loads(line)
return {"json_dict": data,
"metadata":
{"file_path": file_path, "line_number": self._current_line_idx}
}
except json.JSONDecodeError as e:
logger.warning(f"Skipping invalid JSON line: {e}")
continue
except IndexError:
# Current file is exhausted, set to None and continue to load next file
self._current_lines = None
continue
def get_state(self) -> Dict[str, Any]:
return {
"source_state": self.source.state_dict(),
"current_line_idx": self._current_line_idx
}