使用 Dataflow 来流式传输 ML 内容的 Gemma

八月 14, 2024
Reza Rokni Google Senior Staff Dataflow
Ravin Kumar Google Data Scientist Language Applications

Gemma 2 是 Google 最先进的轻量级开放模型系列,利用创建 Gemini 模型所用的研究和技术创建而成。像 Gemma 这样的大型语言模型 (LLM) 用途广泛,为业务流程提供诸多的集成可能性。本博客探讨了如何使用 Gemma 来评估对话传达出的情绪总结对话的内容,并协助为棘手的对话生成响应(之后可以再由人工审批)。其中一个关键要求是,表达了负面情绪的客户希望他们的需求可以近乎实时地得到满足,这意味着我们需要利用流式数据流水线,从而以最低的延迟利用 LLM。


Gemma

Gemma 2 在大小方面的性能无可比拟。经证实,Gemma 模型可以实现出色的基准结果,甚至比一些大模型更为出色。由于模型的小尺寸架构,因而可以直接在流式数据处理流水线上部署或嵌入模型,并具有以下好处:

  • 通过本地工作器而不是远程过程来调用数据,以在单独的系统上实现数据局部性

  • 可自动调节的单个系统,允许使用诸如源头背压等指标作为自动调节程序的直接信号

  • 在生产环境中进行观察和监控的单个系统

Dataflow 提供一个可伸缩的统一批处理和流处理平台。借助 Dataflow,您可以使用 Apache Beam Python SDK 开发流式数据、事件处理流水线。Dataflow 提供以下好处:

  • Dataflow 为全代管式,可根据需求自动上下调节

  • Apache Beam 提供一组低代码一站式转换功能,可节省您编写通用样板代码的时间、精力和成本。毕竟,最好的代码是您无需编写的代码

  • Dataflow ML 直接支持 GPU,进而安装必要的驱动程序并提供对一系列 GPU 设备的访问权限

以下示例显示如何将 Gemma 模型嵌入流式数据流水线中,以便使用 Dataflow 运行推理。


场景

这个场景围绕一条繁忙的食物链展开,即通过各种聊天渠道分析和存储大量的客户支持请求。这些互动既包括自动聊天机器人生成的聊天,也包括更多需要实时支持人员关注的微妙对话。为了应对这一挑战,我们制定了宏大的目标:

  • 首先,我们希望通过总结正向互动来高效地管理和存储聊天数据,以供参考和未来分析。

  • 其次,我们希望部署实时问题检测和解决措施,使用情感分析快速发现感到不满意的客户,并生成量身定制的回复来解决他们的问题。

该解决方案使用流水线来近乎实时地处理已完成的聊天消息。首先,Gemma 会监控并分析这些聊天内容透露出的情绪。然后,模型会对所有聊天数据进行总结,并通过搭配使用现成的 I/O 与 Dataflow,将积极或中性情绪的聊天直接发送到数据平台 BigQuery。对于报告为消极情绪的聊天,我们会要求 Gemma 模型为感到不满意的客户生成符合情境的回复,并将此回复发送给支持人员进行审核,以便他们可以在优化消息之后再将其发送给可能感到不满意的客户。

在此用例中,我们探讨了在流水线中使用 LLM 的一些有趣方面。例如,考虑到客户可以接受的回复具有非确定性,而 LLM 又必须以代码处理回复,因此存在一些挑战。例如,我们要求 LLM 以 JSON 格式回复,但无法保证模型能做到这一点。要使用此请求,我们需要解析和验证回复,此过程与您通常需要处理来自数据结构可能不正确的数据源的数据类似。

借助此解决方案,客户可以体验更快的回复时间,并在问题出现时获得个性化的关注。正向聊天内容总结自动化为支持人员节省了时间,让他们可以专注于更复杂的互动。此外,深入分析聊天数据可以推动制定数据驱动的决策,而系统的可伸缩性使模型可以轻松应对不断增加的聊天量,而不会对回复质量造成影响。


数据处理流水线

流水线流如下所示:

Data processing pipeline architecture

高级流水线可以用几行内容来描述:

  1. 阅读来自 Pub/Sub(我们的事件消息传递来源)的评论数据。此数据包含作为 JSON 负载的聊天 ID 和聊天记录。此负载在流水线中进行处理。

2. 流水线通过提示将此消息中的文本传递给 Gemma。流水线要求完成两个任务。

  • 使用以下三个值,为消息附加情绪分数:1 表示积极情绪的聊天,0 表示中性情绪的聊天,-1 表示消极情绪的聊天。

  • 用一句话总结聊天内容。

3. 接下来,根据情绪分数,流水线会出现以下分支:

  • 如果分数为 1 或 0,带总结的聊天内容将被发送到我们的数据分析系统,用于存储和未来分析。

  • 如果分数为 -1,我们会要求 Gemma 提供回复。然后,模型将此回复与聊天信息一起发送到事件消息传递系统,该系统充当流水线与其他应用之间的粘合剂。在此步骤,用户可以查看内容。


流水线代码

设置

访问和下载 Gemma

在我们的示例中,我们通过 KerasNLP 使用 Gemma,并且还使用了 Kaggle 的“指令调整”gemma2_keras_gemma2_instruct_2b_en 变体。您必须下载模型并将其存储在流水线可以访问的位置中。


使用 Dataflow 服务

虽然可以使用 CPU 进行测试和开发,但考虑到推理时间,对于生产系统,我们需要在 Dataflow ML 服务上使用 GPU。自定义容器有助于将 GPU 与 Dataflow 搭配使用。有关此设置的详细信息,请访问 Dataflow GPU 支持。我们建议您按照本地开发指南进行开发,以便快速测试流水线。您还可以参考关于在 Dataflow 上使用 Gemma 的指南,其中包括指向示例 Docker 文件的链接。


Gemma 自定义模型处理程序

Apache Beam 中的 RunInference 转换是此解决方案的核心,可使用模型处理程序进行配置,并从生产所需的样板代码中提取用户。大多数模型类型仅支持使用 Beam 的内置模型处理程序进行配置,但对于 Gemma,此博客介绍的是如何使用自定义模型处理程序进行配置,这样让我们在仍然使用 RunInference 提供的所有处理机制的同时,还能完全控制与模型的交互。流水线 custom_model_gemma.py 有一个您可以使用的示例 GemmModelHandler。请注意,务必使用来自该 GemmModelHandlermodel.generate() 调用中使用的 max_length 值。此值控制 Gemma 的最大查询回复长度,并且需要经过更改才能匹配用例的需求。在本博客中,我们使用的值是 512。

提示:对于此博客,我们发现使用 jax keras 后端的效果明显更好。要启用此后端,DockerFile 必须包含指令 ENV KERAS_BACKEND="jax"。您必须在工作器启动 Beam(即导入 Keras)之前在容器中完成这项设置


构建流水线

流水线中的第一步是事件处理系统的标准:我们需要读取上游系统创建的 JSON 消息,并以包含聊天 ID 的简单结构打包聊天消息。

chats = ( pipeline | "Read Topic" >>
                        beam.io.ReadFromPubSub(subscription=args.messages_subscription)
| "Decode" >> beam.Map(lambda x: x.decode("utf-8")
   )

以下示例显示了其中一条 JSON 消息,以及关于菠萝和披萨的重要讨论(ID 221 是我们的客户)。

{
"id": 1, 
"user_id": 221, 
"chat_message": "\\nid 221:嘿,我真的很生气,菜单上居然有菠萝披萨!\\nid 331:抱歉,但加了菠萝的披萨很好吃\\nid 221:天啊!一点都不好吃,我现在很不开心!\\n"
}

我们现在有包含各种 python 聊天对象的 PCollection。在下一步,我们将从这些聊天消息中提取所需的值,并将其整合到提示中,以传递给经过指令调整的 LLM。为此,我们会创建一个为模型提供指令的提示模板。

prompt_template = """
<提示>
根据下面与用户 {} 相关的聊天记录,提供以下两项任务的完成结果
任务 1:评估用户语气是愉快 (1)、中性 (0) 还是生气 (-1)
任务 2:总结文本内容,最多 512 个字符
将结果输出为带有字段 [情绪、总结] 的 JSON 文件
 
@@@{}@@@
<回答>
"""

以下是向模型发送的提示示例:

<提示>
根据下面与用户 221 相关的聊天记录提供以下两项任务的完成结果
任务 1评估用户语气是愉快 (1)中性 (0) 还是生气 (-1)
任务 2总结文本内容最多 512 个字符
将结果输出为带有字段 [情绪总结]  JSON 文件
 
@@@"\\nid 221:嘿,我真的很生气,菜单上居然有菠萝披萨!\\nid 331:抱歉,但加了菠萝的披萨很好吃\\nid 221:一点都不好吃,我现在很不开心!\\n"@@@
<回答>

有关提示的一些注意事项:

  1. 本提示示例仅供参考。对于您自己的提示,请使用应用的指示性数据运行完整的分析。

  • 对于原型设计,您可以使用 aistudio.google.com 来快速测试 Gemma 和 Gemini 行为。如果您想以编程方式进行测试,还可以使用一键式 API 密钥。

2. 使用尺寸较小、功能稍逊的模型,您可以简化单个任务的指令并针对模型进行多次调用,以获得更好的回复。

3. 我们将聊天消息摘要限制为最多 512 个字符。将此值与 max_length 配置中提供的值相匹配,以便 Gemma 生成调用。

4. 使用三个“&”、“@@@”,以便我们在处理后从消息中提取原始聊天内容。可用于完成此任务的其他方法包括:

  • 将整个聊天消息用作键值对中的键。

  • 将结果连接回原始数据。这种方法需要进行重排。

5. 由于需要处理代码中的回复,所以我们要求 LLM 使用情绪和总结这两个字段,以 JSON 格式表示回答。

要创建提示,我们需要解析源 JSON 消息中的信息,然后将其插入模板中。我们将此过程封装在 Beam DoFN 中,并在我们的流水线中使用。在我们的 yield 语句中,我们构建了一个键值结构,其中聊天 ID 充当键。这种结构让我们能够在调用模型时将聊天与推理相匹配。

# 使用聊天信息创建提示
class CreatePrompt(beam.DoFn):
  def process(self, element, *args, **kwargs):
    user_chat = json.loads(element)
    chat_id = user_chat['id']
    user_id = user_chat['user_id']
    messages = user_chat['chat_message']
    yield (chat_id, prompt_template.format(user_id, messages))
 
prompts = chats |  "Create Prompt" >> beam.ParDo(CreatePrompt())

现在,我们可以调用自己的模型了。得益于 RunInference 机制,此步骤非常简单。我们将 GemmaModelHandler 封装在 KeyedModelhandler 中,并告诉 RunInference 接受传入的数据作为键值对元组。在开发和测试期间,模型存储在 gemma2 目录中。在 Dataflow ML 服务上运行模型时,模型存储在 Google Cloud Storage 中,URI 格式为 gs://<your_bucket>/gemma-directory

keyed_model_handler = KeyedModelHandler(GemmaModelHandler('gemma2'))
results =  prompts | "RunInference-Gemma" >> RunInference(keyed_model_handler)

结果集合现在包含 LLM 调用的结果。有一点十分有趣:虽然 LLM 调用是代码,但与仅调用另一个函数不同,因为结果具有不确定性!这包括我们的提示请求将结果输出为带有字段 [情绪、总结] 的 JSON 文件的最后一位。一般来说,回复与该形状匹配,但这一点不能保证。因此,我们需要采取一些防御措施,并验证我们的输入。如果验证失败,我们会将结果输出到错误集合。在此示例中,我们对这些值予以保留。对于生产流水线,您可能希望让 LLM 再次尝试并再次在 RunInference 中运行错误集合结果,然后使用结果集合压缩回复。由于 Beam 流水线为有向无环图,因此我们无法在此处创建循环。

现在,我们采用结果集合并处理 LLM 输出。为了处理 RunInference 的结果,我们创建了一个新的 DoFn SentimentAnalysis 和函数 extract_model_reply。在此步骤,模型会返回 PredictionResult 类型的对象:

def extract_model_reply(model_inference):
    match = re.search(r"(\{[\s\S]*?\})", model_inference)
    json_str = match.group(1)
    result = json.loads(json_str)
    if all(key in result for key in ['sentiment', 'summary']):
        return result
    raise Exception('Malformed model reply')
class SentimentAnalysis(beam.DoFn):
    def process(self, element):
        key = element[0]                          
        match = re.search(r"@@@([\s\S]*?)@@@", element[1].example)
        chats = match.group(1)
 
        try:
            # 结果将包含提示,并将提示替换为""
            result = extract_model_reply(element[1].inference.replace(element[1].example, ""))
            processed_result = (key, chats, result['sentiment'], result['summary'])           
 
            if (result['sentiment'] <0):
              output = beam.TaggedOutput('negative', processed_result)
            else:
              output = beam.TaggedOutput('main', processed_result)
 
        except Exception as err:
            print("ERROR!" + str(err))
            output = beam.TaggedOutput('error', element)
 
        yield output

不妨花几分钟时间来研究对 extract_model_reply() 的需求。由于模型为自托管式,我们无法保证文本将是 JSON 输出。为了确保我们获得 JSON 输出,我们需要运行多次检查。使用 Gemini API 的一个好处是,它包含一项被称为约束解码的功能,可确保输出始终是 JSON 格式。

现在,让我们在流水线中使用这些函数:

filtered_results = (results | "Process Results" >> beam.ParDo(SentimentAnalysis()).with_outputs('main','negative','error'))

使用 with_outputsfiltered_results 中创建多个可访问的集合。主集合包含正面和中性评价的情绪及摘要,而错误包含 LLM 中无法解析的回复。您可以使用写入转换功能,将这些集合发送到其他来源,例如 BigQuery。此示例中未演示此步骤,但我们希望在此流水线中实现更多与负面评价集合相关的功能。


负面情绪处理

确保客户满意对于留住客户至关重要。虽然我们在本次辩论中使用了菠萝披萨这样一个无关紧要的例子,但在与客户直接互动时,组织的各个部门应始终努力做到有同理心并积极回复。在此阶段,我们会将聊天内容传递给一位训练有素的支持代表,但我们仍然可以看到 LLM 是否能够帮助该支持人员缩短解决问题的时间。

对于此步骤,我们会调用模型并要求其生成回复。在代码中,我们再次使用 Gemma 2B 模型进行此调用。

generated_responses = (results.negative 
       | "Generate Response" >> beam.Map(lambda x: ((x[0], x[3]), "<prompt>在此聊天文本中为用户生成道歉回复: " + x[1] + "<answer>"))
       | "Gemma-Response" >> RunInference(keyed_model_handler)

一般来说,您可以将提示创建代码封装在 DoFn 中,但也可以在流水线代码本身中使用简单的 lambda。在这里,我们生成一个提示,其中包含从 SentimentAnalysis 函数中提取出来的原始聊天消息。

对于本地运行和测试,我们可以使用一些简单的 print 语句来查看各种 PCollection 上的输出:

generated_responses | "Print Response" >> beam.Map(print)
filtered_results.main | "Print Main" >> beam.Map(print)
filtered_results.error | "Print Errors" >> beam.Map(print)

当然,在实际使用中,这些输出将被发送到各种接收器,如 Pub/Sub 和 BigQuery。


运行流水线

我们来看看模型如何处理前面的 JSON 消息:

步骤 1:情感分析和总结

“情绪”:-1,

“总结”:用户 221 对披萨上有菠萝非常不满。”

2B 模型生成的回复也不错。情绪分析正确无误,并且由于总结的结果更加主观,所以回复的正确性取决于下游对此信息的使用情况。

步骤 2:生成的回复

我知道您对菠萝披萨很不满。虽然这是非常个人的偏好,但很抱歉没能让您感到满意。我们致力于提供多样化的菜单,以满足各种口味需求,同时也始终乐于接受大家的反馈意见。对于菠萝披萨,您有什么想说的吗?

这些回复是否可以接受?在这个阶段,我们打算将整个数据包发送给支持代表进行分析,如果他们满意,则可以按原样发送,否则可以进行一些编辑和调整。


后续工作

也许在这个阶段,我们希望使用有更多参数的模型,例如 Gemma2 9B 或 27B。我们还可以使用一个足够大的模型,需要调用 API 才能调用外部服务(如 Gemini),而不是加载到工作器上。最后,我们使用较小的模型作为过滤器,以减少发送数据到这些较大模型所需完成的工作。做出这些选择不仅是一项技术决策,也是一项业务决策,因为需要衡量成本和收益。我们可以再次利用 Dataflow 来更轻松地设置 A/B 测试。

您还可以选择根据用例对模型进行自定义微调。这是改变模型“作用”以满足您需求的一种方式。


A/B 测试

在生成步骤中,我们将所有传入的负面情绪的聊天传递给我们的 2B 模型。如果我们想将集合的一部分发送到另一个模型,则可以结合使用 Beam 中的 Partition 函数与 filtered_responses.negative 集合。通过将一些客户消息定向到不同的模型,并在发送之前让支持人员为生成的回复打分,我们可以收集有关回复质量和提高利润率的宝贵反馈。


总结

利用这几行代码,我们构建了一个能够以高速度和可变性处理客户情绪数据的系统。通过利用 Gemma 2 开放模型“无与伦比的性能”,我们能够将这一强大的 LLM 纳入流处理用例中,从而为客户打造更好的体验。