Gemma 2 adalah versi terbaru dalam keluarga model terbuka Google yang ringan dan canggih yang dibangun dari penelitian dan teknologi yang sama dengan yang digunakan untuk membuat model Gemini. Model bahasa besar (LLM) seperti Gemma sangatlah serbaguna, yang membuka banyak potensi integrasi untuk proses bisnis. Blog ini mengeksplorasi bagaimana Anda bisa menggunakan Gemma untuk mengukur sentimen percakapan, meringkas isi percakapan tersebut, dan membantu membuat balasan untuk percakapan yang sulit yang bisa disetujui oleh pengguna. Salah satu persyaratan utama adalah pelanggan yang telah mengungkapkan sentimen negatif dapat dilayani kebutuhannya dalam waktu cepat, yang berarti kita harus menggunakan pipeline data streaming yang memanfaatkan LLM dengan latensi minimal.
Gemma 2 menawarkan performa tak tertandingi untuk ukurannya. Model Gemma telah terbukti mencapai hasil tolok ukur yang luar biasa, bahkan mengungguli beberapa model yang lebih besar. Ukuran model yang kecil memungkinkan arsitektur model ini diterapkan atau disematkan secara langsung ke dalam pipeline pemrosesan data streaming, sehingga memberikan manfaat, seperti:
Dataflow menyediakan platform pemrosesan streaming dan batch terpadu yang skalabel. Dengan Dataflow, Anda bisa menggunakan Apache Beam Python SDK untuk mengembangkan data streaming dan pipeline pemrosesan peristiwa. Dataflow memberikan manfaat seperti berikut:
Contoh berikut menunjukkan cara menyematkan model Gemma ke dalam pipeline data streaming untuk menjalankan inferensi menggunakan Dataflow.
Skenario ini melibatkan rantai makanan yang sibuk bergulat dengan analisis dan menyimpan permintaan dukungan pelanggan bervolume tinggi melalui berbagai saluran chat. Interaksi ini mencakup chat yang dihasilkan oleh chatbot otomatis dan percakapan lebih kompleks yang membutuhkan perhatian staf dukungan langsung. Untuk menjawab tantangan ini, kami telah menetapkan tujuan yang ambisius:
Solusi ini menggunakan pipeline yang memproses pesan chat yang telah selesai dalam sekejap. Gemma digunakan pada instance pertama untuk melakukan pekerjaan analisis pemantauan sentimen chat ini. Semua chat kemudian diringkas, dengan chat yang bersentimen positif atau netral dikirim langsung ke platform data, BigQuery, dengan menggunakan I/O yang sudah tersedia dengan Dataflow. Untuk chat yang melaporkan sentimen negatif, kami menggunakan Gemma untuk meminta model membuat respons yang sesuai secara konteks untuk pelanggan yang tidak puas. Respons ini kemudian dikirim ke manusia untuk ditinjau, sehingga staf dukungan dapat menyaring pesan sebelum sampai ke pelanggan yang berpotensi tidak puas.
Dengan kasus penggunaan ini, kami mengeksplorasi beberapa aspek menarik dari penggunaan LLM dalam pipeline. Sebagai contoh, ada tantangan ketika kami harus memproses respons dalam kode, mengingat respons non-deterministik yang dapat diterima. Sebagai contoh, kami meminta LLM untuk merespons dalam JSON, yang tidak dijamin dapat melakukannya. Permintaan ini mengharuskan kami untuk mengurai dan memvalidasi respons, yang mirip dengan proses yang biasanya Anda lakukan untuk memproses data dari sumber yang mungkin tidak memiliki data terstruktur yang benar.
Dengan solusi ini, pelanggan bisa merasakan waktu respons yang lebih cepat dan menerima perhatian yang dipersonalisasi ketika masalah muncul. Otomatisasi ringkasan chat positif membuat staf dukungan memiliki lebih banyak waktu, sehingga mereka dapat fokus pada interaksi yang lebih kompleks. Selain itu, analisis mendalam terhadap data chat bisa mendorong pengambilan keputusan berbasis data, sementara skalabilitas sistem memungkinkannya beradaptasi dengan mudah terhadap volume chat yang terus meningkat tanpa mengorbankan kualitas respons.
Alur pipeline bisa dilihat di bawah ini:
Pipeline level tinggi bisa dijelaskan dengan beberapa baris:
2. Pipeline meneruskan teks dari pesan ini ke Gemma dengan perintah. Pipeline meminta agar dua tugas diselesaikan.
3. Berikutnya, pipeline akan bercabang, bergantung pada skor sentimen:
Dalam contoh, kami menggunakan Gemma melalui KerasNLP, dan menggunakan varian Kaggle 'Instruction tuned' gemma2_keras_gemma2_instruct_2b_en. Anda harus mendownload model dan menyimpannya di lokasi yang dapat diakses pipeline.
Meskipun bisa menggunakan CPU untuk pengujian dan pengembangan, mengingat waktu inferensi, untuk sistem produksi kami harus menggunakan GPU pada layanan Dataflow ML. Penggunaan GPU dengan Dataflow difasilitasi oleh sebuah penampung khusus. Detail untuk pengaturan ini tersedia di dukungan Dataflow GPU. Kami menyarankan Anda untuk mengikuti panduan pengembangan lokal untuk pengembangan, yang memungkinkan pengujian pipeline secara cepat. Anda juga dapat merujuk panduan untuk menggunakan Gemma di Dataflow, yang menyertakan link ke contoh file Docker.
Transformasi RunInference di Apache Beam merupakan inti dari solusi ini, memanfaatkan pengendali model untuk melakukan konfigurasi dan mengabstraksi pengguna dari kode boilerplate yang diperlukan untuk proses produksi. Sebagian besar tipe model bisa didukung hanya dengan konfigurasi menggunakan pengendali model bawaan Beam, tetapi untuk Gemma, blog ini menggunakan pengendali model khusus, yang memberikan kami kontrol penuh atas interaksi kami dengan model sembari tetap menggunakan semua komponen yang disediakan RunInference untuk pemrosesan. Pipeline custom_model_gemma.py memiliki contoh GemmModelHandler
yang dapat Anda gunakan. Harap perhatikan penggunaan nilai max_length yang digunakan dalam panggilan model.generate() dari GemmModelHandler
. Nilai ini mengontrol panjang maksimum respons Gemma terhadap kueri dan perlu diubah agar sesuai dengan kebutuhan kasus penggunaan, untuk blog ini kami menggunakan nilai 512.
Tips: Untuk blog ini, kami menemukan bahwa penggunaan backend keras jax berkinerja jauh lebih baik. Untuk mengaktifkannya, DockerFile harus berisi instruksi ENV KERAS_BACKEND="jax"
. Ini harus diatur dalam container Anda sebelum worker memulai Beam (yang mengimpor Keras)
Langkah pertama dalam pipeline adalah standar untuk sistem pemrosesan peristiwa: kita perlu membaca pesan JSON yang telah dibuat oleh sistem upstream, yang mengemas pesan chat ke dalam struktur sederhana yang mencakup ID chat.
chats = ( pipeline | "Read Topic" >>
beam.io.ReadFromPubSub(subscription=args.messages_subscription)
| "Decode" >> beam.Map(lambda x: x.decode("utf-8")
)
Contoh berikut menunjukkan salah satu pesan JSON ini, serta diskusi yang sangat penting tentang nanas dan pizza, dengan ID 221 sebagai pelanggan kami.
{
"id": 1,
"user_id": 221,
"chat_message": "\\nid 221: Hay I am really annoyed that your menu includes a pizza with pineapple on it! \\nid 331: Sorry to hear that , but pineapple is nice on pizza\\nid 221: What a terrible thing to say! Its never ok, so unhappy right now! \\n"
}
Kami sekarang memiliki PCollection objek chat python. Pada langkah berikutnya, kami mengekstrak nilai yang diperlukan dari pesan chat ini dan memasukkannya ke dalam perintah untuk diteruskan ke LLM yang telah disetel dengan instruksi. Untuk melakukan langkah ini, kami membuat template perintah yang menyediakan instruksi untuk model.
prompt_template = """
<prompt>
Provide the results of doing these two tasks on the chat history provided below for the user {}
task 1 : assess if the tone is happy = 1 , neutral = 0 or angry = -1
task 2 : summarize the text with a maximum of 512 characters
Output the results as a json with fields [sentiment, summary]
@@@{}@@@
<answer>
"""
Berikut ini adalah contoh perintah yang dikirimkan ke model:
<prompt>
Provide the results of doing these two tasks on the chat history provided below for the user 221
task 1 : assess if the tone is happy = 1 , neutral = 0 or angry = -1
task 2 : summarize the text with a maximum of 512 characters
Output the results as a json with fields [sentiment, summary]
@@@"\\nid 221: Hay I am really annoyed that your menu includes a pizza with pineapple on it! \\nid 331: Sorry to hear that , but pineapple is nice on pizza\\nid 221: What a terrible thing to say! Its never ok, so unhappy right now! \\n"@@@
<answer>
Beberapa catatan mengenai perintah:
2. Dengan model yang lebih kecil dan kurang bertenaga, Anda mungkin akan mendapatkan respons yang lebih baik dengan menyederhanakan instruksi untuk satu tugas dan melakukan beberapa panggilan terhadap model tersebut.
3. Kami membatasi ringkasan pesan chat hingga maksimal 512 karakter. Cocokkan nilai ini dengan nilai yang disediakan dalam konfigurasi max_length untuk panggilan yang dibuat Gemma.
4. Tiga ampersan, '@@@' digunakan sebagai trik agar kita bisa mengekstrak chat asli dari pesan setelah diproses. Cara lain yang dapat kita lakukan untuk melakukan tugas ini antara lain:
5. Karena kita perlu memproses respons dalam kode, kita meminta LLM untuk membuat representasi JSON dari jawabannya dengan dua kolom: sentimen dan ringkasan.
Untuk membuat perintah, kita perlu mengurai informasi dari pesan JSON sumber kemudian menyisipkannya ke dalam template. Kami merangkum proses ini dalam Beam DoFN dan menggunakannya dalam pipeline. Dalam pernyataan hasil, kami membuat struktur kunci-nilai, dengan ID chat sebagai kuncinya. Struktur ini memungkinkan kami untuk mencocokkan chat dengan inferensi ketika kami memanggil model.
# Create the prompt using the information from the chat
class CreatePrompt(beam.DoFn):
def process(self, element, *args, **kwargs):
user_chat = json.loads(element)
chat_id = user_chat['id']
user_id = user_chat['user_id']
messages = user_chat['chat_message']
yield (chat_id, prompt_template.format(user_id, messages))
prompts = chats | "Create Prompt" >> beam.ParDo(CreatePrompt())
Kita sekarang siap untuk memanggil model. Berkat perangkat RunInference, langkah ini sangatlah praktis. Kita mengemas GemmaModelHandler
di dalam KeyedModelhandler
, yang memberi tahu RunInference untuk menerima data yang masuk sebagai tuple pasangan kunci-nilai. Selama pengembangan dan pengujian, model disimpan dalam direktori gemma2
. Ketika menjalankan model pada layanan Dataflow ML, model disimpan di Google Cloud Storage, dengan format URI gs://<your_bucket>/gemma-directory
.
keyed_model_handler = KeyedModelHandler(GemmaModelHandler('gemma2'))
results = prompts | "RunInference-Gemma" >> RunInference(keyed_model_handler)
Koleksi hasil sekarang berisi hasil dari panggilan LLM. Di sini ada hal yang sedikit menarik: meskipun panggilan LLM adalah kode, tidak seperti memanggil fungsi lain, hasilnya tidak bersifat deterministik! Ini termasuk bagian akhir dari permintaan perintah kami, yaitu "Output hasilnya sebagai JSON dengan kolom [sentimen, ringkasan]". Secara umum, responsnya sesuai dengan bentuk tersebut, tetapi tidak dijamin. Kami harus sedikit defensif di sini dan memvalidasi input. Jika gagal dalam validasi, kami akan menampilkan hasilnya ke koleksi error. Dalam contoh ini, kami membiarkan nilai tersebut di sana. Untuk pipeline produksi, sebaiknya Anda membiarkan LLM mencoba untuk kedua kalinya dan menjalankan hasil pengumpulan error di RunInference lagi kemudian meratakan respons dengan pengumpulan hasil. Karena pipeline Beam adalah Directed Acyclic Graphs, kami tidak bisa membuat loop di sini.
Sekarang kami mengambil koleksi hasil dan memproses output LLM. Untuk memproses hasil RunInference, kami membuat DoFn SentimentAnalysis
baru dan fungsi extract_model_reply
. Langkah ini mengembalikan objek bertipe PredictionResult:
def extract_model_reply(model_inference):
match = re.search(r"(\{[\s\S]*?\})", model_inference)
json_str = match.group(1)
result = json.loads(json_str)
if all(key in result for key in ['sentiment', 'summary']):
return result
raise Exception('Malformed model reply')
class SentimentAnalysis(beam.DoFn):
def process(self, element):
key = element[0]
match = re.search(r"@@@([\s\S]*?)@@@", element[1].example)
chats = match.group(1)
try:
# The result will contain the prompt, replace the prompt with ""
result = extract_model_reply(element[1].inference.replace(element[1].example, ""))
processed_result = (key, chats, result['sentiment'], result['summary'])
if (result['sentiment'] <0):
output = beam.TaggedOutput('negative', processed_result)
else:
output = beam.TaggedOutput('main', processed_result)
except Exception as err:
print("ERROR!" + str(err))
output = beam.TaggedOutput('error', element)
yield output
Sebaiknya luangkan waktu beberapa menit untuk keperluan extract_model_reply()
. Karena model ini dihosting sendiri, kami tidak bisa menjamin bahwa teks akan menghasilkan output JSON. Untuk memastikan bahwa kami mendapatkan output JSON, kami perlu melakukan beberapa pengecekan. Salah satu keuntungan menggunakan Gemini API adalah adanya fitur yang memastikan output selalu dalam bentuk JSON, yang dikenal sebagai constrained decoding.
Sekarang mari kita gunakan fungsi ini dalam pipeline:
filtered_results = (results | "Process Results" >> beam.ParDo(SentimentAnalysis()).with_outputs('main','negative','error'))
Menggunakan with_outputs
akan menghasilkan beberapa koleksi yang dapat diakses di filtered_results
. Koleksi utama berisi sentimen dan ringkasan untuk ulasan positif dan netral, sedangkan error berisi respons yang tidak dapat diurai dari LLM. Anda bisa mengirim koleksi ini ke sumber lain, seperti BigQuery, dengan transformasi penulisan. Contoh ini tidak mendemonstrasikan langkah-langkahnya, tetapi koleksi negatif adalah sesuatu yang ingin kita selesaikan lebih lanjut di dalam pipeline ini.
Memastikan pelanggan merasa puas sangatlah penting untuk retensi. Meskipun kami menggunakan contoh sederhana dengan debat nanas dan pizza, interaksi langsung dengan pelanggan sebaiknya selalu mengutamakan empati dan respons positif dari semua bagian organisasi. Pada tahap ini, kami meneruskan chat ini ke salah satu perwakilan dukungan yang terlatih, tetapi kami melihat jika LLM masih dapat membantu staf dukungan dalam mengurangi waktu resolusi.
Untuk langkah ini, kami membuat panggilan ke model dan memintanya untuk menyusun respons. Kami kembali menggunakan model Gemma 2B untuk panggilan ini di dalam kode.
generated_responses = (results.negative
| "Generate Response" >> beam.Map(lambda x: ((x[0], x[3]), "<prompt>Generate an apology response for the user in this chat text: " + x[1] + "<answer>"))
| "Gemma-Response" >> RunInference(keyed_model_handler)
Pada umumnya, Anda mengemas kode pembuatan prompt dalam DoFn, tetapi Anda juga bisa menggunakan lambda sederhana dalam kode pipeline itu sendiri. Di sini kita membuat perintah yang berisi pesan chat asli, yang diekstrak dalam fungsi SentimentAnalysis
.
Untuk menjalankan dan menguji secara lokal, kita bisa menggunakan beberapa pernyataan print sederhana untuk melihat output pada berbagai PCollection:
generated_responses | "Print Response" >> beam.Map(print)
filtered_results.main | "Print Main" >> beam.Map(print)
filtered_results.error | "Print Errors" >> beam.Map(print)
Tentu saja untuk penggunaan yang sebenarnya, output ini akan dikirim ke berbagai sink seperti Pub/Sub dan BigQuery.
Mari kita lihat bagaimana model ini bekerja dengan pesan JSON sebelumnya:
Langkah 1: Analisis sentimen dan peringkasan
"sentiment": -1,
"summary": "User 221 is very unhappy about the presence of pineapple on pizza."
Respons yang dihasilkan oleh model 2B tidaklah buruk. Sentimennya benar, dan karena hasil ringkasannya lebih subjektif, ketepatan responsnya bergantung pada penggunaan downstream informasi ini.
Langkah 2: Respons yang dihasilkan
"I understand that you're upset about the pineapple pizza. It's a very personal preference, and I apologize that it might have caused you some frustration. We strive to offer a diverse menu to cater to a wide range of tastes, and we're always open to feedback. Would you like to share your thoughts on the pizza with pineapple?"
Apakah respons ini dapat diterima? Pada tahap ini, kami ingin mengirimkan seluruh paket data ke perwakilan dukungan untuk dianalisis dan jika mereka setuju, mereka bisa mengirimkannya sebagaimana adanya, atau mereka dapat melakukan beberapa edit dan penyesuaian.
Mungkin pada tahap ini kita ingin menggunakan model dengan lebih banyak parameter, seperti Gemma2 9B atau 27B. Kita juga bisa menggunakan model yang cukup besar sehingga membutuhkan panggilan API ke panggilan layanan eksternal, seperti Gemini, alih-alih dimuat ke dalam worker. Lagi pula, kami mengurangi pekerjaan yang diperlukan untuk mengirim ke model yang lebih besar ini dengan menggunakan model yang lebih kecil sebagai filter. Membuat pilihan-pilihan ini bukan hanya keputusan teknis, tetapi juga keputusan bisnis. Biaya dan manfaatnya perlu diukur. Kita dapat menggunakan Dataflow lagi untuk mengatur pengujian A/B dengan lebih mudah.
Anda juga bisa memilih untuk menyempurnakan model khusus bagi kasus penggunaan Anda. Ini adalah salah satu cara untuk mengubah "suara" model agar sesuai dengan kebutuhan Anda.
Pada langkah generate, kami meneruskan semua chat negatif yang masuk ke model 2B. Jika kita ingin mengirim sebagian koleksi ke model lain, kita bisa menggunakan fungsi Partition di Beam dengan koleksi filtered_responses.negative
. Dengan mengarahkan beberapa pesan pelanggan ke model yang berbeda dan meminta staf dukungan untuk menilai respons yang dibuat sebelum mengirimkannya, kami dapat mengumpulkan masukan yang berharga mengenai kualitas respons dan margin peningkatan.
Dengan beberapa baris kode ini, kami membangun sebuah sistem yang mampu memproses data sentimen pelanggan dengan sangat cepat dan bervariasi. Menggunakan model terbuka Gemma 2, dengan 'performa yang tak tertandingi untuk ukurannya', kami bisa menggabungkan LLM yang kuat ini dalam kasus penggunaan stream processing untuk membantu menciptakan pengalaman yang lebih baik bagi pelanggan.