[AI Coding+安全] 二.CodeBuddy赋能恶意代码分析与家族分类实践(肝货)
随着恶意代码规模化、家族化与变种化趋势日益显著,传统依赖人工经验与规则的恶意代码分析方法在效率、可扩展性和准确性方面面临严峻挑战。AI 辅助分析正逐渐成为安全研究的重要方向。本文以 CodeBuddy 为核心工具,系统介绍其在恶意代码分析与家族分类中的实践应用路径。文章首先回顾传统恶意代码分析的局限性,随后概述 CodeBuddy 的能力特征,并重点围绕动态与静态特征提取、AI 赋能的数据预处理、
随着恶意代码规模化、家族化与变种化趋势日益显著,传统依赖人工经验与规则的恶意代码分析方法在效率、可扩展性和准确性方面面临严峻挑战。AI 辅助分析正逐渐成为安全研究的重要方向。本文以 CodeBuddy 为核心工具,系统介绍其在恶意代码分析与家族分类中的实践应用路径。
文章首先回顾传统恶意代码分析的局限性,随后概述 CodeBuddy 的能力特征,并重点围绕动态与静态特征提取、AI 赋能的数据预处理、基于机器学习与深度学习的家族分类方法,以及聚类与可视化分析等关键环节展开详细讨论。通过 AI Coding 与安全分析的深度结合,展示 CodeBuddy 在提升恶意家族分析自动化与智能化水平方面的实际价值。
代码开源地址:
恶意代码数据集可以从下面申请下载:
文章目录
前文赏析:
- [AI Coding] 一.腾讯CodeBuddy IDE内测、安装及基本用法(国产AI IDE启航)
- [AI Coding+安全] 二.CodeBuddy赋能恶意代码分析与家族分类实践
一.传统恶意代码分析
恶意软件(Malware)或恶意代码分析是网络安全研究中的基础性问题,其核心目标在于刻画程序行为特征、识别潜在威胁并实现家族级别的归类与溯源。传统恶意代码分析方法通常从是否真实执行程序的角度,将特征划分为静态特征与动态特征两大类。二者分别从程序结构与运行行为两个层面描述恶意代码,为后续检测与分类提供依据。
然而,随着恶意代码混淆、加壳、变种生成与对抗技术的不断发展,单纯依赖人工经验和规则驱动的特征提取方式逐渐暴露出效率低、泛化能力弱、特征工程成本高等问题。在此背景下,有必要系统梳理传统静态与动态分析方法的技术路径及其局限性,为引入 AI Coding 与智能化分析方法奠定基础。
1.静态特征
静态特征是指在不真实运行程序的前提下,从二进制文件或中间表示中提取的特征信息。这类特征通常反映程序的结构组成与潜在功能意图,具有分析成本相对可控、不依赖运行环境的优点,但对混淆和对抗技术较为敏感。
常见的静态特征包括:
- 字节码特征:将二进制文件直接映射为字节序列,是最原始的表示形式,未引入任何语义抽象;
- IAT(Import Address Table)表信息:PE 文件结构中的关键组成部分,反映程序在运行时可能调用的外部函数,与功能行为密切相关;
- Android 权限声明:通过分析应用请求的权限集合,判断其是否存在权限滥用或潜在恶意意图;
- 可打印字符特征:将二进制内容转换为 ASCII 字符序列并进行统计分析,用于挖掘硬编码字符串或命令;
- 反汇编跳转块:基于 IDA 等工具获取的基本块与跳转关系,可构造成序列或图结构特征;
- 静态 API 调用特征:统计或建模程序中出现的关键系统 API;
- 恶意代码图像化表示:将二进制映射为图像,从视觉模式角度进行分析。
静态特征提取通常依赖 CAPA、IDA Pro 以及安全厂商提供的静态分析能力。这类方法高度依赖人工规则与经验,特征设计成本较高,且在面对重度混淆、加密或多态变种时,鲁棒性明显不足。
- CAPA
– https://github.com/mandiant/capa - IDA Pro
- 安全厂商沙箱
作者前文博客静态分析恶意软件的提取效果如下图所示:


2.动态特征
动态特征分析通过在真实或仿真环境中执行恶意代码,从运行行为层面捕获程序的实际操作模式。相较于静态分析,动态分析更贴近真实攻击行为,但其分析成本与环境依赖性更高。
典型的动态特征包括:
- API 调用序列及调用关系:通过记录程序运行过程中调用的系统 API,刻画其功能行为;
- 控制流图(CFG):描述程序执行路径的结构信息,可进一步转换为向量或图表示用于机器学习;
- 数据流图(DFG):刻画数据在程序内部的传播与依赖关系,用于分析信息泄露或恶意操作逻辑。
动态特征通常借助 Cuckoo、CAPE 等开源沙箱或安全厂商的专有沙箱环境进行采集。尽管动态分析在语义表达上更为充分,但其易受反沙箱技术影响,执行开销较大,且难以在大规模样本分析中高效应用。
- Cuckoo
– https://github.com/cuckoosandbox/cuckoo - CAPE
– https://github.com/kevoreilly/CAPEv2
– https://capev2.readthedocs.io/en/latest/ - 安全厂商沙箱
作者前文博客CAPE动态分析恶意软件的提取效果如下图所示:


二.CodeBuddy概述
随着大语言模型在代码理解与生成任务中的能力不断增强,AI Coding 逐渐从单点式代码补全工具演进为面向软件工程全过程的智能协同范式。在这一背景下,CodeBuddy 作为腾讯自研的智能编程助手,体现了大模型技术在工程级编程场景中的系统化落地路径。
从概念上看,CodeBuddy 是一类以大语言模型为核心、面向全开发生命周期的 AI Coding 平台。其核心目标在于通过自然语言理解、代码语义建模与上下文感知机制,将开发者的业务意图、设计约束与工程规范转化为可执行的代码结构与工程实现,从而实现“意图驱动编程(Intent-Driven Programming)”。与传统基于规则或局部上下文的代码补全工具不同,CodeBuddy 强调对项目级语义、工程结构与历史演化过程的整体理解,使 AI 能够深度参与到软件开发的多个关键阶段。
-
在系统形态上,CodeBuddy 通过插件、集成开发环境(IDE)与命令行工具(CLI)三端协同,覆盖本地开发、云端开发与自动化流水线等多种使用场景。这种多形态协同设计,使 AI 能够嵌入编码、调试、重构、测试、部署与运维等不同环节,突破了单一开发工具对编程辅助能力的限制,形成贯穿全流程的一体化智能开发环境。
-
在技术层面,CodeBuddy 以大语言模型作为认知中枢,结合代码理解、上下文建模与工程知识注入机制,实现对复杂软件项目的语义级建模能力。其工作过程强调对代码库、依赖关系、配置文件与历史修改记录的持续感知,并在此基础上完成自然语言需求与代码语义之间的双向映射,通过推理与生成机制输出符合工程规范的代码、注释、测试用例与重构建议。通过开发者反馈与运行结果的闭环迭代,CodeBuddy 能够不断优化生成质量与工程适配性。
-
基于上述架构,CodeBuddy 在多个维度上展现出显著优势。首先,其具备良好的全场景适配能力,能够在不同开发环境与工程规模下稳定工作。其次,其功能覆盖从需求理解到代码生成、错误定位与文档生成等多个环节,体现出全栈式智能辅助能力。再次,通过上下文感知与一致性约束,CodeBuddy 能在跨文件、跨模块甚至跨语言的生成过程中保持工程结构与风格的统一性,降低大模型“碎片化生成”带来的风险。最后,CodeBuddy 采用以人为中心的人机协同模式,将开发者从高频、重复和易错的实现细节中解放出来,使其更多聚焦于系统设计与问题抽象。

总体而言,CodeBuddy 代表了 AI Coding 从“局部辅助工具”向“工程级智能协作者”的重要演进方向。其在语义理解深度、工程一致性与开发流程覆盖范围方面的优势,为将 AI Coding 方法引入复杂安全分析任务(如恶意代码特征建模与家族分类)提供了坚实的技术基础。
温馨提示:作者团队近期与人民邮电出版社、腾讯AI团队合作撰写了《CodeBuddy领航:AI辅助编程从入门到精通》新书,希望早日与大家相遇,欢迎大家购买与指正!

三.CodeBuddy赋能恶意家族分类
接下来,我们将利用CodeBuddy进行恶意代码分析与家族分类实践。
1.动态与静态特征提取
首先,假设我们已经通过动态分析和静态分析提取了五个家族恶意软件的动态与静态融合特征,如下图所示。

打开恶意家族class2显示的结果如下图所示,包含8列特征,分别对应序号、家族、md5、战术、技术、tid(ATT&CK序号)、静态API序列、动态API序列。

接下来,我们就将利用AI Coding工具实现自动化的分析。注意,这里我们使用融合特征进行分析,体现特征融合的有效性。
2.AI赋能数据预处理
第一步,打开CodeBuddy。 随后,选择桌面的文件夹,整个分析过程生成的代码和执行结果将在该文件夹中实现。

第二步,在右下角对话框中选择大模型,并输入详细的数据预处理提示词。 注意,除CodeBuddy自带大模型外,读者可以自己搭建模型,各类模型均可使用。此外,动态特征与静态特征融合时需要按照相同md5值进行融合与交叉验证,该部分前面已完成,故不涉及。
该项目中具有data文件夹,存储5个恶意家族的特征序列,现在需要撰写Python代码对数据集进行预处理和划分。代码的具体要求如下:
(1)整个数据集包含8个特征,分别是 [no, label, md5, tactic, technique, tid, api, dynamic_api],现在需要撰写代码对五个CSV文件数据集进行融合。
(2)按照6:3:1的比例将整合后的数据集文件,随机划分成三个CSV文件,分别对应训练集、测试集和验证集。
(3)撰写代码统计分析处理后的数据集分布情况,并绘制相关的可视化图。
注意,CSV文件样本数量在合并前后一定要一致,数据集要随机划分。

第三步,输入提示词并运行程序。CodeBuddy会调用大模型深度思考,从而完成相关任务。

下图展示了其修改代码的过程,其自动生成了data_preprocessing.py文件,并将处理的文件保存至processed文件夹中。
注意,CodeBuddy会自动生成代码、调试代码、运行代码和优化代码,真的是一个非常棒的AI Coding工具。生成代码点击“Keep”按钮即表示接受,通常在所有程序执行完毕且效果符合需求后点击。

该工程目录如下:

第四步,程序自动运行并在线显示可视化分析结果。 如下图所示,详细展示了5个恶意家族的数据分布情况。注意,大家在真实的科研中,需要对其进行验证,确保数据真实可靠。

最终划分的数据集包含图和表,最终预处理的代码如下所示:


AI Coding生成的代码更多是辅助大家完成具体的功能,当然大家也可以学习AI生成代码的规范和逻辑,从而更好地帮助我们理解恶意代码智能分析。
import pandas as pd
import numpy as np
import os
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
# 读取数据文件夹中的所有CSV文件
data_dir = 'c:/Users/xiuzhang/Desktop/mal_analysis/data'
csv_files = [os.path.join(data_dir, f) for f in os.listdir(data_dir) if f.endswith('.csv') and 'result_final' in f]
print("=" * 80)
print("步骤1: 读取并合并所有CSV文件")
print("=" * 80)
# 读取所有CSV文件
dfs = []
sample_counts = {}
total_before_clean = 0
total_after_clean = 0
for csv_file in csv_files:
file_name = os.path.basename(csv_file)
df = pd.read_csv(csv_file)
# 清理数据:删除label为NaN的行
before_clean = len(df)
df_cleaned = df.dropna(subset=['label'])
after_clean = len(df_cleaned)
sample_counts[file_name] = len(df_cleaned)
dfs.append(df_cleaned)
total_before_clean += before_clean
total_after_clean += after_clean
print(f"读取文件: {file_name}")
print(f" 原始样本数量: {before_clean}")
print(f" 清理后样本数量: {after_clean}")
print(f" 删除的NaN行数: {before_clean - after_clean}")
# 合并所有数据集
merged_df = pd.concat(dfs, ignore_index=True)
print(f"\n合并后总样本数量: {len(merged_df)}")
print(f"合并前各文件样本数量之和: {sum(sample_counts.values())}")
print(f"合并前后样本数量是否一致: {len(merged_df) == sum(sample_counts.values())}")
print(f"\n【数据清理统计】")
print(f"清理前总样本数: {total_before_clean}")
print(f"清理后总样本数: {total_after_clean}")
print(f"删除的无效行数: {total_before_clean - total_after_clean}")
print(f"保留率: {total_after_clean/total_before_clean*100:.2f}%")
# 移除样本数量过少的类别(样本数量小于10的类别)
label_counts = merged_df['label'].value_counts()
valid_labels = label_counts[label_counts >= 10].index
merged_df = merged_df[merged_df['label'].isin(valid_labels)]
print(f"\n【类别过滤】")
print(f"原始标签类别数: {len(label_counts)}")
print(f"保留的标签类别数: {len(valid_labels)}")
print(f"移除的标签类别数: {len(label_counts) - len(valid_labels)}")
print(f"过滤后样本数量: {len(merged_df)}")
# 保存合并后的数据集
output_dir = 'c:/Users/xiuzhang/Desktop/mal_analysis/processed'
os.makedirs(output_dir, exist_ok=True)
merged_df.to_csv(os.path.join(output_dir, 'merged_dataset.csv'), index=False)
print(f"\n合并后的数据集已保存至: {os.path.join(output_dir, 'merged_dataset.csv')}")
print("\n" + "=" * 80)
print("步骤2: 按照6:3:1比例划分数据集")
print("=" * 80)
# 首先打乱数据
merged_df_shuffled = merged_df.sample(frac=1, random_state=42).reset_index(drop=True)
# 按照6:3:1的比例划分
# 6:3:1 = 60%:30%:10%
train_ratio = 0.6
val_ratio = 0.3
test_ratio = 0.1
# 第一次划分: 训练集 + 临时集
train_df, temp_df = train_test_split(
merged_df_shuffled,
test_size=(val_ratio + test_ratio),
random_state=42,
stratify=merged_df_shuffled['label']
)
# 第二次划分: 从临时集中划分验证集和测试集
val_df, test_df = train_test_split(
temp_df,
test_size=(test_ratio / (val_ratio + test_ratio)), # 0.1 / (0.3 + 0.1) = 0.25
random_state=42,
stratify=temp_df['label']
)
print(f"训练集样本数量: {len(train_df)} ({len(train_df)/len(merged_df)*100:.1f}%)")
print(f"验证集样本数量: {len(val_df)} ({len(val_df)/len(merged_df)*100:.1f}%)")
print(f"测试集样本数量: {len(test_df)} ({len(test_df)/len(merged_df)*100:.1f}%)")
print(f"总计: {len(train_df) + len(val_df) + len(test_df)}")
print(f"划分前后样本数量是否一致: {len(merged_df) == len(train_df) + len(val_df) + len(test_df)}")
# 保存划分后的数据集
train_df.to_csv(os.path.join(output_dir, 'train_dataset.csv'), index=False)
val_df.to_csv(os.path.join(output_dir, 'val_dataset.csv'), index=False)
test_df.to_csv(os.path.join(output_dir, 'test_dataset.csv'), index=False)
print(f"\n训练集已保存至: {os.path.join(output_dir, 'train_dataset.csv')}")
print(f"验证集已保存至: {os.path.join(output_dir, 'val_dataset.csv')}")
print(f"测试集已保存至: {os.path.join(output_dir, 'test_dataset.csv')}")
print("\n" + "=" * 80)
print("步骤3: 统计分析数据集分布情况")
print("=" * 80)
# 3.1 标签分布统计
print("\n【合并数据集标签分布】")
label_distribution = merged_df['label'].value_counts().sort_index()
print(label_distribution)
print(f"\n标签数量: {len(label_distribution)}")
print(f"样本总数: {len(merged_df)}")
# 3.2 各数据集的标签分布
print("\n【训练集标签分布】")
train_label_dist = train_df['label'].value_counts().sort_index()
print(train_label_dist)
print("\n【验证集标签分布】")
val_label_dist = val_df['label'].value_counts().sort_index()
print(val_label_dist)
print("\n【测试集标签分布】")
test_label_dist = test_df['label'].value_counts().sort_index()
print(test_label_dist)
# 3.3 原始文件统计
print("\n【原始CSV文件样本统计】")
print(f"文件名{' '*20}样本数量")
print("-" * 40)
for file_name, count in sorted(sample_counts.items()):
print(f"{file_name:30s}{count:>10}")
print("\n" + "=" * 80)
print("步骤4: 绘制可视化图表")
print("=" * 80)
# 创建可视化图表保存目录
pic_dir = 'c:/Users/xiuzhang/Desktop/mal_analysis/pic/analysis'
os.makedirs(pic_dir, exist_ok=True)
# 4.1 原始文件样本数量分布
plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
sorted_files = sorted(sample_counts.items(), key=lambda x: x[0])
file_names = [item[0] for item in sorted_files]
file_counts = [item[1] for item in sorted_files]
colors = plt.cm.Set3(range(len(file_names)))
bars = plt.bar(file_names, file_counts, color=colors)
plt.xlabel('恶意家族文件', fontsize=12)
plt.ylabel('样本数量', fontsize=12)
plt.title('各恶意家族样本数量分布', fontsize=14, fontweight='bold')
plt.xticks(rotation=45, ha='right')
# 添加数值标签
for bar in bars:
height = bar.get_height()
plt.text(bar.get_x() + bar.get_width()/2., height,
f'{int(height)}',
ha='center', va='bottom', fontsize=9)
plt.grid(axis='y', alpha=0.3)
# 4.2 合并数据集标签分布
plt.subplot(1, 2, 2)
label_names = label_distribution.index.tolist()
label_counts = label_distribution.values.tolist()
colors = plt.cm.Set2(range(len(label_names)))
bars = plt.bar(label_names, label_counts, color=colors)
plt.xlabel('标签类别', fontsize=12)
plt.ylabel('样本数量', fontsize=12)
plt.title('合并数据集标签分布', fontsize=14, fontweight='bold')
# 添加数值标签
for bar in bars:
height = bar.get_height()
plt.text(bar.get_x() + bar.get_width()/2., height,
f'{int(height)}',
ha='center', va='bottom', fontsize=10)
plt.grid(axis='y', alpha=0.3)
plt.tight_layout()
plt.savefig(os.path.join(pic_dir, '原始数据集分布.png'), dpi=300, bbox_inches='tight')
print(f"\n图表已保存至: {os.path.join(pic_dir, '原始数据集分布.png')}")
# 4.3 数据集划分后的标签分布对比
fig, axes = plt.subplots(1, 3, figsize=(18, 6))
datasets = [
('训练集', train_label_dist),
('验证集', val_label_dist),
('测试集', test_label_dist)
]
colors_set = plt.cm.Set2(range(len(label_names)))
for idx, (name, dist) in enumerate(datasets):
ax = axes[idx]
bars = ax.bar(dist.index, dist.values, color=colors_set)
ax.set_xlabel('标签类别', fontsize=12)
ax.set_ylabel('样本数量', fontsize=12)
ax.set_title(f'{name}标签分布', fontsize=14, fontweight='bold')
# 添加数值标签
for bar in bars:
height = bar.get_height()
ax.text(bar.get_x() + bar.get_width()/2., height,
f'{int(height)}',
ha='center', va='bottom', fontsize=10)
ax.grid(axis='y', alpha=0.3)
plt.tight_layout()
plt.savefig(os.path.join(pic_dir, '划分后数据集分布.png'), dpi=300, bbox_inches='tight')
print(f"图表已保存至: {os.path.join(pic_dir, '划分后数据集分布.png')}")
# 4.4 数据集划分比例对比
fig, ax = plt.subplots(figsize=(10, 6))
dataset_names = ['训练集', '验证集', '测试集']
dataset_sizes = [len(train_df), len(val_df), len(test_df)]
colors_pie = plt.cm.Pastel1([0, 1, 2])
wedges, texts, autotexts = ax.pie(dataset_sizes, labels=dataset_names, autopct='%1.1f%%',
colors=colors_pie, startangle=90, textprops={'fontsize': 12})
for autotext in autotexts:
autotext.set_fontsize(12)
autotext.set_fontweight('bold')
ax.set_title('数据集划分比例 (6:3:1)', fontsize=16, fontweight='bold', pad=20)
plt.savefig(os.path.join(pic_dir, '数据集划分比例.png'), dpi=300, bbox_inches='tight')
print(f"图表已保存至: {os.path.join(pic_dir, '数据集划分比例.png')}")
# 4.5 标签分布对比(所有数据集)
fig, ax = plt.subplots(figsize=(14, 8))
labels = sorted(list(set(train_label_dist.index) | set(val_label_dist.index) | set(test_label_dist.index)))
x = np.arange(len(labels))
width = 0.25
train_counts = [train_label_dist.get(label, 0) for label in labels]
val_counts = [val_label_dist.get(label, 0) for label in labels]
test_counts = [test_label_dist.get(label, 0) for label in labels]
bars1 = ax.bar(x - width, train_counts, width, label='训练集', color=colors_pie[0], alpha=0.8)
bars2 = ax.bar(x, val_counts, width, label='验证集', color=colors_pie[1], alpha=0.8)
bars3 = ax.bar(x + width, test_counts, width, label='测试集', color=colors_pie[2], alpha=0.8)
ax.set_xlabel('标签类别', fontsize=12)
ax.set_ylabel('样本数量', fontsize=12)
ax.set_title('各数据集标签分布对比', fontsize=14, fontweight='bold')
ax.set_xticks(x)
ax.set_xticklabels(labels)
ax.legend(fontsize=11)
ax.grid(axis='y', alpha=0.3)
# 添加数值标签
for bars in [bars1, bars2, bars3]:
for bar in bars:
height = bar.get_height()
if height > 0:
ax.text(bar.get_x() + bar.get_width()/2., height,
f'{int(height)}',
ha='center', va='bottom', fontsize=8)
plt.tight_layout()
plt.savefig(os.path.join(pic_dir, '各数据集标签分布对比.png'), dpi=300, bbox_inches='tight')
print(f"图表已保存至: {os.path.join(pic_dir, '各数据集标签分布对比.png')}")
# 4.6 数据集统计表格
fig, ax = plt.subplots(figsize=(12, 6))
ax.axis('tight')
ax.axis('off')
# 准备统计表格数据
table_data = []
table_data.append(['', '训练集', '验证集', '测试集', '总计'])
table_data.append(['样本数量', len(train_df), len(val_df), len(test_df), len(merged_df)])
table_data.append(['占比', f'{len(train_df)/len(merged_df)*100:.1f}%',
f'{len(val_df)/len(merged_df)*100:.1f}%',
f'{len(test_df)/len(merged_df)*100:.1f}%', '100.0%'])
# 添加各标签的统计
for label in sorted(merged_df['label'].unique()):
train_count = len(train_df[train_df['label'] == label])
val_count = len(val_df[val_df['label'] == label])
test_count = len(test_df[test_df['label'] == label])
table_data.append([f'标签{label}', train_count, val_count, test_count, train_count+val_count+test_count])
table = ax.table(cellText=table_data, cellLoc='center', loc='center')
table.auto_set_font_size(False)
table.set_fontsize(10)
table.scale(1.2, 1.5)
# 设置表头样式
for i in range(len(table_data[0])):
table[(0, i)].set_facecolor('#4472C4')
table[(0, i)].set_text_props(weight='bold', color='white')
# 设置第一列样式
for i in range(len(table_data)):
table[(i, 0)].set_facecolor('#D9E2F3')
plt.title('数据集统计汇总表', fontsize=14, fontweight='bold', pad=20)
plt.savefig(os.path.join(pic_dir, '数据集统计汇总表.png'), dpi=300, bbox_inches='tight')
print(f"图表已保存至: {os.path.join(pic_dir, '数据集统计汇总表.png')}")
print("\n" + "=" * 80)
print("数据处理完成!")
print("=" * 80)
print(f"\n所有处理后的文件保存在: {output_dir}")
print(f"所有可视化图表保存在: {pic_dir}")
print("\n生成文件列表:")
print("数据文件:")
print(" - merged_dataset.csv (合并后的完整数据集)")
print(" - train_dataset.csv (训练集)")
print(" - val_dataset.csv (验证集)")
print(" - test_dataset.csv (测试集)")
print("\n可视化图表:")
print(" - 原始数据集分布.png")
print(" - 划分后数据集分布.png")
print(" - 数据集划分比例.png")
print(" - 各数据集标签分布对比.png")
print(" - 数据集统计汇总表.png")
您是否感受到了大模型和AI Coding的魅力和力量!
3.AI赋能基于机器学习的恶意家族分类
随后,我们将基于预处理的数据集开展基于机器学习的恶意代码分析。
第一步,构建精准的提示词。
请撰写python代码构建随机森林模型,读取processed中的训练集train_dataset.csv和测试集test_dataset.csv的数据,利用[tactic,technique,tid,api,dynamic_api]五维特征用来构建向量,五列特征融合了恶意代码的静态和动态特征,直接拼接成数据集,其分类家族为[label]列,总共5个家族。请利用sklearn构建随机森林算法评价性能,要求保留4位有效数字,包括精确率、召回率、F1值和准确率。请给出详细代码,并绘制可视化图形(包括混淆矩阵图)。
第二步,将提示词输入对话框中,选择大模型并进行提交。

第三步,大模型经过深度思考理解我们的需求,并自动生成机器学习代码,同时调试和运行代码。
- 自动生成 random_forest_model.py 文件

第四步,动态显示该代码运行的效果图。下图展示了随机森林模型针对五类数据集的预测结果,测试集准确率达到91.32%,超过90%,整体表现不错。

下面我们来检查代码分析的具体结果与可视化图形。
(1)混淆矩阵

(2)分类结果

(3)随机森林重要特征评估,该分析能进一步挖掘哪些关键特征对恶意家族分类存在影响,下面的序号可以转换为真实的特征及与ATT&CK映射关系,譬如:
- <T1083, Discovery, File and Directory Discovery>
- <T1027, Defense Evasion, Obfuscated Files or Information>
- <T1129, Execution, Shared Modules>

(4)实验结果,真实分析中以测试集和验证集为主,训练集通常不作比较

温馨提示:Codebuddy有一个非常强大的上下文关联功能(@按钮),比如关联已生成的随机森林Python文件,我们让其在改代码基础上生成SVM模型,即可利用如下的提示词实现。在大型系统开发中,该功能极其重要。

最后,我们给出完整代码。
import pandas as pd
import numpy as np
import os
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, classification_report
from sklearn.preprocessing import LabelEncoder
from sklearn.feature_extraction.text import TfidfVectorizer
import warnings
warnings.filterwarnings('ignore')
# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
print("=" * 80)
print("随机森林模型构建与评价")
print("=" * 80)
# 1. 读取数据
print("\n【步骤1: 读取数据】")
train_df = pd.read_csv('c:/Users/xiuzhang/Desktop/mal_analysis/processed/train_dataset.csv')
test_df = pd.read_csv('c:/Users/xiuzhang/Desktop/mal_analysis/processed/test_dataset.csv')
print(f"训练集样本数: {len(train_df)}")
print(f"测试集样本数: {len(test_df)}")
print(f"特征列: tactic, technique, tid, api, dynamic_api")
print(f"标签列: label")
# 2. 特征工程
print("\n【步骤2: 特征工程】")
# 定义特征列
feature_columns = ['tactic', 'technique', 'tid', 'api', 'dynamic_api']
# 检查缺失值
print("检查特征缺失值:")
for col in feature_columns:
train_nan = train_df[col].isna().sum()
test_nan = test_df[col].isna().sum()
print(f" {col}: 训练集缺失 {train_nan}, 测试集缺失 {test_nan}")
# 填充缺失值
for col in feature_columns:
train_df[col] = train_df[col].fillna('')
test_df[col] = test_df[col].fillna('')
# 对每个文本特征使用TF-IDF编码
print("\n使用TF-IDF对文本特征进行编码...")
# 为每个特征创建TF-IDF向量化器
tactic_tfidf = TfidfVectorizer(max_features=100, token_pattern=r'(?u)\b\w+\b|;')
technique_tfidf = TfidfVectorizer(max_features=100, token_pattern=r'(?u)\b\w+\b|;')
tid_tfidf = TfidfVectorizer(max_features=50, token_pattern=r'(?u)\b\w+\b|;')
api_tfidf = TfidfVectorizer(max_features=200, token_pattern=r'(?u)\b\w+\b|;')
dynamic_api_tfidf = TfidfVectorizer(max_features=200, token_pattern=r'(?u)\b\w+\b|;')
# 训练集TF-IDF转换
train_tactic = tactic_tfidf.fit_transform(train_df['tactic'].astype(str)).toarray()
train_technique = technique_tfidf.fit_transform(train_df['technique'].astype(str)).toarray()
train_tid = tid_tfidf.fit_transform(train_df['tid'].astype(str)).toarray()
train_api = api_tfidf.fit_transform(train_df['api'].astype(str)).toarray()
train_dynamic_api = dynamic_api_tfidf.fit_transform(train_df['dynamic_api'].astype(str)).toarray()
# 测试集TF-IDF转换(使用训练集拟合的向量化器)
test_tactic = tactic_tfidf.transform(test_df['tactic'].astype(str)).toarray()
test_technique = technique_tfidf.transform(test_df['technique'].astype(str)).toarray()
test_tid = tid_tfidf.transform(test_df['tid'].astype(str)).toarray()
test_api = api_tfidf.transform(test_df['api'].astype(str)).toarray()
test_dynamic_api = dynamic_api_tfidf.transform(test_df['dynamic_api'].astype(str)).toarray()
# 拼接所有特征向量
X_train = np.hstack([train_tactic, train_technique, train_tid, train_api, train_dynamic_api])
X_test = np.hstack([test_tactic, test_technique, test_tid, test_api, test_dynamic_api])
print(f"训练集特征向量维度: {X_train.shape}")
print(f"测试集特征向量维度: {X_test.shape}")
print(f"特征维度分布: tactic(100) + technique(100) + tid(50) + api(200) + dynamic_api(200) = {X_train.shape[1]}")
# 3. 标签编码
print("\n【步骤3: 标签编码】")
label_encoder = LabelEncoder()
y_train = label_encoder.fit_transform(train_df['label'])
y_test = label_encoder.transform(test_df['label'])
print(f"标签类别: {label_encoder.classes_}")
print(f"标签数量: {len(label_encoder.classes_)}")
print(f"训练集标签分布: {np.bincount(y_train)}")
print(f"测试集标签分布: {np.bincount(y_test)}")
# 4. 构建随机森林模型
print("\n【步骤4: 构建随机森林模型】")
# 创建随机森林分类器
rf_model = RandomForestClassifier(
n_estimators=100,
max_depth=None,
min_samples_split=2,
min_samples_leaf=1,
max_features='sqrt',
random_state=42,
n_jobs=-1,
class_weight='balanced'
)
# 训练模型
print("开始训练随机森林模型...")
rf_model.fit(X_train, y_train)
print("模型训练完成!")
# 5. 模型预测
print("\n【步骤5: 模型预测】")
y_train_pred = rf_model.predict(X_train)
y_test_pred = rf_model.predict(X_test)
# 6. 性能评价
print("\n【步骤6: 性能评价】")
print("\n【训练集性能】")
train_accuracy = accuracy_score(y_train, y_train_pred)
train_precision = precision_score(y_train, y_train_pred, average='weighted')
train_recall = recall_score(y_train, y_train_pred, average='weighted')
train_f1 = f1_score(y_train, y_train_pred, average='weighted')
print(f"准确率 (Accuracy): {train_accuracy:.4f}")
print(f"精确率 (Precision): {train_precision:.4f}")
print(f"召回率 (Recall): {train_recall:.4f}")
print(f"F1值 (F1-Score): {train_f1:.4f}")
print("\n【测试集性能】")
test_accuracy = accuracy_score(y_test, y_test_pred)
test_precision = precision_score(y_test, y_test_pred, average='weighted')
test_recall = recall_score(y_test, y_test_pred, average='weighted')
test_f1 = f1_score(y_test, y_test_pred, average='weighted')
print(f"准确率 (Accuracy): {test_accuracy:.4f}")
print(f"精确率 (Precision): {test_precision:.4f}")
print(f"召回率 (Recall): {test_recall:.4f}")
print(f"F1值 (F1-Score): {test_f1:.4f}")
# 详细分类报告
print("\n【详细分类报告】")
print(classification_report(y_test, y_test_pred, target_names=label_encoder.classes_, digits=4))
# 7. 特征重要性分析
print("\n【步骤7: 特征重要性分析】")
feature_names = []
feature_names.extend([f'tactic_{i}' for i in range(100)])
feature_names.extend([f'technique_{i}' for i in range(100)])
feature_names.extend([f'tid_{i}' for i in range(50)])
feature_names.extend([f'api_{i}' for i in range(200)])
feature_names.extend([f'dynamic_api_{i}' for i in range(200)])
feature_importance = rf_model.feature_importances_
top_indices = np.argsort(feature_importance)[-20:][::-1]
print("Top 20 重要特征:")
for idx in top_indices:
print(f" {feature_names[idx]}: {feature_importance[idx]:.6f}")
# 8. 可视化
print("\n【步骤8: 绘制可视化图表】")
# 创建可视化保存目录
pic_dir = 'c:/Users/xiuzhang/Desktop/mal_analysis/pic/random_forest'
os.makedirs(pic_dir, exist_ok=True)
# 8.1 混淆矩阵
cm_train = confusion_matrix(y_train, y_train_pred)
cm_test = confusion_matrix(y_test, y_test_pred)
fig, axes = plt.subplots(1, 2, figsize=(16, 6))
# 训练集混淆矩阵
sns.heatmap(cm_train, annot=True, fmt='d', cmap='Blues',
xticklabels=label_encoder.classes_,
yticklabels=label_encoder.classes_,
ax=axes[0])
axes[0].set_xlabel('预测标签', fontsize=12)
axes[0].set_ylabel('真实标签', fontsize=12)
axes[0].set_title(f'训练集混淆矩阵\n准确率: {train_accuracy:.4f}', fontsize=14, fontweight='bold')
# 测试集混淆矩阵
sns.heatmap(cm_test, annot=True, fmt='d', cmap='Greens',
xticklabels=label_encoder.classes_,
yticklabels=label_encoder.classes_,
ax=axes[1])
axes[1].set_xlabel('预测标签', fontsize=12)
axes[1].set_ylabel('真实标签', fontsize=12)
axes[1].set_title(f'测试集混淆矩阵\n准确率: {test_accuracy:.4f}', fontsize=14, fontweight='bold')
plt.tight_layout()
plt.savefig(os.path.join(pic_dir, '混淆矩阵.png'), dpi=300, bbox_inches='tight')
print(f"混淆矩阵已保存至: {os.path.join(pic_dir, '混淆矩阵.png')}")
# 8.2 性能指标对比
metrics = ['准确率', '精确率', '召回率', 'F1值']
train_scores = [train_accuracy, train_precision, train_recall, train_f1]
test_scores = [test_accuracy, test_precision, test_recall, test_f1]
fig, ax = plt.subplots(figsize=(10, 6))
x = np.arange(len(metrics))
width = 0.35
bars1 = ax.bar(x - width/2, train_scores, width, label='训练集', color='steelblue', alpha=0.8)
bars2 = ax.bar(x + width/2, test_scores, width, label='测试集', color='lightcoral', alpha=0.8)
ax.set_xlabel('性能指标', fontsize=12)
ax.set_ylabel('分数', fontsize=12)
ax.set_title('随机森林模型性能对比', fontsize=14, fontweight='bold')
ax.set_xticks(x)
ax.set_xticklabels(metrics)
ax.legend(fontsize=11)
ax.set_ylim([0, 1.1])
ax.grid(axis='y', alpha=0.3)
# 添加数值标签
for bars in [bars1, bars2]:
for bar in bars:
height = bar.get_height()
ax.text(bar.get_x() + bar.get_width()/2., height,
f'{height:.4f}',
ha='center', va='bottom', fontsize=10, fontweight='bold')
plt.tight_layout()
plt.savefig(os.path.join(pic_dir, '性能指标对比.png'), dpi=300, bbox_inches='tight')
print(f"性能指标对比图已保存至: {os.path.join(pic_dir, '性能指标对比.png')}")
# 8.3 每个类别的性能指标
precision_per_class = precision_score(y_test, y_test_pred, average=None)
recall_per_class = recall_score(y_test, y_test_pred, average=None)
f1_per_class = f1_score(y_test, y_test_pred, average=None)
fig, ax = plt.subplots(figsize=(12, 6))
x = np.arange(len(label_encoder.classes_))
width = 0.25
bars1 = ax.bar(x - width, precision_per_class, width, label='精确率', color='skyblue', alpha=0.8)
bars2 = ax.bar(x, recall_per_class, width, label='召回率', color='lightgreen', alpha=0.8)
bars3 = ax.bar(x + width, f1_per_class, width, label='F1值', color='salmon', alpha=0.8)
ax.set_xlabel('恶意家族类别', fontsize=12)
ax.set_ylabel('分数', fontsize=12)
ax.set_title('各恶意家族类别性能指标', fontsize=14, fontweight='bold')
ax.set_xticks(x)
ax.set_xticklabels(label_encoder.classes_)
ax.legend(fontsize=11)
ax.set_ylim([0, 1.1])
ax.grid(axis='y', alpha=0.3)
# 添加数值标签
for bars in [bars1, bars2, bars3]:
for bar in bars:
height = bar.get_height()
ax.text(bar.get_x() + bar.get_width()/2., height,
f'{height:.4f}',
ha='center', va='bottom', fontsize=9)
plt.tight_layout()
plt.savefig(os.path.join(pic_dir, '各类别性能指标.png'), dpi=300, bbox_inches='tight')
print(f"各类别性能指标图已保存至: {os.path.join(pic_dir, '各类别性能指标.png')}")
# 8.4 特征重要性Top 20
fig, ax = plt.subplots(figsize=(12, 8))
top_feature_names = [feature_names[i] for i in top_indices]
top_feature_importance = feature_importance[top_indices]
colors = plt.cm.viridis(np.linspace(0.3, 0.9, len(top_indices)))
bars = ax.barh(range(len(top_indices)), top_feature_importance, color=colors)
ax.set_yticks(range(len(top_indices)))
ax.set_yticklabels(top_feature_names, fontsize=9)
ax.set_xlabel('特征重要性', fontsize=12)
ax.set_ylabel('特征名称', fontsize=12)
ax.set_title('随机森林模型 - Top 20 重要特征', fontsize=14, fontweight='bold')
ax.invert_yaxis()
# 添加数值标签
for i, bar in enumerate(bars):
width = bar.get_width()
ax.text(width, bar.get_y() + bar.get_height()/2.,
f'{width:.4f}',
ha='left', va='center', fontsize=8, fontweight='bold')
plt.tight_layout()
plt.savefig(os.path.join(pic_dir, '特征重要性.png'), dpi=300, bbox_inches='tight')
print(f"特征重要性图已保存至: {os.path.join(pic_dir, '特征重要性.png')}")
# 8.5 归一化混淆矩阵(百分比)
cm_test_normalized = cm_test.astype('float') / cm_test.sum(axis=1)[:, np.newaxis]
fig, ax = plt.subplots(figsize=(10, 8))
sns.heatmap(cm_test_normalized, annot=True, fmt='.2%', cmap='YlGnBu',
xticklabels=label_encoder.classes_,
yticklabels=label_encoder.classes_,
cbar_kws={'label': '比例'})
ax.set_xlabel('预测标签', fontsize=12)
ax.set_ylabel('真实标签', fontsize=12)
ax.set_title('测试集归一化混淆矩阵(百分比)', fontsize=14, fontweight='bold')
plt.tight_layout()
plt.savefig(os.path.join(pic_dir, '归一化混淆矩阵.png'), dpi=300, bbox_inches='tight')
print(f"归一化混淆矩阵已保存至: {os.path.join(pic_dir, '归一化混淆矩阵.png')}")
# 8.6 模型性能汇总表
fig, ax = plt.subplots(figsize=(12, 6))
ax.axis('tight')
ax.axis('off')
table_data = []
table_data.append(['数据集', '准确率', '精确率', '召回率', 'F1值'])
table_data.append(['训练集', f'{train_accuracy:.4f}', f'{train_precision:.4f}',
f'{train_recall:.4f}', f'{train_f1:.4f}'])
table_data.append(['测试集', f'{test_accuracy:.4f}', f'{test_precision:.4f}',
f'{test_recall:.4f}', f'{test_f1:.4f}'])
table = ax.table(cellText=table_data, cellLoc='center', loc='center')
table.auto_set_font_size(False)
table.set_fontsize(12)
table.scale(1.2, 1.5)
# 设置表头样式
for i in range(len(table_data[0])):
table[(0, i)].set_facecolor('#4472C4')
table[(0, i)].set_text_props(weight='bold', color='white')
plt.title('随机森林模型性能汇总', fontsize=16, fontweight='bold', pad=20)
plt.savefig(os.path.join(pic_dir, '模型性能汇总.png'), dpi=300, bbox_inches='tight')
print(f"模型性能汇总表已保存至: {os.path.join(pic_dir, '模型性能汇总.png')}")
print("\n" + "=" * 80)
print("模型训练和评价完成!")
print("=" * 80)
print(f"\n所有可视化图表保存在: {pic_dir}")
print("\n生成图表列表:")
print(" - 混淆矩阵.png")
print(" - 性能指标对比.png")
print(" - 各类别性能指标.png")
print(" - 特征重要性.png")
print(" - 归一化混淆矩阵.png")
print(" - 模型性能汇总.png")
print("\n" + "=" * 80)
print("最终性能评价总结")
print("=" * 80)
print(f"\n【测试集性能】")
print(f" 准确率 (Accuracy): {test_accuracy:.4f}")
print(f" 精确率 (Precision): {test_precision:.4f}")
print(f" 召回率 (Recall): {test_recall:.4f}")
print(f" F1值 (F1-Score): {test_f1:.4f}")
print(f"\n【模型结论】")
if test_accuracy > 0.9:
print(" 模型表现优秀!准确率超过90%")
elif test_accuracy > 0.8:
print(" 模型表现良好!准确率超过80%")
elif test_accuracy > 0.7:
print(" 模型表现中等,准确率超过70%")
else:
print(" 模型需要进一步优化")
4.AI赋能基于深度学习的恶意家族分类
接下来,我们利用CodeBuddy生成深度学习CNN-BiLSTM模型实现家族分类。
第一步,构建提示词。
请撰写python Pytorch代码构建CNN-BiLSTM模型,读取processed中的训练集train_dataset.csv和测试集test_dataset.csv的数据,利用[tactic,technique,tid,api,dynamic_api]五维特征用来构建向量,五列特征融合了恶意代码的静态和动态特征,直接拼接成数据集,其分类家族为[label]列,总共5个家族。请利用CNN-BiLSTM模型进行恶意家族分类并评价性能,要求保留4位有效数字,包括精确率、召回率、F1值和准确率。请给出详细代码,并绘制可视化图形(包括混淆矩阵图),保证程序能顺利运行。
第二步,在CodeBuddy中输入提示词,经过深度思考后生成代码。

第三步,自动运行生成的深度学习代码。

由于作者是用CPU,因此代码运行比较耗时,因此该代码请大家自行尝试。注意,如果代码运行报错,大家一定要学会与CodeBuddy对话,从而优化代码直至完成相关功能。

最终生成代码如下图所示,整个代码量500多行还是非常大的。

模型构建部分的关键代码如下,完整代码请参考作者的Github。
# 1. 定义自定义数据集类
class MalwareDataset(Dataset):
def __init__(self, features, labels):
self.features = torch.FloatTensor(features)
self.labels = torch.LongTensor(labels)
def __len__(self):
return len(self.labels)
def __getitem__(self, idx):
return self.features[idx], self.labels[idx]
# 2. 定义CNN-BiLSTM模型
class CNNBiLSTM(nn.Module):
def __init__(self, input_dim, hidden_dim, num_layers, num_classes, dropout=0.5):
super(CNNBiLSTM, self).__init__()
# CNN部分
self.conv1 = nn.Conv1d(in_channels=1, out_channels=64, kernel_size=3, padding=1)
self.conv2 = nn.Conv1d(in_channels=64, out_channels=128, kernel_size=3, padding=1)
self.pool = nn.MaxPool1d(kernel_size=2)
self.dropout1 = nn.Dropout(dropout)
# 计算CNN输出维度
# 输入: (batch_size, 1, input_dim)
# Conv1: (batch_size, 64, input_dim)
# Pool1: (batch_size, 64, input_dim//2)
# Conv2: (batch_size, 128, input_dim//2)
# Pool2: (batch_size, 128, input_dim//4)
self.cnn_output_dim = 128 * (input_dim // 4)
# BiLSTM部分
self.bilstm = nn.LSTM(
input_size=self.cnn_output_dim,
hidden_size=hidden_dim,
num_layers=num_layers,
batch_first=True,
bidirectional=True,
dropout=dropout if num_layers > 1 else 0
)
# 全连接层
self.dropout2 = nn.Dropout(dropout)
self.fc = nn.Linear(hidden_dim * 2, num_classes) # 双向LSTM输出是2倍hidden_dim
# 激活函数
self.relu = nn.ReLU()
def forward(self, x):
# x shape: (batch_size, input_dim)
# Reshape for CNN: (batch_size, 1, input_dim)
x = x.unsqueeze(1)
# CNN部分
x = self.conv1(x) # (batch_size, 64, input_dim)
x = self.relu(x)
x = self.pool(x) # (batch_size, 64, input_dim//2)
x = self.dropout1(x)
x = self.conv2(x) # (batch_size, 128, input_dim//2)
x = self.relu(x)
x = self.pool(x) # (batch_size, 128, input_dim//4)
x = self.dropout1(x)
# Flatten for LSTM: (batch_size, cnn_output_dim)
x = x.view(x.size(0), -1)
# Reshape for LSTM: (batch_size, 1, cnn_output_dim)
x = x.unsqueeze(1)
# BiLSTM部分
lstm_out, (h_n, c_n) = self.bilstm(x) # h_n shape: (num_layers*2, batch_size, hidden_dim)
# 使用最后一个时间步的输出
# 拼接前向和后向的最终隐藏状态
h_forward = h_n[-2] # 前向最后一层
h_backward = h_n[-1] # 后向最后一层
x = torch.cat([h_forward, h_backward], dim=1) # (batch_size, hidden_dim*2)
# 全连接层
x = self.dropout2(x)
x = self.fc(x) # (batch_size, num_classes)
return x
# 3. 读取数据
print("\n【步骤1: 读取数据】")
train_df = pd.read_csv('c:/Users/xiuzhang/Desktop/mal_analysis/processed/train_dataset.csv')
test_df = pd.read_csv('c:/Users/xiuzhang/Desktop/mal_analysis/processed/test_dataset.csv')
print(f"训练集样本数: {len(train_df)}")
print(f"测试集样本数: {len(test_df)}")
# 4. 特征工程
print("\n【步骤2: 特征工程】")
feature_columns = ['tactic', 'technique', 'tid', 'api', 'dynamic_api']
# 填充缺失值
for col in feature_columns:
train_df[col] = train_df[col].fillna('')
test_df[col] = test_df[col].fillna('')
# 使用TF-IDF编码
print("使用TF-IDF对文本特征进行编码...")
tactic_tfidf = TfidfVectorizer(max_features=100, token_pattern=r'(?u)\b\w+\b|;')
technique_tfidf = TfidfVectorizer(max_features=100, token_pattern=r'(?u)\b\w+\b|;')
tid_tfidf = TfidfVectorizer(max_features=50, token_pattern=r'(?u)\b\w+\b|;')
api_tfidf = TfidfVectorizer(max_features=200, token_pattern=r'(?u)\b\w+\b|;')
dynamic_api_tfidf = TfidfVectorizer(max_features=200, token_pattern=r'(?u)\b\w+\b|;')
# 训练集
train_tactic = tactic_tfidf.fit_transform(train_df['tactic'].astype(str)).toarray()
train_technique = technique_tfidf.fit_transform(train_df['technique'].astype(str)).toarray()
train_tid = tid_tfidf.fit_transform(train_df['tid'].astype(str)).toarray()
train_api = api_tfidf.fit_transform(train_df['api'].astype(str)).toarray()
train_dynamic_api = dynamic_api_tfidf.fit_transform(train_df['dynamic_api'].astype(str)).toarray()
X_train = np.hstack([train_tactic, train_technique, train_tid, train_api, train_dynamic_api])
# 测试集
test_tactic = tactic_tfidf.transform(test_df['tactic'].astype(str)).toarray()
test_technique = technique_tfidf.transform(test_df['technique'].astype(str)).toarray()
test_tid = tid_tfidf.transform(test_df['tid'].astype(str)).toarray()
test_api = api_tfidf.transform(test_df['api'].astype(str)).toarray()
test_dynamic_api = dynamic_api_tfidf.transform(test_df['dynamic_api'].astype(str)).toarray()
X_test = np.hstack([test_tactic, test_technique, test_tid, test_api, test_dynamic_api])
print(f"训练集特征向量维度: {X_train.shape}")
print(f"测试集特征向量维度: {X_test.shape}")
# 5. 标签编码
print("\n【步骤3: 标签编码】")
label_encoder = LabelEncoder()
y_train = label_encoder.fit_transform(train_df['label'])
y_test = label_encoder.transform(test_df['label'])
print(f"标签类别: {label_encoder.classes_}")
print(f"标签数量: {len(label_encoder.classes_)}")
# 6. 创建数据加载器
print("\n【步骤4: 创建数据加载器】")
train_dataset = MalwareDataset(X_train, y_train)
test_dataset = MalwareDataset(X_test, y_test)
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
print(f"训练集批次数: {len(train_loader)}")
print(f"测试集批次数: {len(test_loader)}")
# 7. 创建模型
print("\n【步骤5: 创建CNN-BiLSTM模型】")
input_dim = X_train.shape[1] # 560
hidden_dim = 128
num_layers = 2
num_classes = len(label_encoder.classes_) # 5
model = CNNBiLSTM(input_dim, hidden_dim, num_layers, num_classes, dropout=0.5)
model = model.to(device)
print(f"模型结构:")
print(model)
print(f"\n模型参数数量: {sum(p.numel() for p in model.parameters()):,}")
# 8. 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-4)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5, verbose=True)
5.AI赋能可视化聚类分析
最后,我们尝试进行可视化降维分析,提示词如下:
现在需要进行降维可视化分析,请读取test_dataset.csv文件中的特征[tactic,technique,tid,api,dynamic_api]来构建向量,利用t-SNE进行可视化分析,其分类的家族为label列,共5个家族。最终呈现美观的聚类效果图。注意,整个代码利用Python实现,并且家族之间颜色不同,呈现的效果美观。

运行结果如下图所示,还需要进一步结合实验特征优化表征。


四.总结
本文围绕 AI Coding 与安全分析的融合实践,系统探讨了 CodeBuddy 在恶意代码分析与家族分类中的应用路径。从传统静态与动态特征分析的局限性出发,文章展示了大语言模型驱动的 AI Coding 如何在特征提取、数据预处理、分类建模与可视化分析等环节中显著提升分析效率与工程一致性,体现了智能化方法在复杂安全任务中的现实价值。
未来,大语言模型(LLM)与智能体(Agent)将在恶意代码分析领域扮演更加核心的角色。
- 在特征建模层面,LLM 有望实现对二进制代码、反汇编结果和运行日志的语义级理解,从而减少对人工特征工程的依赖,提升对混淆、变种与对抗样本的鲁棒性。
- 在分析流程层面,引入具备规划与执行能力的安全智能体,可将恶意代码分析任务拆解为自动化的多步骤流程,实现从样本采集、行为分析到家族归因的自主协同分析。
- 在知识层面,LLM 可与知识图谱和威胁情报库深度融合,支持跨样本、跨家族的关联推理与攻击链重构,增强分析结果的可解释性与可追溯性。
此外,在工程实践中,AI Coding 平台与安全工具链的深度集成,将推动恶意代码分析从“工具驱动”向“智能协作”转变,使安全分析人员逐步从底层实现细节中解放出来,更多关注威胁建模与决策支持问题。总体而言,LLM 与智能体的引入不仅将重塑恶意代码分析的技术路径,也为构建高效、智能、可演化的安全分析体系提供了重要发展方向。
与此同时,Eastmount已正式开启《AI Coding》专栏,将持续发布关于大模型辅助编程、国产AI IDE工具评测、AI自动化开发实战等系列内容,欢迎关注专栏,一起探索智能开发的前沿趋势,不断学习与精进。基础性文章,希望对您有所帮助,写得不好的地方还请海涵!
2024年4月28日是Eastmount的安全星球——『网络攻防和AI安全之家』正式创建和运营的日子,该星球目前主营业务为 安全零基础答疑、安全技术分享、AI安全技术分享、AI安全论文交流、威胁情报每日推送、网络攻防技术总结、系统安全技术实战、面试求职、安全考研考博、简历修改及润色、学术交流及答疑、人脉触达、认知提升等。下面是星球的新人券,欢迎新老博友和朋友加入,一起分享更多安全知识,比较良心的星球,非常适合初学者和换安全专业的读者学习。
目前收到了很多博友、朋友和老师的支持和点赞,尤其是一些看了我文章多年的老粉,购买来感谢,真的很感动,类目。未来,我将分享更多高质量文章,更多安全干货,真心帮助到大家。虽然起步晚,但贵在坚持,像十多年如一日的博客分享那样,脚踏实地,只争朝夕。继续加油,再次感谢!
(By:Eastmount 2026-01-22 周四写于贵阳 http://blog.csdn.net/eastmount/ )
更多推荐


所有评论(0)