Creación de canalizaciones de datos de alto rendimiento con Grain y ArrayRecord

7 DE OCTUBRE DE 2025
Jiyang Kang Technical Program Manager
Shivaji Dutta Field Solutions Architect
Ihor Indyk Software Engineer
Felix Chern Software Engineer

Al entrenar modelos grandes en aceleradores potentes, como GPU y TPU, lo último que quieres es que tu acelerador esté inactivo, esperando datos. Todo tu sistema es tan rápido como su parte más lenta y, a menudo, ese cuello de botella es el canal de entrada de datos. Por lo tanto, para el aprendizaje automático a gran escala, es esencial contar con una canalización de datos eficiente y reproducible. En esta guía, verás cómo resolver este desafío creando de una canalización de datos sólida y eficiente con Grain, una biblioteca de carga de datos flexible para JAX, y ArrayRecord, un formato de archivo altamente eficiente.


Comprensión de los componentes principales

Grain: un cargador de datos de alto rendimiento para JAX

Grain es una biblioteca de carga de datos liviana y de código abierto diseñada específicamente para resolver este problema para cargas de trabajo basadas en JAX. Garantiza que los datos se carguen, preprocesen e inserten en tu modelo de manera eficiente, lo que le permite maximizar el rendimiento de tu hardware.

¿Por qué usar Grain?

Grain se basa en una filosofía de rendimiento, reproducibilidad y flexibilidad. Estos son sus beneficios clave:

  • Rendimiento excepcional: Grain está diseñado para la velocidad. Utiliza un multiprocesamiento eficiente (a través del método .mp_prefetch()) para ejecutar la carga de datos y las transformaciones en paralelo, lo que asegura que siempre haya un búfer de datos preparado para tu modelo. Esto mantiene a tus aceleradores saturados y minimiza el tiempo de entrenamiento.

  • Determinismo y reproducibilidad garantizados: Grain proporciona una reproducibilidad completa, lo cual es fundamental para una investigación creíble. Con un valor inicial simple, garanizas que los datos siempre se elijan aleatoriamente de la misma manera. Fundamentalmente, sus iteradores de datos tienen estado y pueden tener puntos de control. Esto significa que si tu trabajo de entrenamiento se interrumpe, puedes reiniciar desde el mismo punto exacto en el flujo de datos.

  • Una API intuitiva y declarativa: Tú defines la canalización de datos encadenando métodos simples y legibles. A partir de un origen MapDataset, puedes agregar transformaciones de forma fluida como .shuffle (), .map() y .batch(). Este estilo declarativo hace que la canalización de datos sea fácil de entender, modificar y mantener.

  • Desbloquear la aleatoriedad global verdadera: Para obtener el mejor rendimiento de tus modelos, debes aleatorizar los datos de manera efectiva. Cuando se combina con un formato de archivo que admite acceso aleatorio, como ArrayRecord, Grain puede realizar una verdadera aleatoriedad global en todo tu conjunto de datos, incluso cuando no entran en la memoria del host. Esta es una característica poderosa que a menudo es computacionalmente poco práctica con otros cargadores de datos y formatos.


¿Qué es ArrayRecord y por qué usarlo?

Si bien TFRecord es un estándar conocido, su naturaleza secuencial no permite una verdadera aleatoriedad gloval. ArrayRecord es un formato de archivo moderno diseñado específicamente para resolver este problema, ya que una nueva frontera en la eficiencia de los datos.

ArrayRecord File Layout

Cómo funciona: diseñado para la velocidad y el paralelismo

El alto rendimiento de ArrayRecord se basa en su diseño central, que se basa en el formato de archivo Riegeli de Google. Esta estructura proporciona varias ventajas clave para el manejo de datos a gran escala:

  1. Acceso aleatorio eficiente: ArrayRecord cuenta con un índice de metadatos incorporado que asigna cada registro a su ubicación precisa. Esta es la opción de diseño clave que permite un acceso instantáneo y directo a cualquier registro del conjunto de datos, lo que evita por completo la necesidad de leer un archivo desde el principio.


2. Paralelismo masivo: Los registros se agrupan en fragmentos de datos. Esta estructura se diseñó inherentemente para que se lea en paralelo, lo que permite que múltiples procesos lean diferentes fragmentos del mismo archivo simultáneamente para aumentar en gran medida el rendimiento de lectura.


3. Rendimiento excepcional: Como resultado de este diseño indexado y fragmentado, las comparativas muestran que ArrayRecord puede lograr un rendimiento de lectura un orden de magnitud más alto que los formatos tradicionales, por lo que es ideal para los conjuntos de datos masivos de hoy en día.


4. Integridad de datos inteligente: El formato maneja la integridad de los datos de manera inteligente aprovechando la potente corrección de errores en los sistemas de almacenamiento en la nube subyacentes (como Google Cloud Storage), en lugar de agregar comprobaciones redundantes. De esta manera, se proporciona una protección sólida contra la corrupción sin sobrecargas de rendimiento innecesarias.


¿Por qué lo usamos?

Las características de ArrayRecord habilitan directamente las capacidades avanzadas requeridas por los cargadores de datos modernos, como Grain.

El beneficio más importante es lograr una aleatoriedad global verdadera y determinista. Debido a que se puede acceder a cualquier registro al instante, un cargador de datos puede generar índices perfectamente aleatorios en el conjunto de datos sobre la marcha a medida que se realiza el entrenamiento y luego obtener datos en ese orden específico. Esta capacidad, que es computacionalmente poco práctica con formatos secuenciales como TFRecord, es vital para la investigación reproducible y el entrenamiento óptimo del modelo.


ArrayRecord y TFRecord: una comparación detallada

Aquí hay un desglose detallado de cómo ArrayRecord y TFRecord se comparan en características clave:

  1. Estructura

  • ArrayRecord se basa en el formato de archivoRiegeli de Google, que está diseñado para almacenar secuencias de registros con un enfoque en la decodificación de alta velocidad, la integridad de los datos y la compresión fuerte. Agrupa los registros en fragmentos e incluye un índice de metadatos al final del archivo.

  • TFRecord es una secuencia de registros binarios, en la que cada registro suele ser un búfer de protocolo tf.train.Example.


2. Acceso aleatorio

  • ArrayRecord ofrece acceso aleatorio nativo y eficiente. Su estructura de archivos incluye un índice incorporado de posiciones de los registros, lo que permite un acceso directo y rápido a cualquier registro por su índice sin necesidad de leer todo el archivo.

  • TFRecord, por otro lado, carece de acceso aleatorio nativo. Como formato secuencial optimizado para la transmisión de datos, acceder a un registro específico requiere iterar a través del archivo desde el principio.


3. Aleatoriedad global

  • Con ArrayRecord, es posible realizar una verdadera aleatoriedad global. Gracias a su eficiente acceso aleatorio, un cargador de datos como Grain puede generar índices en un orden aleatorio y leer registros sobre la marcha.

  • Con TFRecord, es difícil lograr una verdadera aleatoriedad global. La aleatoriedad "global" a menudo se basa en aproximaciones, como aleatorizar una lista de nombres de archivo fragmentados y luego aleatorizar registros dentro de un pequeño búfer de memoria. Esta no es una verdadera aleatoriedad global.


4. E/S paralelas

  • ArrayRecord admite de forma nativa de E/S paralelas. La estructura interna fragmentada de un archivo ArrayRecord hace que sea inherentemente fácil para múltiples procesos leer diferentes partes del mismo archivo en paralelo, lo que simplifica la administración de datos.

  • TFRecord admite la lectura paralela, pero generalmente se logra dividiendo el conjunto de datos en muchos archivos TFRecord pequeños y haciendo que diferentes trabajadores lean de diferentes archivos. Esto puede resultar en una gran cantidad de archivos para administrar.


5. Integración

  • ArrayRecord está diseñado para E/S de alto rendimiento y funciona a la perfección con cargadores basados en JAX como Grain. También se puede utilizar dentro del ecosistema TensorFlow a través de tfds.data_source.

  • TFRecord está estrechamente integrado con el ecosistema de tf.data de TensorFlow.


6. Caso de uso principal

  • ArrayRecord es ideal para la carga de datos de alto rendimiento para el aprendizaje automático crítico para el rendimiento, especialmente cuando se requiere determinismo y una verdadera aleatoriedad global (por ejemplo, cargas de trabajo JAX/TPU).

  • TFRecord es adecuado para el almacenamiento de datos a gran escala de uso general para TensorFlow y está optimizado para lecturas secuenciales.


Cómo convertir conjuntos de datos TFRecord a ArrayRecord

El método para convertir conjuntos de datos dependerá de si se trata de un conjunto de datos estándar y registrado en el catálogo TensorFlow Datasets (TFDS) o de un conjunto de datos personalizado y patentado.

Método 1: Para conjuntos de datos estándar en el catálogo TFDS

Si usas un conjunto de datos conocido, como cifar10 o imagenet2012, la herramienta de línea de comandos tfds es el método más sencillo.

Requisito previo: Instalar los conjuntos de datos de TensorFlow

pip install -q --upgrade tfds-nightly
Shell

Uso de la CLI de compilación tfds

Este comando descarga los datos de origen, ejecuta la lógica de preparación y guarda la salida en el formato deseado.

# Generate the 'cifar10' dataset in ArrayRecord format
tfds build cifar10 --file_format=array_record
Shell

Los archivos ArrayRecord generados se guardarán en tu directorio ~/tensorflow_datasets/, listos para usar.

Método 2: Para conjuntos de datos TFRecord personalizados o patentados

Para la conversión a gran escala de tus propios conjuntos de datos TFRecord personalizados, el enfoque recomendado es utilizar Apache Beam. La biblioteca array_record proporciona canalizaciones de Beam listas para usar que hacen que esta conversión sea increíblemente simple y escalable. Este método es muy recomendable para conjuntos de datos masivos, ya que el procesamiento se puede distribuir entre muchos trabajadores que utilizan un servicio como Google Cloud Dataflow.

Requisitos previos: Instalar Apache Beam y Array Record Beam SDK

pip install -q apache-beam
pip install -q array-record-beam-sdk
Shell

Uso de la canalización de conversión lista para usar

El módulo array_record.beam.pipelines contiene la función convert_tf_to_arrayrecord_disk_match_shards, una utilidad especialmente diseñada que maneja todo el proceso de conversión. Lee archivos TFRecord y escribe un conjunto de datos de ArrayRecord fragmentado correspondiente.

Así se usa en una secuencia de comandos Python:

from apache_beam.options import pipeline_options
from array_record.beam.pipelines import convert_tf_to_arrayrecord_disk_match_shards
 
# 1. Define your input and output patterns.
# This example uses Google Cloud Storage (GCS) paths, which is common for large datasets.
input_pattern = 'gs://your-gcs-bucket/path/to/records-*.tfrecord'
output_path = 'gs://your-gcs-bucket/path/to/converted-records'
 
# Arguments dictionary for the conversion function.
args = {
    'input': input_pattern,
    'output': output_path,
}
 
# 2. Configure pipeline options for execution.
 
# To run locally on your machine (for smaller datasets or testing):
# No options are needed; the local runner is used by default.
local_pipeline_options = pipeline_options.PipelineOptions()
 
 
# To run at scale on Google Cloud Dataflow (for large datasets):
# Uncomment the following lines and fill in your project details.
#
# dataflow_pipeline_options = pipeline_options.PipelineOptions(
#     runner='DataflowRunner',
#     project='your-gcp-project-id',
#     region='your-gcp-region',
#     # A requirements.txt file may be needed for dependencies on Dataflow workers.
#     # requirements_file='requirements.txt',
#     temp_location='gs://your-gcs-bucket/path/to/temp'
# )
 
 
# 3. Define and run the main execution logic.
def main():
  print("Starting TFRecord to ArrayRecord conversion...")
  convert_tf_to_arrayrecord_disk_match_shards(
      args=args,
      # Pass the appropriate options here.
      # Use `local_pipeline_options` for local runs.
      # Use `dataflow_pipeline_options` for cloud runs.
      pipeline_options=local_pipeline_options,
  ).run()
  print(f"Conversion complete. ArrayRecord files written to '{output_path}'.")
 
if __name__ == '__main__':
  main()
Python

Este enfoque es más potente y robusto que escribir una canalización manual porque es una API probada y generalizada, diseñada específicamente para esta tarea, que maneja detalles como hacer coincidir fragmentos de salida con fragmentos de entrada automáticamente.


Cómo crear una canalización de Grain y ArrayRecord: guía conceptual

Una vez que tus datos estén en formato ArrayRecord, puedes definir tu canalización de entrada de alto rendimiento utilizando la API Dataset de Grain. El proceso implica crear una fuente y encadenar los métodos de transformación.

Paso 1: Crea un MapDataset a partir de una fuente de datos

Primero, apunta a tus archivos ArrayRecord para crear un MapDataset.

import grain 
 
# Path to your generated ArrayRecord files
file_paths = ["~/tensorflow_datasets/cifar10/3.0.2/cifar10-train.array_record-00000-of-00001"]
 
# Create a data source
data_source = grain.sources.ArrayRecordDataSource(file_paths)
 
# Create a MapDataset from the source
dataset = grain.MapDataset.source(data_source)
Python

Paso 2: Transformaciones de cadena (aleatoriedad, mapeo, lote)

Ahora, aplica las transformaciones al MapDataset. Cada método devuelve un nuevo MapDataset, lo que te permite encadenar llamadas de forma declarativa.

# Example parsing function
def parse_and_transform(record):
    # Your logic to parse features, augment data, etc.
    return {"record": record}
 
BATCH_SIZE = 32
 
# Chain transformations
# The order of operations matters.
dataset = (
    dataset.shuffle(seed=42)
           .map(parse_and_transform)
           .batch(batch_size=BATCH_SIZE, drop_remainder=True)
)
Python

Paso 3: Crea y usa DatasetIterator

Finalmente, crea un iterador a partir de tu conjunto de datos completamente definido para recorrer en bucle los datos de tu secuencia de comandos de entrenamiento.

# Create the stateful iterator
data_iterator = iter(dataset)
 
# You can now loop over the data
for batch in data_iterator:
    # Your training step with the batch...
    pass
 
    # For checkpoint saving/restoration, you can get/set the iterator's state
    # state = data_iterator.get_state()
    # data_iterator.set_state(state)
Python

Configuración de rendimiento

Para evitar que tu canalización de datos se convierta en un cuello de botella, debe utilizar el multiprocesamiento para cargar y preprocesar datos en paralelo con el ent|renamiento del modelo. En la API de Dataset, esto se logra agregando la transformación .mp_prefetch() a tu canalización.

Este método inicia un grupo de procesos de trabajo para preparar de forma asíncrona lotes de datos en segundo plano y almacenarlos en un búfer, de modo que estén listos en el momento en que tu ciclo de entrenamiento los necesite.

Así es como se aplica:

# The full pipeline with performance tuning.
dataset = (
    grain.MapDataset.source(data_source)
    .shuffle(seed=42)
    .map(parse_and_transform)
 
    # Convert to an iterable dataset to apply prefetching.
    .to_iter_dataset()
    .batch(batch_size=BATCH_SIZE, drop_remainder=True)
    # Apply multiprocessing and prefetching.
    .mp_prefetch(
        grain.multiprocessing.MultiprocessingOptions(
            num_workers=16   # Number of parallel worker processes.
        )
    )
)
 
# Create the final iterator
data_iterator = iter(dataset)
Python
  • num_workers: Especifica el número de procesos secundarios paralelos que se usarán para la carga de datos. Si notas que tu acelerador a menudo está inactivo esperando datos, aumentar este valor puede mejorar significativamente todo el proceso. La cantidad óptima dependerá de los núcleos de CPU disponibles en tu máquina y de la complejidad de tu función de mapeo.


Explorar más

¿Quieres profundizar y empezar a compilar? Consulta la documentación oficial y el código fuente de las tecnologías analizadas en esta guía.

Tecnologías fundamentales


Ejemplo del mundo real: Capacitación en LLM a gran escala

Las canalizaciones de datos deterministas y de rendimiento creadas con Grain y ArrayRecord son fundamentales para el entrenamiento de modelos a gran escala. Un buen ejemplo es MaxText, un modelo de lenguaje grande de código abierto y alto rendimiento escrito en JAX. MaxText aprovecha estas técnicas exactas de canalización de datos para ingresar datos de manera eficiente a grandes clústeres de TPU y GPU.