OpenAvatarChat:系统架构和Handler协作机制的详细说明

📋 目录

  1. 整体架构
  2. 数据运行流程
  3. Handler的本质
  4. Handler协同工作机制
  5. Handler详解
  6. 快速参考

一、整体架构

1.1 系统层次结构

OpenAvatarChat采用分层架构,从顶层到底层分为三个层次:

image-nrWy.png

架构说明

  1. ChatEngine(顶层)

    • 系统核心,管理整个对话引擎
    • 负责初始化、配置加载、Handler管理
    • 支持多会话并发,每个会话独立运行
  2. ChatSession(中间层)

    • 对应一个用户会话(一个WebRTC连接)
    • 管理该会话中的所有Handler实例
    • 管理数据流转、线程、队列
  3. Handler(底层)

    • 功能模块,负责具体任务处理
    • 包括:RTC客户端、VAD、ASR、LLM、TTS、Avatar等
    • 每个Handler在会话启动时创建独立实例

1.2 核心组件说明

ChatEngine (src/chat_engine/chat_engine.py)

职责

  • 系统初始化和管理
  • HandlerManager的创建和初始化
  • 会话的创建和销毁
  • 多会话并发管理

关键方法

def initialize(engine_config, app=None, ui=None)
    # 初始化HandlerManager
    # 加载所有Handler
    # 设置客户端Handler

def create_client_session(session_info, client_handler)
    # 创建新的ChatSession
    # 准备Handler环境
    # 返回会话和Handler环境

def stop_session(session_id)
    # 停止并销毁会话

HandlerManager (src/chat_engine/core/handler_manager.py)

职责

  • 从配置文件动态加载Handler模块
  • 注册Handler实例
  • 管理Handler生命周期

关键数据结构

handler_registries = {
    "RtcClient": HandlerRegistry(
        base_info=HandlerBaseInfo(...),
        handler=RtcClient实例,
        handler_config=配置对象
    ),
    "SileroVad": HandlerRegistry(...),
    ...
}

ChatSession (src/chat_engine/core/chat_session.py)

职责

  • 管理单个会话的数据流
  • 创建和管理Handler实例
  • 数据路由和分发
  • 线程管理

关键数据结构

# 数据路由表:数据类型 → Handler输入队列
data_sinks = {
    ChatDataType.MIC_AUDIO: [
        DataSink(owner="SileroVad", sink_queue=vad_queue),
    ],
    ChatDataType.HUMAN_TEXT: [
        DataSink(owner="LLM_Bailian", sink_queue=llm_queue),
    ],
}

# Handler记录:Handler名称 → Handler环境
handlers = {
    "SileroVad": HandlerRecord(env=HandlerEnv(...)),
    ...
}

二、数据运行流程

2.1 完整数据流转架构图

完整的数据流转如下:

image-oicg.png

2.2 数据流转详细流程

步骤1:客户端输入

image-UwNQ.png

步骤2:数据分发(订阅分发)

image-teHb.png

关键机制

  • data_sinks是数据类型到Handler输入队列的映射表
  • 系统根据数据类型自动查找所有订阅者
  • 数据同时分发到所有订阅该类型的Handler

步骤3:Handler处理

每个Handler都有独立的处理线程,从自己的输入队列读取数据:

image-qwJM.png

步骤4:数据链式流转

数据按照Handler的输入输出定义,自动形成处理链:

image-KvYP.png

步骤5:客户端输出

image-Xkhp.png

2.3 关键数据结构:队列与路由

输入队列

# 客户端输入队列(由RTC客户端Handler创建)
input_queues = {
    EngineChannelType.AUDIO: asyncio.Queue(),
    EngineChannelType.VIDEO: asyncio.Queue(),
    EngineChannelType.TEXT: asyncio.Queue(),
}

Handler输入队列

# 每个Handler有自己的输入队列
vad_input_queue = queue.Queue()      # SileroVad的输入队列
asr_input_queue = queue.Queue()      # SenseVoice的输入队列
llm_input_queue = queue.Queue()      # LLM_Bailian的输入队列
tts_input_queue = queue.Queue()      # Edge_TTS的输入队列
avatar_input_queue = queue.Queue()   # AvatarMusetalk的输入队列

数据路由表(data_sinks)

# 数据类型 → 订阅该类型的Handler列表
data_sinks = {
    ChatDataType.MIC_AUDIO: [
        DataSink(owner="SileroVad", sink_queue=vad_input_queue),
    ],
    ChatDataType.HUMAN_AUDIO: [
        DataSink(owner="SenseVoice", sink_queue=asr_input_queue),
    ],
    ChatDataType.HUMAN_TEXT: [
        DataSink(owner="LLM_Bailian", sink_queue=llm_input_queue),
    ],
    ChatDataType.AVATAR_TEXT: [
        DataSink(owner="Edge_TTS", sink_queue=tts_input_queue),
    ],
    ChatDataType.AVATAR_AUDIO: [
        DataSink(owner="AvatarMusetalk", sink_queue=avatar_input_queue),
    ],
}

输出队列映射

# (Handler名称, 数据类型) → 输出队列
outputs = {
    ("AvatarMusetalk", ChatDataType.AVATAR_VIDEO): DataSink(
        sink_queue=output_queues[EngineChannelType.VIDEO]
    ),
}

三、Handler的本质

3.1 Handler是什么?

Handler是独立的功能模块,每个Handler负责一个特定的任务:

  • RTC客户端Handler:管理WebRTC连接,接收用户输入,发送输出
  • SileroVad Handler:语音活动检测(VAD),识别用户是否在说话
  • SenseVoice Handler:语音识别(ASR),将语音转换为文本
  • LLM Handler:大语言模型,生成回复文本
  • TTS Handler:文本转语音,将文本转换为音频
  • Avatar Handler:数字人驱动,根据音频生成视频

3.2 Handler的实质:独立线程

关键理解:每个Handler在会话启动时,会创建一个独立的线程

image-WTzR.png

线程运行模式

# handler_pumper线程的核心循环
def handler_pumper(session_context, handler_env, sinks, outputs):
    shared_states = session_context.shared_states
    input_queue = handler_env.input_queue  # Handler的输入队列
    
    while shared_states.active:  # 会话活跃时持续运行
        try:
            # 1. 从输入队列读取数据
            input_data = input_queue.get_nowait()
        except queue.Empty:
            time.sleep(0.03)  # 队列为空时休眠30ms
            continue
        
        # 2. 调用Handler处理
        handler_result = handler_env.handler.handle(
            handler_env.context,
            input_data,
            handler_env.output_info
        )
        
        # 3. 提交处理结果
        ChatDataSubmitter.submit(handler_result)
            │
            └─→ distribute_data()  # 分发到下一个Handler

3.3 Handler的生命周期

阶段1:加载(load)

系统启动时,每个Handler执行一次加载:

handler.load(engine_config, handler_config)

作用

  • 加载模型文件
  • 初始化全局资源
  • 准备Handler运行环境

示例

  • SileroVad:加载VAD模型
  • SenseVoice:加载ASR模型
  • LLM:初始化API客户端
  • Avatar:加载数字人模型

阶段2:创建上下文(create_context)

每个会话创建时,为每个Handler创建独立的上下文:

handler_context = handler.create_context(session_context, handler_config)

作用

  • 创建会话相关的状态
  • 例如:LLM创建对话历史,ASR创建音频缓冲区

阶段3:处理(handle)

会话运行期间,Handler持续处理数据:

handler_result = handler.handle(context, inputs, output_definitions)

特点

  • 每个Handler在自己的线程中运行
  • 从自己的输入队列读取数据
  • 处理完成后输出结果

阶段4:销毁上下文(destroy_context)

会话结束时,清理Handler上下文:

handler.destroy_context(handler_context)

作用

  • 释放会话相关的资源
  • 清理状态数据

3.4 Handler的接口定义

所有Handler都继承自HandlerBase,必须实现以下接口:

class HandlerBase(ABC):
    @abstractmethod
    def load(self, engine_config, handler_config):
        """加载Handler(加载模型等)"""
        pass
    
    @abstractmethod
    def create_context(self, session_context, handler_config):
        """创建Handler上下文"""
        pass
    
    @abstractmethod
    def handle(self, context, inputs, output_definitions):
        """处理输入数据"""
        pass
    
    @abstractmethod
    def get_handler_detail(self, session_context, context):
        """声明输入输出数据类型"""
        return HandlerDetail(
            inputs={...},   # 输入类型定义
            outputs={...}   # 输出类型定义
        )
    
    @abstractmethod
    def destroy_context(self, context):
        """销毁Handler上下文"""
        pass

3.5 Handler的关键方法:get_handler_detail

这是Handler与系统交互的关键方法,Handler通过它声明自己的输入输出:

def get_handler_detail(self, session_context, context) -> HandlerDetail:
    return HandlerDetail(
        inputs={
            ChatDataType.MIC_AUDIO: HandlerDataInfo(
                type=ChatDataType.MIC_AUDIO,
                # 其他配置...
            )
        },
        outputs={
            ChatDataType.HUMAN_AUDIO: HandlerDataInfo(
                type=ChatDataType.HUMAN_AUDIO,
                definition=数据定义,
            )
        }
    )

系统如何使用

  1. prepare_handler()阶段,系统调用get_handler_detail()
  2. 根据返回的inputs,系统创建数据路由:
    for input_type, input_info in io_detail.inputs.items():
        sink_list = data_sinks.setdefault(input_type, [])
        data_sink = DataSink(
            owner=handler_name,
            sink_queue=handler_input_queue
        )
        sink_list.append(data_sink)
    
  3. 当该类型的数据到达时,系统自动分发到Handler的输入队列

四、Handler协同工作机制

4.1 数据订阅机制

核心思想:Handler通过声明输入类型来"订阅"数据,系统自动建立数据路由。

订阅关系的建立

image-CiLh.png

订阅关系示例

以glut3.yaml配置为例:

# SileroVad订阅MIC_AUDIO
data_sinks[ChatDataType.MIC_AUDIO] = [
    DataSink(owner="SileroVad", sink_queue=vad_queue),
]

# SenseVoice订阅HUMAN_AUDIO(SileroVad的输出)
data_sinks[ChatDataType.HUMAN_AUDIO] = [
    DataSink(owner="SenseVoice", sink_queue=asr_queue),
]

# LLM_Bailian订阅HUMAN_TEXT(SenseVoice的输出)
data_sinks[ChatDataType.HUMAN_TEXT] = [
    DataSink(owner="LLM_Bailian", sink_queue=llm_queue),
]

# Edge_TTS订阅AVATAR_TEXT(LLM的输出)
data_sinks[ChatDataType.AVATAR_TEXT] = [
    DataSink(owner="Edge_TTS", sink_queue=tts_queue),
]

# AvatarMusetalk订阅AVATAR_AUDIO(TTS的输出)
data_sinks[ChatDataType.AVATAR_AUDIO] = [
    DataSink(owner="AvatarMusetalk", sink_queue=avatar_queue),
]

4.2 数据分发机制(订阅分发)

当数据到达时,系统通过distribute_data()自动分发:

def distribute_data(data: ChatData, sinks, outputs):
    # 1. 检查是否是最终输出(直接发送到客户端)
    source_key = (data.source, data.type)
    if source_key in outputs:
        outputs[source_key].sink_queue.put_nowait(data)
    
    # 2. 查找所有订阅该数据类型的Handler
    sink_list = sinks.get(data.type, [])
    
    # 3. 分发到所有订阅者
    for sink in sink_list:
        if sink.owner == data.source:
            continue  # 跳过数据源自身
        
        sink.sink_queue.put_nowait(data)  # 放入Handler的输入队列

关键点

  • 数据根据类型自动路由
  • 一个数据可以同时分发给多个订阅者
  • Handler之间完全解耦,不知道彼此的存在

4.3 Handler并行处理机制

并行运行

所有Handler线程同时运行,互不阻塞:

image-LhEY.png

数据流顺序保证

虽然Handler并行运行,但数据流是顺序的:

MIC_AUDIO → HUMAN_AUDIO → HUMAN_TEXT → AVATAR_TEXT → AVATAR_AUDIO → AVATAR_VIDEO

为什么顺序能保证

  1. 数据类型驱动

    • SileroVad输出HUMAN_AUDIO
    • SenseVoice订阅HUMAN_AUDIO(不是MIC_AUDIO
    • 只有当HUMAN_AUDIO数据产生时,SenseVoice才会收到
  2. 队列缓冲

    • 每个Handler有独立的输入队列
    • 队列自动缓冲数据,保证顺序
  3. VAD的语音结束标记

    • VAD在处理过程中会输出human_speech_end标记
    • ASR等待这个标记后才进行推理
    • 确保完整的语音段被处理

4.4 Handler之间的解耦

完全解耦

Handler之间不直接通信,只通过数据类型交互:

❌ 错误方式(紧耦合):
    SileroVad → 直接调用 → SenseVoice.handle()

✅ 正确方式(解耦):
    SileroVad → 输出HUMAN_AUDIO → 系统分发 → SenseVoice输入队列

解耦的好处

  1. 易于扩展:添加新Handler只需声明输入输出,无需修改现有Handler
  2. 灵活组合:通过配置文件灵活组合Handler
  3. 易于测试:每个Handler可以独立测试
  4. 易于维护:Handler职责清晰,互不干扰

4.5 会话结束机制

共享标志控制

所有线程共享一个标志:shared_states.active

# 会话运行时
shared_states.active = True

# 所有线程循环检查
while shared_states.active:
    # 处理数据
    ...

# 会话结束时
shared_states.active = False

# 所有线程自动退出循环

结束流程

image-PKiu.png


五、Handler详解

5.1 RTC客户端Handler

功能:管理WebRTC连接,负责与客户端的双向通信

输入:客户端音频/视频/文本(通过WebRTC接收)

输出:数字人视频/音频(通过WebRTC发送)

关键代码位置

  • src/handlers/client/rtc_client/client_handler_rtc.py
  • src/service/rtc_service/rtc_stream.py

工作流程

image-iGcN.png

5.2 SileroVad Handler(语音活动检测)

功能:检测用户是否在说话,过滤静音

输入ChatDataType.MIC_AUDIO(原始音频)

输出ChatDataType.HUMAN_AUDIO(人声音频,包含语音活动标记)

关键代码位置src/handlers/vad/silerovad/vad_handler_silero.py

关键方法

def get_handler_detail(self, ...):
    return HandlerDetail(
        inputs={
            ChatDataType.MIC_AUDIO: HandlerDataInfo(...)
        },
        outputs={
            ChatDataType.HUMAN_AUDIO: HandlerDataInfo(...)
        }
    )

def handle(self, context, inputs, output_definitions):
    # 1. 从输入中提取音频
    audio_data = inputs.data.get_main_data()
    
    # 2. VAD模型推理
    is_speech = self.model(audio_data)
    
    # 3. 如果检测到语音,输出HUMAN_AUDIO
    if is_speech:
        yield ChatData(type=HUMAN_AUDIO, data=audio_data)

特点

  • 实时处理,流式输出
  • 输出包含human_speech_starthuman_speech_end标记
  • ASR依赖这些标记来确定何时进行识别

5.3 SenseVoice Handler(语音识别)

功能:将语音转换为文本

输入ChatDataType.HUMAN_AUDIO(人声音频)

输出ChatDataType.HUMAN_TEXT(识别出的文本)

关键代码位置src/handlers/asr/sensevoice/asr_handler_sensevoice.py

关键方法

def get_handler_detail(self, ...):
    return HandlerDetail(
        inputs={
            ChatDataType.HUMAN_AUDIO: HandlerDataInfo(...)
        },
        outputs={
            ChatDataType.HUMAN_TEXT: HandlerDataInfo(...)
        }
    )

def handle(self, context, inputs, output_definitions):
    # 1. 累积音频数据
    context.audio_buffer.append(inputs.data.get_main_data())
    
    # 2. 检查是否有human_speech_end标记
    if inputs.data.has_meta('human_speech_end'):
        # 3. 进行ASR推理
        text = self.model(context.audio_buffer)
        
        # 4. 输出识别文本
        yield ChatData(type=HUMAN_TEXT, data=text)
        
        # 5. 清空缓冲区
        context.audio_buffer.clear()

特点

  • 累积音频,等待语音结束标记
  • 一次性识别完整语音段
  • 输出文本格式:<|zh|><|NEUTRAL|><|Speech|><|woitn|>你好

5.4 LLM Handler(大语言模型)

功能:理解用户输入,生成回复文本

输入ChatDataType.HUMAN_TEXT(用户文本)

输出ChatDataType.AVATAR_TEXT(AI回复文本)

关键代码位置src/handlers/llm/openai_compatible/llm_handler_openai_compatible.py

关键方法

def get_handler_detail(self, ...):
    return HandlerDetail(
        inputs={
            ChatDataType.HUMAN_TEXT: HandlerDataInfo(...)
        },
        outputs={
            ChatDataType.AVATAR_TEXT: HandlerDataInfo(...)
        }
    )

def handle(self, context, inputs, output_definitions):
    # 1. 更新对话历史
    context.history.add_user_message(inputs.data.get_main_data())
    
    # 2. 调用LLM API(流式)
    response = self.client.chat.completions.create(
        model=self.model_name,
        messages=context.history.get_messages(),
        stream=True
    )
    
    # 3. 流式输出文本
    for chunk in response:
        text = chunk.choices[0].delta.content
        if text:
            yield ChatData(type=AVATAR_TEXT, data=text)

特点

  • 维护对话历史
  • 支持流式输出
  • 可以配置不同的LLM模型(百炼、OpenAI兼容等)

5.5 Edge_TTS Handler(文本转语音)

功能:将文本转换为语音

输入ChatDataType.AVATAR_TEXT(AI回复文本)

输出ChatDataType.AVATAR_AUDIO(生成的音频)

关键代码位置src/handlers/tts/edgetts/tts_handler_edgetts.py

关键方法

def get_handler_detail(self, ...):
    return HandlerDetail(
        inputs={
            ChatDataType.AVATAR_TEXT: HandlerDataInfo(...)
        },
        outputs={
            ChatDataType.AVATAR_AUDIO: HandlerDataInfo(...)
        }
    )

def handle(self, context, inputs, output_definitions):
    # 1. 累积文本
    context.text_buffer += inputs.data.get_main_data()
    
    # 2. 检查是否有文本结束标记
    if inputs.data.has_meta('text_end'):
        # 3. 调用TTS API生成音频
        audio = edge_tts.generate(
            text=context.text_buffer,
            voice=self.voice
        )
        
        # 4. 输出音频流
        for audio_chunk in audio:
            yield ChatData(type=AVATAR_AUDIO, data=audio_chunk)
        
        # 5. 清空缓冲区
        context.text_buffer = ""

特点

  • 累积文本,等待完整句子
  • 支持多种语音(通过配置选择)
  • 输出24kHz音频

5.6 AvatarMusetalk Handler(数字人驱动)

功能:根据音频生成数字人视频(唇形同步)

输入ChatDataType.AVATAR_AUDIO(TTS生成的音频)

输出ChatDataType.AVATAR_VIDEO(数字人视频帧)

关键代码位置src/handlers/avatar/musetalk/avatar_handler_musetalk.py

关键方法

def get_handler_detail(self, ...):
    return HandlerDetail(
        inputs={
            ChatDataType.AVATAR_AUDIO: HandlerDataInfo(...)
        },
        outputs={
            ChatDataType.AVATAR_VIDEO: HandlerDataInfo(...)
        }
    )

def handle(self, context, inputs, output_definitions):
    # 1. 累积音频数据
    context.audio_buffer.append(inputs.data.get_main_data())
    
    # 2. 检查是否有音频结束标记
    if inputs.data.has_meta('audio_end'):
        # 3. MuseTalk模型处理
        video_frames = self.model(
            audio=context.audio_buffer,
            avatar_image=context.avatar_image
        )
        
        # 4. 输出视频帧流
        for frame in video_frames:
            yield ChatData(type=AVATAR_VIDEO, data=frame)
        
        # 5. 清空缓冲区
        context.audio_buffer.clear()

特点

  • 精确的唇形同步
  • 支持16fps视频输出
  • 使用MuseTalk模型进行推理

5.7 Handler处理链总结

image-sHOW.png


六、快速参考

6.1 关键代码位置

功能文件路径关键方法
主入口src/glut.pymain()
引擎初始化src/chat_engine/chat_engine.pyChatEngine.initialize()
Handler加载src/chat_engine/core/handler_manager.pyHandlerManager.initialize()
会话创建src/chat_engine/chat_engine.pyChatEngine.create_client_session()
数据分发src/chat_engine/core/chat_session.pyChatSession.distribute_data()
输入处理src/chat_engine/core/chat_session.pyChatSession.inputs_pumper()
Handler处理src/chat_engine/core/chat_session.pyChatSession.handler_pumper()

6.2 关键数据结构

数据类型(ChatDataType)

MIC_AUDIO        # 麦克风音频
HUMAN_AUDIO      # 人声音频
HUMAN_TEXT       # 用户文本
AVATAR_TEXT      # AI回复文本
AVATAR_AUDIO     # TTS音频
AVATAR_VIDEO     # 数字人视频

数据路由表(data_sinks)

data_sinks: Dict[ChatDataType, List[DataSink]]
# 数据类型 → 订阅该类型的Handler列表

Handler注册表

handler_registries: Dict[str, HandlerRegistry]
# Handler名称 → Handler注册信息

6.3 核心执行流程

image-UkmG.png

6.4 模块化关键特性

  1. 配置驱动:通过YAML配置文件定义Handler组合
  2. 动态加载:运行时根据配置动态导入和实例化Handler
  3. 数据驱动路由:基于数据类型自动分发,Handler无需知道其他Handler
  4. 异步处理:每个Handler在独立线程中运行,通过队列通信
  5. 松耦合:Handler之间不直接依赖,只依赖数据类型
  6. 易于扩展:添加新Handler只需实现HandlerBase接口

总结

OpenAvatarChat采用分层、模块化的架构设计:

  • 顶层(ChatEngine):管理整个系统,支持多会话并发
  • 中间层(ChatSession):管理单个会话,协调Handler协同工作
  • 底层(Handler):独立的功能模块,通过数据类型进行通信

核心机制

  1. 数据订阅:Handler通过声明输入类型订阅数据
  2. 自动路由:系统根据数据类型自动分发数据
  3. 并行处理:Handler在独立线程中并行运行
  4. 队列通信:通过队列实现异步、解耦的通信

这种设计实现了高内聚、低耦合的架构,使得系统易于扩展、维护和测试。