C++实现基于多智能体强化学习的广告联盟动态收益分配系统
C++实现基于多智能体强化学习的广告联盟动态收益分配系统
·
#include <iostream>
#include <vector>
#include <unordered_map>
#include <memory>
#include <random>
#include <thread>
#include <mutex>
#include <chrono>
#include <algorithm>
#include <numeric>
#include <cmath>
#include <queue>
#include <Eigen/Dense>
#include <boost/asio.hpp>
#include <nlohmann/json.hpp>
using namespace std;
using namespace Eigen;
using json = nlohmann::json;
namespace asio = boost::asio;
using asio::ip::tcp;
// 广告联盟成员类型
enum class PartnerType {
PUBLISHER, // 内容发布商
AD_NETWORK, // 广告网络
DSP, // 需求方平台
SSP, // 供应方平台
DATA_PROVIDER // 数据提供商
};
// 成员智能体
class PartnerAgent {
private:
string id;
PartnerType type;
double contribution; // 贡献度
double revenue_share; // 当前收益分成比例
double min_share; // 最低可接受分成
double max_share; // 最高期望分成
double learning_rate;
vector<double> historical_shares;
vector<double> historical_contributions;
mutex mtx;
// 计算预期效用
double calculate_utility(double proposed_share) const {
// 效用 = 收益分成 * 贡献度 - 风险系数 * (max_share - proposed_share)^2
double risk_factor = 0.5;
return proposed_share * contribution -
risk_factor * pow(max_share - proposed_share, 2);
}
public:
PartnerAgent(string id, PartnerType type,
double min_share, double max_share,
double learning_rate = 0.01)
: id(id), type(type), contribution(0.0),
revenue_share((min_share + max_share) / 2),
min_share(min_share), max_share(max_share),
learning_rate(learning_rate) {}
// 提出收益分成建议
double propose_share() {
lock_guard<mutex> lock(mtx);
// 加入随机探索
random_device rd;
mt19937 gen(rd());
normal_distribution<double> dist(revenue_share, 0.05);
double proposal = dist(gen);
return max(min_share, min(max_share, proposal));
}
// 更新策略
void update_strategy(double accepted_share, double actual_contribution) {
lock_guard<mutex> lock(mtx);
// 记录历史数据
historical_shares.push_back(accepted_share);
historical_contributions.push_back(actual_contribution);
// 更新贡献度估计 (移动平均)
contribution = 0.9 * contribution + 0.1 * actual_contribution;
// 基于效用更新期望分成
double current_utility = calculate_utility(revenue_share);
double new_share = revenue_share + learning_rate * (accepted_share - revenue_share);
// 确保在合理范围内
revenue_share = max(min_share, min(max_share, new_share));
}
// 评估提议
bool evaluate_proposal(double proposed_share) {
lock_guard<mutex> lock(mtx);
return proposed_share >= min_share &&
calculate_utility(proposed_share) >= calculate_utility(revenue_share) * 0.8;
}
// 获取当前状态
json get_state() const {
return {
{"id", id},
{"type", static_cast<int>(type)},
{"contribution", contribution},
{"current_share", revenue_share},
{"min_share", min_share},
{"max_share", max_share}
};
}
const string& get_id() const { return id; }
PartnerType get_type() const { return type; }
double get_contribution() const { return contribution; }
double get_current_share() const { return revenue_share; }
};
// 广告交易记录
class AdTransaction {
private:
string transaction_id;
vector<string> involved_partners;
double total_revenue;
unordered_map<string, double> contributions;
unordered_map<string, double> final_shares;
chrono::system_clock::time_point timestamp;
public:
AdTransaction(string id, const vector<string>& partners, double revenue)
: transaction_id(id), involved_partners(partners), total_revenue(revenue),
timestamp(chrono::system_clock::now()) {}
// 记录成员贡献
void record_contribution(const string& partner_id, double contribution) {
contributions[partner_id] = contribution;
}
// 记录最终分成
void record_final_share(const string& partner_id, double share) {
final_shares[partner_id] = share;
}
// 获取交易信息
json get_info() const {
json info = {
{"transaction_id", transaction_id},
{"total_revenue", total_revenue},
{"timestamp", chrono::system_clock::to_time_t(timestamp)},
{"contributions", contributions},
{"final_shares", final_shares}
};
return info;
}
const string& get_id() const { return transaction_id; }
double get_total_revenue() const { return total_revenue; }
};
// 广告联盟协调器
class AdAllianceCoordinator {
private:
unordered_map<string, shared_ptr<PartnerAgent>> partners;
vector<shared_ptr<AdTransaction>> transactions;
double total_alliance_revenue;
mutex mtx;
thread learning_thread;
atomic<bool> running;
condition_variable cv;
queue<shared_ptr<AdTransaction>> learning_queue;
// 计算标准化贡献度
unordered_map<string, double> calculate_normalized_contributions(
const unordered_map<string, double>& raw_contributions) {
double total = accumulate(raw_contributions.begin(), raw_contributions.end(), 0.0,
[](double sum, const auto& pair) { return sum + pair.second; });
unordered_map<string, double> normalized;
for (const auto& [id, contrib] : raw_contributions) {
normalized[id] = total > 0 ? contrib / total : 1.0 / raw_contributions.size();
}
return normalized;
}
// 学习线程
void learning_loop() {
while (running) {
unique_lock<mutex> lock(mtx);
cv.wait(lock, [this]() { return !learning_queue.empty() || !running; });
if (!running) break;
auto transaction = learning_queue.front();
learning_queue.pop();
lock.unlock();
// 更新各参与方的策略
const auto& shares = transaction->get_info()["final_shares"];
const auto& contribs = transaction->get_info()["contributions"];
for (const auto& [id, share] : shares.items()) {
if (partners.count(id)) {
double contrib = contribs[id].get<double>();
partners[id]->update_strategy(share, contrib);
}
}
}
}
// 协商收益分配
unordered_map<string, double> negotiate_shares(
const vector<string>& participant_ids,
double total_revenue) {
unordered_map<string, double> proposed_shares;
unordered_map<string, double> accepted_shares;
// 多轮协商
for (int round = 0; round < 3; ++round) {
// 各方提出建议
for (const auto& id : participant_ids) {
if (partners.count(id)) {
proposed_shares[id] = partners[id]->propose_share();
}
}
// 检查总和是否合理
double total_proposed = accumulate(proposed_shares.begin(), proposed_shares.end(), 0.0,
[](double sum, const auto& pair) { return sum + pair.second; });
// 标准化比例
if (total_proposed > 0) {
for (auto& [id, share] : proposed_shares) {
share = share / total_proposed;
}
} else {
// 默认平均分配
double equal_share = 1.0 / participant_ids.size();
for (const auto& id : participant_ids) {
proposed_shares[id] = equal_share;
}
}
// 各方评估提议
bool all_accepted = true;
for (const auto& id : participant_ids) {
if (partners.count(id) &&
!partners[id]->evaluate_proposal(proposed_shares[id])) {
all_accepted = false;
break;
}
}
if (all_accepted) {
// 达成一致
for (const auto& [id, share] : proposed_shares) {
accepted_shares[id] = share * total_revenue;
}
break;
}
}
// 如果协商失败,按贡献度分配
if (accepted_shares.empty()) {
unordered_map<string, double> contributions;
for (const auto& id : participant_ids) {
contributions[id] = partners.count(id) ? partners[id]->get_contribution() : 1.0;
}
auto normalized = calculate_normalized_contributions(contributions);
for (const auto& [id, share] : normalized) {
accepted_shares[id] = share * total_revenue;
}
}
return accepted_shares;
}
public:
AdAllianceCoordinator() : total_alliance_revenue(0), running(false) {}
~AdAllianceCoordinator() { stop(); }
// 添加联盟成员
void add_partner(shared_ptr<PartnerAgent> partner) {
lock_guard<mutex> lock(mtx);
partners[partner->get_id()] = partner;
}
// 处理广告交易
shared_ptr<AdTransaction> process_transaction(
const vector<string>& participant_ids,
double total_revenue,
const unordered_map<string, double>& raw_contributions) {
// 创建交易记录
auto transaction = make_shared<AdTransaction>(
"tx_" + to_string(transactions.size() + 1),
participant_ids,
total_revenue);
// 记录原始贡献
for (const auto& [id, contrib] : raw_contributions) {
transaction->record_contribution(id, contrib);
}
// 协商收益分配
auto final_shares = negotiate_shares(participant_ids, total_revenue);
// 记录最终分成
for (const auto& [id, share] : final_shares) {
transaction->record_final_share(id, share);
}
// 更新联盟总收益
{
lock_guard<mutex> lock(mtx);
total_alliance_revenue += total_revenue;
transactions.push_back(transaction);
learning_queue.push(transaction);
cv.notify_one();
}
return transaction;
}
// 启动学习线程
void start() {
if (running) return;
running = true;
learning_thread = thread(&AdAllianceCoordinator::learning_loop, this);
}
// 停止学习线程
void stop() {
if (!running) return;
running = false;
cv.notify_one();
if (learning_thread.joinable()) {
learning_thread.join();
}
}
// 获取联盟状态报告
json get_alliance_report() const {
json report;
lock_guard<mutex> lock(mtx);
report["total_revenue"] = total_alliance_revenue;
report["num_transactions"] = transactions.size();
json partners_json;
for (const auto& [id, partner] : partners) {
partners_json[id] = partner->get_state();
}
report["partners"] = partners_json;
return report;
}
};
// HTTP服务器
class AllianceServer {
private:
AdAllianceCoordinator& coordinator;
asio::io_context io_context;
tcp::acceptor acceptor;
atomic<bool> running;
// 处理HTTP请求
string handle_request(const string& method, const string& target, const string& body) {
if (method == "POST") {
if (target == "/transaction") {
try {
json request = json::parse(body);
vector<string> participants = request["participants"];
double revenue = request["revenue"];
unordered_map<string, double> contributions = request["contributions"];
auto transaction = coordinator.process_transaction(participants, revenue, contributions);
return "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\n\r\n" +
transaction->get_info().dump();
} catch (const exception& e) {
return "HTTP/1.1 400 Bad Request\r\n\r\nInvalid request: " + string(e.what());
}
}
else if (target == "/add_partner") {
try {
json request = json::parse(body);
string id = request["id"];
PartnerType type = static_cast<PartnerType>(request["type"].get<int>());
double min_share = request["min_share"];
double max_share = request["max_share"];
auto partner = make_shared<PartnerAgent>(id, type, min_share, max_share);
coordinator.add_partner(partner);
return "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\n\r\n" +
R"({"status": "partner_added"})";
} catch (const exception& e) {
return "HTTP/1.1 400 Bad Request\r\n\r\nInvalid request: " + string(e.what());
}
}
}
else if (method == "GET") {
if (target == "/report") {
return "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\n\r\n" +
coordinator.get_alliance_report().dump();
}
}
return "HTTP/1.1 404 Not Found\r\n\r\n";
}
// 处理客户端连接
void handle_connection(tcp::socket socket) {
try {
asio::streambuf buffer;
asio::read_until(socket, buffer, "\r\n\r\n");
istream is(&buffer);
string request_line;
getline(is, request_line);
vector<string> tokens;
boost::split(tokens, request_line, boost::is_any_of(" "));
if (tokens.size() < 3) return;
string method = tokens[0];
string target = tokens[1];
// 读取请求体
string body;
while (is) {
string line;
getline(is, line);
if (line == "\r") break;
}
while (is) {
string line;
getline(is, line);
body += line;
}
// 处理请求
string response = handle_request(method, target, body);
// 发送响应
asio::write(socket, asio::buffer(response));
} catch (const exception& e) {
cerr << "处理连接错误: " << e.what() << endl;
}
}
// 接受连接
void accept_connections() {
while (running) {
try {
tcp::socket socket(io_context);
acceptor.accept(socket);
thread([this, s = move(socket)]() mutable {
handle_connection(move(s));
}).detach();
} catch (const exception& e) {
if (running) {
cerr << "接受连接错误: " << e.what() << endl;
}
}
}
}
public:
AllianceServer(AdAllianceCoordinator& coord, unsigned short port)
: coordinator(coord), acceptor(io_context, tcp::endpoint(tcp::v4(), port)), running(false) {}
~AllianceServer() {
stop();
}
// 启动服务器
void start() {
if (running) return;
running = true;
thread([this]() { accept_connections(); }).detach();
}
// 停止服务器
void stop() {
if (!running) return;
running = false;
acceptor.close();
}
};
// 示例使用
int main() {
try {
// 1. 创建广告联盟协调器
AdAllianceCoordinator coordinator;
// 2. 添加联盟成员
coordinator.add_partner(make_shared<PartnerAgent>(
"pub1", PartnerType::PUBLISHER, 0.3, 0.7));
coordinator.add_partner(make_shared<PartnerAgent>(
"dsp1", PartnerType::DSP, 0.2, 0.5));
coordinator.add_partner(make_shared<PartnerAgent>(
"data1", PartnerType::DATA_PROVIDER, 0.1, 0.3));
// 3. 启动学习线程
coordinator.start();
// 4. 创建HTTP服务器
AllianceServer server(coordinator, 8080);
server.start();
cout << "广告联盟服务器已启动,监听端口8080" << endl;
// 5. 模拟交易 (在实际应用中应通过HTTP接口)
cout << "模拟交易处理..." << endl;
unordered_map<string, double> contributions = {
{"pub1", 0.6}, // 发布商贡献60%
{"dsp1", 0.3}, // DSP贡献30%
{"data1", 0.1} // 数据提供商贡献10%
};
auto tx1 = coordinator.process_transaction(
{"pub1", "dsp1", "data1"}, 1000.0, contributions);
cout << "交易1结果: " << tx1->get_info().dump(2) << endl;
// 6. 查看联盟状态
cout << "联盟状态报告: " << coordinator.get_alliance_report().dump(2) << endl;
// 7. 停止服务
cout << "按Enter键停止服务器..." << endl;
cin.get();
server.stop();
coordinator.stop();
} catch (const exception& e) {
cerr << "错误: " << e.what() << endl;
return 1;
}
return 0;
}
使用说明
功能特点
-
多智能体系统:
-
自主协商决策
-
个性化策略学习
-
分布式利益优化
-
-
动态收益分配:
-
多轮协商机制
-
贡献度评估
-
效用最大化
-
-
自适应学习:
-
在线策略更新
-
历史数据分析
-
长期收益优化
-
-
生产级特性:
-
线程安全设计
-
RESTful API
-
实时监控
-
核心组件
-
PartnerAgent:
-
成员智能体实现
-
收益策略管理
-
效用计算
-
-
AdTransaction:
-
交易记录
-
贡献度跟踪
-
收益分配
-
-
AdAllianceCoordinator:
-
联盟协调核心
-
协商协议
-
学习机制
-
-
AllianceServer:
-
HTTP接口服务
-
交易处理
-
状态查询
-
使用方法
-
初始化联盟:
AdAllianceCoordinator coordinator;
-
添加成员:
coordinator.add_partner(make_shared<PartnerAgent>("pub1", PartnerType::PUBLISHER, 0.3, 0.7));
-
启动服务:
coordinator.start(); AllianceServer server(coordinator, 8080); server.start();
-
处理交易:
POST /transaction { "participants": ["pub1", "dsp1"], "revenue": 1000.0, "contributions": {"pub1": 0.6, "dsp1": 0.4} }
-
查询状态:
GET /report
-
停止服务:
server.stop(); coordinator.stop();
应用场景
-
广告联盟管理:
-
跨平台收益分配
-
多方协作优化
-
价值公平分配
-
-
程序化广告:
-
SSP与DSP协作
-
数据货币化
-
流量变现优化
-
-
效果分成:
-
基于贡献的分配
-
动态调整比例
-
长期合作关系
-
技术亮点
-
多智能体系统:
-
自主决策
-
分布式学习
-
博弈论应用
-
-
现代C++:
-
智能指针
-
多线程
-
RAII管理
-
-
Boost.Asio:
-
异步网络
-
高性能IO
-
跨平台
-
-
JSON处理:
-
nlohmann/json
-
灵活数据交换
-
易读性高
-
扩展建议
-
高级协商:
-
实现拍卖机制
-
添加信誉系统
-
支持智能合约
-
-
监控分析:
-
实时仪表盘
-
异常检测
-
预测分析
-
-
安全机制:
-
身份验证
-
数据加密
-
防欺诈检测
-
-
分布式部署:
-
多节点协同
-
共识算法
-
容错机制
-
这个系统为广告联盟提供了一个智能化的收益分配解决方案,能够公平合理地分配广告收益,激励各方持续贡献,最大化联盟整体效益。
更多推荐
所有评论(0)