Gemma untuk Streaming ML dengan Dataflow

AGU 16, 2024
Reza Rokni Google Senior Staff Dataflow
Ravin Kumar Google Data Scientist Language Applications

Gemma 2 adalah versi terbaru dalam keluarga model terbuka Google yang ringan dan canggih yang dibangun dari penelitian dan teknologi yang sama dengan yang digunakan untuk membuat model Gemini. Model bahasa besar (LLM) seperti Gemma sangatlah serbaguna, yang membuka banyak potensi integrasi untuk proses bisnis. Blog ini mengeksplorasi bagaimana Anda bisa menggunakan Gemma untuk mengukur sentimen percakapan, meringkas isi percakapan tersebut, dan membantu membuat balasan untuk percakapan yang sulit yang bisa disetujui oleh pengguna. Salah satu persyaratan utama adalah pelanggan yang telah mengungkapkan sentimen negatif dapat dilayani kebutuhannya dalam waktu cepat, yang berarti kita harus menggunakan pipeline data streaming yang memanfaatkan LLM dengan latensi minimal.


Gemma

Gemma 2 menawarkan performa tak tertandingi untuk ukurannya. Model Gemma telah terbukti mencapai hasil tolok ukur yang luar biasa, bahkan mengungguli beberapa model yang lebih besar. Ukuran model yang kecil memungkinkan arsitektur model ini diterapkan atau disematkan secara langsung ke dalam pipeline pemrosesan data streaming, sehingga memberikan manfaat, seperti:

  • Pelokalan data dengan panggilan worker lokal daripada RPC data ke sistem terpisah

  • Sistem tunggal untuk penskalaan otomatis, memungkinkan penggunaan metrik seperti tekanan balik pada sumber untuk digunakan sebagai sinyal langsung ke autoscaler

  • Sistem tunggal untuk mengamati dan memantau produksi

Dataflow menyediakan platform pemrosesan streaming dan batch terpadu yang skalabel. Dengan Dataflow, Anda bisa menggunakan Apache Beam Python SDK untuk mengembangkan data streaming dan pipeline pemrosesan peristiwa. Dataflow memberikan manfaat seperti berikut:

  • Dataflow terkelola sepenuhnya, penskalaan otomatis naik dan turun berdasarkan permintaan

  • Apache Beam menyediakan rangkaian transformasi siap pakai dengan sedikit kode yang bisa menghemat waktu, tenaga, dan biaya daripada menulis kode boilerplate generik. Lagipula, kode terbaik adalah kode yang tidak perlu Anda tulis

  • Dataflow ML secara langsung mendukung GPU, menginstal driver yang diperlukan, dan menyediakan akses ke berbagai perangkat GPU

Contoh berikut menunjukkan cara menyematkan model Gemma ke dalam pipeline data streaming untuk menjalankan inferensi menggunakan Dataflow.


Skenario

Skenario ini melibatkan rantai makanan yang sibuk bergulat dengan analisis dan menyimpan permintaan dukungan pelanggan bervolume tinggi melalui berbagai saluran chat. Interaksi ini mencakup chat yang dihasilkan oleh chatbot otomatis dan percakapan lebih kompleks yang membutuhkan perhatian staf dukungan langsung. Untuk menjawab tantangan ini, kami telah menetapkan tujuan yang ambisius:

  • Pertama, kami ingin mengelola dan menyimpan data chat secara efisien dengan meringkas interaksi positif untuk memudahkan referensi dan analisis di masa mendatang.

  • Kedua, kami ingin mengimplementasikan deteksi dan penyelesaian masalah secara real-time, menggunakan analisis sentimen untuk mengidentifikasi pelanggan yang tidak puas secara cepat dan menghasilkan respons yang sesuai untuk mengatasi masalah mereka.

Solusi ini menggunakan pipeline yang memproses pesan chat yang telah selesai dalam sekejap. Gemma digunakan pada instance pertama untuk melakukan pekerjaan analisis pemantauan sentimen chat ini. Semua chat kemudian diringkas, dengan chat yang bersentimen positif atau netral dikirim langsung ke platform data, BigQuery, dengan menggunakan I/O yang sudah tersedia dengan Dataflow. Untuk chat yang melaporkan sentimen negatif, kami menggunakan Gemma untuk meminta model membuat respons yang sesuai secara konteks untuk pelanggan yang tidak puas. Respons ini kemudian dikirim ke manusia untuk ditinjau, sehingga staf dukungan dapat menyaring pesan sebelum sampai ke pelanggan yang berpotensi tidak puas.

Dengan kasus penggunaan ini, kami mengeksplorasi beberapa aspek menarik dari penggunaan LLM dalam pipeline. Sebagai contoh, ada tantangan ketika kami harus memproses respons dalam kode, mengingat respons non-deterministik yang dapat diterima. Sebagai contoh, kami meminta LLM untuk merespons dalam JSON, yang tidak dijamin dapat melakukannya. Permintaan ini mengharuskan kami untuk mengurai dan memvalidasi respons, yang mirip dengan proses yang biasanya Anda lakukan untuk memproses data dari sumber yang mungkin tidak memiliki data terstruktur yang benar.

Dengan solusi ini, pelanggan bisa merasakan waktu respons yang lebih cepat dan menerima perhatian yang dipersonalisasi ketika masalah muncul. Otomatisasi ringkasan chat positif membuat staf dukungan memiliki lebih banyak waktu, sehingga mereka dapat fokus pada interaksi yang lebih kompleks. Selain itu, analisis mendalam terhadap data chat bisa mendorong pengambilan keputusan berbasis data, sementara skalabilitas sistem memungkinkannya beradaptasi dengan mudah terhadap volume chat yang terus meningkat tanpa mengorbankan kualitas respons.


Pipeline pemrosesan data

Alur pipeline bisa dilihat di bawah ini:

Data processing pipeline architecture

Pipeline level tinggi bisa dijelaskan dengan beberapa baris:

  1. Baca data ulasan dari Pub/Sub, sumber fitur pesan peristiwa kami. Data ini berisi ID chat dan histori chat sebagai payload JSON. Payload ini diproses di dalam pipeline.

2. Pipeline meneruskan teks dari pesan ini ke Gemma dengan perintah. Pipeline meminta agar dua tugas diselesaikan.

  • Lampirkan skor sentimen pada pesan, dengan menggunakan tiga nilai berikut: 1 untuk chat positif, 0 untuk chat netral, dan -1 untuk chat negatif.

  • Ringkas chat dengan satu kalimat.

3. Berikutnya, pipeline akan bercabang, bergantung pada skor sentimen:

  • Jika skornya 1 atau 0, chat dengan ringkasan akan dikirim ke sistem analisis data kami untuk disimpan dan digunakan dalam analisis di masa mendatang.

  • Jika nilainya -1, kami meminta Gemma untuk memberikan respons. Respons ini, digabungkan dengan informasi chat, kemudian dikirim ke sistem pesan peristiwa yang bertindak sebagai perekat antara pipeline dan aplikasi lain. Langkah ini memungkinkan staf untuk meninjau isinya.


Kode pipeline

Penyiapan

Akses dan download Gemma

Dalam contoh, kami menggunakan Gemma melalui KerasNLP, dan menggunakan varian Kaggle 'Instruction tuned' gemma2_keras_gemma2_instruct_2b_en. Anda harus mendownload model dan menyimpannya di lokasi yang dapat diakses pipeline.


Gunakan layanan Dataflow

Meskipun bisa menggunakan CPU untuk pengujian dan pengembangan, mengingat waktu inferensi, untuk sistem produksi kami harus menggunakan GPU pada layanan Dataflow ML. Penggunaan GPU dengan Dataflow difasilitasi oleh sebuah penampung khusus. Detail untuk pengaturan ini tersedia di dukungan Dataflow GPU. Kami menyarankan Anda untuk mengikuti panduan pengembangan lokal untuk pengembangan, yang memungkinkan pengujian pipeline secara cepat. Anda juga dapat merujuk panduan untuk menggunakan Gemma di Dataflow, yang menyertakan link ke contoh file Docker.


Pengendali model khusus Gemma

Transformasi RunInference di Apache Beam merupakan inti dari solusi ini, memanfaatkan pengendali model untuk melakukan konfigurasi dan mengabstraksi pengguna dari kode boilerplate yang diperlukan untuk proses produksi. Sebagian besar tipe model bisa didukung hanya dengan konfigurasi menggunakan pengendali model bawaan Beam, tetapi untuk Gemma, blog ini menggunakan pengendali model khusus, yang memberikan kami kontrol penuh atas interaksi kami dengan model sembari tetap menggunakan semua komponen yang disediakan RunInference untuk pemrosesan. Pipeline custom_model_gemma.py memiliki contoh GemmModelHandler yang dapat Anda gunakan. Harap perhatikan penggunaan nilai max_length yang digunakan dalam panggilan model.generate() dari GemmModelHandler. Nilai ini mengontrol panjang maksimum respons Gemma terhadap kueri dan perlu diubah agar sesuai dengan kebutuhan kasus penggunaan, untuk blog ini kami menggunakan nilai 512.

Tips: Untuk blog ini, kami menemukan bahwa penggunaan backend keras jax berkinerja jauh lebih baik. Untuk mengaktifkannya, DockerFile harus berisi instruksi ENV KERAS_BACKEND="jax". Ini harus diatur dalam container Anda sebelum worker memulai Beam (yang mengimpor Keras)


Bangun pipeline

Langkah pertama dalam pipeline adalah standar untuk sistem pemrosesan peristiwa: kita perlu membaca pesan JSON yang telah dibuat oleh sistem upstream, yang mengemas pesan chat ke dalam struktur sederhana yang mencakup ID chat.

chats = ( pipeline | "Read Topic" >>
                        beam.io.ReadFromPubSub(subscription=args.messages_subscription)
| "Decode" >> beam.Map(lambda x: x.decode("utf-8")
   )

Contoh berikut menunjukkan salah satu pesan JSON ini, serta diskusi yang sangat penting tentang nanas dan pizza, dengan ID 221 sebagai pelanggan kami.

{
"id": 1, 
"user_id": 221, 
"chat_message": "\\nid 221: Hay I am really annoyed that your menu includes a pizza with pineapple on it! \\nid 331: Sorry to hear that , but pineapple is nice on pizza\\nid 221: What a terrible thing to say! Its never ok, so unhappy right now! \\n"
}

Kami sekarang memiliki PCollection objek chat python. Pada langkah berikutnya, kami mengekstrak nilai yang diperlukan dari pesan chat ini dan memasukkannya ke dalam perintah untuk diteruskan ke LLM yang telah disetel dengan instruksi. Untuk melakukan langkah ini, kami membuat template perintah yang menyediakan instruksi untuk model.

prompt_template = """
<prompt>
Provide the results of doing these two tasks on the chat history provided below for the user {}
task 1 : assess if the tone is happy = 1 , neutral = 0 or angry = -1
task 2 : summarize the text with a maximum of 512 characters
Output the results as a json with fields [sentiment, summary]
 
@@@{}@@@
<answer>
"""

Berikut ini adalah contoh perintah yang dikirimkan ke model:

<prompt>
Provide the results of doing these two tasks on the chat history provided below for the user 221
task 1 : assess if the tone is happy = 1 , neutral = 0 or angry = -1
task 2 : summarize the text with a maximum of 512 characters
Output the results as a json with fields [sentiment, summary]
 
@@@"\\nid 221: Hay I am really annoyed that your menu includes a pizza with pineapple on it! \\nid 331: Sorry to hear that , but pineapple is nice on pizza\\nid 221: What a terrible thing to say! Its never ok, so unhappy right now! \\n"@@@
<answer>

Beberapa catatan mengenai perintah:

  1. Perintah ini bertujuan sebagai contoh ilustrasi. Untuk perintah Anda sendiri, jalankan analisis lengkap dengan data indikatif untuk aplikasi Anda.

  • Untuk pembuatan prototipe, Anda bisa menggunakan aistudio.google.com untuk menguji perilaku Gemma dan Gemini dengan cepat. Ada juga kunci API sekali klik jika Anda ingin menguji secara terprogram.

2. Dengan model yang lebih kecil dan kurang bertenaga, Anda mungkin akan mendapatkan respons yang lebih baik dengan menyederhanakan instruksi untuk satu tugas dan melakukan beberapa panggilan terhadap model tersebut.

3. Kami membatasi ringkasan pesan chat hingga maksimal 512 karakter. Cocokkan nilai ini dengan nilai yang disediakan dalam konfigurasi max_length untuk panggilan yang dibuat Gemma.

4. Tiga ampersan, '@@@' digunakan sebagai trik agar kita bisa mengekstrak chat asli dari pesan setelah diproses. Cara lain yang dapat kita lakukan untuk melakukan tugas ini antara lain:

  • Gunakan seluruh pesan chat sebagai kunci dalam pasangan kunci-nilai.

  • Gabungkan kembali hasilnya menjadi data asli. Pendekatan ini membutuhkan pengacakan.

5. Karena kita perlu memproses respons dalam kode, kita meminta LLM untuk membuat representasi JSON dari jawabannya dengan dua kolom: sentimen dan ringkasan.

Untuk membuat perintah, kita perlu mengurai informasi dari pesan JSON sumber kemudian menyisipkannya ke dalam template. Kami merangkum proses ini dalam Beam DoFN dan menggunakannya dalam pipeline. Dalam pernyataan hasil, kami membuat struktur kunci-nilai, dengan ID chat sebagai kuncinya. Struktur ini memungkinkan kami untuk mencocokkan chat dengan inferensi ketika kami memanggil model.

# Create the prompt using the information from the chat
class CreatePrompt(beam.DoFn):
  def process(self, element, *args, **kwargs):
    user_chat = json.loads(element)
    chat_id = user_chat['id']
    user_id = user_chat['user_id']
    messages = user_chat['chat_message']
    yield (chat_id, prompt_template.format(user_id, messages))
 
prompts = chats |  "Create Prompt" >> beam.ParDo(CreatePrompt())

Kita sekarang siap untuk memanggil model. Berkat perangkat RunInference, langkah ini sangatlah praktis. Kita mengemas GemmaModelHandler di dalam KeyedModelhandler, yang memberi tahu RunInference untuk menerima data yang masuk sebagai tuple pasangan kunci-nilai. Selama pengembangan dan pengujian, model disimpan dalam direktori gemma2. Ketika menjalankan model pada layanan Dataflow ML, model disimpan di Google Cloud Storage, dengan format URI gs://<your_bucket>/gemma-directory.

keyed_model_handler = KeyedModelHandler(GemmaModelHandler('gemma2'))
results =  prompts | "RunInference-Gemma" >> RunInference(keyed_model_handler)

Koleksi hasil sekarang berisi hasil dari panggilan LLM. Di sini ada hal yang sedikit menarik: meskipun panggilan LLM adalah kode, tidak seperti memanggil fungsi lain, hasilnya tidak bersifat deterministik! Ini termasuk bagian akhir dari permintaan perintah kami, yaitu "Output hasilnya sebagai JSON dengan kolom [sentimen, ringkasan]". Secara umum, responsnya sesuai dengan bentuk tersebut, tetapi tidak dijamin. Kami harus sedikit defensif di sini dan memvalidasi input. Jika gagal dalam validasi, kami akan menampilkan hasilnya ke koleksi error. Dalam contoh ini, kami membiarkan nilai tersebut di sana. Untuk pipeline produksi, sebaiknya Anda membiarkan LLM mencoba untuk kedua kalinya dan menjalankan hasil pengumpulan error di RunInference lagi kemudian meratakan respons dengan pengumpulan hasil. Karena pipeline Beam adalah Directed Acyclic Graphs, kami tidak bisa membuat loop di sini.

Sekarang kami mengambil koleksi hasil dan memproses output LLM. Untuk memproses hasil RunInference, kami membuat DoFn SentimentAnalysis baru dan fungsi extract_model_reply. Langkah ini mengembalikan objek bertipe PredictionResult:

def extract_model_reply(model_inference):
    match = re.search(r"(\{[\s\S]*?\})", model_inference)
    json_str = match.group(1)
    result = json.loads(json_str)
    if all(key in result for key in ['sentiment', 'summary']):
        return result
    raise Exception('Malformed model reply')
class SentimentAnalysis(beam.DoFn):
    def process(self, element):
        key = element[0]                          
        match = re.search(r"@@@([\s\S]*?)@@@", element[1].example)
        chats = match.group(1)
 
        try:
            # The result will contain the prompt, replace the prompt with ""
            result = extract_model_reply(element[1].inference.replace(element[1].example, ""))
            processed_result = (key, chats, result['sentiment'], result['summary'])           
 
            if (result['sentiment'] <0):
              output = beam.TaggedOutput('negative', processed_result)
            else:
              output = beam.TaggedOutput('main', processed_result)
 
        except Exception as err:
            print("ERROR!" + str(err))
            output = beam.TaggedOutput('error', element)
 
        yield output

Sebaiknya luangkan waktu beberapa menit untuk keperluan extract_model_reply(). Karena model ini dihosting sendiri, kami tidak bisa menjamin bahwa teks akan menghasilkan output JSON. Untuk memastikan bahwa kami mendapatkan output JSON, kami perlu melakukan beberapa pengecekan. Salah satu keuntungan menggunakan Gemini API adalah adanya fitur yang memastikan output selalu dalam bentuk JSON, yang dikenal sebagai constrained decoding.

Sekarang mari kita gunakan fungsi ini dalam pipeline:

filtered_results = (results | "Process Results" >> beam.ParDo(SentimentAnalysis()).with_outputs('main','negative','error'))

Menggunakan with_outputs akan menghasilkan beberapa koleksi yang dapat diakses di filtered_results. Koleksi utama berisi sentimen dan ringkasan untuk ulasan positif dan netral, sedangkan error berisi respons yang tidak dapat diurai dari LLM. Anda bisa mengirim koleksi ini ke sumber lain, seperti BigQuery, dengan transformasi penulisan. Contoh ini tidak mendemonstrasikan langkah-langkahnya, tetapi koleksi negatif adalah sesuatu yang ingin kita selesaikan lebih lanjut di dalam pipeline ini.


Pemrosesan sentimen negatif

Memastikan pelanggan merasa puas sangatlah penting untuk retensi. Meskipun kami menggunakan contoh sederhana dengan debat nanas dan pizza, interaksi langsung dengan pelanggan sebaiknya selalu mengutamakan empati dan respons positif dari semua bagian organisasi. Pada tahap ini, kami meneruskan chat ini ke salah satu perwakilan dukungan yang terlatih, tetapi kami melihat jika LLM masih dapat membantu staf dukungan dalam mengurangi waktu resolusi.

Untuk langkah ini, kami membuat panggilan ke model dan memintanya untuk menyusun respons. Kami kembali menggunakan model Gemma 2B untuk panggilan ini di dalam kode.

generated_responses = (results.negative 
       | "Generate Response" >> beam.Map(lambda x: ((x[0], x[3]), "<prompt>Generate an apology response for the user in this chat text: " + x[1] + "<answer>"))
       | "Gemma-Response" >> RunInference(keyed_model_handler)

Pada umumnya, Anda mengemas kode pembuatan prompt dalam DoFn, tetapi Anda juga bisa menggunakan lambda sederhana dalam kode pipeline itu sendiri. Di sini kita membuat perintah yang berisi pesan chat asli, yang diekstrak dalam fungsi SentimentAnalysis.

Untuk menjalankan dan menguji secara lokal, kita bisa menggunakan beberapa pernyataan print sederhana untuk melihat output pada berbagai PCollection:

generated_responses | "Print Response" >> beam.Map(print)
filtered_results.main | "Print Main" >> beam.Map(print)
filtered_results.error | "Print Errors" >> beam.Map(print)

Tentu saja untuk penggunaan yang sebenarnya, output ini akan dikirim ke berbagai sink seperti Pub/Sub dan BigQuery.


Jalankan pipeline

Mari kita lihat bagaimana model ini bekerja dengan pesan JSON sebelumnya:

Langkah 1: Analisis sentimen dan peringkasan

"sentiment": -1,

"summary": "User 221 is very unhappy about the presence of pineapple on pizza."

Respons yang dihasilkan oleh model 2B tidaklah buruk. Sentimennya benar, dan karena hasil ringkasannya lebih subjektif, ketepatan responsnya bergantung pada penggunaan downstream informasi ini.

Langkah 2: Respons yang dihasilkan

"I understand that you're upset about the pineapple pizza. It's a very personal preference, and I apologize that it might have caused you some frustration. We strive to offer a diverse menu to cater to a wide range of tastes, and we're always open to feedback. Would you like to share your thoughts on the pizza with pineapple?"

Apakah respons ini dapat diterima? Pada tahap ini, kami ingin mengirimkan seluruh paket data ke perwakilan dukungan untuk dianalisis dan jika mereka setuju, mereka bisa mengirimkannya sebagaimana adanya, atau mereka dapat melakukan beberapa edit dan penyesuaian.


Langkah Berikutnya

Mungkin pada tahap ini kita ingin menggunakan model dengan lebih banyak parameter, seperti Gemma2 9B atau 27B. Kita juga bisa menggunakan model yang cukup besar sehingga membutuhkan panggilan API ke panggilan layanan eksternal, seperti Gemini, alih-alih dimuat ke dalam worker. Lagi pula, kami mengurangi pekerjaan yang diperlukan untuk mengirim ke model yang lebih besar ini dengan menggunakan model yang lebih kecil sebagai filter. Membuat pilihan-pilihan ini bukan hanya keputusan teknis, tetapi juga keputusan bisnis. Biaya dan manfaatnya perlu diukur. Kita dapat menggunakan Dataflow lagi untuk mengatur pengujian A/B dengan lebih mudah.

Anda juga bisa memilih untuk menyempurnakan model khusus bagi kasus penggunaan Anda. Ini adalah salah satu cara untuk mengubah "suara" model agar sesuai dengan kebutuhan Anda.


Pengujian A/B

Pada langkah generate, kami meneruskan semua chat negatif yang masuk ke model 2B. Jika kita ingin mengirim sebagian koleksi ke model lain, kita bisa menggunakan fungsi Partition di Beam dengan koleksi filtered_responses.negative. Dengan mengarahkan beberapa pesan pelanggan ke model yang berbeda dan meminta staf dukungan untuk menilai respons yang dibuat sebelum mengirimkannya, kami dapat mengumpulkan masukan yang berharga mengenai kualitas respons dan margin peningkatan.


Ringkasan

Dengan beberapa baris kode ini, kami membangun sebuah sistem yang mampu memproses data sentimen pelanggan dengan sangat cepat dan bervariasi. Menggunakan model terbuka Gemma 2, dengan 'performa yang tak tertandingi untuk ukurannya', kami bisa menggabungkan LLM yang kuat ini dalam kasus penggunaan stream processing untuk membantu menciptakan pengalaman yang lebih baik bagi pelanggan.