Al entrenar modelos grandes en aceleradores potentes, como GPU y TPU, lo último que quieres es que tu acelerador esté inactivo, esperando datos. Todo tu sistema es tan rápido como su parte más lenta y, a menudo, ese cuello de botella es el canal de entrada de datos. Por lo tanto, para el aprendizaje automático a gran escala, es esencial contar con una canalización de datos eficiente y reproducible. En esta guía, verás cómo resolver este desafío creando de una canalización de datos sólida y eficiente con Grain, una biblioteca de carga de datos flexible para JAX, y ArrayRecord, un formato de archivo altamente eficiente.
Grain es una biblioteca de carga de datos liviana y de código abierto diseñada específicamente para resolver este problema para cargas de trabajo basadas en JAX. Garantiza que los datos se carguen, preprocesen e inserten en tu modelo de manera eficiente, lo que le permite maximizar el rendimiento de tu hardware.
Grain se basa en una filosofía de rendimiento, reproducibilidad y flexibilidad. Estos son sus beneficios clave:
.mp_prefetch()
method) to run data loading and transformations in parallel, ensuring that a buffer of prepared data is always ready for your model. This keeps your accelerators saturated and minimizes training time..shuffle ()
, .map()
y .batch()
. Este estilo declarativo hace que la canalización de datos sea fácil de entender, modificar y mantener.Si bien TFRecord es un estándar conocido, su naturaleza secuencial no permite una verdadera aleatoriedad gloval. ArrayRecord es un formato de archivo moderno diseñado específicamente para resolver este problema, ya que una nueva frontera en la eficiencia de los datos.
El alto rendimiento de ArrayRecord se basa en su diseño central, que se basa en el formato de archivo Riegeli de Google. Esta estructura proporciona varias ventajas clave para el manejo de datos a gran escala:
2. Massive parallelism: Records are grouped into data chunks. This structure is inherently designed to be read in parallel, allowing multiple processes to read different chunks of the same file simultaneously to dramatically increase read throughput.
3. Exceptional performance: As a result of this indexed and chunked design, benchmarks show ArrayRecord can achieve a read throughput an order of magnitude higher than traditional formats, making it ideal for today's massive datasets.
4. Smart data integrity: The format handles data integrity intelligently by leveraging the powerful error correction in underlying cloud storage systems (like Google Cloud Storage) rather than adding redundant checks. This provides robust protection against corruption without unnecessary performance overhead.
Las características de ArrayRecord habilitan directamente las capacidades avanzadas requeridas por los cargadores de datos modernos, como Grain.
The most important benefit is achieving true, deterministic global shuffling. Because any record can be accessed instantly, a data loader can generate perfectly randomized indices in the dataset on the fly as the training happens and then fetch data in that specific order. This capability, which is computationally impractical with sequential formats like TFRecord, is vital for reproducible research and optimal model training.
Aquí hay un desglose detallado de cómo ArrayRecord y TFRecord se comparan en características clave:
2. Acceso aleatorio
3. Aleatoriedad global
4. E/S paralelas
5. Integración
6. Caso de uso principal
El método para convertir conjuntos de datos dependerá de si se trata de un conjunto de datos estándar y registrado en el catálogo TensorFlow Datasets (TFDS) o de un conjunto de datos personalizado y patentado.
Si usas un conjunto de datos conocido, como cifar10
o imagenet2012
, la herramienta de línea de comandos tfds es el método más sencillo.
Requisito previo: Instalar los conjuntos de datos de TensorFlow
pip install -q --upgrade tfds-nightly
Uso de la CLI de compilación tfds
Este comando descarga los datos de origen, ejecuta la lógica de preparación y guarda la salida en el formato deseado.
# Generate the 'cifar10' dataset in ArrayRecord format
tfds build cifar10 --file_format=array_record
Los archivos ArrayRecord generados se guardarán en tu directorio ~/tensorflow_datasets/
, listos para usar.
Para la conversión a gran escala de tus propios conjuntos de datos TFRecord personalizados, el enfoque recomendado es utilizar Apache Beam. La biblioteca array_record
proporciona canalizaciones de Beam listas para usar que hacen que esta conversión sea increíblemente simple y escalable. Este método es muy recomendable para conjuntos de datos masivos, ya que el procesamiento se puede distribuir entre muchos trabajadores que utilizan un servicio como Google Cloud Dataflow.
Requisitos previos: Instalar Apache Beam y Array Record Beam SDK
pip install -q apache-beam
pip install -q array-record-beam-sdk
Uso de la canalización de conversión lista para usar
El módulo array_record.beam.pipelines
contiene la función convert_tf_to_arrayrecord_disk_match_shards
, una utilidad especialmente diseñada que maneja todo el proceso de conversión. Lee archivos TFRecord y escribe un conjunto de datos de ArrayRecord fragmentado correspondiente.
Así se usa en una secuencia de comandos Python:
from apache_beam.options import pipeline_options
from array_record.beam.pipelines import convert_tf_to_arrayrecord_disk_match_shards
# 1. Define tus patrones de entrada y salida.
# Este ejemplo usa rutas de Google Cloud Storage (GCS), que son comunes para conjuntos de datos grandes.
input_pattern = 'gs://your-gcs-bucket/path/to/records-*.tfrecord'
output_path = 'gs://your-gcs-bucket/path/to/converted-records'
# Diccionario de argumentos para la función de conversión.
args = {
'input': input_pattern,
'output': output_path,
}
# 2. Configura opciones de canalización para su ejecución.
# Para ejecutar localmente en tu máquina (para conjuntos de datos más pequeños o pruebas):
# No se necesitan opciones; se usa el ejecutor local de manera predeterminada.
local_pipeline_options = pipeline_options.PipelineOptions()
# Para ejecutar a escala en Google Cloud Dataflow (para conjuntos de datos grandes):
# Elimina las siguientes líneas y completa con los detalles de tu proyecto.
#
# dataflow_pipeline_options = pipeline_options.PipelineOptions(
# runner='DataflowRunner',
# project='your-gcp-project-id',
# region='your-gcp-region',
# # Podría requerirse un archivo requirements.txt para dependencias en los trabajadores de Dataflow.
# # requirements_file='requirements.txt',
# temp_location='gs://your-gcs-bucket/path/to/temp'
# )
# 3. Define y ejecuta la principal lógica de ejecución.
def main():
print("Starting TFRecord to ArrayRecord conversion...")
convert_tf_to_arrayrecord_disk_match_shards(
args=args,
# Pasa aquí las opciones apropiadas.
# Usa `local_pipeline_options` para ejecuciones locales.
# Usa `dataflow_pipeline_options` para ejecuciones en la nube.
pipeline_options=local_pipeline_options,
).run()
print(f"Conversion complete. ArrayRecord files written to '{output_path}'.")
if __name__ == '__main__':
main()
Este enfoque es más potente y robusto que escribir una canalización manual porque es una API probada y generalizada, diseñada específicamente para esta tarea, que maneja detalles como hacer coincidir fragmentos de salida con fragmentos de entrada automáticamente.
Una vez que tus datos estén en formato ArrayRecord, puedes definir tu canalización de entrada de alto rendimiento utilizando la API Dataset
de Grain. El proceso implica crear una fuente y encadenar los métodos de transformación.
Primero, apunta a tus archivos ArrayRecord para crear un MapDataset
.
import grain
# Path to your generated ArrayRecord files
file_paths = ["~/tensorflow_datasets/cifar10/3.0.2/cifar10-train.array_record-00000-of-00001"]
# Create a data source
data_source = grain.sources.ArrayRecordDataSource(file_paths)
# Create a MapDataset from the source
dataset = grain.MapDataset.source(data_source)
Ahora, aplica las transformaciones al MapDataset
. Cada método devuelve un nuevo MapDataset
, lo que te permite encadenar llamadas de forma declarativa.
# Example parsing function
def parse_and_transform(record):
# Your logic to parse features, augment data, etc.
return {"record": record}
BATCH_SIZE = 32
# Chain transformations
# The order of operations matters.
dataset = (
dataset.shuffle(seed=42)
.map(parse_and_transform)
.batch(batch_size=BATCH_SIZE, drop_remainder=True)
)
DatasetIterator
Finalmente, crea un iterador a partir de tu conjunto de datos completamente definido para recorrer en bucle los datos de tu secuencia de comandos de entrenamiento.
# Create the stateful iterator
data_iterator = iter(dataset)
# You can now loop over the data
for batch in data_iterator:
# Your training step with the batch...
pass
# For checkpoint saving/restoration, you can get/set the iterator's state
# state = data_iterator.get_state()
# data_iterator.set_state(state)
Para evitar que tu canalización de datos se convierta en un cuello de botella, debe utilizar el multiprocesamiento para cargar y preprocesar datos en paralelo con el ent|renamiento del modelo. En la API de Dataset, esto se logra agregando la transformación .mp_prefetch()
a tu canalización.
Este método inicia un grupo de procesos de trabajo para preparar de forma asíncrona lotes de datos en segundo plano y almacenarlos en un búfer, de modo que estén listos en el momento en que tu ciclo de entrenamiento los necesite.
Así es como se aplica:
# The full pipeline with performance tuning.
dataset = (
grain.MapDataset.source(data_source)
.shuffle(seed=42)
.map(parse_and_transform)
# Convert to an iterable dataset to apply prefetching.
.to_iter_dataset()
.batch(batch_size=BATCH_SIZE, drop_remainder=True)
# Apply multiprocessing and prefetching.
.mp_prefetch(
grain.multiprocessing.MultiprocessingOptions(
num_workers=16 # Number of parallel worker processes.
)
)
)
# Create the final iterator
data_iterator = iter(dataset)
num_workers
: Especifica el número de procesos secundarios paralelos que se usarán para la carga de datos. Si notas que tu acelerador a menudo está inactivo esperando datos, aumentar este valor puede mejorar significativamente todo el proceso. La cantidad óptima dependerá de los núcleos de CPU disponibles en tu máquina y de la complejidad de tu función de mapeo.¿Quieres profundizar y empezar a compilar? Consulta la documentación oficial y el código fuente de las tecnologías analizadas en esta guía.
Las canalizaciones de datos deterministas y de rendimiento creadas con Grain y ArrayRecord son fundamentales para el entrenamiento de modelos a gran escala. Un buen ejemplo es MaxText, un modelo de lenguaje grande de código abierto y alto rendimiento escrito en JAX. MaxText aprovecha estas técnicas exactas de canalización de datos para ingresar datos de manera eficiente a grandes clústeres de TPU y GPU.