O Gemma 2 é a versão mais recente da família de modelos abertos leves e de última geração do Google, criados a partir da mesma pesquisa e tecnologia usadas para criar os modelos Gemini. Os modelos de linguagem grandes (LLMs), como o Gemma, são notavelmente versáteis e possibilitam muitas integrações potenciais para processos de negócios. Este blog explora como você pode usar o Gemma para medir o sentimento de uma conversa, resumir o conteúdo dela e ajudar na criação de uma resposta para uma conversa difícil que possa ser aprovada por uma pessoa. Um dos principais requisitos é que os clientes que expressarem um sentimento negativo tenham suas necessidades atendidas quase em tempo real, o que significa que precisaremos fazer uso de um pipeline de dados de streaming que aproveite os LLMs com o mínimo de latência.
O Gemma 2 oferece desempenho incomparável para seu tamanho. Já foi demonstrado que os modelos Gemma alcançam resultados excepcionais em comparativos de mercado, superando até mesmo alguns modelos maiores. O tamanho reduzido dos modelos permite arquiteturas nas quais o modelo é implantado ou incorporado diretamente no pipeline de processamento de dados de streaming, oferecendo benefícios como:
O Dataflow fornece uma plataforma de processamento de streaming em lote escalonável e unificada. Com o Dataflow, você pode usar o Apache Beam Python SDK para desenvolver dados de streaming e pipelines de processamento de eventos. O Dataflow oferece os seguintes benefícios:
O exemplo a seguir mostra como incorporar o modelo Gemma ao pipeline de dados de streaming para executar inferências usando o Dataflow.
Este cenário gira em torno de uma movimentada cadeia de alimentos que lida com a análise e o armazenamento de um alto volume de solicitações de suporte ao cliente por meio de vários canais de chat. Essas interações incluem chats gerados por bots de chat automatizados e conversas com mais nuances, que exigem a atenção da equipe de suporte em tempo real. Em resposta a esse desafio, estabelecemos metas ambiciosas:
A solução usa um pipeline que processa mensagens de chat concluídas quase em tempo real. O Gemma é usado na primeira instância para realizar trabalhos de análise monitorando o sentimento desses chats. Todos os chats são, então, resumidos, e os chats com sentimentos positivos ou neutros são enviados diretamente para uma plataforma de dados, o BigQuery, usando as E/S prontas com o Dataflow. Para chats que relatam um sentimento negativo, usamos o Gemma para pedir ao modelo que crie uma resposta contextualmente apropriada para o cliente insatisfeito. Essa resposta é, então, enviada a um ser humano para análise, permitindo que a equipe de suporte refine a mensagem antes que ela chegue a um cliente potencialmente insatisfeito.
Com esse caso de uso, exploramos alguns aspectos interessantes do uso de um LLM dentro de um pipeline. Por exemplo, há desafios quando é preciso processar as respostas em código, dadas as respostas não determinísticas que podem ser aceitas. Por exemplo, pedimos ao nosso LLM que responda em JSON, o que não é garantido. Essa solicitação exige que analisemos e validemos a resposta, um processo semelhante a como você normalmente processaria dados de fontes que podem não estar corretamente estruturados.
Com essa solução, os clientes podem ter tempos de resposta menores e receber atenção personalizada quando surgirem problemas. A automação do resumo positivo de chats libera tempo para a equipe de suporte, permitindo que ela se concentre em interações mais complexas. Além disso, a análise aprofundada dos dados do chat pode promover a tomada de decisões baseada em dados, enquanto a escalonabilidade do sistema permite que ele se adapte facilmente ao aumento dos volumes de chats sem comprometer a qualidade da resposta.
O fluxo do pipeline pode ser visto abaixo:
O pipeline de alto nível pode ser descrito com algumas linhas:
2. O pipeline transmite o texto dessa mensagem para o Gemma com um prompt. O pipeline solicita que duas tarefas sejam concluídas.
3. Em seguida, o pipeline se ramifica, dependendo da pontuação de sentimento:
Em nosso exemplo, usamos o Gemma por meio do KerasNLP e usamos a variante "ajustada por instrução" do Kaggle gemma2_keras_gemma2_instruct_2b_en. Você deve fazer o download do modelo e armazená-lo em um local que o pipeline possa acessar.
Embora seja possível usar CPUs para testes e desenvolvimento, dados os tempos de inferência, para um sistema de produção, precisamos usar GPUs no serviço Dataflow ML. O uso de GPUs com o Dataflow é facilitado por um contêiner personalizado. Os detalhes para essa configuração estão disponíveis no suporte a GPU do Dataflow. Recomendamos seguir o guia de desenvolvimento local para desenvolvimento, que permite o teste rápido do pipeline. Você também pode consultar o guia para o uso do Gemma no Dataflow, que inclui links para um Dockerfile de exemplo.
A transformação RunInference no Apache Beam está no centro desta solução, fazendo uso de um gerenciador de modelos para configuração e abstraindo o usuário do código boilerplate necessário para a produção. A maioria dos tipos de modelos pode ter suporte com a configuração usando apenas os gerenciadores de modelos incorporados ao Beam. Mas, para o Gemma, este blog faz uso de um gerenciador de modelos personalizado, o que nos dá controle total de nossas interações com o modelo, enquanto ainda usamos todo o maquinário que o RunInference fornece para processamento. O pipeline custom_model_gemma.py tem um exemplo de GemmModelHandler
que você pode usar. Observe o uso do valor max_length na chamada model.generate () desse GemmModelHandler
. Esse valor controla o comprimento máximo da resposta do Gemma às consultas e precisará ser modificado para corresponder às necessidades do caso de uso. Para este blog, usamos o valor 512.
Dica: para este blog, descobrimos que o uso do back-end jax keras teve um desempenho significativamente melhor. Para ativar isso, o Dockerfile deve conter a instrução ENV KERAS_BACKEND="jax"
. Isso deve ser definido no contêiner antes que o worker inicialize o Beam (que importa o Keras).
A primeira etapa no pipeline é padrão para sistemas de processamento de eventos: precisamos ler as mensagens JSON que nossos sistemas upstream criaram, que empacotam as mensagens de chats em uma estrutura simples que inclui o ID do chat.
chats = ( pipeline | "Read Topic" >>
beam.io.ReadFromPubSub(subscription=args.messages_subscription)
| "Decode" >> beam.Map(lambda x: x.decode("utf-8")
)
O exemplo a seguir mostra uma dessas mensagens JSON, bem como uma discussão muito importante sobre abacaxi e pizza, sendo o ID 221 o nosso cliente.
{
"id": 1,
"user_id": 221,
"chat_message": "\\nid 221: Hay I am really annoyed that your menu includes a pizza with pineapple on it! \\nid 331: Sorry to hear that , but pineapple is nice on pizza\\nid 221: What a terrible thing to say! Its never ok, so unhappy right now! \\n"
}
Agora, temos uma PCollection de objetos de chat do python. Na próxima etapa, extraímos os valores necessários dessas mensagens de chat e os incorporamos a um prompt para transmissão para nosso LLM ajustado por instruções. Para executar essa etapa, criamos um modelo de prompt que fornece instruções para o modelo.
prompt_template = """
<prompt>
Provide the results of doing these two tasks on the chat history provided below for the user {}
task 1 : assess if the tone is happy = 1 , neutral = 0 or angry = -1
task 2 : summarize the text with a maximum of 512 characters
Output the results as a json with fields [sentiment, summary]
@@@{}@@@
<answer>
"""
Este é um exemplo de um prompt sendo enviado para o modelo:
<prompt>
Provide the results of doing these two tasks on the chat history provided below for the user 221
task 1 : assess if the tone is happy = 1 , neutral = 0 or angry = -1
task 2 : summarize the text with a maximum of 512 characters
Output the results as a json with fields [sentiment, summary]
@@@"\\nid 221: Hay I am really annoyed that your menu includes a pizza with pineapple on it! \\nid 331: Sorry to hear that , but pineapple is nice on pizza\\nid 221: What a terrible thing to say! Its never ok, so unhappy right now! \\n"@@@
<answer>
Algumas observações sobre o prompt:
2. Com modelos menores e menos poderosos, você pode obter respostas melhores simplificando as instruções para uma única tarefa e fazendo várias chamadas ao modelo.
3. Limitamos os resumos de mensagens de chat a no máximo 512 caracteres. Combine esse valor com o valor fornecido na configuração max_length para a chamada generate do Gemma.
4. Os três símbolos, "@@@", são usados como um truque para nos permitir extrair os chats originais da mensagem após o processamento. Outras maneiras de executar essa tarefa incluem:
5. Como precisamos processar a resposta em código, pedimos ao LLM que crie uma representação JSON de sua resposta com dois campos: sentimento e resumo.
Para criar o prompt, precisamos analisar as informações de nossa mensagem JSON de origem e inseri-las no modelo. Encapsulamos esse processo em um Beam DoFN e o usamos em nosso pipeline. Em nossa instrução yield, construímos uma estrutura de valor-chave, na qual o ID do chat é a chave. Essa estrutura nos permite combinar o chat com a inferência quando chamamos o modelo.
# Create the prompt using the information from the chat
class CreatePrompt(beam.DoFn):
def process(self, element, *args, **kwargs):
user_chat = json.loads(element)
chat_id = user_chat['id']
user_id = user_chat['user_id']
messages = user_chat['chat_message']
yield (chat_id, prompt_template.format(user_id, messages))
prompts = chats | "Create Prompt" >> beam.ParDo(CreatePrompt())
Agora, estamos prontos para chamar nosso modelo. Graças ao mecanismo RunInference, essa etapa é simples. Envolvemos o GemmaModelHandler
dentro de um KeyedModelhandler
, o que diz a RunInference para aceitar os dados recebidos como uma sequência de pares de chave-valor. Durante o desenvolvimento e teste, o modelo é armazenado no diretório gemma2
. Ao executar o modelo no serviço Dataflow ML, o modelo é armazenado no Google Cloud Storage, com o formato de URI gs://<your_bucket>/gemma-directory
.
keyed_model_handler = KeyedModelHandler(GemmaModelHandler('gemma2'))
results = prompts | "RunInference-Gemma" >> RunInference(keyed_model_handler)
A coleção results agora contém resultados da chamada ao LLM. Aqui, as coisas ficam bem interessantes: embora a chamada ao LLM seja em código, ao contrário da chamada apenas a outra função, os resultados não são determinísticos! Isso inclui a parte final de nossa solicitação de prompt "Output the results as a JSON with fields [sentiment, summary]". Em geral, a resposta corresponde a essa forma, mas isso não é garantido. Precisamos ser um pouco cautelosos aqui e validar nossa entrada. Se houver falha na validação, fazemos a saída dos resultados para uma coleção error. Neste exemplo, deixamos esses valores lá. Para um pipeline de produção, convém deixar o LLM tentar uma segunda vez, executar os resultados da coleção error em RunInference novamente e, depois, juntar a resposta com a coleção results. Como os pipelines do Beam são gráficos acíclicos dirigidos, não podemos criar um laço aqui.
Agora, pegamos a coleção results e processamos a saída do LLM. Para processar os resultados de RunInference, criamos um novo DoFn SentimentAnalysis
e a função extract_model_reply
. Essa etapa retorna um objeto do tipo PredictionResult:
def extract_model_reply(model_inference):
match = re.search(r"(\{[\s\S]*?\})", model_inference)
json_str = match.group(1)
result = json.loads(json_str)
if all(key in result for key in ['sentiment', 'summary']):
return result
raise Exception('Malformed model reply')
class SentimentAnalysis(beam.DoFn):
def process(self, element):
key = element[0]
match = re.search(r"@@@([\s\S]*?)@@@", element[1].example)
chats = match.group(1)
try:
# The result will contain the prompt, replace the prompt with ""
result = extract_model_reply(element[1].inference.replace(element[1].example, ""))
processed_result = (key, chats, result['sentiment'], result['summary'])
if (result['sentiment'] <0):
output = beam.TaggedOutput('negative', processed_result)
else:
output = beam.TaggedOutput('main', processed_result)
except Exception as err:
print("ERROR!" + str(err))
output = beam.TaggedOutput('error', element)
yield output
Vale a pena dedicar alguns minutos à necessidade de extract_model_reply()
. Como o modelo é auto-hospedado, não podemos garantir que o texto seja uma saída em JSON. Para assegurar que obtenhamos uma saída em JSON, precisamos executar algumas verificações. Um benefício de usar a API Gemini é que ela inclui um recurso que garante que a saída seja sempre em JSON, algo conhecido como decodificação restrita.
Agora, vamos usar essas funções em nosso pipeline:
filtered_results = (results | "Process Results" >> beam.ParDo(SentimentAnalysis()).with_outputs('main','negative','error'))
Usar with_outputs
cria várias coleções acessíveis em filtered_results
. A coleção main tem sentimentos e resumos para avaliações positivas e neutras, enquanto error contém quaisquer respostas não analisáveis do LLM. Você pode enviar essas coleções para outras fontes, como o BigQuery, com uma transformação write. Este exemplo não demonstra essa etapa, mas a coleção negativa é algo que desejamos fazer mais dentro deste pipeline.
Garantir a satisfação dos clientes é fundamental para a retenção. Embora tenhamos usado um exemplo mais leve com nosso debate sobre o abacaxi na pizza, as interações diretas com um cliente devem sempre buscar empatia e respostas positivas de todas as partes de uma organização. Nesta fase, transmitimos esse chat para um dos representantes de suporte treinados, mas ainda podemos ver se o LLM é capaz de ajudar essa pessoa do suporte a reduzir o tempo até a resolução.
Para essa etapa, fazemos uma chamada para o modelo e pedimos que ele formule uma resposta. Novamente, usamos o modelo Gemma 2B para essa chamada no código.
generated_responses = (results.negative
| "Generate Response" >> beam.Map (lambda x: ((x[0], x[3]), "<prompt>Generate an apology response for the user in this chat text: " + x[1] + "<answer>"))
| "Gemma-Response" >> RunInference (keyed_model_handler)
Em geral, você envolve o código de criação de prompts em um DoFn, mas também é possível usar um lambda simples no próprio código do pipeline. Aqui, geramos um prompt que contém a mensagem do chat original, que foi extraída na função SentimentAnalysis
.
Para execução e teste locais, podemos fazer uso de algumas instruções print simples para ver as saídas nas várias PCollections:
generated_responses | "Print Response" >> beam.Map(print)
filtered_results.main | "Print Main" >> beam.Map(print)
filtered_results.error | "Print Errors" >> beam.Map(print)
É claro que, para uso real, essas saídas serão enviadas para vários coletores, como o Pub/Sub e o BigQuery.
Vejamos como o modelo se sai com a mensagem JSON anterior:
Etapa 1: Análise de sentimento e resumo
"sentiment": -1,
"summary": "User 221 is very unhappy about the presence of pineapple on pizza."
As respostas geradas pelo modelo 2B não são ruins. O sentimento está correto e, como os resultados do resumo são mais subjetivos, a exatidão da resposta depende dos usos posteriores dessas informações.
Etapa 2: Resposta gerada
"I understand that you're upset about the pineapple pizza. It's a very personal preference, and I apologize that it might have caused you some frustration. We strive to offer a diverse menu to cater to a wide range of tastes, and we're always open to feedback. Would you like to share your thoughts on the pizza with pineapple?"
Essas respostas são aceitáveis? Nesta fase, nosso objetivo é enviar todo o pacote de dados para análise por um representante de suporte e, se ele estiver satisfeito, poderá enviar o pacote como está ou fazer algumas edições e ajustes.
Nesta fase, podemos usar um modelo com mais parâmetros, como o Gemma2 9B ou 27B. Também podemos usar um modelo grande o suficiente para exigir uma chamada de API para uma chamada de serviço externo, como o Gemini, em vez do carregamento para um worker. No final, reduzimos o trabalho necessário para o envio a esses modelos maiores usando o modelo menor como filtro. Fazer essas escolhas não é apenas uma decisão técnica, mas também uma decisão de negócios. Os custos e benefícios precisam ser medidos. Podemos, novamente, usar o Dataflow para configurar mais facilmente o teste A/B.
Você também pode optar por ajustar um modelo personalizado para seu caso de uso. Essa é uma maneira de mudar a "voz" do modelo de acordo com as suas necessidades.
Em nossa etapa de geração, transmitimos todos os chats negativos recebidos para nosso modelo 2B. Se quisermos enviar uma parte da coleção para outro modelo, podemos usar a função Partition no Beam com a coleção filtered_responses.negative
. Ao direcionar algumas mensagens de clientes para diferentes modelos e fazer com que a equipe de suporte classifique as respostas geradas antes de enviá-las, podemos coletar feedbacks valiosos sobre a qualidade das respostas e as margens para melhorias.
Com essas poucas linhas de código, criamos um sistema capaz de processar dados de sentimentos de clientes em alta velocidade e variabilidade. Ao usar o modelo aberto Gemma 2, com seu "desempenho incomparável para seu tamanho", conseguimos incorporar esse poderoso LLM a um caso de uso de processamento de stream que ajuda a criar uma experiência melhor para os clientes.