.png)
数字人系列(7):迁移架构与性能提升——从 Java 到 Python 的音视频实时推送优化
在我们的项目中,曾经依赖于 Java 实现的音视频处理功能已经成功迁移至 Python,经过这次转变,整个系统架构得到了显著的优化,尤其是在视频推流方面。最为关键的改进之一是,从 WebSocket 传输协议过渡到 WebRTC,实现了更高效、低延迟的音视频数据传输。此外,我们通过将所有音频和图像数据直接加载到内存中,避免了磁盘 I/O 操作,这对于实时性要求较高的系统来说,是一次革命性的提升。
一、框架重构
1.1 Java 迁移到 Python的挑战与解决方案
在将音视频处理从 Java 迁移到 Python 的过程中,我们遇到了一些技术挑战,特别是在性能优化和库支持方面的差异。Java 在音视频处理方面有强大的支持,但我们需要转向 Python,因为 Python 在处理高并发、数据处理和与 WebRTC 结合方面具有更强的灵活性。尤其是在音视频处理的实时性要求上,Python 的协程和异步编程模型(asyncio
)大大简化了我们对实时数据流的管理。
迁移过程中,我们特别关注了如何在 Python 中更高效地管理音频和视频数据的处理。尤其是在避免磁盘 I/O 操作上,我们将所有数据加载到内存中,减少了与文件系统交互的延迟。尽管 Python 在多线程和多进程支持上与 Java 略有不同,但通过合理的线程池和异步编程设计,系统的性能得到了充分保障。
1.2 WebRTC vs WebSocket:为何选择 WebRTC?
WebSocket 是最初用于实时通信的协议,广泛应用于我们之前的音视频推送架构中。然而,随着音视频数据量和实时性要求的提升,我们发现 WebSocket 在传输音视频流时存在较高的延迟,尤其是在网络质量较差的环境下,WebSocket 无法有效保证实时性和多路流的同步。
与 WebSocket 相比,WebRTC 专为低延迟、高质量的音视频通信设计,能够在复杂的网络条件下保持稳定的连接。WebRTC 内置了针对网络抖动、丢包和延迟的容错机制,支持更加高效的数据传输,尤其适合实时音视频应用。因此,我们决定将 WebSocket 替换为 WebRTC,以确保音视频推送能够顺畅进行,且延迟最低。
WebRTC 的协议堆栈包括 STUN、TURN 和 ICE 三大核心协议。STUN 负责 NAT 穿越,TURN 用于通过中继服务器转发数据,而 ICE 则帮助 WebRTC 选择最佳的传输路径,以适应不同的网络环境。
二、音视频数据的实时处理
WebRTC 使用 VideoStreamTrack
和 AudioStreamTrack
对象来处理音视频流。这些对象分别负责接收和发送视频帧和音频数据。RTCPeerConnection
将它们包装在会话中,处理媒体协商、编码、解码等任务。
音视频流发送与接收:在 Python 中,我们通过
VideoStreamTrack
和AudioStreamTrack
实现视频和音频的流传输。这些流通过RTCPeerConnection
对象管理,以保证数据按时间顺序传送并保持同步。协商与数据交换:在 WebRTC 连接中,两个端点需要通过 SDP(Session Description Protocol)交换媒体协商信息。通过这种方式,WebRTC 确定双方的编解码器、分辨率、帧率等参数。
2.1 音频处理
音频数据的实时处理同样至关重要。为了确保音视频同步传输,我们使用了 SingleFrameAudioStreamTrack
类来管理音频流。在每次接收到新的音频数据时,音频帧会被加入到队列中,并通过 recv()
方法实时推送。
与传统的磁盘存储不同,我们的音频数据完全通过内存进行传输。音频数据每20毫秒进行分帧推送,确保了音频数据的高效传输和低延迟。此外,音频帧的同步与视频帧的同步通过时间戳管理,确保了音视频同步播放。
以下是音频数据推送的代码实现:
class SingleFrameAudioStreamTrack(AudioStreamTrack):
async def recv(self):
while not self.audio_queue: # Wait until there's audio data in the queue
await asyncio.sleep(0.005) # Sleep briefly and retry if no data
audio_data = self.audio_queue.popleft() # Get the next chunk of audio data
samples = audio_data.shape[0] # Get the number of samples in the audio data
# Create an audio frame to send over the WebRTC stream
frame = AudioFrame(format="s16", layout="mono", samples=samples)
frame.sample_rate = self.sample_rate # Set the sample rate
frame.time_base = fractions.Fraction(1, self.sample_rate) # Time base is set to sample rate
# Add the audio data into the frame and update the frame
frame.planes[0].update(audio_data.tobytes()) # Convert data to bytes and store in frame
frame.pts = self._timestamp # Set the timestamp (presentation time)
self._timestamp += samples # Increment timestamp for the next frame
return frame # Return the audio frame to be transmitted
2.2 视频处理
在视频处理方面,我们使用 SingleFrameVideoStreamTrack
类逐帧处理视频数据。每个视频帧的时间戳通过帧率控制,保证了视频的连续性。在推送过程中,视频帧直接从内存传输至 WebRTC 流,避免了磁盘操作的性能瓶颈。
音视频同步是此架构优化的核心之一。每当推送一个视频帧时,我们根据视频帧的时间戳同步推送音频数据。以下是关键的同步推送代码:
class SingleFrameVideoStreamTrack(VideoStreamTrack):
async def recv(self):
async with self._lock: # Ensure thread-safe access to the frame
if isinstance(self._current_frame, VideoFrame):
frame = self._current_frame # If current frame is a VideoFrame, use it
else:
# Otherwise, convert the current frame data (numpy array) to a VideoFrame
frame = VideoFrame.from_ndarray(self._current_frame, format='bgr24')
# Set the timestamp (PTS) for the video frame
frame.pts = self._timestamp
frame.time_base = self._time_base # Time base for the frame
# Increment timestamp based on the frame rate (30fps in this case)
self._timestamp += 3300 # 30fps, so each frame's timestamp is incremented by 3300 units (33 ms per frame)
return frame # Return the video frame to be transmitted
2.3 音视频同步的实现
音视频同步是推送音视频数据时需要处理的核心问题。在实际应用中,音频和视频的推送需要保持在同一时间窗口内,以确保它们的同步播放。在代码中,我们通过以下方法来实现音视频同步:
同步推送音频和视频数据:
在
push_av_segment()
方法中,我们会根据视频帧的时间戳推送对应的音频数据。每一帧视频的时间戳会指引我们推送对应时间段内的音频数据。通过
await asyncio.sleep(0.02)
控制每段音频的推送间隔,确保音频数据在每 20 毫秒的间隔内被推送。这个时间间隔与视频帧的持续时间(33 毫秒)相配合,确保了同步。
控制音频帧的推送速率:
我们根据视频帧的时间戳和音频数据的长度,动态调整每次音频数据的推送量,避免由于网络延迟或数据丢失导致的音视频不同步。
以下是音视频同步推送的完整代码:
async def push_av_segment(segment_index):
"""Synchronously push audio and video segment"""
try:
frames = global_frame_map[segment_index]
waveform = global_audio_frame_map[segment_index]
sample_rate = 24000 # Audio sample rate (24kHz)
fps = 33 # Frames per second for video (33fps)
# Calculate the audio duration in seconds
audio_duration = len(waveform) / sample_rate
# Calculate the total number of video frames required for this audio duration
video_frame_count = min(len(frames), int(audio_duration * fps))
# Define chunk size for audio (20ms per chunk)
chunk_samples = int(0.02 * sample_rate) # 20ms audio chunk
audio_pos = 0
# Define frame duration (in seconds)
frame_duration = 1 / fps
start_time = time.time() # Start timing to ensure accurate frame pacing
# Loop through the video frames and sync with audio
for frame_idx in range(video_frame_count):
# Convert video frame to WebRTC format and update the track
video_frame = VideoFrame.from_ndarray(frames[frame_idx], format='bgr24')
await track.update_frame(video_frame)
# Calculate the expected position for the corresponding audio frame
expected_audio_pos = int(frame_idx * frame_duration * sample_rate)
# Push the corresponding audio chunks while the audio position is less than the expected position
while audio_pos < expected_audio_pos and audio_pos < len(waveform):
chunk_end = min(audio_pos + chunk_samples, len(waveform))
chunk = waveform[audio_pos:chunk_end]
# If the chunk size is smaller than expected, pad it to ensure consistency
if len(chunk) < chunk_samples:
chunk = np.pad(chunk, (0, chunk_samples - len(chunk)))
# Push the audio data (converted to int16 format) to the audio track
audio_track.push_audio_data((chunk * 32767).astype(np.int16).reshape(-1, 1))
audio_pos = chunk_end
# Sleep to maintain audio frame pacing (20ms delay)
await asyncio.sleep(0.02)
# Control video frame rate by comparing elapsed time with expected frame time
elapsed = time.time() - start_time
expected_time = (frame_idx + 1) * frame_duration
if elapsed < expected_time:
await asyncio.sleep(expected_time - elapsed)
except Exception as e:
print(f"❌ Segment {segment_index} push failed: {str(e)}")
三、Python 与 WebRTC 的协同工作
WebRTC 协议的优势在于它专门为实时数据传输而设计。我们通过 RTCPeerConnection
来建立 WebRTC 连接,并利用 VideoStreamTrack
和 AudioStreamTrack
进行音视频流的发送与接收。以下是 WebRTC 配置的核心代码:
ice_servers = [RTCIceServer(
urls="turn:freestun.net:3478", # TURN 服务器的地址
username="free", # 用户名
credential="free" # 密码
)]
configuration = RTCConfiguration(ice_servers)
pc = RTCPeerConnection(configuration=configuration)
WebRTC 的协议堆栈包括 STUN、TURN 和 ICE,它们分别承担着以下任务:
STUN (Session Traversal Utilities for NAT):用来穿越 NAT(网络地址转换)和防火墙,确定客户端的公网地址。
TURN (Traversal Using Relays around NAT):在 STUN 无法穿越防火墙或 NAT 时,TURN 作为中继服务器转发数据包,确保通信稳定。
ICE (Interactive Connectivity Establishment):通过 ICE,WebRTC 可以寻找最合适的网络路径,从而优化数据传输。
通过配置 TURN 服务器,WebRTC 在复杂网络环境下依然能够稳定运行。RTCPeerConnection
的使用使得我们能够在保证音视频同步的情况下顺畅推送数据。
四、系统优化与展望
性能优化的其他方法
在去除磁盘 I/O 操作后,我们的系统已经大幅度提高了实时性,但我们依然可以进一步提升系统的性能。例如,利用内存池和高效的数据结构来减少内存分配的开销,使用异步 IO 来优化网络请求的处理速度等。此外,通过并行处理多个视频流,或者通过分布式处理来提升处理能力,可以进一步减少系统的负载,提升响应速度。
音视频同步的复杂性
音视频同步不仅仅是基于时间戳的简单操作。它还需要考虑网络延迟、缓冲区管理、数据丢失等因素。例如,我们可能需要在推送视频帧的过程中,通过适当的缓冲机制处理视频帧的突发数据。我们还可以通过更复杂的算法来调整音频和视频帧的速率,使其能够在不丢失数据的情况下实现流畅播放。
系统的可扩展性
随着音视频处理需求的提升,我们需要考虑如何使系统具备更高的可扩展性。通过水平扩展(如负载均衡、分布式部署等),我们能够支持更大规模的并发音视频流处理。对于大规模实时应用,云端部署和微服务架构将帮助我们更好地管理和分配资源,提高系统的稳定性和扩展能力。
- 感谢你赐予我前进的力量