Al entrenar modelos grandes en aceleradores potentes, como GPU y TPU, lo último que quieres es que tu acelerador esté inactivo, esperando datos. Todo tu sistema es tan rápido como su parte más lenta y, a menudo, ese cuello de botella es el canal de entrada de datos. Por lo tanto, para el aprendizaje automático a gran escala, es esencial contar con una canalización de datos eficiente y reproducible. En esta guía, verás cómo resolver este desafío creando de una canalización de datos sólida y eficiente con Grain, una biblioteca de carga de datos flexible para JAX, y ArrayRecord, un formato de archivo altamente eficiente.
Grain es una biblioteca de carga de datos liviana y de código abierto diseñada específicamente para resolver este problema para cargas de trabajo basadas en JAX. Garantiza que los datos se carguen, preprocesen e inserten en tu modelo de manera eficiente, lo que le permite maximizar el rendimiento de tu hardware.
Grain se basa en una filosofía de rendimiento, reproducibilidad y flexibilidad. Estos son sus beneficios clave:
.mp_prefetch()) para ejecutar la carga de datos y las transformaciones en paralelo, lo que asegura que siempre haya un búfer de datos preparado para tu modelo. Esto mantiene a tus aceleradores saturados y minimiza el tiempo de entrenamiento..shuffle (), .map() y .batch(). Este estilo declarativo hace que la canalización de datos sea fácil de entender, modificar y mantener.Si bien TFRecord es un estándar conocido, su naturaleza secuencial no permite una verdadera aleatoriedad gloval. ArrayRecord es un formato de archivo moderno diseñado específicamente para resolver este problema, ya que una nueva frontera en la eficiencia de los datos.
El alto rendimiento de ArrayRecord se basa en su diseño central, que se basa en el formato de archivo Riegeli de Google. Esta estructura proporciona varias ventajas clave para el manejo de datos a gran escala:
2. Paralelismo masivo: Los registros se agrupan en fragmentos de datos. Esta estructura se diseñó inherentemente para que se lea en paralelo, lo que permite que múltiples procesos lean diferentes fragmentos del mismo archivo simultáneamente para aumentar en gran medida el rendimiento de lectura.
3. Rendimiento excepcional: Como resultado de este diseño indexado y fragmentado, las comparativas muestran que ArrayRecord puede lograr un rendimiento de lectura un orden de magnitud más alto que los formatos tradicionales, por lo que es ideal para los conjuntos de datos masivos de hoy en día.
4. Integridad de datos inteligente: El formato maneja la integridad de los datos de manera inteligente aprovechando la potente corrección de errores en los sistemas de almacenamiento en la nube subyacentes (como Google Cloud Storage), en lugar de agregar comprobaciones redundantes. De esta manera, se proporciona una protección sólida contra la corrupción sin sobrecargas de rendimiento innecesarias.
Las características de ArrayRecord habilitan directamente las capacidades avanzadas requeridas por los cargadores de datos modernos, como Grain.
El beneficio más importante es lograr una aleatoriedad global verdadera y determinista. Debido a que se puede acceder a cualquier registro al instante, un cargador de datos puede generar índices perfectamente aleatorios en el conjunto de datos sobre la marcha a medida que se realiza el entrenamiento y luego obtener datos en ese orden específico. Esta capacidad, que es computacionalmente poco práctica con formatos secuenciales como TFRecord, es vital para la investigación reproducible y el entrenamiento óptimo del modelo.
Aquí hay un desglose detallado de cómo ArrayRecord y TFRecord se comparan en características clave:
2. Acceso aleatorio
3. Aleatoriedad global
4. E/S paralelas
5. Integración
6. Caso de uso principal
El método para convertir conjuntos de datos dependerá de si se trata de un conjunto de datos estándar y registrado en el catálogo TensorFlow Datasets (TFDS) o de un conjunto de datos personalizado y patentado.
Si usas un conjunto de datos conocido, como cifar10 o imagenet2012, la herramienta de línea de comandos tfds es el método más sencillo.
Requisito previo: Instalar los conjuntos de datos de TensorFlow
pip install -q --upgrade tfds-nightly
Uso de la CLI de compilación tfds
Este comando descarga los datos de origen, ejecuta la lógica de preparación y guarda la salida en el formato deseado.
# Generate the 'cifar10' dataset in ArrayRecord format
tfds build cifar10 --file_format=array_record
Los archivos ArrayRecord generados se guardarán en tu directorio ~/tensorflow_datasets/, listos para usar.
Para la conversión a gran escala de tus propios conjuntos de datos TFRecord personalizados, el enfoque recomendado es utilizar Apache Beam. La biblioteca array_record proporciona canalizaciones de Beam listas para usar que hacen que esta conversión sea increíblemente simple y escalable. Este método es muy recomendable para conjuntos de datos masivos, ya que el procesamiento se puede distribuir entre muchos trabajadores que utilizan un servicio como Google Cloud Dataflow.
Requisitos previos: Instalar Apache Beam y Array Record Beam SDK
pip install -q apache-beam
pip install -q array-record-beam-sdk
Uso de la canalización de conversión lista para usar
El módulo array_record.beam.pipelines contiene la función convert_tf_to_arrayrecord_disk_match_shards, una utilidad especialmente diseñada que maneja todo el proceso de conversión. Lee archivos TFRecord y escribe un conjunto de datos de ArrayRecord fragmentado correspondiente.
Así se usa en una secuencia de comandos Python:
from apache_beam.options import pipeline_options
from array_record.beam.pipelines import convert_tf_to_arrayrecord_disk_match_shards
# 1. Define your input and output patterns.
# This example uses Google Cloud Storage (GCS) paths, which is common for large datasets.
input_pattern = 'gs://your-gcs-bucket/path/to/records-*.tfrecord'
output_path = 'gs://your-gcs-bucket/path/to/converted-records'
# Arguments dictionary for the conversion function.
args = {
'input': input_pattern,
'output': output_path,
}
# 2. Configure pipeline options for execution.
# To run locally on your machine (for smaller datasets or testing):
# No options are needed; the local runner is used by default.
local_pipeline_options = pipeline_options.PipelineOptions()
# To run at scale on Google Cloud Dataflow (for large datasets):
# Uncomment the following lines and fill in your project details.
#
# dataflow_pipeline_options = pipeline_options.PipelineOptions(
# runner='DataflowRunner',
# project='your-gcp-project-id',
# region='your-gcp-region',
# # A requirements.txt file may be needed for dependencies on Dataflow workers.
# # requirements_file='requirements.txt',
# temp_location='gs://your-gcs-bucket/path/to/temp'
# )
# 3. Define and run the main execution logic.
def main():
print("Starting TFRecord to ArrayRecord conversion...")
convert_tf_to_arrayrecord_disk_match_shards(
args=args,
# Pass the appropriate options here.
# Use `local_pipeline_options` for local runs.
# Use `dataflow_pipeline_options` for cloud runs.
pipeline_options=local_pipeline_options,
).run()
print(f"Conversion complete. ArrayRecord files written to '{output_path}'.")
if __name__ == '__main__':
main()
Este enfoque es más potente y robusto que escribir una canalización manual porque es una API probada y generalizada, diseñada específicamente para esta tarea, que maneja detalles como hacer coincidir fragmentos de salida con fragmentos de entrada automáticamente.
Una vez que tus datos estén en formato ArrayRecord, puedes definir tu canalización de entrada de alto rendimiento utilizando la API Dataset de Grain. El proceso implica crear una fuente y encadenar los métodos de transformación.
Primero, apunta a tus archivos ArrayRecord para crear un MapDataset.
import grain
# Path to your generated ArrayRecord files
file_paths = ["~/tensorflow_datasets/cifar10/3.0.2/cifar10-train.array_record-00000-of-00001"]
# Create a data source
data_source = grain.sources.ArrayRecordDataSource(file_paths)
# Create a MapDataset from the source
dataset = grain.MapDataset.source(data_source)
Ahora, aplica las transformaciones al MapDataset. Cada método devuelve un nuevo MapDataset, lo que te permite encadenar llamadas de forma declarativa.
# Example parsing function
def parse_and_transform(record):
# Your logic to parse features, augment data, etc.
return {"record": record}
BATCH_SIZE = 32
# Chain transformations
# The order of operations matters.
dataset = (
dataset.shuffle(seed=42)
.map(parse_and_transform)
.batch(batch_size=BATCH_SIZE, drop_remainder=True)
)
DatasetIteratorFinalmente, crea un iterador a partir de tu conjunto de datos completamente definido para recorrer en bucle los datos de tu secuencia de comandos de entrenamiento.
# Create the stateful iterator
data_iterator = iter(dataset)
# You can now loop over the data
for batch in data_iterator:
# Your training step with the batch...
pass
# For checkpoint saving/restoration, you can get/set the iterator's state
# state = data_iterator.get_state()
# data_iterator.set_state(state)
Para evitar que tu canalización de datos se convierta en un cuello de botella, debe utilizar el multiprocesamiento para cargar y preprocesar datos en paralelo con el ent|renamiento del modelo. En la API de Dataset, esto se logra agregando la transformación .mp_prefetch() a tu canalización.
Este método inicia un grupo de procesos de trabajo para preparar de forma asíncrona lotes de datos en segundo plano y almacenarlos en un búfer, de modo que estén listos en el momento en que tu ciclo de entrenamiento los necesite.
Así es como se aplica:
# The full pipeline with performance tuning.
dataset = (
grain.MapDataset.source(data_source)
.shuffle(seed=42)
.map(parse_and_transform)
# Convert to an iterable dataset to apply prefetching.
.to_iter_dataset()
.batch(batch_size=BATCH_SIZE, drop_remainder=True)
# Apply multiprocessing and prefetching.
.mp_prefetch(
grain.multiprocessing.MultiprocessingOptions(
num_workers=16 # Number of parallel worker processes.
)
)
)
# Create the final iterator
data_iterator = iter(dataset)
num_workers: Especifica el número de procesos secundarios paralelos que se usarán para la carga de datos. Si notas que tu acelerador a menudo está inactivo esperando datos, aumentar este valor puede mejorar significativamente todo el proceso. La cantidad óptima dependerá de los núcleos de CPU disponibles en tu máquina y de la complejidad de tu función de mapeo.¿Quieres profundizar y empezar a compilar? Consulta la documentación oficial y el código fuente de las tecnologías analizadas en esta guía.
Las canalizaciones de datos deterministas y de rendimiento creadas con Grain y ArrayRecord son fundamentales para el entrenamiento de modelos a gran escala. Un buen ejemplo es MaxText, un modelo de lenguaje grande de código abierto y alto rendimiento escrito en JAX. MaxText aprovecha estas técnicas exactas de canalización de datos para ingresar datos de manera eficiente a grandes clústeres de TPU y GPU.