在我们的项目中,曾经依赖于 Java 实现的音视频处理功能已经成功迁移至 Python,经过这次转变,整个系统架构得到了显著的优化,尤其是在视频推流方面。最为关键的改进之一是,从 WebSocket 传输协议过渡到 WebRTC,实现了更高效、低延迟的音视频数据传输。此外,我们通过将所有音频和图像数据直接加载到内存中,避免了磁盘 I/O 操作,这对于实时性要求较高的系统来说,是一次革命性的提升。

一、框架重构

1.1 Java 迁移到 Python的挑战与解决方案

在将音视频处理从 Java 迁移到 Python 的过程中,我们遇到了一些技术挑战,特别是在性能优化和库支持方面的差异。Java 在音视频处理方面有强大的支持,但我们需要转向 Python,因为 Python 在处理高并发、数据处理和与 WebRTC 结合方面具有更强的灵活性。尤其是在音视频处理的实时性要求上,Python 的协程和异步编程模型(asyncio)大大简化了我们对实时数据流的管理。

迁移过程中,我们特别关注了如何在 Python 中更高效地管理音频和视频数据的处理。尤其是在避免磁盘 I/O 操作上,我们将所有数据加载到内存中,减少了与文件系统交互的延迟。尽管 Python 在多线程和多进程支持上与 Java 略有不同,但通过合理的线程池和异步编程设计,系统的性能得到了充分保障。

1.2 WebRTC vs WebSocket:为何选择 WebRTC?

WebSocket 是最初用于实时通信的协议,广泛应用于我们之前的音视频推送架构中。然而,随着音视频数据量和实时性要求的提升,我们发现 WebSocket 在传输音视频流时存在较高的延迟,尤其是在网络质量较差的环境下,WebSocket 无法有效保证实时性和多路流的同步。

与 WebSocket 相比,WebRTC 专为低延迟、高质量的音视频通信设计,能够在复杂的网络条件下保持稳定的连接。WebRTC 内置了针对网络抖动、丢包和延迟的容错机制,支持更加高效的数据传输,尤其适合实时音视频应用。因此,我们决定将 WebSocket 替换为 WebRTC,以确保音视频推送能够顺畅进行,且延迟最低。

WebRTC 的协议堆栈包括 STUN、TURN 和 ICE 三大核心协议。STUN 负责 NAT 穿越,TURN 用于通过中继服务器转发数据,而 ICE 则帮助 WebRTC 选择最佳的传输路径,以适应不同的网络环境。

二、音视频数据的实时处理

WebRTC 使用 VideoStreamTrackAudioStreamTrack 对象来处理音视频流。这些对象分别负责接收和发送视频帧和音频数据。RTCPeerConnection 将它们包装在会话中,处理媒体协商、编码、解码等任务。

  1. 音视频流发送与接收:在 Python 中,我们通过 VideoStreamTrackAudioStreamTrack 实现视频和音频的流传输。这些流通过 RTCPeerConnection 对象管理,以保证数据按时间顺序传送并保持同步。

  2. 协商与数据交换:在 WebRTC 连接中,两个端点需要通过 SDP(Session Description Protocol)交换媒体协商信息。通过这种方式,WebRTC 确定双方的编解码器、分辨率、帧率等参数。

2.1 音频处理

音频数据的实时处理同样至关重要。为了确保音视频同步传输,我们使用了 SingleFrameAudioStreamTrack 类来管理音频流。在每次接收到新的音频数据时,音频帧会被加入到队列中,并通过 recv() 方法实时推送。

与传统的磁盘存储不同,我们的音频数据完全通过内存进行传输。音频数据每20毫秒进行分帧推送,确保了音频数据的高效传输和低延迟。此外,音频帧的同步与视频帧的同步通过时间戳管理,确保了音视频同步播放。

以下是音频数据推送的代码实现:

class SingleFrameAudioStreamTrack(AudioStreamTrack):
    async def recv(self):
        while not self.audio_queue:  # Wait until there's audio data in the queue
            await asyncio.sleep(0.005)  # Sleep briefly and retry if no data

        audio_data = self.audio_queue.popleft()  # Get the next chunk of audio data
        samples = audio_data.shape[0]  # Get the number of samples in the audio data

        # Create an audio frame to send over the WebRTC stream
        frame = AudioFrame(format="s16", layout="mono", samples=samples)
        frame.sample_rate = self.sample_rate  # Set the sample rate
        frame.time_base = fractions.Fraction(1, self.sample_rate)  # Time base is set to sample rate

        # Add the audio data into the frame and update the frame
        frame.planes[0].update(audio_data.tobytes())  # Convert data to bytes and store in frame
        frame.pts = self._timestamp  # Set the timestamp (presentation time)
        self._timestamp += samples  # Increment timestamp for the next frame
        
        return frame  # Return the audio frame to be transmitted

2.2 视频处理

在视频处理方面,我们使用 SingleFrameVideoStreamTrack 类逐帧处理视频数据。每个视频帧的时间戳通过帧率控制,保证了视频的连续性。在推送过程中,视频帧直接从内存传输至 WebRTC 流,避免了磁盘操作的性能瓶颈。

音视频同步是此架构优化的核心之一。每当推送一个视频帧时,我们根据视频帧的时间戳同步推送音频数据。以下是关键的同步推送代码:

class SingleFrameVideoStreamTrack(VideoStreamTrack):
    async def recv(self):
        async with self._lock:  # Ensure thread-safe access to the frame
            if isinstance(self._current_frame, VideoFrame):
                frame = self._current_frame  # If current frame is a VideoFrame, use it
            else:
                # Otherwise, convert the current frame data (numpy array) to a VideoFrame
                frame = VideoFrame.from_ndarray(self._current_frame, format='bgr24')

            # Set the timestamp (PTS) for the video frame
            frame.pts = self._timestamp
            frame.time_base = self._time_base  # Time base for the frame
            
            # Increment timestamp based on the frame rate (30fps in this case)
            self._timestamp += 3300  # 30fps, so each frame's timestamp is incremented by 3300 units (33 ms per frame)
            
            return frame  # Return the video frame to be transmitted

2.3 音视频同步的实现

音视频同步是推送音视频数据时需要处理的核心问题。在实际应用中,音频和视频的推送需要保持在同一时间窗口内,以确保它们的同步播放。在代码中,我们通过以下方法来实现音视频同步:

  1. 同步推送音频和视频数据:

    • push_av_segment() 方法中,我们会根据视频帧的时间戳推送对应的音频数据。每一帧视频的时间戳会指引我们推送对应时间段内的音频数据。

    • 通过 await asyncio.sleep(0.02) 控制每段音频的推送间隔,确保音频数据在每 20 毫秒的间隔内被推送。这个时间间隔与视频帧的持续时间(33 毫秒)相配合,确保了同步。

  2. 控制音频帧的推送速率:

    • 我们根据视频帧的时间戳和音频数据的长度,动态调整每次音频数据的推送量,避免由于网络延迟或数据丢失导致的音视频不同步。

以下是音视频同步推送的完整代码:

async def push_av_segment(segment_index):
    """Synchronously push audio and video segment"""

    try:
        frames = global_frame_map[segment_index]
        waveform = global_audio_frame_map[segment_index]
        sample_rate = 24000  # Audio sample rate (24kHz)
        fps = 33  # Frames per second for video (33fps)
        
        # Calculate the audio duration in seconds
        audio_duration = len(waveform) / sample_rate
        
        # Calculate the total number of video frames required for this audio duration
        video_frame_count = min(len(frames), int(audio_duration * fps))

        # Define chunk size for audio (20ms per chunk)
        chunk_samples = int(0.02 * sample_rate)  # 20ms audio chunk
        audio_pos = 0

        # Define frame duration (in seconds)
        frame_duration = 1 / fps
        
        start_time = time.time()  # Start timing to ensure accurate frame pacing

        # Loop through the video frames and sync with audio
        for frame_idx in range(video_frame_count):
            # Convert video frame to WebRTC format and update the track
            video_frame = VideoFrame.from_ndarray(frames[frame_idx], format='bgr24')
            await track.update_frame(video_frame)

            # Calculate the expected position for the corresponding audio frame
            expected_audio_pos = int(frame_idx * frame_duration * sample_rate)
            
            # Push the corresponding audio chunks while the audio position is less than the expected position
            while audio_pos < expected_audio_pos and audio_pos < len(waveform):
                chunk_end = min(audio_pos + chunk_samples, len(waveform))
                chunk = waveform[audio_pos:chunk_end]
                
                # If the chunk size is smaller than expected, pad it to ensure consistency
                if len(chunk) < chunk_samples:
                    chunk = np.pad(chunk, (0, chunk_samples - len(chunk)))

                # Push the audio data (converted to int16 format) to the audio track
                audio_track.push_audio_data((chunk * 32767).astype(np.int16).reshape(-1, 1))
                audio_pos = chunk_end

                # Sleep to maintain audio frame pacing (20ms delay)
                await asyncio.sleep(0.02)

            # Control video frame rate by comparing elapsed time with expected frame time
            elapsed = time.time() - start_time
            expected_time = (frame_idx + 1) * frame_duration
            if elapsed < expected_time:
                await asyncio.sleep(expected_time - elapsed)

    except Exception as e:
        print(f"❌ Segment {segment_index} push failed: {str(e)}")

三、Python 与 WebRTC 的协同工作

WebRTC 协议的优势在于它专门为实时数据传输而设计。我们通过 RTCPeerConnection 来建立 WebRTC 连接,并利用 VideoStreamTrackAudioStreamTrack 进行音视频流的发送与接收。以下是 WebRTC 配置的核心代码:

ice_servers = [RTCIceServer(
    urls="turn:freestun.net:3478",  # TURN 服务器的地址
    username="free",  # 用户名
    credential="free"  # 密码
)]
configuration = RTCConfiguration(ice_servers)
pc = RTCPeerConnection(configuration=configuration)

WebRTC 的协议堆栈包括 STUN、TURN 和 ICE,它们分别承担着以下任务:

  • STUN (Session Traversal Utilities for NAT):用来穿越 NAT(网络地址转换)和防火墙,确定客户端的公网地址。

  • TURN (Traversal Using Relays around NAT):在 STUN 无法穿越防火墙或 NAT 时,TURN 作为中继服务器转发数据包,确保通信稳定。

  • ICE (Interactive Connectivity Establishment):通过 ICE,WebRTC 可以寻找最合适的网络路径,从而优化数据传输。

通过配置 TURN 服务器,WebRTC 在复杂网络环境下依然能够稳定运行。RTCPeerConnection 的使用使得我们能够在保证音视频同步的情况下顺畅推送数据。

四、系统优化与展望

性能优化的其他方法

在去除磁盘 I/O 操作后,我们的系统已经大幅度提高了实时性,但我们依然可以进一步提升系统的性能。例如,利用内存池和高效的数据结构来减少内存分配的开销,使用异步 IO 来优化网络请求的处理速度等。此外,通过并行处理多个视频流,或者通过分布式处理来提升处理能力,可以进一步减少系统的负载,提升响应速度。

音视频同步的复杂性

音视频同步不仅仅是基于时间戳的简单操作。它还需要考虑网络延迟、缓冲区管理、数据丢失等因素。例如,我们可能需要在推送视频帧的过程中,通过适当的缓冲机制处理视频帧的突发数据。我们还可以通过更复杂的算法来调整音频和视频帧的速率,使其能够在不丢失数据的情况下实现流畅播放。

系统的可扩展性

随着音视频处理需求的提升,我们需要考虑如何使系统具备更高的可扩展性。通过水平扩展(如负载均衡、分布式部署等),我们能够支持更大规模的并发音视频流处理。对于大规模实时应用,云端部署和微服务架构将帮助我们更好地管理和分配资源,提高系统的稳定性和扩展能力。