Creación de canalizaciones de datos de alto rendimiento con Grain y ArrayRecord

3 DE OCTUBRE DE 2025
Jiyang Kang Technical Program Manager
Shivaji Dutta Field Solutions Architect
Ihor Indyk Software Engineer
Felix Chern Software Engineer

Al entrenar modelos grandes en aceleradores potentes, como GPU y TPU, lo último que quieres es que tu acelerador esté inactivo, esperando datos. Todo tu sistema es tan rápido como su parte más lenta y, a menudo, ese cuello de botella es el canal de entrada de datos. Por lo tanto, para el aprendizaje automático a gran escala, es esencial contar con una canalización de datos eficiente y reproducible. En esta guía, verás cómo resolver este desafío creando de una canalización de datos sólida y eficiente con Grain, una biblioteca de carga de datos flexible para JAX, y ArrayRecord, un formato de archivo altamente eficiente.


Comprensión de los componentes principales

Grain: un cargador de datos de alto rendimiento para JAX

Grain es una biblioteca de carga de datos liviana y de código abierto diseñada específicamente para resolver este problema para cargas de trabajo basadas en JAX. Garantiza que los datos se carguen, preprocesen e inserten en tu modelo de manera eficiente, lo que le permite maximizar el rendimiento de tu hardware.

¿Por qué usar Grain?

Grain se basa en una filosofía de rendimiento, reproducibilidad y flexibilidad. Estos son sus beneficios clave:

  • Exceptional performance: Grain is built for speed. It uses efficient multiprocessing (via the .mp_prefetch() method) to run data loading and transformations in parallel, ensuring that a buffer of prepared data is always ready for your model. This keeps your accelerators saturated and minimizes training time.

  • Guaranteed determinism and reproducibility: Grain provides full reproducibility, which is critical for credible research. By setting a simple seed, you ensure the data is always shuffled the same way. Crucially, its data iterators are stateful and can be checkpointed. This means if your training job is interrupted or preempted, you can restart from the exact same point in the data stream.

  • Una API intuitiva y declarativa: Tú defines la canalización de datos encadenando métodos simples y legibles. A partir de un origen MapDataset, puedes agregar transformaciones de forma fluida como .shuffle (), .map() y .batch(). Este estilo declarativo hace que la canalización de datos sea fácil de entender, modificar y mantener.

  • Unlocking true global shuffling: To get the best performance from your models, you need to shuffle your data effectively. When paired with a file format that supports random access, like ArrayRecord, Grain can perform a true global shuffle across your entire dataset, even when it doesn’t fit into host memory. This is a powerful feature that is often computationally impractical with other data loaders and formats.


¿Qué es ArrayRecord y por qué usarlo?

Si bien TFRecord es un estándar conocido, su naturaleza secuencial no permite una verdadera aleatoriedad gloval. ArrayRecord es un formato de archivo moderno diseñado específicamente para resolver este problema, ya que una nueva frontera en la eficiencia de los datos.

ArrayRecord File Layout

Cómo funciona: diseñado para la velocidad y el paralelismo

El alto rendimiento de ArrayRecord se basa en su diseño central, que se basa en el formato de archivo Riegeli de Google. Esta estructura proporciona varias ventajas clave para el manejo de datos a gran escala:

  1. Efficient random access: ArrayRecord features a built-in metadata index that maps every record to its precise location. This is the key design choice that enables instant, direct access to any record in the dataset, completely avoiding the need to read a file from the beginning.


2. Massive parallelism: Records are grouped into data chunks. This structure is inherently designed to be read in parallel, allowing multiple processes to read different chunks of the same file simultaneously to dramatically increase read throughput.


3. Exceptional performance: As a result of this indexed and chunked design, benchmarks show ArrayRecord can achieve a read throughput an order of magnitude higher than traditional formats, making it ideal for today's massive datasets.


4. Smart data integrity: The format handles data integrity intelligently by leveraging the powerful error correction in underlying cloud storage systems (like Google Cloud Storage) rather than adding redundant checks. This provides robust protection against corruption without unnecessary performance overhead.


¿Por qué lo usamos?

Las características de ArrayRecord habilitan directamente las capacidades avanzadas requeridas por los cargadores de datos modernos, como Grain.

The most important benefit is achieving true, deterministic global shuffling. Because any record can be accessed instantly, a data loader can generate perfectly randomized indices in the dataset on the fly as the training happens and then fetch data in that specific order. This capability, which is computationally impractical with sequential formats like TFRecord, is vital for reproducible research and optimal model training.


ArrayRecord y TFRecord: una comparación detallada

Aquí hay un desglose detallado de cómo ArrayRecord y TFRecord se comparan en características clave:

  1. Estructura

  • ArrayRecord se basa en el formato de archivoRiegeli de Google, que está diseñado para almacenar secuencias de registros con un enfoque en la decodificación de alta velocidad, la integridad de los datos y la compresión fuerte. Agrupa los registros en fragmentos e incluye un índice de metadatos al final del archivo.

  • TFRecord es una secuencia de registros binarios, en la que cada registro suele ser un búfer de protocolo tf.train.Example.


2. Acceso aleatorio

  • ArrayRecord ofrece acceso aleatorio nativo y eficiente. Su estructura de archivos incluye un índice incorporado de posiciones de los registros, lo que permite un acceso directo y rápido a cualquier registro por su índice sin necesidad de leer todo el archivo.

  • TFRecord, por otro lado, carece de acceso aleatorio nativo. Como formato secuencial optimizado para la transmisión de datos, acceder a un registro específico requiere iterar a través del archivo desde el principio.


3. Aleatoriedad global

  • Con ArrayRecord, es posible realizar una verdadera aleatoriedad global. Gracias a su eficiente acceso aleatorio, un cargador de datos como Grain puede generar índices en un orden aleatorio y leer registros sobre la marcha.

  • Con TFRecord, es difícil lograr una verdadera aleatoriedad global. La aleatoriedad "global" a menudo se basa en aproximaciones, como aleatorizar una lista de nombres de archivo fragmentados y luego aleatorizar registros dentro de un pequeño búfer de memoria. Esta no es una verdadera aleatoriedad global.


4. E/S paralelas

  • ArrayRecord admite de forma nativa de E/S paralelas. La estructura interna fragmentada de un archivo ArrayRecord hace que sea inherentemente fácil para múltiples procesos leer diferentes partes del mismo archivo en paralelo, lo que simplifica la administración de datos.

  • TFRecord admite la lectura paralela, pero generalmente se logra dividiendo el conjunto de datos en muchos archivos TFRecord pequeños y haciendo que diferentes trabajadores lean de diferentes archivos. Esto puede resultar en una gran cantidad de archivos para administrar.


5. Integración

  • ArrayRecord está diseñado para E/S de alto rendimiento y funciona a la perfección con cargadores basados en JAX como Grain. También se puede utilizar dentro del ecosistema TensorFlow a través de tfds.data_source.

  • TFRecord está estrechamente integrado con el ecosistema de tf.data de TensorFlow.


6. Caso de uso principal

  • ArrayRecord is ideal for high-throughput data loading for performance-critical machine learning, especially where determinism and true global shuffling are required (e.g., JAX/TPU workloads).

  • TFRecord is suited for general-purpose, large-scale data storage for TensorFlow and is optimized for sequential reads.


Cómo convertir conjuntos de datos TFRecord a ArrayRecord

El método para convertir conjuntos de datos dependerá de si se trata de un conjunto de datos estándar y registrado en el catálogo TensorFlow Datasets (TFDS) o de un conjunto de datos personalizado y patentado.

Método 1: Para conjuntos de datos estándar en el catálogo TFDS

Si usas un conjunto de datos conocido, como cifar10 o imagenet2012, la herramienta de línea de comandos tfds es el método más sencillo.

Requisito previo: Instalar los conjuntos de datos de TensorFlow

pip install -q --upgrade tfds-nightly
Shell

Uso de la CLI de compilación tfds

Este comando descarga los datos de origen, ejecuta la lógica de preparación y guarda la salida en el formato deseado.

# Generate the 'cifar10' dataset in ArrayRecord format
tfds build cifar10 --file_format=array_record
Shell

Los archivos ArrayRecord generados se guardarán en tu directorio ~/tensorflow_datasets/, listos para usar.

Método 2: Para conjuntos de datos TFRecord personalizados o patentados

Para la conversión a gran escala de tus propios conjuntos de datos TFRecord personalizados, el enfoque recomendado es utilizar Apache Beam. La biblioteca array_record proporciona canalizaciones de Beam listas para usar que hacen que esta conversión sea increíblemente simple y escalable. Este método es muy recomendable para conjuntos de datos masivos, ya que el procesamiento se puede distribuir entre muchos trabajadores que utilizan un servicio como Google Cloud Dataflow.

Requisitos previos: Instalar Apache Beam y Array Record Beam SDK

pip install -q apache-beam
pip install -q array-record-beam-sdk
Shell

Uso de la canalización de conversión lista para usar

El módulo array_record.beam.pipelines contiene la función convert_tf_to_arrayrecord_disk_match_shards, una utilidad especialmente diseñada que maneja todo el proceso de conversión. Lee archivos TFRecord y escribe un conjunto de datos de ArrayRecord fragmentado correspondiente.

Así se usa en una secuencia de comandos Python:

from apache_beam.options import pipeline_options
from array_record.beam.pipelines import convert_tf_to_arrayrecord_disk_match_shards
 
# 1. Define tus patrones de entrada y salida.
# Este ejemplo usa rutas de Google Cloud Storage (GCS), que son comunes para conjuntos de datos grandes.
input_pattern = 'gs://your-gcs-bucket/path/to/records-*.tfrecord'
output_path = 'gs://your-gcs-bucket/path/to/converted-records'
 
# Diccionario de argumentos para la función de conversión.
args = {
    'input': input_pattern,
    'output': output_path,
}
 
# 2. Configura opciones de canalización para su ejecución.
 
# Para ejecutar localmente en tu máquina (para conjuntos de datos más pequeños o pruebas):
# No se necesitan opciones; se usa el ejecutor local de manera predeterminada.
local_pipeline_options = pipeline_options.PipelineOptions()
 
 
# Para ejecutar a escala en Google Cloud Dataflow (para conjuntos de datos grandes):
# Elimina las siguientes líneas y completa con los detalles de tu proyecto.
#
# dataflow_pipeline_options = pipeline_options.PipelineOptions(
#     runner='DataflowRunner',
#     project='your-gcp-project-id',
#     region='your-gcp-region',
#     # Podría requerirse un archivo requirements.txt para dependencias en los trabajadores de Dataflow.
#     # requirements_file='requirements.txt',
#     temp_location='gs://your-gcs-bucket/path/to/temp'
# )
 
 
# 3. Define y ejecuta la principal lógica de ejecución.
def main():
  print("Starting TFRecord to ArrayRecord conversion...")
  convert_tf_to_arrayrecord_disk_match_shards(
      args=args,
      # Pasa aquí las opciones apropiadas.
      # Usa `local_pipeline_options` para ejecuciones locales.
      # Usa `dataflow_pipeline_options` para ejecuciones en la nube.
      pipeline_options=local_pipeline_options,
  ).run()
  print(f"Conversion complete. ArrayRecord files written to '{output_path}'.")
 
if __name__ == '__main__':
  main()
Python

Este enfoque es más potente y robusto que escribir una canalización manual porque es una API probada y generalizada, diseñada específicamente para esta tarea, que maneja detalles como hacer coincidir fragmentos de salida con fragmentos de entrada automáticamente.


Cómo crear una canalización de Grain y ArrayRecord: guía conceptual

Una vez que tus datos estén en formato ArrayRecord, puedes definir tu canalización de entrada de alto rendimiento utilizando la API Dataset de Grain. El proceso implica crear una fuente y encadenar los métodos de transformación.

Paso 1: Crea un MapDataset a partir de una fuente de datos

Primero, apunta a tus archivos ArrayRecord para crear un MapDataset.

import grain 
 
# Path to your generated ArrayRecord files
file_paths = ["~/tensorflow_datasets/cifar10/3.0.2/cifar10-train.array_record-00000-of-00001"]
 
# Create a data source
data_source = grain.sources.ArrayRecordDataSource(file_paths)
 
# Create a MapDataset from the source
dataset = grain.MapDataset.source(data_source)
Python

Paso 2: Transformaciones de cadena (aleatoriedad, mapeo, lote)

Ahora, aplica las transformaciones al MapDataset. Cada método devuelve un nuevo MapDataset, lo que te permite encadenar llamadas de forma declarativa.

# Example parsing function
def parse_and_transform(record):
    # Your logic to parse features, augment data, etc.
    return {"record": record}
 
BATCH_SIZE = 32
 
# Chain transformations
# The order of operations matters.
dataset = (
    dataset.shuffle(seed=42)
           .map(parse_and_transform)
           .batch(batch_size=BATCH_SIZE, drop_remainder=True)
)
Python

Step 3: Create and use the DatasetIterator

Finalmente, crea un iterador a partir de tu conjunto de datos completamente definido para recorrer en bucle los datos de tu secuencia de comandos de entrenamiento.

# Create the stateful iterator
data_iterator = iter(dataset)
 
# You can now loop over the data
for batch in data_iterator:
    # Your training step with the batch...
    pass
 
    # For checkpoint saving/restoration, you can get/set the iterator's state
    # state = data_iterator.get_state()
    # data_iterator.set_state(state)
Python

Configuración de rendimiento

Para evitar que tu canalización de datos se convierta en un cuello de botella, debe utilizar el multiprocesamiento para cargar y preprocesar datos en paralelo con el ent|renamiento del modelo. En la API de Dataset, esto se logra agregando la transformación .mp_prefetch() a tu canalización.

Este método inicia un grupo de procesos de trabajo para preparar de forma asíncrona lotes de datos en segundo plano y almacenarlos en un búfer, de modo que estén listos en el momento en que tu ciclo de entrenamiento los necesite.

Así es como se aplica:

# The full pipeline with performance tuning.
dataset = (
    grain.MapDataset.source(data_source)
    .shuffle(seed=42)
    .map(parse_and_transform)
 
    # Convert to an iterable dataset to apply prefetching.
    .to_iter_dataset()
    .batch(batch_size=BATCH_SIZE, drop_remainder=True)
    # Apply multiprocessing and prefetching.
    .mp_prefetch(
        grain.multiprocessing.MultiprocessingOptions(
            num_workers=16   # Number of parallel worker processes.
        )
    )
)
 
# Create the final iterator
data_iterator = iter(dataset)
Python
  • num_workers: Especifica el número de procesos secundarios paralelos que se usarán para la carga de datos. Si notas que tu acelerador a menudo está inactivo esperando datos, aumentar este valor puede mejorar significativamente todo el proceso. La cantidad óptima dependerá de los núcleos de CPU disponibles en tu máquina y de la complejidad de tu función de mapeo.


Explorar más

¿Quieres profundizar y empezar a compilar? Consulta la documentación oficial y el código fuente de las tecnologías analizadas en esta guía.

Tecnologías fundamentales


Ejemplo del mundo real: Capacitación en LLM a gran escala

Las canalizaciones de datos deterministas y de rendimiento creadas con Grain y ArrayRecord son fundamentales para el entrenamiento de modelos a gran escala. Un buen ejemplo es MaxText, un modelo de lenguaje grande de código abierto y alto rendimiento escrito en JAX. MaxText aprovecha estas técnicas exactas de canalización de datos para ingresar datos de manera eficiente a grandes clústeres de TPU y GPU.