那这事情得接上一个文章:关于小程序和蓝牙信标的一些尝试-CSDN博客 

展厅中有一个开放区域,里面使用OpenAI 驱动一个虚拟人进行一些开放式对话,但是开放区域的指向麦克风有的时候并不能听清楚。

开放区域的机房距离上一个文章中的功放和中控距离大概有50米,走线距离得有100米左右,走线很麻烦,所以放弃这个方案吧。

走线方案是从卡农头输出一个AUX音频到声卡mic端,这样实现会变的很容易。

那么我就要改造一下我的代码,由于不方便公开代码具体细节,而且我的代码大部分是模仿 AOAI Realtime Audio Sdk Sample

要使用的NuGet包:

<PackageReference Include="NAudio" Version="2.2.1" />

<PackageReference Include="NetCoreServer" Version="8.0.7" />

官方示例中麦克风直通音频是这样实现的:

await foreach (ConversationUpdate update in session.ReceiveUpdatesAsync())
{
            // session.created is the very first command on a session and lets us know that connection was successful.
    if (update is ConversationSessionStartedUpdate)
    {
        Console.WriteLine($" <<< Connected: session started");
        // This is a good time to start capturing microphone input and sending audio to the service. The
        // input stream will be chunked and sent asynchronously, so we don't need to await anything in the
        // processing loop.
        _ = Task.Run(async () =>
        {
            using MicrophoneAudioStream microphoneInput = MicrophoneAudioStream.Start();
            Console.WriteLine($" >>> Listening to microphone input");
            Console.WriteLine($" >>> (Just tell the app you're done to finish)");
            Console.WriteLine();
            await session.SendInputAudioAsync(microphoneInput);
         });
    }
// 省略
}
using NAudio.Wave;

#nullable disable

/// <summary>
/// Uses the NAudio library (https://github.com/naudio/NAudio) to provide a rudimentary abstraction of microphone
/// input as a stream.
/// </summary>
public class MicrophoneAudioStream : Stream, IDisposable
{
    private const int SAMPLES_PER_SECOND = 24000;
    private const int BYTES_PER_SAMPLE = 2;
    private const int CHANNELS = 1;

    // For simplicity, this is configured to use a static 10-second ring buffer.
    private readonly byte[] _buffer = new byte[BYTES_PER_SAMPLE * SAMPLES_PER_SECOND * CHANNELS * 10];
    private readonly object _bufferLock = new();
    private int _bufferReadPos = 0;
    private int _bufferWritePos = 0;

    private readonly WaveInEvent _waveInEvent;

    private MicrophoneAudioStream()
    {
        _waveInEvent = new()
        {
            WaveFormat = new WaveFormat(SAMPLES_PER_SECOND, BYTES_PER_SAMPLE * 8, CHANNELS),
        };
        _waveInEvent.DataAvailable += (_, e) =>
        {
            lock (_bufferLock)
            {
                int bytesToCopy = e.BytesRecorded;
                if (_bufferWritePos + bytesToCopy >= _buffer.Length)
                {
                    int bytesToCopyBeforeWrap = _buffer.Length - _bufferWritePos;
                    Array.Copy(e.Buffer, 0, _buffer, _bufferWritePos, bytesToCopyBeforeWrap);
                    bytesToCopy -= bytesToCopyBeforeWrap;
                    _bufferWritePos = 0;
                }
                Array.Copy(e.Buffer, e.BytesRecorded - bytesToCopy, _buffer, _bufferWritePos, bytesToCopy);
                _bufferWritePos += bytesToCopy;
            }
        };
        _waveInEvent.StartRecording();
    }

    public static MicrophoneAudioStream Start() => new();

    public override bool CanRead => true;

    public override bool CanSeek => false;

    public override bool CanWrite => false;

    public override long Length => throw new NotImplementedException();

    public override long Position { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }

    public override void Flush()
    {
        throw new NotImplementedException();
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        int totalCount = count;

        int GetBytesAvailable() => _bufferWritePos < _bufferReadPos
            ? _bufferWritePos + (_buffer.Length - _bufferReadPos)
            : _bufferWritePos - _bufferReadPos;

        // For simplicity, we'll block until all requested data is available and not perform partial reads.
        while (GetBytesAvailable() < count)
        {
            Thread.Sleep(100);
        }

        lock (_bufferLock)
        {
            if (_bufferReadPos + count >= _buffer.Length)
            {
                int bytesBeforeWrap = _buffer.Length - _bufferReadPos;
                Array.Copy(
                    sourceArray: _buffer,
                    sourceIndex: _bufferReadPos,
                    destinationArray: buffer,
                    destinationIndex: offset,
                    length: bytesBeforeWrap);
                _bufferReadPos = 0;
                count -= bytesBeforeWrap;
                offset += bytesBeforeWrap;
            }

            Array.Copy(_buffer, _bufferReadPos, buffer, offset, count);
            _bufferReadPos += count;
        }

        return totalCount;
    }

    public override long Seek(long offset, SeekOrigin origin)
    {
        throw new NotImplementedException();
    }

    public override void SetLength(long value)
    {
        throw new NotImplementedException();
    }

    public override void Write(byte[] buffer, int offset, int count)
    {
        throw new NotImplementedException();
    }

    protected override void Dispose(bool disposing)
    {
        _waveInEvent?.Dispose();
        base.Dispose(disposing);
    }
}

可以看到麦克风的流是从 MicrophoneAudioStream 过来的。

我要做的是模拟一个Stream 来接收UDP信号

using Microsoft.Extensions.Logging;
using NAudio.Wave;

namespace MCCO.Avatar.Agent.Utility
{
	/// <summary>
	/// 单例注册
	/// 用于接收与 MicrophoneAudioStream 相同格式的音频二进制流。
	/// </summary>
	public class SocketAudioStream : Stream
	{
		private const int SAMPLES_PER_SECOND = 24000;
		private const int BYTES_PER_SAMPLE = 2;
		private const int CHANNELS = 1;

		// For simplicity, this is configured to use a static 10-second ring buffer.
		private readonly byte[] _buffer = new byte[BYTES_PER_SAMPLE * SAMPLES_PER_SECOND * CHANNELS * 10];
		private readonly object _bufferLock = new();

		private int _bufferReadPos = 0;
		private int _bufferWritePos = 0;

		private readonly ILogger<SocketAudioStream> _logger;

		public volatile bool IsRecording = false;

		public event Action? RecordingStarted;
		public event Action? RecordingStopped;
		public event Action<byte[], int, int>? OnDataAvailable;

		//debug保存wav文件
		private bool enableDebugSaveWav = true;
		private WaveFileWriter? _wavWriter;
		private readonly object _wavLock = new();
		private bool _wavWriterInitialized = false;
		private readonly string _wavFilePath = $"quic_debug_{DateTime.Now:yyyyMMdd_HHmmss}.wav";
		private readonly WaveFormat _waveFormat = new WaveFormat(24000, 16, 1); // 24kHz, 16bit, 1ch

		public SocketAudioStream(ILogger<SocketAudioStream> logger)
		{
			_logger = logger;
		}


		public void StartRecording()
		{
			IsRecording = true;
			_logger.LogWarning("开始网络获取录音");
		}

		public void StopRecording()
		{
			IsRecording = false;
			lock (_bufferLock)
			{
				_logger.LogDebug("清空缓冲区");
				_bufferReadPos = 0;
				_bufferWritePos = 0;
				Array.Clear(_buffer, 0, _buffer.Length);
			}
			_logger.LogWarning("停止网络获取录音");
		}

		
		public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
		{
			int totalCount = count;

			int GetBytesAvailable() => _bufferWritePos < _bufferReadPos
				? _bufferWritePos + (_buffer.Length - _bufferReadPos)
				: _bufferWritePos - _bufferReadPos;

			// For simplicity, we'll block until all requested data is available and not perform partial reads.
			while (GetBytesAvailable() < count || !IsRecording)
			{
				await Task.Delay(100, cancellationToken);
			}

			lock (_bufferLock)
			{
				if (_bufferReadPos + count >= _buffer.Length && IsRecording)
				{
					int bytesBeforeWrap = _buffer.Length - _bufferReadPos;
					Array.Copy(
						sourceArray: _buffer,
						sourceIndex: _bufferReadPos,
						destinationArray: buffer,
						destinationIndex: offset,
						length: bytesBeforeWrap);
					_bufferReadPos = 0;
					count -= bytesBeforeWrap;
					offset += bytesBeforeWrap;
				}

				Array.Copy(_buffer, _bufferReadPos, buffer, offset, count);
				_bufferReadPos += count;
			}

			return totalCount;
		}

		public override int Read(byte[] buffer, int offset, int count)
		{
			return ReadAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult();
		}

		public override bool CanRead => true;
		public override bool CanSeek => false;
		public override bool CanWrite => true;

		public override long Length => throw new NotSupportedException();
		public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }
		public override void Flush() => throw new NotSupportedException();
		public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
		public override void SetLength(long value) => throw new NotSupportedException();

		/// <summary>
		/// 这里相当于 WaveInEvent 触发了 DataAvailable 事件。
		/// </summary>
		/// <param name="buffer">这个参数相当于waveInEvent.DataAvailable 中e.Buffer</param>
		/// <param name="offset">这里传递结果会是0</param>
		/// <param name="count">这个参数相当于waveInEvent.DataAvailable 中e.BytesRecorded</param>
		/// <param name="cancellationToken"></param>
		/// <returns></returns>
		public override void Write(byte[] buffer, int offset, int count)
		{
			if (!IsRecording) return;
			lock (_bufferLock)
			{
				int bytesToCopy = count;
				if (_bufferWritePos + bytesToCopy >= _buffer.Length)
				{
					int bytesToCopyBeforeWrap = _buffer.Length - _bufferWritePos;
					Array.Copy(buffer, 0, _buffer, _bufferWritePos, bytesToCopyBeforeWrap);
					bytesToCopy -= bytesToCopyBeforeWrap;
					_bufferWritePos = 0;
				}
				Array.Copy(buffer, count - bytesToCopy, _buffer, _bufferWritePos, bytesToCopy);
				_bufferWritePos += bytesToCopy;

				OnDataAvailable?.Invoke(buffer, 0, count);
#if DEBUG
				if (enableDebugSaveWav)
				{
					if (!_wavWriterInitialized)
					{
						_wavWriter = new WaveFileWriter(_wavFilePath, _waveFormat);
						_wavWriterInitialized = true;
					}
					_wavWriter?.Write(buffer, offset, count);
					_wavWriter?.Flush();
				}
#endif
			}
		}
	}
}

然后做一个UDP服务,接收客户端发送的音频流:

public class UdpService : UdpServer
{
	public event Action<byte[], int, int>? OnDataReceived;

	public UdpService(IPAddress address, int port) : base(address, port)
	{
	}

	protected override void OnStarted()
	{
		// Start receive datagrams
		ReceiveAsync();
	}

	protected override void OnReceived(EndPoint endpoint, byte[] buffer, long offset, long size)
	{
		OnDataReceived?.Invoke(buffer, (int)offset, (int)size);
		ReceiveAsync();
	}

}

这样我只需要监听OnDataReceived 事件就可以获得到音频流并给他写入 SocketAudioStream 

public class Main
{
    //伪代码: 记得做声明
    public MainForm()
    {
        _udpService = new UdpService("127.0.0.1", "8085");
        _udpService.OnDataReceived += UdpService_OnDataReceived;
    }

    private void UdpService_OnDataReceived(byte[] data, int offset, int count)
    {
        _audioStream.Write(data, offset, count);
    }
}

在Openai的ConversationSessionStartedUpdate阶段 判断输入来源就可以:

await foreach (ConversationUpdate update in session.ReceiveUpdatesAsync())
{
    if (update is ConversationSessionStartedUpdate)
    {
        Console.WriteLine($" <<< Connected: session started");
        _ = Task.Run(async () =>
        {
            //开始realtime之前要确定是麦克风输入还是UDP输入
            if(mic)
            {
                using MicrophoneAudioStream microphoneInput = MicrophoneAudioStream.Start();
                await session.SendInputAudioAsync(microphoneInput);
            }else{
                //这里用完记得卸载
                _audioStream.OnDataAvailable += OnDataAvailableStream;
                _audioStream.StartRecording();
                await session.SendInputAudioAsync(_audioStream);
            }
            
         });
    }
// 省略
}

OnDataAvailableStream的用处是向上返回一个音频流用来显示电平信号,可以省略。

我是在DI(反射注入)的时候给 SocketAudioStream 注册为单例模式,因为展示使用是一个单路,他不需要注意释放,他只要一直打开就可以,因为流在任何时候(即使是静音的时候)也会传递,为了避免流写入错误,这里我没有做释放。如果您在参考我的代码,请按照实际情况释放Stream。

这里realtime需要传递24000采样率的音频,这种音频在局域网可以传,在互联网是不行的,会有严重丢包。需要做音频缓冲才可以,这里我就不深入研究了。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐