Membangun Data Pipelines Berkinerja Tinggi dengan Grain dan ArrayRecord

7 OKT. 2025
Jiyang Kang Technical Program Manager
Shivaji Dutta Field Solutions Architect
Ihor Indyk Software Engineer
Felix Chern Software Engineer

Saat melatih model besar pada akselerator yang kuat, seperti GPU dan TPU, hal terakhir yang Anda inginkan adalah akselerator tidak ada aktivitas, menunggu data. Seluruh sistem Anda hanya akan berjalan secepat bagian yang paling lambat, dan sering kali, bottleneck tersebut adalah pipeline input data. Oleh karena itu, untuk machine learning berskala besar, data pipeline yang efisien dan dapat direproduksi sangatlah penting. Panduan ini akan menunjukkan kepada Anda cara mengatasi tantangan ini dengan membangun data pipeline yang kuat dan berkinerja baik menggunakan Grain, library pemuatan data yang fleksibel untuk JAX, dan ArrayRecord, sebuah format file yang sangat efisien.


Memahami komponen inti

Grain: Loader data berkinerja tinggi untuk JAX

Grain adalah library pemuatan data open source yang ringan dan dirancang khusus untuk mengatasi masalah ini untuk beban kerja berbasis JAX. Grain memastikan bahwa data dimuat, diproses sebelumnya, dan dimasukkan ke model Anda secara efisien, sehingga Anda bisa memaksimalkan performa hardware Anda.

Mengapa menggunakan Grain?

Grain dibangun berdasarkan filosofi performa, reproduktifitas, dan fleksibilitas. Berikut adalah manfaat utama yang diberikannya:

  • Performa yang luar biasa: Grain dibangun untuk kecepatan. Ia menggunakan multipemrosesan yang efisien (melalui metode .mp_prefetch()) untuk menjalankan pemuatan data dan transformasi secara paralel, memastikan bahwa buffer data yang telah disiapkan selalu siap untuk model Anda. Hal ini membuat akselerator Anda tetap jenuh dan meminimalkan waktu pelatihan.

  • Determinisme dan reproduktifitas yang terjamin: Grain memberikan reproduktifitas penuh, yang sangat penting untuk riset yang kredibel. Dengan menetapkan bibit sederhana, Anda memastikan data selalu diacak dengan cara yang sama. Yang terpenting, iterator datanya bersifat stateful dan dapat di-checkpoint. Ini berarti jika tugas pelatihan Anda diinterupsi atau preempted, Anda bisa memulai ulang dari titik yang sama dalam aliran data.

  • API deklaratif yang intuitif: Anda menetapkan data pipeline dengan merangkai metode yang sederhana dan mudah dibaca. Mulai dengan sumber MapDataset, Anda bisa dengan mudah menambahkan transformasi, seperti .shuffle(), .map(), dan .batch(). Gaya deklaratif ini membuat data pipeline Anda mudah dipahami, dimodifikasi, dan dipelihara.

  • Membuka pengacakan global yang sebenarnya: Untuk mendapatkan performa terbaik dari model, Anda perlu mengacak data secara efektif. Ketika dipasangkan dengan format file yang mendukung akses acak, seperti ArrayRecord, Grain bisa melakukan pengacakan global yang sebenarnya di seluruh set data Anda, meskipun tidak muat di memori host. Ini adalah fitur kuat yang sering kali tidak praktis secara komputasi dengan loader dan format data lainnya.


Apa itu ArrayRecord dan mengapa menggunakannya?

Meskipun TFRecord adalah standar yang familier, sifat sekuensialnya tidak memungkinkan pengacakan global yang sebenarnya. ArrayRecord adalah format file modern yang dirancang khusus untuk mengatasi masalah ini, menawarkan terobosan baru dalam efisiensi data.

ArrayRecord File Layout

Bagaimana cara kerjanya: Dirancang untuk kecepatan dan paralelisme

Performa tinggi ArrayRecord berakar pada desain intinya, yang berbasis format file Riegeli dari Google. Struktur ini memberikan beberapa keuntungan utama untuk penanganan data berskala besar:

  1. Akses acak yang efisien: ArrayRecord memiliki indeks metadata bawaan yang memetakan setiap catatan ke lokasi akurat. Ini adalah pilihan desain utama yang memungkinkan akses langsung secara instan ke setiap catatan dalam set data, sehingga tidak perlu membaca file dari awal.


2. Paralelisme skala besar: Catatan dikelompokkan ke dalam potongan data. Struktur ini secara inheren dirancang untuk dibaca secara paralel, memungkinkan beberapa proses untuk membaca bagian yang berbeda dari file yang sama secara bersamaan untuk secara dramatis meningkatkan throughput baca.


3. Performa yang luar biasa: Berkat desain terindeks dan terpotong-potong ini, tolok ukur menunjukkan bahwa ArrayRecord bisa mencapai throughput baca yang jauh lebih tinggi daripada format tradisional, membuatnya ideal untuk set data yang sangat besar saat ini.


4. Integritas data yang cerdas: Format ini menangani integritas data secara cerdas dengan memanfaatkan koreksi error yang kuat dalam sistem penyimpanan cloud yang mendasarinya (seperti Google Cloud Storage) alih-alih menambahkan pemeriksaan yang berlebihan. Ini memberikan perlindungan yang kuat terhadap korupsi tanpa overhead performa yang tidak perlu.


Mengapa kami menggunakannya?

Fitur ArrayRecord secara langsung mendukung kemampuan lanjutan yang dibutuhkan oleh loader data modern seperti Grain.

Manfaat yang terpenting adalah mencapai pengacakan global deterministik yang sebenarnya. Karena setiap catatan bisa diakses secara instan, loader data bisa menghasilkan indeks yang teracak sempurna dalam set data dengan cepat saat pelatihan berlangsung dan kemudian mengambil data dalam urutan tertentu. Kemampuan ini, yang secara komputasi tidak praktis dengan format sekuensial seperti TFRecord, sangatlah penting untuk penelitian yang dapat direproduksi dan pelatihan model yang optimal.


ArrayRecord vs. TFRecord: Perbandingan terperinci

Berikut adalah uraian detail tentang perbandingan ArrayRecord dan TFRecord pada berbagai fitur utama:

  1. Struktur

  • ArrayRecord dibangun berbasis format file Riegeli dari Google, yang dirancang untuk menyimpan urutan catatan dengan fokus pada decoding berkecepatan tinggi, integritas data, dan kompresi yang kuat. Ia mengelompokkan catatan ke dalam potongan-potongan kecil dan menyertakan indeks metadata di akhir file.

  • TFRecord adalah urutan catatan biner, dengan setiap catatan biasanya berupa buffering protokol tf.train.Example.


2. Akses Acak

  • ArrayRecord menyediakan akses acak native dan efisien. Struktur filenya mencakup indeks posisi catatan bawaan, memungkinkan akses langsung dan cepat ke setiap catatan berdasarkan indeksnya tanpa perlu membaca seluruh file.

  • TFRecord, di sisi lain, tidak memiliki akses acak native. Karena format sekuensial dioptimalkan untuk streaming data, mengakses catatan tertentu memerlukan iterasi file dari awal.


3. Pengacakan Global

  • Dengan ArrayRecord, pengacakan global yang sebenarnya dapat dilakukan. Berkat akses acak yang efisien, loader data seperti Grain bisa menghasilkan indeks dalam urutan acak dan membaca catatan dengan cepat.

  • Dengan TFRecord, pengacakan global yang sebenarnya sulit dicapai. Pengacakan "Global" sering kali bergantung pada perkiraan, seperti mengacak daftar nama file yang dipecah-pecah dan kemudian mengacak catatan dalam buffering memori kecil. Ini bukanlah pengacakan global yang sebenarnya.


4. I/O Paralel

  • ArrayRecord secara native mendukung I/O paralel. Struktur potongan internal file ArrayRecord memudahkan beberapa proses untuk membaca bagian yang berbeda dari file yang sama secara paralel, yang menyederhanakan manajemen data.

  • TFRecord mendukung pembacaan paralel, tetapi biasanya tercapai dengan melakukan sharding set data menjadi banyak file TFRecord kecil dan meminta worker yang berbeda membaca dari file yang berbeda. Hal ini bisa mengakibatkan banyaknya file yang harus dikelola.


5. Integrasi

  • ArrayRecord dirancang untuk I/O berkinerja tinggi dan bekerja secara mulus dengan loader berbasis JAX seperti Grain. Ia juga bisa digunakan dalam ekosistem TensorFlow melalui tfds.data_source.

  • TFRecord terintegrasi penuh dengan ekosistem tf.data TensorFlow.


6. Kasus Penggunaan Utama

  • ArrayRecord sangat ideal untuk pemuatan data dengan throughput tinggi untuk machine learning yang sangat membutuhkan performa, terutama ketika determinisme dan pengacakan global yang sebenarnya diperlukan (mis., beban kerja JAX/TPU).

  • TFRecord cocok untuk penyimpanan data berskala besar dengan tujuan umum untuk TensorFlow dan dioptimalkan untuk pembacaan sekuensial.


Cara mengonversi set data TFRecord ke ArrayRecord

Metode konversi set data Anda bergantung pada apakah set data tersebut merupakan set data standar yang terdaftar di katalog TensorFlow Datasets (TFDS) atau set data khusus dan berpemilik.

Metode 1: Untuk set data standar dalam katalog TFDS

Jika Anda menggunakan set data populer, seperti cifar10 atau imagenet2012, alat command line tfds adalah metode yang paling mudah.

Prasyarat: Instal set data TensorFlow

pip install -q --upgrade tfds-nightly
Shell

Menggunakan CLI build tfds

Perintah ini mendownload data sumber, menjalankan logika persiapan, dan menyimpan output dalam format yang Anda inginkan.

# Generate the 'cifar10' dataset in ArrayRecord format
tfds build cifar10 --file_format=array_record
Shell

File ArrayRecord yang dihasilkan akan disimpan di direktori ~/tensorflow_datasets/ Anda, siap untuk digunakan.

Metode 2: Untuk set data TFRecord khusus atau berpemilik

Untuk konversi berskala besar set data TFRecord khusus milik Anda sendiri, pendekatan yang disarankan adalah menggunakan Apache Beam. Library array_record menyediakan pipeline Beam yang sudah dipaketkan yang membuat konversi ini sangat sederhana dan skalabel. Metode ini sangat direkomendasikan untuk set data yang sangat besar, karena pemrosesan bisa didistribusikan ke banyak worker menggunakan layanan seperti Google Cloud Dataflow.

Prasyarat: Instal Apache Beam dan Array Record Beam SDK

pip install -q apache-beam
pip install -q array-record-beam-sdk
Shell

Menggunakan pipeline konversi yang sudah dipaketkan

Modul array_record.beam.pipelines berisi fungsi convert_tf_to_arrayrecord_disk_match_shards, aplikasi utilitas yang dibuat khusus untuk menangani seluruh proses konversi. Ia membaca file TFRecord dan menulis pecahan set data ArrayRecord yang sesuai.

Berikut cara menggunakan fungsi ini dalam skrip Python:

from apache_beam.options import pipeline_options
from array_record.beam.pipelines import convert_tf_to_arrayrecord_disk_match_shards
 
# 1. Define your input and output patterns.
# This example uses Google Cloud Storage (GCS) paths, which is common for large datasets.
input_pattern = 'gs://your-gcs-bucket/path/to/records-*.tfrecord'
output_path = 'gs://your-gcs-bucket/path/to/converted-records'
 
# Arguments dictionary for the conversion function.
args = {
    'input': input_pattern,
    'output': output_path,
}
 
# 2. Configure pipeline options for execution.
 
# To run locally on your machine (for smaller datasets or testing):
# No options are needed; the local runner is used by default.
local_pipeline_options = pipeline_options.PipelineOptions()
 
 
# To run at scale on Google Cloud Dataflow (for large datasets):
# Uncomment the following lines and fill in your project details.
#
# dataflow_pipeline_options = pipeline_options.PipelineOptions(
#     runner='DataflowRunner',
#     project='your-gcp-project-id',
#     region='your-gcp-region',
#     # A requirements.txt file may be needed for dependencies on Dataflow workers.
#     # requirements_file='requirements.txt',
#     temp_location='gs://your-gcs-bucket/path/to/temp'
# )
 
 
# 3. Define and run the main execution logic.
def main():
  print("Starting TFRecord to ArrayRecord conversion...")
  convert_tf_to_arrayrecord_disk_match_shards(
      args=args,
      # Pass the appropriate options here.
      # Use `local_pipeline_options` for local runs.
      # Use `dataflow_pipeline_options` for cloud runs.
      pipeline_options=local_pipeline_options,
  ).run()
  print(f"Conversion complete. ArrayRecord files written to '{output_path}'.")
 
if __name__ == '__main__':
  main()
Python

Pendekatan ini lebih kuat dan tangguh daripada menulis pipeline manual karena ini adalah API tingkat tinggi yang telah teruji serta dirancang khusus untuk tugas ini, menangani detail seperti mencocokkan pecahan output dengan pecahan input secara otomatis.


Membangun pipeline Grain dan ArrayRecord: Panduan konseptual

Setelah data Anda berada dalam format ArrayRecord, Anda bisa mendefinisikan pipeline input berkinerja tinggi menggunakan Grain Dataset API. Prosesnya melibatkan pembuatan sumber dan kemudian metode transformasi berantai.

Langkah 1: Buat MapDataset dari Sumber Data

Pertama, tentukan file ArrayRecord Anda untuk membuat MapDataset.

import grain 
 
# Path to your generated ArrayRecord files
file_paths = ["~/tensorflow_datasets/cifar10/3.0.2/cifar10-train.array_record-00000-of-00001"]
 
# Create a data source
data_source = grain.sources.ArrayRecordDataSource(file_paths)
 
# Create a MapDataset from the source
dataset = grain.MapDataset.source(data_source)
Python

Langkah 2: Rantai Transformasi (Acak, Peta, Batch)

Sekarang, terapkan transformasi ke MapDataset. Setiap metode menghasilkan MapDataset baru, yang memungkinkan Anda melakukan panggilan berantai bersama-sama secara deklaratif.

# Example parsing function
def parse_and_transform(record):
    # Your logic to parse features, augment data, etc.
    return {"record": record}
 
BATCH_SIZE = 32
 
# Chain transformations
# The order of operations matters.
dataset = (
    dataset.shuffle(seed=42)
           .map(parse_and_transform)
           .batch(batch_size=BATCH_SIZE, drop_remainder=True)
)
Python

Langkah 3: Buat dan gunakan DatasetIterator

Terakhir, buat iterator dari set data yang telah didefinisikan secara lengkap untuk mengulang data dalam skrip pelatihan Anda.

# Create the stateful iterator
data_iterator = iter(dataset)
 
# You can now loop over the data
for batch in data_iterator:
    # Your training step with the batch...
    pass
 
    # For checkpoint saving/restoration, you can get/set the iterator's state
    # state = data_iterator.get_state()
    # data_iterator.set_state(state)
Python

Setelan konfigurasi performa

Untuk mencegah data pipeline mengalami bottleneck, Anda harus menggunakan multipemrosesan untuk memuat dan melakukan praproses data secara paralel dengan pelatihan model. Dalam Dataset API, ini dapat dicapai dengan menambahkan transformasi .mp_prefetch() ke pipeline Anda.

Metode ini memulai kumpulan proses worker untuk menyiapkan kumpulan data secara asinkron di latar belakang dan menyimpannya dalam buffer, sehingga mereka siap pada saat loop pelatihan Anda membutuhkannya.

Berikut ini adalah cara menerapkannya:

# The full pipeline with performance tuning.
dataset = (
    grain.MapDataset.source(data_source)
    .shuffle(seed=42)
    .map(parse_and_transform)
 
    # Convert to an iterable dataset to apply prefetching.
    .to_iter_dataset()
    .batch(batch_size=BATCH_SIZE, drop_remainder=True)
    # Apply multiprocessing and prefetching.
    .mp_prefetch(
        grain.multiprocessing.MultiprocessingOptions(
            num_workers=16   # Number of parallel worker processes.
        )
    )
)
 
# Create the final iterator
data_iterator = iter(dataset)
Python
  • num_workers: Ini menentukan jumlah proses turunan paralel yang digunakan untuk memuat data. Jika Anda melihat akselerator sering tidak ada aktivitas karena menunggu data, meningkatkan nilai ini bisa meningkatkan performa secara signifikan. Jumlah optimal bergantung pada inti CPU yang tersedia pada mesin Anda dan kompleksitas fungsi peta Anda.


Jelajahi lebih jauh

Ingin mempelajari lebih dalam dan mulai membangun? Lihat dokumentasi resmi dan kode sumber untuk teknologi yang dibahas dalam panduan ini.

Teknologi dasar


Contoh dunia nyata: Pelatihan LLM berskala besar

Data pipeline deterministik serta berkinerja yang dibangun dengan Grain dan ArrayRecord sangatlah penting untuk pelatihan model berskala besar. Contoh terbaiknya adalah MaxText, Model Bahasa Besar open source berkinerja tinggi yang ditulis dalam JAX. MaxText memanfaatkan teknik data pipeline ini untuk memasukkan data secara efisien ke cluster TPU dan GPU besar.