在UE5.2(不包含5.2)之后,UE的HTTP模块已经实现了对流式接口的支持,我也是偶然间在写流式接接口时发现的,不过没有发现这方面的资料,现在来记录一下,实现流式接口接收百度的文心一言大模型。

注意:5.2之前的版本貌似是没有实现的,5.2之前的版本可能需要自己去扩展引擎的源码或者借助第三方库。

引入模块

首先在代码中引入对应模块,如果是在插件中实现,就在插件的Build.cs文件引入,如果在项目的Source中实现,就在项目的Build.cs文件引入,我这里是插件,所以在插件中引入

插件实现

我们在插件中创建名为AIFunction的文件,文件的类是继承UBlueprintAsyncActionBase

在头文件中,首先我们创建一个结构体,用于我们传入的请求参数,

USTRUCT(BlueprintType)
struct FAIRequest
{
	GENERATED_BODY()


	UPROPERTY(BlueprintReadWrite)
	FString URL;
	UPROPERTY(BlueprintReadWrite)
	TArray<FString> Message;
	UPROPERTY(BlueprintReadWrite)
	FString APIKey;
	UPROPERTY(BlueprintReadWrite)
	FString APISecret;
	UPROPERTY(BlueprintReadWrite)
	FString AppID;
	UPROPERTY(BlueprintReadWrite)
	FString access_token;
	UPROPERTY(BlueprintReadWrite)
	bool stream;
};

接着声明两个枚举类型,用来表示我们请求的AI类型与返回类型,用于蓝图中的多输出节点

UENUM(BlueprintType)
enum class EAIType  : uint8
{
  Baidu UMETA(DisplayName = "百度"),
	Tencent UMETA(DisplayName = "腾讯"),
	Ali UMETA(DisplayName = "阿里"),
	Dianxin UMETA(DisplayName = "电信"),
	XunFei UMETA(DisplayName = "讯飞")
};
UENUM()
enum class EResponseType  : uint8
{
	Completed,
	Failed,
	Stream
};

声明一个多播代理,用于在http返回时将数据传出

DECLARE_DYNAMIC_MULTICAST_DELEGATE_OneParam(FAIReceive,FString, Response);

接着我们声明一个SendAI函数,GetBaiduToken函数,BaiduAI函数,来实现调用百度的接口

UCLASS()
class PAASAIMODULE_API UAIFunction: public UBlueprintAsyncActionBase
{
	GENERATED_BODY()


public:


	UPROPERTY(BlueprintAssignable)
	FAIReceive OnCompleted;  //请求完成时
	UPROPERTY(BlueprintAssignable)
	FAIReceive OnFailed;    //请求失败时
	UPROPERTY(BlueprintAssignable)
	FAIReceive OnStream;     //流式请求时


	FHttpRequestStreamDelegate StreamDelegate; //用于http流式请求的代理


	
	UFUNCTION(BlueprintCallable,meta=( BlueprintInternalUseOnly="true" ))
	static  UAIFunction* SendAI( EAIType Type,const FAIRequest& Request);
	UFUNCTION(BlueprintCallable,meta=( BlueprintInternalUseOnly="true" ))
	static  UAIFunction* GetBaiduToken(const FString& apikey, const FString& secretkey);
	
protected:
	
	//自动生成token待完成
	void BaiDuAI(const FAIRequest& Request);


	void DianXinAI(const FAIRequest& Request);
};

我这里使用的异步蓝图节点,非阻塞式的请求http。对于蓝图的异步节点,大家可以自行了解。

完整的.h文件内容为下

// Fill out your copyright notice in the Description page of Project Settings.


#pragma once


#include "CoreMinimal.h"
#include "Interfaces/IHttpRequest.h"
#include "Kismet/BlueprintFunctionLibrary.h"
#include "Kismet/BlueprintAsyncActionBase.h"
#include "AIFunction.generated.h"




USTRUCT(BlueprintType)
struct FAIRequest
{
	GENERATED_BODY()


	UPROPERTY(BlueprintReadWrite)
	FString URL;
	UPROPERTY(BlueprintReadWrite)
	TArray<FString> Message;
	UPROPERTY(BlueprintReadWrite)
	FString APIKey;
	UPROPERTY(BlueprintReadWrite)
	FString APISecret;
	UPROPERTY(BlueprintReadWrite)
	FString AppID;
	UPROPERTY(BlueprintReadWrite)
	FString access_token;
	UPROPERTY(BlueprintReadWrite)
	bool stream;
};


UENUM()
enum class EResponseType  : uint8
{
	Completed,
	Failed,
	Stream
};


UENUM(BlueprintType)
enum class EAIType  : uint8
{
	Baidu UMETA(DisplayName = "百度"),
	Tencent UMETA(DisplayName = "腾讯"),
	Ali UMETA(DisplayName = "阿里"),
	Dianxin UMETA(DisplayName = "电信"),
	XunFei UMETA(DisplayName = "讯飞")
	
};
/**
 * 
 */


DECLARE_DYNAMIC_MULTICAST_DELEGATE_OneParam(FAIReceive,FString, Response);






UCLASS()
class PAASAIMODULE_API UAIFunction: public UBlueprintAsyncActionBase
{
	GENERATED_BODY()


public:


    UPROPERTY(BlueprintAssignable)
	FAIReceive OnCompleted;  //请求完成时
	UPROPERTY(BlueprintAssignable)
	FAIReceive OnFailed;    //请求失败时
	UPROPERTY(BlueprintAssignable)
	FAIReceive OnStream;     //流式请求时


	FHttpRequestStreamDelegate StreamDelegate;//用于http流式请求的代理


	
	UFUNCTION(BlueprintCallable,meta=( BlueprintInternalUseOnly="true" ))
	static  UAIFunction* SendAI( EAIType Type,const FAIRequest& Request);
	UFUNCTION(BlueprintCallable,meta=( BlueprintInternalUseOnly="true" ))
	static  UAIFunction* GetBaiduToken(const FString& apikey, const FString& secretkey);
	
protected:
	
	//自动生成token待完成
	void BaiDuAI(const FAIRequest& Request);

   TArray<uint8> StreamBuffer;

};

接着我们在cpp文件中实现相应的函数

//函数根据AI类型去调用相应的AI请求
UAIFunction* UAIFunction::SendAI( EAIType Type,const FAIRequest& Request)
{
	UAIFunction* AIFunction = NewObject<UAIFunction>();
	switch (Type)
	{
	case EAIType::Baidu:
		AIFunction->BaiDuAI(Request);
		break;
	case EAIType::Ali:
		break;
	case EAIType::Tencent:
		break;
	case EAIType::Dianxin:
		break;
	default:
		break;
	}
	
	return AIFunction;
}
//获取Baidu的Token
UAIFunction* UAIFunction::GetBaiduToken(const FString& apikey, const FString& secretkey)
{
	UAIFunction* AIFunction = NewObject<UAIFunction>();
	TSharedRef<IHttpRequest, ESPMode::ThreadSafe> HttpRequest = FHttpModule::Get().CreateRequest();
	FString URL = FString::Printf(TEXT("https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id=%s&client_secret=%s"),*apikey,*secretkey );
	HttpRequest->SetURL(URL);
	HttpRequest->SetVerb(TEXT("POST"));
	HttpRequest->SetHeader(TEXT("Content-Type"), TEXT("application/json"));
	HttpRequest->OnProcessRequestComplete().BindLambda(([AIFunction](FHttpRequestPtr Request, FHttpResponsePtr Response, bool bWasSuccessful)
	{
		if (bWasSuccessful && Response.IsValid())
		{
			FString ResponseContent =Response->GetContentAsString();
			TSharedPtr<FJsonObject> ResultObj;
			TSharedRef<TJsonReader<>> Reader = TJsonReaderFactory<>::Create(ResponseContent);
			FJsonSerializer::Deserialize(Reader,ResultObj);
			FString DataString;
			if (ResultObj->TryGetStringField(TEXT("access_token"),DataString))
			{
				AIFunction->OnCompleted.Broadcast(DataString);
			}
		}
		else
		{
			AIFunction->OnFailed.Broadcast(TEXT("失败"));
		}
		
	}));
	HttpRequest->ProcessRequest();
	return AIFunction;


}
//发送AI请求
void UAIFunction::BaiDuAI(const FAIRequest& Request)
{
	AddToRoot();
	TSharedRef<IHttpRequest, ESPMode::ThreadSafe> HttpRequest = FHttpModule::Get().CreateRequest();
	FString URL = FString::Printf(TEXT("%s?access_token=%s"), *Request.URL, *Request.access_token);
	HttpRequest->SetURL(URL);
	HttpRequest->SetVerb(TEXT("POST"));
	HttpRequest->SetHeader(TEXT("Content-Type"), TEXT("application/json"));
	FString OutJsonData;
	TSharedRef<TJsonWriter<>> Writer = TJsonWriterFactory<>::Create(&OutJsonData);


	Writer->WriteObjectStart();						// JSON对象开始
	//Writer->WriteValue(L"category", InfoCategory);  // 填充普通字段
	Writer->WriteArrayStart(L"messages");			// Json 数组字段开始	
	for (FString str : Request.Message)
	{
		Writer->WriteObjectStart();
		Writer->WriteValue(L"role", L"user");		// 填充普通字段
		Writer->WriteValue(L"content", str);	// 填充普通字段
		Writer->WriteObjectEnd();
	}
	Writer->WriteArrayEnd();// Json 数组字段结束
	Writer->WriteValue(L"stream", Request.stream); //是否流式
	Writer->WriteObjectEnd();              //JSON对象结束
	Writer->Close();
	HttpRequest->SetContentAsString(OutJsonData); //设置流式请求代理


	if (Request.stream)
	{
		StreamDelegate.BindLambda([=,this](void* st,int64 length)
		{
			/*if (length >0&& st)
			{
                
				//处理数据 
				FString str = FString(UTF8_TO_TCHAR(static_cast<const char*>(st)));
				          str.RemoveAt(0,5); //去掉返回数据的前置字符,因为百度返回的数据前有一个data:的字符,无法解析
				if (!str.IsEmpty())
				{
					str.ReplaceInline(TEXT("\\n"),TEXT(""));
					str.RemoveSpacesInline();
					TSharedPtr<FJsonObject> ResultObj;
					TSharedRef<TJsonReader<>> Reader = TJsonReaderFactory<>::Create(str);
					FString DataString;
					bool success =  FJsonSerializer::Deserialize(Reader,ResultObj);
					FString Meaasge;
					if (!success)
					{
						return true;
					}
					if (ResultObj->TryGetStringField(TEXT("result"),Meaasge))
					{
						if (!Meaasge.IsEmpty())
						{
							//Meaasge.RemoveFromStart(TEXT("\n")); 从开始删除第一个匹配的字符
							this->OnStream.Broadcast(Meaasge);
						}
					}
					else
					{
						return true;
					}
              
				}
				return true;
			}
			return false;*/
        /*12.23更新处理过程,因为大模型sse返回的数据并不每次都是完整的,
*所以上面的处理方式可能有点问题,现在这种方式会将返回的数据进行缓存,返回完整的数据,
*但是这样似乎和直接使用OnRequestProgress64()函数去处理没什么区别,
*只是OnRequestProgress64()函数每次都会返回全部的数据,我们需要人为地去进行数据的裁剪,
*而StreamDelegate不用,他每次都会返回增量数据,StreamDelegate还对内存友好一些
*/
        if (!this.IsValid())
		{
			return false;
		}
		if (length >0 && st)
		{
			
			//处理数据 未完成
			int32 StartIndex = WeakThis->StreamBuffer.Num();
		    this->StreamBuffer.AddUninitialized(length);
		    FMemory::Memcpy(WeakThis->StreamBuffer.GetData() + StartIndex, st, length);


		    // 2. 尝试解析
		   this->ProcessBuffer();
		}
		return true;
		});
		HttpRequest->SetResponseBodyReceiveStreamDelegate(StreamDelegate);
	}
	HttpRequest->OnProcessRequestComplete().BindLambda(([=,this](FHttpRequestPtr Request, FHttpResponsePtr Response, bool bWasSuccessful)
	{
		if (bWasSuccessful && Response.IsValid())
		{
			this->StreamDelegate.Unbind();
			FString ResponseContent =Response->GetContentAsString();
			this->OnCompleted.Broadcast(ResponseContent);
			RemoveFromRoot();
		}
		
	}));


	HttpRequest->ProcessRequest();
}


);
}
void UAIFunction::ProcessBuffer(FString &DataBuffer)
{
	// SSE 消息通常以双换行符结束 (\n\n)
	// 我们循环查找分隔符来分割完整的消息
	while (true)
	{
		// 查找换行符 (0x0A)
		int32 LineEndIndex = -1;
		for (int32 i = 0; i < StreamBuffer.Num(); i++)
		{
			if (StreamBuffer[i] == '\n')
			{
				LineEndIndex = i;
				break;
			}
		}


		// 如果没找到换行符,说明数据还没收完,等待下一个包
		if (LineEndIndex == -1)
		{
			break; 
		}


		// 4. 提取这一行 (不包含换行符)
		// 此时我们确定这是一段完整的 UTF-8 序列(假设 SSE 服务器按行发送且不截断字符)
		int32 DataLen = LineEndIndex;
        
		// 处理 Windows 风格的 \r\n,如果有 \r 则去掉
		if (DataLen > 0 && StreamBuffer[DataLen - 1] == '\r')
		{
			DataLen--;
		}


		FString LineStr;
		if (DataLen > 0)
		{
			// 安全转换:指定长度
			FUTF8ToTCHAR Converter((const char*)StreamBuffer.GetData(), DataLen);
			LineStr = FString(Converter.Length(), Converter.Get());
		}


		// 5. 从缓冲区移除已处理的数据 (包括 \n)
		StreamBuffer.RemoveAt(0, LineEndIndex + 1, false);


		// 6. 业务逻辑处理
		LineStr.TrimStartAndEndInline();
		if (LineStr.StartsWith("data:"))
		{
			FString Payload = LineStr.RightChop(5).TrimStart();
			if (!Payload.IsEmpty())
			{
				// 广播到蓝图
				OnStream.Broadcast(Payload);
			
			}
		}
        if (StreamBuffer.Num() == 0)
		{
			StreamBuffer.Reset();
		}
	}
}

12.23:更新处理过程,因为大模型sse返回的数据并不每次都是完整的,所以更新处理方式,代码已更新,现在这种方式会将返回的数据进行缓存,返回完整的数据,但是这样似乎和直接使用OnRequestProgress64()函数去处理没什么区别,只是OnRequestProgress64()函数每次都会返回全部的数据,我们需要人为地去进行数据的裁剪,而StreamDelegate不用,他每次都会返回增量数据,StreamDelegate还对内存友好一些

对于使用OnRequestProgress64的方法的大致流程就是对每一次返回的数据裁剪,因为OnRequestProgress64()返回的数据都是全量的,我们需要根据数据去将增量数据裁剪下来,并且拼合完整数据返回,直到结束,函数的原理其实都差不多。

完成后,编译成功后会有下面两个节点。

注意:在选择了流式接口返回后,正常在请求Completed的时候是不会在返回数据的,也就是说StreamCompleted两个只会有一个有数据。

注:对于百度的模型,大家还是去看一下官方文档会对代码有更多的理解,而且需要去申请相应的账号。

文心一言API接入指南-百度开发者中心https://developer.baidu.com/article/detail.html?id=1089328https://developer.baidu.com/article/detail.html?id=1089328

开局逆风的才是主角。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐