Gemma para ML de streaming com o Dataflow

AGO 16, 2024
Reza Rokni Google Senior Staff Dataflow
Ravin Kumar Google Data Scientist Language Applications

O Gemma 2 é a versão mais recente da família de modelos abertos leves e de última geração do Google, criados a partir da mesma pesquisa e tecnologia usadas para criar os modelos Gemini. Os modelos de linguagem grandes (LLMs), como o Gemma, são notavelmente versáteis e possibilitam muitas integrações potenciais para processos de negócios. Este blog explora como você pode usar o Gemma para medir o sentimento de uma conversa, resumir o conteúdo dela e ajudar na criação de uma resposta para uma conversa difícil que possa ser aprovada por uma pessoa. Um dos principais requisitos é que os clientes que expressarem um sentimento negativo tenham suas necessidades atendidas quase em tempo real, o que significa que precisaremos fazer uso de um pipeline de dados de streaming que aproveite os LLMs com o mínimo de latência.


Gemma

O Gemma 2 oferece desempenho incomparável para seu tamanho. Já foi demonstrado que os modelos Gemma alcançam resultados excepcionais em comparativos de mercado, superando até mesmo alguns modelos maiores. O tamanho reduzido dos modelos permite arquiteturas nas quais o modelo é implantado ou incorporado diretamente no pipeline de processamento de dados de streaming, oferecendo benefícios como:

  • Localidade de dados com chamadas worker locais em vez de RPCs (remote procedure calls) de dados para um sistema separado.

  • Um único sistema para escalonamento automático, permitindo o uso de métricas como pressão reversa na origem como sinais diretos para o escalonador automático.

  • Um único sistema a observar e monitorar na produção.

O Dataflow fornece uma plataforma de processamento de streaming em lote escalonável e unificada. Com o Dataflow, você pode usar o Apache Beam Python SDK para desenvolver dados de streaming e pipelines de processamento de eventos. O Dataflow oferece os seguintes benefícios:

  • O Dataflow é totalmente gerenciado, fazendo o escalonamento automático com base na demanda.

  • O Apache Beam fornece um conjunto de transformações prontas com pouca codificação que podem economizar tempo, esforços e custos na escrita de código boilerplate. Afinal, o melhor código é aquele que você não precisa escrever.

  • O Dataflow ML dá suporte direto a GPUs, instalando os drivers necessários e fornecendo acesso a uma variedade de dispositivos de GPU.

O exemplo a seguir mostra como incorporar o modelo Gemma ao pipeline de dados de streaming para executar inferências usando o Dataflow.


Cenário

Este cenário gira em torno de uma movimentada cadeia de alimentos que lida com a análise e o armazenamento de um alto volume de solicitações de suporte ao cliente por meio de vários canais de chat. Essas interações incluem chats gerados por bots de chat automatizados e conversas com mais nuances, que exigem a atenção da equipe de suporte em tempo real. Em resposta a esse desafio, estabelecemos metas ambiciosas:

  • Primeiro, queremos gerenciar e armazenar dados de chat de forma eficiente, resumindo as interações positivas para facilitar a referência e a análise futuras.

  • Em segundo lugar, queremos implementar a detecção e resolução de problemas em tempo real, usando a análise de sentimento para identificar rapidamente os clientes insatisfeitos e gerar respostas personalizadas para lidar com as preocupações deles.

A solução usa um pipeline que processa mensagens de chat concluídas quase em tempo real. O Gemma é usado na primeira instância para realizar trabalhos de análise monitorando o sentimento desses chats. Todos os chats são, então, resumidos, e os chats com sentimentos positivos ou neutros são enviados diretamente para uma plataforma de dados, o BigQuery, usando as E/S prontas com o Dataflow. Para chats que relatam um sentimento negativo, usamos o Gemma para pedir ao modelo que crie uma resposta contextualmente apropriada para o cliente insatisfeito. Essa resposta é, então, enviada a um ser humano para análise, permitindo que a equipe de suporte refine a mensagem antes que ela chegue a um cliente potencialmente insatisfeito.

Com esse caso de uso, exploramos alguns aspectos interessantes do uso de um LLM dentro de um pipeline. Por exemplo, há desafios quando é preciso processar as respostas em código, dadas as respostas não determinísticas que podem ser aceitas. Por exemplo, pedimos ao nosso LLM que responda em JSON, o que não é garantido. Essa solicitação exige que analisemos e validemos a resposta, um processo semelhante a como você normalmente processaria dados de fontes que podem não estar corretamente estruturados.

Com essa solução, os clientes podem ter tempos de resposta menores e receber atenção personalizada quando surgirem problemas. A automação do resumo positivo de chats libera tempo para a equipe de suporte, permitindo que ela se concentre em interações mais complexas. Além disso, a análise aprofundada dos dados do chat pode promover a tomada de decisões baseada em dados, enquanto a escalonabilidade do sistema permite que ele se adapte facilmente ao aumento dos volumes de chats sem comprometer a qualidade da resposta.


O pipeline de processamento de dados

O fluxo do pipeline pode ser visto abaixo:

Data processing pipeline architecture

O pipeline de alto nível pode ser descrito com algumas linhas:

  1. Leia os dados da avaliação do Pub/Sub, nossa fonte de mensagens de eventos. Esses dados contêm o ID e o histórico do chat como um payload do JSON. Esse payload é processado no pipeline.

2. O pipeline transmite o texto dessa mensagem para o Gemma com um prompt. O pipeline solicita que duas tarefas sejam concluídas.

  • Anexar uma pontuação de sentimento à mensagem, usando os três valores a seguir: 1 para um chat positivo, 0 para um chat neutro e -1 para um chat negativo.

  • Resumir o chat em uma única frase.

3. Em seguida, o pipeline se ramifica, dependendo da pontuação de sentimento:

  • Se a pontuação for 1 ou 0, o chat com o resumo é enviado para nosso sistema de análise de dados para armazenamento e análise futura.

  • Se a pontuação for -1, pedimos ao Gemma que forneça uma resposta. Essa resposta, combinada com as informações do chat, é enviada para um sistema de mensagens de eventos que atua como uma junção entre o pipeline e outros aplicativos. Esta etapa permite que uma pessoa analise o conteúdo.


O código do pipeline

Configuração

Acesse e faça o download do Gemma

Em nosso exemplo, usamos o Gemma por meio do KerasNLP e usamos a variante "ajustada por instrução" do Kaggle gemma2_keras_gemma2_instruct_2b_en. Você deve fazer o download do modelo e armazená-lo em um local que o pipeline possa acessar.


Use o serviço Dataflow

Embora seja possível usar CPUs para testes e desenvolvimento, dados os tempos de inferência, para um sistema de produção, precisamos usar GPUs no serviço Dataflow ML. O uso de GPUs com o Dataflow é facilitado por um contêiner personalizado. Os detalhes para essa configuração estão disponíveis no suporte a GPU do Dataflow. Recomendamos seguir o guia de desenvolvimento local para desenvolvimento, que permite o teste rápido do pipeline. Você também pode consultar o guia para o uso do Gemma no Dataflow, que inclui links para um Dockerfile de exemplo.


Gerenciador de modelos personalizados do Gemma

A transformação RunInference no Apache Beam está no centro desta solução, fazendo uso de um gerenciador de modelos para configuração e abstraindo o usuário do código boilerplate necessário para a produção. A maioria dos tipos de modelos pode ter suporte com a configuração usando apenas os gerenciadores de modelos incorporados ao Beam. Mas, para o Gemma, este blog faz uso de um gerenciador de modelos personalizado, o que nos dá controle total de nossas interações com o modelo, enquanto ainda usamos todo o maquinário que o RunInference fornece para processamento. O pipeline custom_model_gemma.py tem um exemplo de GemmModelHandler que você pode usar. Observe o uso do valor max_length na chamada model.generate () desse GemmModelHandler. Esse valor controla o comprimento máximo da resposta do Gemma às consultas e precisará ser modificado para corresponder às necessidades do caso de uso. Para este blog, usamos o valor 512.

Dica: para este blog, descobrimos que o uso do back-end jax keras teve um desempenho significativamente melhor. Para ativar isso, o Dockerfile deve conter a instrução ENV KERAS_BACKEND="jax". Isso deve ser definido no contêiner antes que o worker inicialize o Beam (que importa o Keras).


Crie o pipeline

A primeira etapa no pipeline é padrão para sistemas de processamento de eventos: precisamos ler as mensagens JSON que nossos sistemas upstream criaram, que empacotam as mensagens de chats em uma estrutura simples que inclui o ID do chat.

chats = ( pipeline | "Read Topic" >>
                        beam.io.ReadFromPubSub(subscription=args.messages_subscription)
| "Decode" >> beam.Map(lambda x: x.decode("utf-8")
   )

O exemplo a seguir mostra uma dessas mensagens JSON, bem como uma discussão muito importante sobre abacaxi e pizza, sendo o ID 221 o nosso cliente.

{
"id": 1, 
"user_id": 221, 
"chat_message": "\\nid 221: Hay I am really annoyed that your menu includes a pizza with pineapple on it! \\nid 331: Sorry to hear that , but pineapple is nice on pizza\\nid 221: What a terrible thing to say! Its never ok, so unhappy right now! \\n"
}

Agora, temos uma PCollection de objetos de chat do python. Na próxima etapa, extraímos os valores necessários dessas mensagens de chat e os incorporamos a um prompt para transmissão para nosso LLM ajustado por instruções. Para executar essa etapa, criamos um modelo de prompt que fornece instruções para o modelo.

prompt_template = """
<prompt>
Provide the results of doing these two tasks on the chat history provided below for the user {}
task 1 : assess if the tone is happy = 1 , neutral = 0 or angry = -1
task 2 : summarize the text with a maximum of 512 characters
Output the results as a json with fields [sentiment, summary]
 
@@@{}@@@
<answer>
"""

Este é um exemplo de um prompt sendo enviado para o modelo:

<prompt>
Provide the results of doing these two tasks on the chat history provided below for the user 221
task 1 : assess if the tone is happy = 1 , neutral = 0 or angry = -1
task 2 : summarize the text with a maximum of 512 characters
Output the results as a json with fields [sentiment, summary]
 
@@@"\\nid 221: Hay I am really annoyed that your menu includes a pizza with pineapple on it! \\nid 331: Sorry to hear that , but pineapple is nice on pizza\\nid 221: What a terrible thing to say! Its never ok, so unhappy right now! \\n"@@@
<answer>

Algumas observações sobre o prompt:

  1. O objetivo desse prompt é ser um exemplo ilustrativo. Para seus próprios prompts, execute uma análise completa com dados indicativos para seu aplicativo.

  • Para prototipagem, você pode usar o aistudio.google.com para testar rapidamente o comportamento do Gemma e do Gemini. Há também uma chave de API de um clique se você quiser testar programaticamente.

2. Com modelos menores e menos poderosos, você pode obter respostas melhores simplificando as instruções para uma única tarefa e fazendo várias chamadas ao modelo.

3. Limitamos os resumos de mensagens de chat a no máximo 512 caracteres. Combine esse valor com o valor fornecido na configuração max_length para a chamada generate do Gemma.

4. Os três símbolos, "@@@", são usados como um truque para nos permitir extrair os chats originais da mensagem após o processamento. Outras maneiras de executar essa tarefa incluem:

  • Usar a mensagem do chat inteira como uma chave no par de chave-valor.

  • Mesclar os resultados novamente com os dados originais. Essa abordagem requer um embaralhamento.

5. Como precisamos processar a resposta em código, pedimos ao LLM que crie uma representação JSON de sua resposta com dois campos: sentimento e resumo.

Para criar o prompt, precisamos analisar as informações de nossa mensagem JSON de origem e inseri-las no modelo. Encapsulamos esse processo em um Beam DoFN e o usamos em nosso pipeline. Em nossa instrução yield, construímos uma estrutura de valor-chave, na qual o ID do chat é a chave. Essa estrutura nos permite combinar o chat com a inferência quando chamamos o modelo.

# Create the prompt using the information from the chat
class CreatePrompt(beam.DoFn):
  def process(self, element, *args, **kwargs):
    user_chat = json.loads(element)
    chat_id = user_chat['id']
    user_id = user_chat['user_id']
    messages = user_chat['chat_message']
    yield (chat_id, prompt_template.format(user_id, messages))
 
prompts = chats |  "Create Prompt" >> beam.ParDo(CreatePrompt())

Agora, estamos prontos para chamar nosso modelo. Graças ao mecanismo RunInference, essa etapa é simples. Envolvemos o GemmaModelHandler dentro de um KeyedModelhandler, o que diz a RunInference para aceitar os dados recebidos como uma sequência de pares de chave-valor. Durante o desenvolvimento e teste, o modelo é armazenado no diretório gemma2. Ao executar o modelo no serviço Dataflow ML, o modelo é armazenado no Google Cloud Storage, com o formato de URI gs://<your_bucket>/gemma-directory.

keyed_model_handler = KeyedModelHandler(GemmaModelHandler('gemma2'))
results =  prompts | "RunInference-Gemma" >> RunInference(keyed_model_handler)

A coleção results agora contém resultados da chamada ao LLM. Aqui, as coisas ficam bem interessantes: embora a chamada ao LLM seja em código, ao contrário da chamada apenas a outra função, os resultados não são determinísticos! Isso inclui a parte final de nossa solicitação de prompt "Output the results as a JSON with fields [sentiment, summary]". Em geral, a resposta corresponde a essa forma, mas isso não é garantido. Precisamos ser um pouco cautelosos aqui e validar nossa entrada. Se houver falha na validação, fazemos a saída dos resultados para uma coleção error. Neste exemplo, deixamos esses valores lá. Para um pipeline de produção, convém deixar o LLM tentar uma segunda vez, executar os resultados da coleção error em RunInference novamente e, depois, juntar a resposta com a coleção results. Como os pipelines do Beam são gráficos acíclicos dirigidos, não podemos criar um laço aqui.

Agora, pegamos a coleção results e processamos a saída do LLM. Para processar os resultados de RunInference, criamos um novo DoFn SentimentAnalysis e a função extract_model_reply. Essa etapa retorna um objeto do tipo PredictionResult:

def extract_model_reply(model_inference):
    match = re.search(r"(\{[\s\S]*?\})", model_inference)
    json_str = match.group(1)
    result = json.loads(json_str)
    if all(key in result for key in ['sentiment', 'summary']):
        return result
    raise Exception('Malformed model reply')
class SentimentAnalysis(beam.DoFn):
    def process(self, element):
        key = element[0]                          
        match = re.search(r"@@@([\s\S]*?)@@@", element[1].example)
        chats = match.group(1)
 
        try:
            # The result will contain the prompt, replace the prompt with ""
            result = extract_model_reply(element[1].inference.replace(element[1].example, ""))
            processed_result = (key, chats, result['sentiment'], result['summary'])           
 
            if (result['sentiment'] <0):
              output = beam.TaggedOutput('negative', processed_result)
            else:
              output = beam.TaggedOutput('main', processed_result)
 
        except Exception as err:
            print("ERROR!" + str(err))
            output = beam.TaggedOutput('error', element)
 
        yield output

Vale a pena dedicar alguns minutos à necessidade de extract_model_reply(). Como o modelo é auto-hospedado, não podemos garantir que o texto seja uma saída em JSON. Para assegurar que obtenhamos uma saída em JSON, precisamos executar algumas verificações. Um benefício de usar a API Gemini é que ela inclui um recurso que garante que a saída seja sempre em JSON, algo conhecido como decodificação restrita.

Agora, vamos usar essas funções em nosso pipeline:

filtered_results = (results | "Process Results" >> beam.ParDo(SentimentAnalysis()).with_outputs('main','negative','error'))

Usar with_outputs cria várias coleções acessíveis em filtered_results. A coleção main tem sentimentos e resumos para avaliações positivas e neutras, enquanto error contém quaisquer respostas não analisáveis do LLM. Você pode enviar essas coleções para outras fontes, como o BigQuery, com uma transformação write. Este exemplo não demonstra essa etapa, mas a coleção negativa é algo que desejamos fazer mais dentro deste pipeline.


Processamento de sentimentos negativos

Garantir a satisfação dos clientes é fundamental para a retenção. Embora tenhamos usado um exemplo mais leve com nosso debate sobre o abacaxi na pizza, as interações diretas com um cliente devem sempre buscar empatia e respostas positivas de todas as partes de uma organização. Nesta fase, transmitimos esse chat para um dos representantes de suporte treinados, mas ainda podemos ver se o LLM é capaz de ajudar essa pessoa do suporte a reduzir o tempo até a resolução.

Para essa etapa, fazemos uma chamada para o modelo e pedimos que ele formule uma resposta. Novamente, usamos o modelo Gemma 2B para essa chamada no código.

generated_responses = (results.negative
       | "Generate Response" >> beam.Map (lambda x: ((x[0], x[3]), "<prompt>Generate an apology response for the user in this chat text: " + x[1] + "<answer>"))
       | "Gemma-Response" >> RunInference (keyed_model_handler)

Em geral, você envolve o código de criação de prompts em um DoFn, mas também é possível usar um lambda simples no próprio código do pipeline. Aqui, geramos um prompt que contém a mensagem do chat original, que foi extraída na função SentimentAnalysis.

Para execução e teste locais, podemos fazer uso de algumas instruções print simples para ver as saídas nas várias PCollections:

generated_responses | "Print Response" >> beam.Map(print)
filtered_results.main | "Print Main" >> beam.Map(print)
filtered_results.error | "Print Errors" >> beam.Map(print)

É claro que, para uso real, essas saídas serão enviadas para vários coletores, como o Pub/Sub e o BigQuery.


Execução do pipeline

Vejamos como o modelo se sai com a mensagem JSON anterior:

Etapa 1: Análise de sentimento e resumo

"sentiment": -1,

"summary": "User 221 is very unhappy about the presence of pineapple on pizza."

As respostas geradas pelo modelo 2B não são ruins. O sentimento está correto e, como os resultados do resumo são mais subjetivos, a exatidão da resposta depende dos usos posteriores dessas informações.

Etapa 2: Resposta gerada

"I understand that you're upset about the pineapple pizza. It's a very personal preference, and I apologize that it might have caused you some frustration. We strive to offer a diverse menu to cater to a wide range of tastes, and we're always open to feedback. Would you like to share your thoughts on the pizza with pineapple?"

Essas respostas são aceitáveis? Nesta fase, nosso objetivo é enviar todo o pacote de dados para análise por um representante de suporte e, se ele estiver satisfeito, poderá enviar o pacote como está ou fazer algumas edições e ajustes.


Próximas etapas

Nesta fase, podemos usar um modelo com mais parâmetros, como o Gemma2 9B ou 27B. Também podemos usar um modelo grande o suficiente para exigir uma chamada de API para uma chamada de serviço externo, como o Gemini, em vez do carregamento para um worker. No final, reduzimos o trabalho necessário para o envio a esses modelos maiores usando o modelo menor como filtro. Fazer essas escolhas não é apenas uma decisão técnica, mas também uma decisão de negócios. Os custos e benefícios precisam ser medidos. Podemos, novamente, usar o Dataflow para configurar mais facilmente o teste A/B.

Você também pode optar por ajustar um modelo personalizado para seu caso de uso. Essa é uma maneira de mudar a "voz" do modelo de acordo com as suas necessidades.


Teste A/B

Em nossa etapa de geração, transmitimos todos os chats negativos recebidos para nosso modelo 2B. Se quisermos enviar uma parte da coleção para outro modelo, podemos usar a função Partition no Beam com a coleção filtered_responses.negative. Ao direcionar algumas mensagens de clientes para diferentes modelos e fazer com que a equipe de suporte classifique as respostas geradas antes de enviá-las, podemos coletar feedbacks valiosos sobre a qualidade das respostas e as margens para melhorias.


Resumo

Com essas poucas linhas de código, criamos um sistema capaz de processar dados de sentimentos de clientes em alta velocidade e variabilidade. Ao usar o modelo aberto Gemma 2, com seu "desempenho incomparável para seu tamanho", conseguimos incorporar esse poderoso LLM a um caso de uso de processamento de stream que ajuda a criar uma experiência melhor para os clientes.