本地大模型生成漏洞信息库实战
本文介绍了一个基于本地部署大型语言模型的漏洞信息处理系统。系统采用 B/S + C/S 混合架构,React、Axum 和 Tonic 框架构建,Rust 和 TypeScript 语言编写。全栈实战。
·
1 概述
利用本地部署的大型语言模型对 NVD/CVE 的英文原始数据进行解析与生成, 输出经校验的中文漏洞条目,供 Web 平台展示与检索。
2 方案简述
2.1 数据展示
- 通过 Web 界面呈现漏洞信息
- 列表视图:CVE 编号、标题、状态、更新时间、发布日期
- 详情视图:CVE 编号、中英文标题、状态、更新时间、发布日期、中英文描述、受影响配置
列表展示
详情展示
2.2 数据生成
- 获取 NVD 与 CVE 的英文原始数据
- 抽取、清洗并合并来自各源的数据
- 使用本地大型语言模型对新增与变更项生成或中文信息
- 将生成结果写入并更新 Web 数据库
3 代码实现
3.1 库表定义
CREATE TABLE "tbl_vuln" ( "cve_id" varchar NOT NULL PRIMARY KEY, "title_en" varchar NOT NULL, "title_cn" varchar NOT NULL, "desc_en" varchar NOT NULL, "desc_cn" varchar NOT NULL, "status" varchar NOT NULL, "configurations" varchar NOT NULL, "published_at" datetime_text NOT NULL, "last_modified_at" datetime_text NOT NULL );
3.2 RESTful API
RESTful API 用于 Web 数据展示
代码如下:
use axum::{ Json, Router, extract::{Path, Query, State}, http::StatusCode, response::IntoResponse, routing::get, }; use entity::tbl_vuln; use sea_orm::{ColumnTrait, Condition, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder}; use serde::{Deserialize, Serialize}; use serde_json::json; use validator::Validate; use crate::restful_server::AppState; pub fn routers(state: AppState) -> Router { Router::new() .route("/vulns", get(vuln_query)) .route("/vulns/{cve_id}", get(vuln_detail)) .with_state(state) } #[derive(Deserialize, Debug, Validate)] struct QueryInputDto { keyword: Option<String>, size: u64, page: u64, } #[derive(Serialize, Debug)] struct QueryOutputDto { cve_id: String, title: String, status: String, last_modified_at: i64, published_at: i64, } async fn vuln_query( app_state: State<AppState>, Query(query_input_dto): Query<QueryInputDto>, ) -> impl IntoResponse { let mut select = tbl_vuln::Entity::find(); if let Some(keyword) = query_input_dto.keyword { if !keyword.is_empty() { let like_pattern = format!("%{keyword}%"); let condition = Condition::any() .add(tbl_vuln::Column::CveId.like(&like_pattern)) .add(tbl_vuln::Column::TitleEn.like(&like_pattern)) .add(tbl_vuln::Column::TitleCn.like(like_pattern)); select = select.filter(condition); } } let paginator = select .order_by_desc(tbl_vuln::Column::LastModifiedAt) .paginate(&app_state.db_conn, query_input_dto.size); let num_pages = match paginator.num_pages().await { Ok(v) => v, Err(e) => { log::error!("num_pages err: {}", e); return StatusCode::INTERNAL_SERVER_ERROR.into_response(); } }; let num_items = match paginator.num_items().await { Ok(v) => v, Err(e) => { log::error!("num_items err: {}", e); return StatusCode::INTERNAL_SERVER_ERROR.into_response(); } }; let tbl_vulns = match paginator.fetch_page(query_input_dto.page).await { Ok(v) => v, Err(e) => { log::error!("fetch_page err: {}", e); return StatusCode::INTERNAL_SERVER_ERROR.into_response(); } }; let mut vulns = Vec::new(); for tbl_vuln in tbl_vulns { vulns.push(QueryOutputDto { cve_id: tbl_vuln.cve_id, title: tbl_vuln.title_cn, status: tbl_vuln.status, last_modified_at: tbl_vuln.last_modified_at.and_utc().timestamp_millis(), published_at: tbl_vuln.published_at.and_utc().timestamp_millis(), }); } ( StatusCode::OK, Json(json!( { "page":{ "size":query_input_dto.size, "total_elements":num_items, "total_pages":num_pages }, "_embedded":{ "vuln":vulns } } )), ) .into_response() } async fn vuln_detail( Path(cve_id): Path<String>, State(app_state): State<AppState>, ) -> impl IntoResponse { match tbl_vuln::Entity::find() .filter(tbl_vuln::Column::CveId.eq(&cve_id)) .one(&app_state.db_conn) .await { Ok(op) => match op { Some(tbl_vuln) => ( StatusCode::OK, Json(json!({ "cve_id":tbl_vuln.cve_id, "title_en":tbl_vuln.title_en, "title_cn":tbl_vuln.title_cn, "desc_en":tbl_vuln.desc_en, "desc_cn":tbl_vuln.desc_cn, "status":tbl_vuln.status, "configurations":tbl_vuln.configurations, "last_modified_at":tbl_vuln.last_modified_at.and_utc().timestamp_millis(), "published_at":tbl_vuln.published_at.and_utc().timestamp_millis(), })), ) .into_response(), None => StatusCode::BAD_REQUEST.into_response(), }, Err(e) => { log::error!("find tbl_vuln {} db err: {}", cve_id, e); StatusCode::INTERNAL_SERVER_ERROR.into_response() } } }
3.3 gRPC API
proto 定义:
syntax = "proto3"; package vuln; service VulnService { // 获取所有漏洞时间 rpc PullVulnTime(Empty) returns (stream VulnTime) {} // 更新漏洞信息 rpc PushVulnInfo(stream VulnInfo) returns (Empty) {} } message VulnTime { string cve_id = 1; int64 published_at = 2; int64 last_modified_at = 3; } message VulnInfo { string cve_id = 1; string title_en = 2; string title_cn = 3; string desc_en = 4; string desc_cn = 5; string status = 6; string configurations = 7; int64 published_at = 8; int64 last_modified_at = 9; } message Empty {}
代码如下:
use std::fs; use chrono::{DateTime, NaiveDateTime}; use entity::tbl_vuln; use sea_orm::{ ActiveModelTrait, ActiveValue::Set, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, QuerySelect, }; use tokio_stream::wrappers::ReceiverStream; use tonic::{ Code, Request, Response, Status, codec::CompressionEncoding, metadata::MetadataMap, service::{Interceptor, interceptor::InterceptedService}, transport::{Identity, Server, ServerTlsConfig}, }; use crate::{ config::CONFIG_TOML, grpc_server::proto::{ Empty, VulnInfo, VulnTime, vuln_service_server::{VulnService, VulnServiceServer}, }, }; pub mod proto { tonic::include_proto!("vuln"); } #[derive(Debug, Clone)] pub struct VulnInterceptor {} impl Interceptor for VulnInterceptor { fn call(&mut self, req: tonic::Request<()>) -> Result<tonic::Request<()>, Status> { let agent_id = extract_metadata_value(req.metadata(), "agent_id")?; if !CONFIG_TOML .grpc_server .agent_ids .contains(&agent_id.to_string()) { return Err(Status::new( Code::PermissionDenied, format!("agent_id {agent_id} not have permission"), )); } let token = extract_metadata_value(req.metadata(), "token")?; if !CONFIG_TOML.grpc_server.tokens.contains(&token.to_string()) { return Err(Status::new( Code::PermissionDenied, format!("token {token} not have permission"), )); } Ok(req) } } fn extract_metadata_value<'a>(metadata: &'a MetadataMap, key: &str) -> Result<&'a str, Status> { match metadata.get(key) { Some(v) => { if v.is_empty() { log::warn!("{} is empty", key); return Err(Status::new( Code::InvalidArgument, format!("{} is empty", key), )); } match v.to_str() { Ok(val) => Ok(val), Err(e) => { log::error!("{} to_str error: {}", key, e); Err(Status::new(Code::Internal, format!("{} to_str error", key))) } } } None => { log::warn!("{} not exist", key); Err(Status::new( Code::InvalidArgument, format!("{} not exist", key), )) } } } #[derive(Debug)] struct VulnServer { db_conn: sea_orm::DatabaseConnection, } #[tonic::async_trait] impl VulnService for VulnServer { type PullVulnTimeStream = ReceiverStream<Result<VulnTime, Status>>; async fn pull_vuln_time( &self, _request: Request<Empty>, ) -> Result<Response<Self::PullVulnTimeStream>, Status> { let (tx, rx) = tokio::sync::mpsc::channel(10); let db_conn_clone = self.db_conn.clone(); tokio::spawn(async move { if let Err(e) = get_vuln_time(db_conn_clone, tx).await { log::error!("get_vuln_time err: {}", e); } }); Ok(Response::new(ReceiverStream::new(rx))) } async fn push_vuln_info( &self, request: Request<tonic::Streaming<VulnInfo>>, ) -> Result<Response<Empty>, Status> { let mut stream = request.into_inner(); log::info!("start push vuln info"); while let Some(vuln_info) = stream.message().await? { if let Err(e) = upsert(&self.db_conn, &vuln_info).await { log::error!("upsert err: {}", e); return Err(Status::new(Code::Internal, format!("upsert err {}", e))); } } log::info!("push vuln info finish"); Ok(Response::new(Empty {})) } } async fn get_vuln_time( db_conn: sea_orm::DatabaseConnection, tx: tokio::sync::mpsc::Sender<Result<VulnTime, tonic::Status>>, ) -> anyhow::Result<()> { log::info!("start get vuln time"); let vuln_times = tbl_vuln::Entity::find() .select_only() .column(tbl_vuln::Column::CveId) .column(tbl_vuln::Column::PublishedAt) .column(tbl_vuln::Column::LastModifiedAt) .into_tuple::<(String, NaiveDateTime, NaiveDateTime)>() .all(&db_conn) .await?; for (cve_id, published_at, last_modified_at) in vuln_times { tx.send(Ok(VulnTime { cve_id, published_at: published_at.and_utc().timestamp_millis(), last_modified_at: last_modified_at.and_utc().timestamp_millis(), })) .await?; } log::info!("get vuln time finish"); Ok(()) } async fn upsert(db_conn: &sea_orm::DatabaseConnection, vuln_info: &VulnInfo) -> anyhow::Result<()> { let published_at = match DateTime::from_timestamp_millis(vuln_info.published_at) { Some(v) => v.naive_utc(), None => { return Err(anyhow::anyhow!("published_at to datetime none")); } }; let last_modified_at = match DateTime::from_timestamp_millis(vuln_info.last_modified_at) { Some(v) => v.naive_utc(), None => { return Err(anyhow::anyhow!("last_modified_at to datetime none")); } }; match tbl_vuln::Entity::find() .filter(tbl_vuln::Column::CveId.eq(&vuln_info.cve_id)) .one(db_conn) .await? { Some(v) => { let mut tbl_vuln_am = v.into_active_model(); tbl_vuln_am.title_en = Set(vuln_info.title_en.clone()); tbl_vuln_am.title_cn = Set(vuln_info.title_cn.clone()); tbl_vuln_am.desc_en = Set(vuln_info.desc_en.clone()); tbl_vuln_am.desc_cn = Set(vuln_info.desc_cn.clone()); tbl_vuln_am.status = Set(vuln_info.status.clone()); tbl_vuln_am.configurations = Set(vuln_info.configurations.clone()); tbl_vuln_am.published_at = Set(published_at); tbl_vuln_am.last_modified_at = Set(last_modified_at); tbl_vuln_am.save(db_conn).await?; } None => { let tbl_vuln_am = tbl_vuln::ActiveModel { cve_id: Set(vuln_info.cve_id.clone()), title_en: Set(vuln_info.title_en.clone()), title_cn: Set(vuln_info.title_cn.clone()), desc_en: Set(vuln_info.desc_en.clone()), desc_cn: Set(vuln_info.desc_cn.clone()), status: Set(vuln_info.status.clone()), configurations: Set(vuln_info.configurations.clone()), published_at: Set(published_at), last_modified_at: Set(last_modified_at), }; tbl_vuln::Entity::insert(tbl_vuln_am).exec(db_conn).await?; } } Ok(()) }
3.4 中文信息生成
use std::collections::{HashMap, HashSet}; use serde::{Deserialize, Serialize}; use serde_json::Value; use crate::nvd::get_part_vendor_product_from_configurations; #[derive(Debug, Clone, Serialize, Deserialize)] struct Message { role: String, content: String, } #[derive(Debug, Clone, Serialize, Deserialize)] struct ReqBody { model: String, messages: Vec<Message>, stream: bool, } pub async fn generate_title_cn_and_desc_cn( title_en: &str, desc_en: &str, ) -> anyhow::Result<Option<(String, String)>> { let ollama_url = "http://172.16.60.18:11434/api/chat"; let prompt: &str = r#" You are a security expert. You are good at translate English to Simplified Chinese. Please generate title_cn by title_en and desc_en. Please generate desc_cn by desc_en. Output strictly follows the xml format <title_cn></title_cn> and <desc_cn></desc_cn>."#; let model = "gemma2:27b"; let max_len = 2048; let desc_en = if desc_en.len() > max_len { log::warn!("desc_en len {} cut to 2048", desc_en.len()); desc_en.chars().take(max_len).collect() } else { desc_en.to_string() }; let content = format!("\ntitle: {}\ndescription: {}", title_en, desc_en); let client = reqwest::ClientBuilder::new().build()?; let system_msg = Message { role: "system".to_string(), content: prompt.to_string(), }; let user_msg = Message { role: "user".to_string(), content, }; let req_body = ReqBody { model: model.to_string(), messages: [system_msg, user_msg].to_vec(), stream: false, }; let req_body = serde_json::to_value(req_body)?; let rsp = client .post(ollama_url) .body(req_body.to_string()) .send() .await?; let text = rsp.text().await?; let json: serde_json::Value = serde_json::from_str(&text)?; let content = json["message"]["content"].as_str().unwrap_or_default(); // log::info!("content:\n{}", content); if let Some((_, tmp)) = content.split_once("<title_cn>") { if let Some((title_cn, tmp)) = tmp.split_once("</title_cn>") { let title_cn = title_cn.trim(); if title_cn.is_empty() { log::warn!("title_cn is empty: {}", content); } else if let Some((_, tmp)) = tmp.split_once("<desc_cn>") { if let Some((desc_cn, _)) = tmp.split_once("</desc_cn>") { let desc_cn = desc_cn.trim(); if desc_cn.is_empty() { log::warn!("desc_cn is empty: {}", content); } else { return Ok(Some((title_cn.to_string(), desc_cn.to_string()))); } } } } } log::warn!("generate_vul_title_to_cn failed, content:\n{}", content); Ok(None) } pub async fn generate_title_and_desc_cn( cve_id: &str, desc_en: &str, configurations: &str, part_vendor_product_title_map: &HashMap<String, String>, ) -> anyhow::Result<Option<(String, String, String)>> { let ollama_url = "http://172.16.60.18:11434/api/chat"; let client = reqwest::ClientBuilder::new().build()?; let action_map = HashMap::from([ ("Remote Code Execution", "代码执行"), ("Elevation of Privilege", "权限提升"), ("Information Disclosure", "信息泄露"), ("Spoofing", "欺骗"), ("Tampering", "篡改"), ("Denial of Service", "拒绝服务"), ("Cross-Site Scripting", "跨站脚本攻击"), ("Cross-Site Request Forgery", "跨站请求伪造"), ("Buffer Overflow", "缓冲区溢出"), ("SQL Injection", "SQL注入"), ("Path Traversal", "路径遍历"), ("Command Injection", "命令注入"), ("LDAP Injection", "LDAP注入"), ("Server-Side Request Forgery", "服务器端请求伪造"), ("Authorization Bypass", "授权绕过"), ("Insecure Deserialization", "不安全的反序列化"), ("Race Conditions", "竞态条件"), ]); let prompt_whoareyou = r#"You are a security expert."#; let prompt_product = r#"You are good at identify software product by CVE description, output strictly follows the xml format <product></product>."#; let prompt_action = format!( "You are good at summary action by CVE description, action is one of {:?} and other web vulunerbility basic type, output strictly follows the xml format <action></action>.", action_map.keys() ); let prompt_description = r#"You are good at translate CVE descrption to Simplified Chinese, output strictly follows the xml format <chs_description></chs_description>."#; let max_len = 2048; let description_en = if desc_en.len() > max_len { log::warn!("description_en len {} cut to 2048", desc_en.len()); desc_en.chars().take(max_len).collect() } else { desc_en.to_string() }; // 读取精确的product title let json: Value = serde_json::from_str(configurations)?; let cpe23s = get_part_vendor_product_from_configurations(&json); // log::info!("cpe23s: {:?}", cpe23s); let mut product_title_set = HashSet::new(); for cpe23 in cpe23s { if let Some(title) = part_vendor_product_title_map.get(&cpe23) { // log::info!("cpe23: {}, title: {}", cpe23, title); product_title_set.insert(title.clone()); }; if product_title_set.len() > 10 { log::warn!("product_title_set too many, break insert"); break; } } // log::info!("product_title_set: {:?}", product_title_set); let content = match product_title_set.len() { 0 => { // 涉及的product title字典里没有 format!( "{}\n{}\n{}\n{}", prompt_whoareyou, prompt_product, prompt_action, prompt_description ) } _ => { // 涉及的product title字典里有多个,让AI选择 format!( "{}\n{} Software product is one of{:?}\n{}\n{}", prompt_whoareyou, prompt_product, product_title_set, prompt_action, prompt_description ) } }; // log::info!("prompt:\n{}", content); let system_msg = Message { role: "system".to_string(), content, }; let user_msg = Message { role: "user".to_string(), content: description_en, }; let req_body = ReqBody { model: "gemma2:27b".to_string(), messages: [system_msg.clone(), user_msg].to_vec(), stream: false, }; let req_body = serde_json::to_value(req_body)?; let rsp = client .post(ollama_url) .body(req_body.to_string()) .send() .await?; let text = rsp.text().await?; let json: serde_json::Value = serde_json::from_str(&text)?; let content = json["message"]["content"].as_str().unwrap_or_default(); // log::info!("content:\n{}", content); if let Some((_, tmp)) = content.split_once("<product>") { if let Some((product, tmp)) = tmp.split_once("</product>") { // product非常确定的情况 let product = if product_title_set.len() == 1 { log::info!("use product is in dictionary"); product_title_set.iter().next().unwrap().as_str() } else { product }; if product.is_empty() { log::warn!("product is tempty"); return Ok(Some(( cve_id.to_string(), cve_id.to_string(), cve_id.to_string(), ))); } else if product.len() > 100 { log::warn!("product is too long {}, {}", product.len(), product); } if let Some((_, tmp)) = tmp.split_once("<action>") { if let Some((action, tmp)) = tmp.split_once("</action>") { if action.is_empty() { log::warn!("action is tempty"); return Ok(Some(( cve_id.to_string(), cve_id.to_string(), cve_id.to_string(), ))); } if action.len() > 100 { log::warn!("action is too long {}, {}", action.len(), action); } if let Some((_, tmp)) = tmp.split_once("<chs_description>") { if let Some((description_zh, _)) = tmp.split_once("</chs_description>") { if description_zh.is_empty() { log::warn!("description_zh is tempty"); return Ok(Some(( cve_id.to_string(), cve_id.to_string(), cve_id.to_string(), ))); } let action_zh = match action_map.get(action) { Some(v) => v, None => action, }; let title_zh = format!("{} {}漏洞({})", product, action_zh, cve_id); let title_en = format!("{} {} Vulnerability({})", product, action, cve_id); // log::info!("title_zh: {}", title_zh); // log::info!("title_en: {}", title_en); // log::info!("description_zh: {}", description_zh); return Ok(Some((title_zh, title_en, description_zh.to_string()))); } }; } }; } }; log::warn!("generate_title_and_desc_cn faild"); Ok(None) }
更多推荐
所有评论(0)