一、项目背景与目标

1.1 为什么选择实时语音交互?

在智能语音助手、服务型机器人、语音用户界面(VUI)等领域,实时语音交互正逐渐取代传统的延迟式语音识别,成为更加自然和高效的交互方式。

传统语音系统通常存在以下问题:

  • 识别延迟较高,用户体验割裂;

  • 语音输入需要手动控制起止;

  • 无法及时响应用户指令,难以用于机器人等需要即时控制的场景。

而随着 AI 模型轻量化与实时通信能力的提升,"边说边做"的能力逐渐成为现实。实时语音交互的出现,不仅优化了人机对话体验,也极大拓宽了 AI 应用场景。

1.2 OpenAI Realtime API 有哪些能力?

为满足低延迟语音交互的需求,OpenAI 在 2024 年发布了 Realtime API,支持通过 WebSocket 实现流式语音通信。这一接口具有以下关键能力:

全双工 WebSocket 通信:同时发送音频输入和接收语音响应;

语音活动检测(VAD):无需用户手动标记开始与结束,系统自动识别说话段落;

Function Calling 支持:在语音会话中自动识别意图并调用指定函数,实现自然语言驱动的系统操作;

低延迟语音生成与识别:可流式接收 AI 的语音反馈,几乎同步响应。

这使得开发者可以在不依赖大型后端系统的前提下,快速实现一个高响应性、低延迟的语音控制系统。

1.3 本项目要实现什么?

本项目旨在构建一个基于 OpenAI Realtime API 的实时语音控制机器人系统,其主要目标包括:

  • 实时采集麦克风音频,上传至 OpenAI;

  • 接收语音识别结果及 AI 的语音反馈;

  • 自动解析语音中的控制意图与参数(如“前进三厘米”、“左转90°”);

  • 调用本地函数控制机器人或模拟执行;

  • 生成并播放语音反馈,构建完整语音交互闭环。

简而言之,本系统将实现:“我说话,机器人立即听懂并行动。”

1.4 读者能收获什么?

通过学习本项目,你将掌握以下核心技能:

  • 如何使用 OpenAI Realtime API 建立稳定的 WebSocket 实时连接;

  • 如何实时采集与播放 PCM 格式音频流;

  • 如何设计线程模型,实现语音的流式发送与响应接收;

  • 如何通过 Function Calling 技术,让语音指令触发具体动作;

  • 如何构建一个完整的语音输入 → AI 处理 → 指令执行 → 语音反馈的交互系统。

无论你是对语音助手、智能机器人、AI UI 交互,还是对 OpenAI 的新能力感兴趣,这篇教程都将为你打下坚实基础。

二、系统架构与实现思路

为了实现一个具备“语音控制 → 实时响应 → 动作执行”能力的机器人系统,我们首先需要清晰地定义系统的功能目标,然后合理地组织模块和数据流。以下将从系统结构图、关键技术组件和整体数据流程三方面进行梳理。

2.1 系统功能结构图

整个语音控制系统可分为五个关键模块,如下图所示:

┌────────────┐     ┌────────────┐      ┌────────────────┐     ┌──────────────┐      ┌────────────┐
│ 麦克风输入 │───▶ │ 音频编码    |───▶ │ OpenAI Realtime│───▶│ 音频播放输出 │───▶   │ 用户听反馈 │
└────────────┘     │ base64     │      │    API(WS)   │     └──────────────┘      └────────────┘
                   └────────────┘      │    ↓ Function  │
                                       │     Calling    │
                                       └────────────────┘
                                                │
                                                ▼
                                      ┌────────────────┐
                                      │ 动作指令解析与 │
                                      │ 机器人控制执行 │
                                      └────────────────┘

整体流程说明

  1. 用户通过麦克风发出语音指令;

  2. 系统将语音实时编码并发送至 OpenAI 的 Realtime WebSocket 接口;

  3. OpenAI 实时解析语音内容,并返回:

    • 音频响应(可选 TTS 反馈)

    • 或函数调用请求(Function Calling);

  4. 系统根据解析结果控制机器人执行相应动作;

  5. 同时,AI 可回传语音反馈提示,播放给用户听。

2.2 主要技术组件说明

系统的核心技术模块如下:

模块 技术 作用
音频输入/输出 PyAudio 实时采集麦克风输入与扬声器播放
实时通信 websocket-client 与 OpenAI 建立 WebSocket 双向通信
音频编码 base64 + PCM 将原始音频流编码为 API 可识别格式
语音识别与语义理解 OpenAI Realtime API 实时识别语音内容,解析指令意图
函数调用机制 OpenAI Function Calling 将自然语言转化为结构化调用指令
动作执行模块 自定义函数 / 控制接口 将指令映射为机器人动作,驱动执行

这些模块共同构建起一个“人说话、AI理解、机器人行动”的闭环控制系统。

2.3 数据流简要梳理

整个交互系统的数据流可以拆解为以下步骤:

  1. 语音输入阶段

    • 麦克风通过 PyAudio 持续采集 16kHz 的原始 PCM 音频;

    • 通过回调函数写入音频缓冲队列。

  2. 音频上传阶段

    • 后台线程从缓冲队列中取出音频数据;

    • 使用 base64 编码,并通过 WebSocket 发送至 OpenAI。

  3. AI 分析阶段

    • OpenAI 进行流式语音识别;

    • 若检测到语音意图中包含操作指令,则触发 Function Calling;

    • 同时生成响应语音,返回给客户端。

  4. 动作执行阶段

    • 客户端收到 response.function_call_arguments.done 事件;

    • 解析指令内容,如前进、转向等;

    • 调用本地动作控制接口,驱动机器人完成动作。

  5. 语音反馈阶段(可选)

    • 接收 AI 返回的 TTS 音频片段(response.audio.delta);

    • 播放音频,通过扬声器反馈给用户。

这一流程实现了语音到动作的“秒级响应”,确保语音控制体验的流畅性和自然性。

三、开发准备

在开始开发语音控制机器人系统之前,我们需要先完成环境配置与依赖安装。良好的开发环境不仅能减少运行错误,也有助于后续系统的稳定性和扩展性。

3.1 环境依赖

3.1.1 Python 版本要求

本项目使用 Python 3.8 及以上版本,建议使用虚拟环境(如 venvconda)以便隔离项目依赖。

python --version # 推荐输出:Python 3.8.x ~ 3.11.x

3.1.2 依赖库列表

本系统所需的核心依赖包括:

库名 用途
pyaudio 实时采集麦克风音频和播放语音响应
websocket-client 与 OpenAI Realtime API 建立 WebSocket 通信
python-dotenv 加载 .env 文件中的 API 密钥与配置信息
socks(可选) 在需要代理的网络环境下支持 SOCKS5 连接

3.1.3 安装命令

使用以下命令一次性安装所有依赖:

pip install pyaudio websocket-client python-dotenv

3.2 环境变量配置

为了安全地管理 API 密钥和相关配置,我们建议使用 .env 文件存储敏感信息,并通过 python-dotenv 动态加载。

3.2.1 .env 文件示例

在项目根目录下创建 .env 文件,内容如下:

OPENAI_API_KEY=sk-你的密钥

3.2.2 在代码中加载方式

在 Python 项目中通过如下方式引入环境变量:

from dotenv import load_dotenv
import os

load_dotenv()
api_key = os.getenv("OPENAI_API_KEY")

这样可以避免将密钥写死在代码中,提高项目的安全性和可维护性。

3.3 网络与连接优化建议

3.3.1 使用代理(如有)

如果你在受限网络环境下(如校园网、防火墙、国内部分地区)无法访问 OpenAI 的 WebSocket 接口,可配置 HTTP/SOCKS5 代理:

export http_proxy=socks5://127.0.0.1:1080
export https_proxy=socks5://127.0.0.1:1080

也可通过代码设置代理参数(使用 websocket-clienthttp_proxy_host 等参数)。


3.3.2 强制使用 IPv4 避免连接异常

部分用户在默认 IPv6 环境下连接 WebSocket 时可能遇到 Temporary failure in name resolutionhandshake failed 等问题。推荐在连接前封装 IPv4-only 的 getaddrinfo 方法:

import socket
import websocket

def create_connection_with_ipv4(*args, **kwargs):
    original_getaddrinfo = socket.getaddrinfo
    socket.getaddrinfo = lambda host, port, *args: original_getaddrinfo(host, port, socket.AF_INET, *args)
    try:
        return websocket.create_connection(*args, **kwargs)
    finally:
        socket.getaddrinfo = original_getaddrinfo

这样可确保在 WebSocket 建立时优先使用 IPv4,提升连接稳定性。

四、核心开发过程【重点章节】

本章是项目的核心部分,将逐步实现语音控制机器人的关键功能模块,包括 WebSocket 会话建立、音频采集与播放、通信线程管理,以及语音指令解析与动作执行。

4.1 建立 OpenAI WebSocket 连接

4.1.1 API 接口说明

我们使用的是 OpenAI 的 Realtime WebSocket 接口,官方地址格式如下:

WS_URL = "wss://api.openai.com/v1/realtime?model=gpt-4o-mini-realtime-preview-2024-12-17"

你可以根据官方提供的模型更新日期替换其中的模型名,推荐使用支持语音和 Function Calling 的 gpt-4o-mini-realtime-preview 系列,其成本最低。

4.1.2 鉴权方式:Bearer Token

连接 WebSocket 时需要通过 HTTP Header 添加授权字段:

headers = { "Authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}", }

建议将密钥存储在 .env 文件中,并通过 python-dotenv 加载,避免硬编码。

4.1.3 初始化请求结构(session.update)

WebSocket 建立连接后,客户端需立即发送一条 session.update 消息,声明通信配置参数,包括音频编码格式、采样率、模型名、启用 VAD 等设置:

    session_config = {
        "type": "session.update",  # 消息类型
        "session": {
            # AI行为指令
            # 参考模版:
            # 1、它的角色和任务是什么?	识别用户的指令并调用合适的函数来控制机器人
            # 2、它有哪些能力?	能控制机器人动作(如转向、移动等)
            # 3、它的语气与风格?	可选:简洁直接、非冗余
            # 4、不该做什么?	不执行非功能类请求、不进行闲聊或讲笑话
            "instructions": (
                "你可以控制机器人执行动作,如左转、右转、前进、后退、抬手等。将动作通过 move_robot 函数表达。"
                "你的目标是理解用户的语音命令,并将其转换为对机器人行为的函数调用。"
                "不要回答与控制无关的问题,也不要解释规则。"
                "函数调用必须准确匹配用户的意图。"
                "如果用户没有给出明确命令,则保持安静。"
                "只要可以,就尽快触发函数调用。"
            ),
            # 语音活动检测配置
            "turn_detection": {
                "type": "server_vad",  # 使用服务器端VAD
                "threshold": 0.5,  # 语音检测阈值
                "prefix_padding_ms": 300,  # 前缀静音时长
                "silence_duration_ms": 500  # 静音判定时长
            },
            "voice": "alloy",  # 语音类型
            "temperature": 1,  # 响应随机性(0-2)
            "max_response_output_tokens": 4096,  # 最大输出token数
            "modalities": ["text", "audio"],  # 交互模式
            "input_audio_format": "pcm16",  # 输入音频格式
            "output_audio_format": "pcm16",  # 输出音频格式
            # 语音识别配置
            "input_audio_transcription": {
                "model": "whisper-1"  # 使用Whisper模型
            },
            "tool_choice": "auto",  # 工具选择模式
            # 可用工具列表
            "tools": [
                # 控制机器人转向、前进、后退功能
                {
                    "type": "function",
                    "name": "move_robot",
                    "description": "控制机器人运动,包括前进、后退、转向和腿部调节。所有距离参数单位为米,角度参数单位为度,腿长参数单位为厘米。",
                    "parameters": {
                        "type": "object",
                        "properties": {
                            "action": {
                                "type": "string",
                                "enum": [
                                    "advance",
                                    "retreat",
                                    "left_rotation",
                                    "right_rotation",
                                    "leg_length"
                                ],
                                "description": "动作类型:advance(前进)/retreat(后退)/left_rotation(左转)/right_rotation(右转)/leg_length(调节腿长)"
                            },
                            "value": {
                                "type": "number",
                                "description": "动作参数值:前进/后退距离(米),"+"默认值"+os.getenv("ADVANCE_DEFAULT_VALUE")+"米."+
                                               "或者转向角度(度),"+"默认值"+os.getenv("LEFT_ROTATION_DEFAULT_VALUE")+"度."+
                                               "或者腿长(厘米),"+"默认值"+os.getenv("LEG_LENGTH_DEFAULT_VALUE")+"厘米."
                            }
                        },
                        "required": ["action"]
                    }
                }
            ]
        }
    }

该消息用于告知服务端当前会话的能力需求及角色设定,是实时通信的第一步。

4.2 音频流采集与播放逻辑

4.2.1 麦克风音频采集:mic_callback 回调原理

使用 PyAudio 打开输入音频流时,我们注册一个回调函数,它会在每次采集到新音频块时被调用:

def mic_callback(in_data, frame_count, time_info, status):
    mic_queue.put(in_data)  # 音频写入队列
    return (None, pyaudio.paContinue)

该函数的核心是将原始音频数据写入队列,供发送线程编码并推送至 OpenAI。

4.2.2 扬声器语音播放:speaker_callback 缓冲设计

播放 AI 返回的语音响应也通过回调实现,系统维护一个缓冲区 audio_buffer,按需从中读取数据送入扬声器播放:

def speaker_callback(in_data, frame_count, time_info, status):
    global audio_buffer
    bytes_needed = frame_count * 2  # 16-bit PCM
    if len(audio_buffer) >= bytes_needed:
        chunk = audio_buffer[:bytes_needed]
        audio_buffer = audio_buffer[bytes_needed:]
    else:
        chunk = bytes(audio_buffer) + b'\x00' * (bytes_needed - len(audio_buffer))
        audio_buffer.clear()
    return (chunk, pyaudio.paContinue)

这种缓冲机制确保播放的音频不会中断,即便返回数据存在延迟。

4.2.3 音频数据在系统中的流转路径

数据流向可总结如下:

麦克风 → mic_callback → mic_queue → WebSocket 发送 → OpenAI → 音频响应(base64)→ 解码 → audio_buffer → speaker_callback → 扬声器

4.3 WebSocket 通信线程设计

为了确保音频实时传输不中断,系统通过两个后台线程分别处理音频上传与结果接收。

4.3.1 音频发送线程

从队列中读取音频数据,编码后发送给 OpenAI:

def send_mic_audio_to_websocket(ws):
    while not stop_event.is_set():
        if not mic_queue.empty():
            chunk = mic_queue.get()
            encoded = base64.b64encode(chunk).decode("utf-8")
            ws.send(json.dumps({
                "type": "input_audio_buffer.append",
                "audio": encoded
            }))

4.3.2 音频接收线程

从 OpenAI 接收响应,根据事件类型处理数据:

def receive_audio_from_websocket(ws):
    global audio_buffer
    while not stop_event.is_set():
        message = ws.recv()
        event = json.loads(message)
        event_type = event.get("type")

        if event_type == "response.audio.delta":
            audio_chunk = base64.b64decode(event["delta"])
            audio_buffer.extend(audio_chunk)

        elif event_type == "response.function_call_arguments.done":
            handle_function_call(event, ws)
  • response.audio.delta:接收 TTS 音频片段,供扬声器播放;

  • response.function_call_arguments.done:Function Calling 完成,触发动作处理逻辑。

4.3.3 返回事件类型详解(重点补充)

OpenAI 的 Realtime WebSocket 接口是事件驱动型通信机制,服务端通过 WebSocket 发送一系列事件给客户端。每个事件都是一个结构化的 JSON 对象,字段 type 决定其处理方式。

以下是当前 API 中的常见事件类型,以及如何在 receive_audio_from_websocket() 中进行解析与处理:

事件类型 描述 常见用途 客户端处理方式
response.audio.delta 语音响应片段(TTS) 播放 AI 生成的音频 base64 解码 → 加入 audio_buffer
response.audio.done TTS 音频播放完成 可用于关闭扬声器流 可选:清理播放状态
response.function_call_arguments.done 函数调用完成,返回完整参数 控制机器人动作 解析 action/value → 执行控制函数
input_audio_buffer.speech_started 系统检测到开始说话 提示说话开始 可打印日志或更新 UI
input_audio_buffer.speech_stopped 系统检测到说话结束 用于分段识别 可记录时间点或触发提交
response.content_part.added 内容分段更新 生成文字或提示 可选用于字幕展示
response.done 当前响应完整结束 表示 AI 已完成一次响应 可作为一次任务终点标志

4.3.4 示例:事件分发逻辑

你可以在 receive_audio_from_websocket() 中使用如下方式进行结构化处理:

def receive_audio_from_websocket(ws):
    global audio_buffer
    while not stop_event.is_set():
        message = ws.recv()
        event = json.loads(message)
        event_type = event.get("type")

        if event_type == "response.audio.delta":
            audio_chunk = base64.b64decode(event["delta"])
            audio_buffer.extend(audio_chunk)

        elif event_type == "response.audio.done":
            print("语音播放完成")

        elif event_type == "response.function_call_arguments.done":
            handle_function_call(event, ws)

        elif event_type == "input_audio_buffer.speech_started":
            print("检测到开始说话")

        elif event_type == "input_audio_buffer.speech_stopped":
            print("说话已结束")

        elif event_type == "response.done":
            print("AI 响应结束")

        # 其他事件可按需扩展

4.4 指令解析与机器人控制(Function Calling)

4.4.1 Function Call 的事件结构

OpenAI 会在识别出动作意图时返回如下结构:

{
  "type": "response.function_call_arguments.done",
  "name": "move_robot",
  "arguments": "{\"action\": \"advance\", \"value\": 3}",
  "call_id": "abc123"
}

我们需要从中提取指令参数并调用本地函数。

4.4.2 参数提取与默认值支持

解析逻辑如下:

def handle_function_call(event, ws):
    name = event.get("name")
    args = json.loads(event.get("arguments", "{}"))
    action = args.get("action")
    value = args.get("value", os.getenv(f"{action.upper()}_DEFAULT_VALUE"))

    if action and value:
        move_robot(action, value)
        send_function_call_result("执行成功", event.get("call_id"), ws)

支持读取默认值(例如 .env 中配置 ADVANCE_DEFAULT_VALUE=2)以处理非显式指令。

4.4.3 控制函数设计(机器人动作接口)

def move_robot(action, number):
    if action == "advance":
        ES02_def_function.advance(number)
    elif action == "retreat":
        ES02_def_function.retreat(number)
    elif action == "left_rotation":
        ES02_def_function.left_rotation(number)
    elif action == "right_rotation":
        ES02_def_function.right_rotation(number)
    elif action == "leg_length":
        ES02_def_function.leg_length(number)

4.4.4 返回Function Call执行成功相应

def send_function_call_result(result, call_id, ws):
    """
    向服务器发送函数调用结果
    参数:
        result: 函数执行结果(字符串或可序列化对象)
        call_id: 对应的函数调用ID
        ws: WebSocket连接对象
    """
    # 构造结果消息结构
    result_json = {
        "type": "conversation.item.create",  # 消息类型
        "item": {
            "type": "function_call_output",  # 项目类型
            "output": result,  # 实际结果
            "call_id": call_id  # 关联的调用ID
        }
    }

    try:
        # 发送函数调用结果
        ws.send(json.dumps(result_json))
        print(f"已发送函数调用结果: {result_json}")

        # 构造并发送响应创建请求(通知服务器可以生成AI响应)
        rp_json = {"type": "response.create"}
        ws.send(json.dumps(rp_json))
        print(f"已发送响应创建请求: {rp_json}")
    except Exception as e:
        print(f"发送函数调用结果失败: {e}")

这将通知AI函数调用成功,以便其做下一步的处理和响应。

五、常见问题与优化建议

在开发和运行语音控制机器人系统的过程中,可能会遇到一些技术问题或性能瓶颈。本章整理了常见故障场景及应对策略,并提出一些值得探索的进阶优化方向,帮助你提升系统的稳定性和交互质量。

5.1 连接失败或超时

问题现象:

  • 程序卡在 WebSocket 连接阶段;

  • 报错如 handshake failedTemporary failure in name resolution

  • 或者服务端没有响应,连接超时。

原因分析:

  • 网络环境限制,无法访问 OpenAI API;

  • 使用 IPv6 网络而服务端不支持;

  • API key 配置错误或缺失;

  • 忘记设置请求头中的 Authorization 字段。

解决建议:

  • 确认 .env 文件中 OPENAI_API_KEY 配置正确,格式应为以 sk- 开头的密钥;

  • 检查是否使用了代理或 VPN,必要时设置 http_proxy/socks_proxy 环境变量;

  • 使用 create_connection_with_ipv4() 强制 WebSocket 使用 IPv4 连接;

  • 可添加重试机制,以应对偶发性网络故障。

5.2 音频卡顿或延迟

问题现象:

  • 播放的语音响应断断续续;

  • 用户说话时识别不完整或被截断;

  • 控制动作响应慢,甚至丢失命令。

原因分析:

  • 队列堆积,线程处理速度跟不上;

  • frames_per_buffer 太小或太大导致音频分块异常;

  • 播放缓冲区(audio_buffer)处理不及时。

解决建议:

  • 设置合理的 CHUNK_SIZE,一般为 320(20ms)或 512

  • 确保发送线程与播放线程为守护线程(daemon=True),避免阻塞;

  • 使用性能监控工具观察线程运行状态,及时发现瓶颈。

5.3 动作执行不准确

问题现象:

  • 用户说“向前走三步”,但机器人执行了错误指令;

  • 语义识别失败或参数缺失,导致 NoneType 错误;

  • 函数调用中 value 为空。

原因分析:

  • Prompt 设计不明确,模型不理解语义意图;

  • 缺乏上下文或明确指令格式;

  • 未设置函数默认参数,导致参数缺失时解析失败。

解决建议:

  • 提高 system_instruction 的明确性,例如:
    "你是一个机器人控制专家,请根据语音指令生成动作指令,如前进、后退、左转等。"

  • .env 中设置默认值,如 ADVANCE_DEFAULT_VALUE=2,提高容错率;

  • 增加日志打印,输出收到的参数内容,便于调试;

  • 考虑将 Function 调用设计为结构化 schema,明确字段范围。

一个高质量的系统 prompt 通常包含以下 5 个部分:

部分 内容 示例
1️⃣ 角色设定 明确 AI 的职责与定位 你是一个机器人控制系统,负责从用户语音中提取控制指令
2️⃣ 指令目标 明确需要“做什么” 从用户的语音指令中提取出动作类型和参数值
3️⃣ 可用动作说明 列出所有支持的动作及其含义/单位 advance(前进,步数),left_rotation(左转,角度)等
4️⃣ 输出规范 告诉模型输出形式,如 JSON、结构字段等 输出字段包括 action 和 value
5️⃣ 容错与缺省策略 没说清楚怎么办?数值缺失怎么办? 如用户未指定 value,请使用默认值

演示:

六、结语

通过本项目,我们从零搭建了一个基于 OpenAI Realtime API 的实时语音控制机器人系统。系统不仅能够接收用户语音指令,实时进行语义识别,还能触发结构化函数调用,驱动本地动作控制逻辑,最终形成“说话→理解→执行→反馈”的完整闭环。实时语音交互是人机交互领域的重要方向,结合 OpenAI Realtime API 与 Function Calling 技术,你将拥有一个极具潜力的开发平台。希望本教程不仅帮助你完成了一个有趣的项目,更激发你去构建更多基于语音和 AI 的创新应用。

现在,拿起麦克风,让你的机器人“听你说,照你做”。