用C# .NET CORE 传递NAudio音频到OpenAI Realtime
我是在DI(反射注入)的时候给 SocketAudioStream 注册为单例模式,因为展示使用是一个单路,他不需要注意释放,他只要一直打开就可以,因为流在任何时候(即使是静音的时候)也会传递,为了避免流写入错误,这里我没有做释放。这里realtime需要传递24000采样率的音频,这种音频在局域网可以传,在互联网是不行的,会有严重丢包。需要做音频缓冲才可以,这里我就不深入研究了。开放区域的机房距
那这事情得接上一个文章:关于小程序和蓝牙信标的一些尝试-CSDN博客
展厅中有一个开放区域,里面使用OpenAI 驱动一个虚拟人进行一些开放式对话,但是开放区域的指向麦克风有的时候并不能听清楚。
开放区域的机房距离上一个文章中的功放和中控距离大概有50米,走线距离得有100米左右,走线很麻烦,所以放弃这个方案吧。
走线方案是从卡农头输出一个AUX音频到声卡mic端,这样实现会变的很容易。
那么我就要改造一下我的代码,由于不方便公开代码具体细节,而且我的代码大部分是模仿 AOAI Realtime Audio Sdk Sample
要使用的NuGet包:
<PackageReference Include="NAudio" Version="2.2.1" />
<PackageReference Include="NetCoreServer" Version="8.0.7" />
官方示例中麦克风直通音频是这样实现的:
await foreach (ConversationUpdate update in session.ReceiveUpdatesAsync())
{
// session.created is the very first command on a session and lets us know that connection was successful.
if (update is ConversationSessionStartedUpdate)
{
Console.WriteLine($" <<< Connected: session started");
// This is a good time to start capturing microphone input and sending audio to the service. The
// input stream will be chunked and sent asynchronously, so we don't need to await anything in the
// processing loop.
_ = Task.Run(async () =>
{
using MicrophoneAudioStream microphoneInput = MicrophoneAudioStream.Start();
Console.WriteLine($" >>> Listening to microphone input");
Console.WriteLine($" >>> (Just tell the app you're done to finish)");
Console.WriteLine();
await session.SendInputAudioAsync(microphoneInput);
});
}
// 省略
}
using NAudio.Wave;
#nullable disable
/// <summary>
/// Uses the NAudio library (https://github.com/naudio/NAudio) to provide a rudimentary abstraction of microphone
/// input as a stream.
/// </summary>
public class MicrophoneAudioStream : Stream, IDisposable
{
private const int SAMPLES_PER_SECOND = 24000;
private const int BYTES_PER_SAMPLE = 2;
private const int CHANNELS = 1;
// For simplicity, this is configured to use a static 10-second ring buffer.
private readonly byte[] _buffer = new byte[BYTES_PER_SAMPLE * SAMPLES_PER_SECOND * CHANNELS * 10];
private readonly object _bufferLock = new();
private int _bufferReadPos = 0;
private int _bufferWritePos = 0;
private readonly WaveInEvent _waveInEvent;
private MicrophoneAudioStream()
{
_waveInEvent = new()
{
WaveFormat = new WaveFormat(SAMPLES_PER_SECOND, BYTES_PER_SAMPLE * 8, CHANNELS),
};
_waveInEvent.DataAvailable += (_, e) =>
{
lock (_bufferLock)
{
int bytesToCopy = e.BytesRecorded;
if (_bufferWritePos + bytesToCopy >= _buffer.Length)
{
int bytesToCopyBeforeWrap = _buffer.Length - _bufferWritePos;
Array.Copy(e.Buffer, 0, _buffer, _bufferWritePos, bytesToCopyBeforeWrap);
bytesToCopy -= bytesToCopyBeforeWrap;
_bufferWritePos = 0;
}
Array.Copy(e.Buffer, e.BytesRecorded - bytesToCopy, _buffer, _bufferWritePos, bytesToCopy);
_bufferWritePos += bytesToCopy;
}
};
_waveInEvent.StartRecording();
}
public static MicrophoneAudioStream Start() => new();
public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => false;
public override long Length => throw new NotImplementedException();
public override long Position { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
public override void Flush()
{
throw new NotImplementedException();
}
public override int Read(byte[] buffer, int offset, int count)
{
int totalCount = count;
int GetBytesAvailable() => _bufferWritePos < _bufferReadPos
? _bufferWritePos + (_buffer.Length - _bufferReadPos)
: _bufferWritePos - _bufferReadPos;
// For simplicity, we'll block until all requested data is available and not perform partial reads.
while (GetBytesAvailable() < count)
{
Thread.Sleep(100);
}
lock (_bufferLock)
{
if (_bufferReadPos + count >= _buffer.Length)
{
int bytesBeforeWrap = _buffer.Length - _bufferReadPos;
Array.Copy(
sourceArray: _buffer,
sourceIndex: _bufferReadPos,
destinationArray: buffer,
destinationIndex: offset,
length: bytesBeforeWrap);
_bufferReadPos = 0;
count -= bytesBeforeWrap;
offset += bytesBeforeWrap;
}
Array.Copy(_buffer, _bufferReadPos, buffer, offset, count);
_bufferReadPos += count;
}
return totalCount;
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotImplementedException();
}
public override void SetLength(long value)
{
throw new NotImplementedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotImplementedException();
}
protected override void Dispose(bool disposing)
{
_waveInEvent?.Dispose();
base.Dispose(disposing);
}
}
可以看到麦克风的流是从 MicrophoneAudioStream 过来的。
我要做的是模拟一个Stream 来接收UDP信号
using Microsoft.Extensions.Logging;
using NAudio.Wave;
namespace MCCO.Avatar.Agent.Utility
{
/// <summary>
/// 单例注册
/// 用于接收与 MicrophoneAudioStream 相同格式的音频二进制流。
/// </summary>
public class SocketAudioStream : Stream
{
private const int SAMPLES_PER_SECOND = 24000;
private const int BYTES_PER_SAMPLE = 2;
private const int CHANNELS = 1;
// For simplicity, this is configured to use a static 10-second ring buffer.
private readonly byte[] _buffer = new byte[BYTES_PER_SAMPLE * SAMPLES_PER_SECOND * CHANNELS * 10];
private readonly object _bufferLock = new();
private int _bufferReadPos = 0;
private int _bufferWritePos = 0;
private readonly ILogger<SocketAudioStream> _logger;
public volatile bool IsRecording = false;
public event Action? RecordingStarted;
public event Action? RecordingStopped;
public event Action<byte[], int, int>? OnDataAvailable;
//debug保存wav文件
private bool enableDebugSaveWav = true;
private WaveFileWriter? _wavWriter;
private readonly object _wavLock = new();
private bool _wavWriterInitialized = false;
private readonly string _wavFilePath = $"quic_debug_{DateTime.Now:yyyyMMdd_HHmmss}.wav";
private readonly WaveFormat _waveFormat = new WaveFormat(24000, 16, 1); // 24kHz, 16bit, 1ch
public SocketAudioStream(ILogger<SocketAudioStream> logger)
{
_logger = logger;
}
public void StartRecording()
{
IsRecording = true;
_logger.LogWarning("开始网络获取录音");
}
public void StopRecording()
{
IsRecording = false;
lock (_bufferLock)
{
_logger.LogDebug("清空缓冲区");
_bufferReadPos = 0;
_bufferWritePos = 0;
Array.Clear(_buffer, 0, _buffer.Length);
}
_logger.LogWarning("停止网络获取录音");
}
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
int totalCount = count;
int GetBytesAvailable() => _bufferWritePos < _bufferReadPos
? _bufferWritePos + (_buffer.Length - _bufferReadPos)
: _bufferWritePos - _bufferReadPos;
// For simplicity, we'll block until all requested data is available and not perform partial reads.
while (GetBytesAvailable() < count || !IsRecording)
{
await Task.Delay(100, cancellationToken);
}
lock (_bufferLock)
{
if (_bufferReadPos + count >= _buffer.Length && IsRecording)
{
int bytesBeforeWrap = _buffer.Length - _bufferReadPos;
Array.Copy(
sourceArray: _buffer,
sourceIndex: _bufferReadPos,
destinationArray: buffer,
destinationIndex: offset,
length: bytesBeforeWrap);
_bufferReadPos = 0;
count -= bytesBeforeWrap;
offset += bytesBeforeWrap;
}
Array.Copy(_buffer, _bufferReadPos, buffer, offset, count);
_bufferReadPos += count;
}
return totalCount;
}
public override int Read(byte[] buffer, int offset, int count)
{
return ReadAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult();
}
public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => true;
public override long Length => throw new NotSupportedException();
public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }
public override void Flush() => throw new NotSupportedException();
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
public override void SetLength(long value) => throw new NotSupportedException();
/// <summary>
/// 这里相当于 WaveInEvent 触发了 DataAvailable 事件。
/// </summary>
/// <param name="buffer">这个参数相当于waveInEvent.DataAvailable 中e.Buffer</param>
/// <param name="offset">这里传递结果会是0</param>
/// <param name="count">这个参数相当于waveInEvent.DataAvailable 中e.BytesRecorded</param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public override void Write(byte[] buffer, int offset, int count)
{
if (!IsRecording) return;
lock (_bufferLock)
{
int bytesToCopy = count;
if (_bufferWritePos + bytesToCopy >= _buffer.Length)
{
int bytesToCopyBeforeWrap = _buffer.Length - _bufferWritePos;
Array.Copy(buffer, 0, _buffer, _bufferWritePos, bytesToCopyBeforeWrap);
bytesToCopy -= bytesToCopyBeforeWrap;
_bufferWritePos = 0;
}
Array.Copy(buffer, count - bytesToCopy, _buffer, _bufferWritePos, bytesToCopy);
_bufferWritePos += bytesToCopy;
OnDataAvailable?.Invoke(buffer, 0, count);
#if DEBUG
if (enableDebugSaveWav)
{
if (!_wavWriterInitialized)
{
_wavWriter = new WaveFileWriter(_wavFilePath, _waveFormat);
_wavWriterInitialized = true;
}
_wavWriter?.Write(buffer, offset, count);
_wavWriter?.Flush();
}
#endif
}
}
}
}
然后做一个UDP服务,接收客户端发送的音频流:
public class UdpService : UdpServer
{
public event Action<byte[], int, int>? OnDataReceived;
public UdpService(IPAddress address, int port) : base(address, port)
{
}
protected override void OnStarted()
{
// Start receive datagrams
ReceiveAsync();
}
protected override void OnReceived(EndPoint endpoint, byte[] buffer, long offset, long size)
{
OnDataReceived?.Invoke(buffer, (int)offset, (int)size);
ReceiveAsync();
}
}
这样我只需要监听OnDataReceived 事件就可以获得到音频流并给他写入 SocketAudioStream
public class Main
{
//伪代码: 记得做声明
public MainForm()
{
_udpService = new UdpService("127.0.0.1", "8085");
_udpService.OnDataReceived += UdpService_OnDataReceived;
}
private void UdpService_OnDataReceived(byte[] data, int offset, int count)
{
_audioStream.Write(data, offset, count);
}
}
在Openai的ConversationSessionStartedUpdate阶段 判断输入来源就可以:
await foreach (ConversationUpdate update in session.ReceiveUpdatesAsync())
{
if (update is ConversationSessionStartedUpdate)
{
Console.WriteLine($" <<< Connected: session started");
_ = Task.Run(async () =>
{
//开始realtime之前要确定是麦克风输入还是UDP输入
if(mic)
{
using MicrophoneAudioStream microphoneInput = MicrophoneAudioStream.Start();
await session.SendInputAudioAsync(microphoneInput);
}else{
//这里用完记得卸载
_audioStream.OnDataAvailable += OnDataAvailableStream;
_audioStream.StartRecording();
await session.SendInputAudioAsync(_audioStream);
}
});
}
// 省略
}
OnDataAvailableStream的用处是向上返回一个音频流用来显示电平信号,可以省略。
我是在DI(反射注入)的时候给 SocketAudioStream 注册为单例模式,因为展示使用是一个单路,他不需要注意释放,他只要一直打开就可以,因为流在任何时候(即使是静音的时候)也会传递,为了避免流写入错误,这里我没有做释放。如果您在参考我的代码,请按照实际情况释放Stream。
这里realtime需要传递24000采样率的音频,这种音频在局域网可以传,在互联网是不行的,会有严重丢包。需要做音频缓冲才可以,这里我就不深入研究了。
更多推荐


所有评论(0)