🔥 关键词:#Rust 工程级扫描器实战 #Rust 红队开发 #Rust 网络扫描 #扫描器工程骨架期 #红队工具开发

📌 专栏说明:本专栏聚焦 Rust 红队扫描器(LSCAN)从脚本到工程化落地的全流程,核心探讨工程架构、模块划分、并发模型与设计取舍,不涉及基础语法教学,适合有 Rust 基础的红队工具开发者参考。


🔥 前言

上一篇我们完成了聚焦 cli+session 模块!CLI 是扫描器与用户交互的入口,Session 是参数到业务逻辑的桥梁,二者共同构成「用户输入 → 业务可用数据」的完整链路。本文将基于 clap v4+ 实现工程级 CLI 参数解析,通过 Session 层完成业务规则落地,全程干货代码 + 设计思路解析,让扫描器具备灵活、健壮的命令行交互能力!

本篇内容聚焦并发框架升级!并发是红队扫描器的 “性能核心”,直接决定扫描效率与资源占用。本文将深度解析 Worker Pool 与 Stream 两种核心并发模式,结合扫描器三阶段流水线特性给出最优选型方案,并落地工程级并发架构代码,让扫描器兼具高性能与资源可控性!


一、背景与目标

1. 背景

红队扫描器作为高性能工具,需在 “扫描效率” 与 “资源消耗” 之间找到平衡:

  • 主机存活探测、端口扫描、服务识别三阶段任务特性差异大(任务量、耗时、IO/CPU 密集度不同);

  • 单一并发模式无法适配所有阶段(如端口扫描任务量超 10 万 +,主机探测仅千级);

  • 需参考 Nmap/Masscan/RustScan 等成熟工具的并发设计,兼顾性能与稳定性;

  • 需通过工程化代码落地并发架构,实现可扩展、可维护的异步扫描流程。

2. 核心目标

  • 深度解析 Worker Pool 与 Stream 两种并发模式的特性、优劣及适用场景;

  • 结合扫描器三阶段流水线特性,为每个阶段匹配最优并发模式;

  • 设计混合并发架构,基于 Tokio 异步运行时落地工程级实现;

  • 编写可扩展的异步扫描框架代码,支持各阶段并发度配置与性能统计;

  • 落地成熟扫描工具的并发最佳实践(速率限制、超时控制、批量处理)。

二、核心思路

1. 并发模式深度分析

(1)Worker Pool 模式

架构设计

┌─────────────────────────────────────┐
│         任务队列 (Task Queue)        │
│  [Task1][Task2][Task3]...[TaskN]   │
└──────────┬──────────────────────────┘
           │
    ┌──────┴──────┬──────┬──────┐
    ▼             ▼      ▼      ▼
┌────────┐  ┌────────┐  ┌────────┐
│Worker 1│  │Worker 2│  │Worker N│
└────────┘  └────────┘  └────────┘
    │             │           │
    └─────────────┴───────────┘
              ▼
        结果收集器

核心特性

  • 固定数量 Worker 线程 / 任务,从共享队列获取任务;

  • 资源可控,内存占用稳定,线程数量可预测;

  • 实现复杂度高,需手动管理队列和 Worker 生命周期;

  • 适配场景:端口扫描(1000+ 端口 × 多主机)等超大规模任务集。

(2)队列并发模式(Stream/Channel)

架构设计

输入流 → [并发处理] → 输出流
         ↓  ↓  ↓
      同时处理 N 个任务

核心特性

  • 基于 futures::stream/tokio::spawn,通过 buffer_unordered(N) 控制并发度;

  • 实现简洁、代码易读,灵活适配异步 IO 密集型任务;

  • 内存占用随并发数增加,不适合超大规模任务;

  • 适配场景:主机存活探测(IP 数量适中)、服务识别等中等规模任务集。

(3)两种模式对比
维度 Worker Pool Stream 并发
实现复杂度
内存占用 稳定 随并发数增加
适合任务数 超大规模 (10000+) 中等规模 (100-5000)
代码可读性 中等
灵活性
性能 极高

2. 扫描流程并发设计

(1)三阶段流水线特性
阶段1: 主机存活探测
  输入: ["192.168.1.1", "192.168.1.0/24"]
  输出: [IP1, IP2, IP3...] (存活主机)
         ↓
阶段2: 端口扫描
  输入: [IP1, IP2, IP3] × [80, 443, 8080...]
  输出: [(IP1:80), (IP1:443), (IP2:80)...] (开放端口)
         ↓
阶段3: 服务识别
  输入: [(IP1:80), (IP2:443)...]
  输出: [Service1, Service2...] (服务信息)
(2)各阶段并发选型
阶段 任务数量 推荐模式 原因 并发度
主机存活 10-1000 Stream 并发 任务数适中,需快速响应 50-200
端口扫描 1000-100000+ Worker Pool 任务量巨大,需资源控制 1000-5000
服务识别 10-1000 Stream 并发 任务数适中,IO 密集 50-100

3. 混合并发架构设计

最终架构

┌─────────────────────────────────────────────────────┐
│                  Application                         │
│  (异步运行时 - Tokio)                               │
└────────────┬────────────────────────────────────────┘
             │
    ┌────────┴────────┬──────────────┬──────────────┐
    ▼                 ▼              ▼              ▼
┌─────────┐      ┌──────────┐  ┌──────────┐  ┌──────────┐
│ 主机探测 │      │ 端口扫描 │  │ 服务识别 │  │ 结果收集 │
│ Stream  │      │ WorkPool │  │ Stream   │  │          │
└─────────┘      └──────────┘  └──────────┘  └──────────┘
  并发度:50-200    并发度:1000-5000  并发度:50-100

核心设计思路

  • 基于 Tokio 异步运行时统一调度,按阶段适配不同并发模式;

  • 抽象异步 Trait 定义各阶段扫描行为,解耦具体实现与框架逻辑;

  • 配置化并发度,支持各阶段独立调整;

  • 加入性能统计,实时监控各阶段耗时与扫描结果。

三、核心代码分析

use async_trait::async_trait;
use clap::Parser;
use tracing::{info, warn};
​
use crate::{
    cli::Cli,
    configs::config::AppConfig,
    model::{
        concurrency::ConcurrencyConfig,
        mode::{HostDiscovery, PortDiscovery, ScanResults, ServiceDiscovery, Session},
    },
    utils::{error::Result, output::print_scan_results},
};
​
// 异步应用结构体
pub struct Application {
    host_alive: Box<dyn HostAlive>,
    port_scan: Box<dyn PortScan>,
    service_detector: Box<dyn ServiceDetector>,
}
​
impl Application {
    pub fn new(config: AppConfig, _concurrency_config: ConcurrencyConfig) -> Result<Self> {
        Ok(Self {
            host_alive: crate::scanner::host::create_host_alive(config.clone())?,
            port_scan: crate::scanner::port::create_port_scan(config.clone())?,
            service_detector: crate::scanner::service::create_service_detector(config.clone())?,
        })
    }
​
    pub async fn runner(config: AppConfig, concurrency_config: ConcurrencyConfig) -> Result<()> {
        // 参数解析
        let cli = Cli::parse();
​
        // 创建扫描会话
        let session: Session = cli.clone().into();
​
        // 创建 Application 实例(加载扫描器实现)
        let app = Application::new(config, concurrency_config.clone())?;
        let mut results = ScanResults::new();
​
        let start_time = std::time::Instant::now();
        let mut host_elapsed = std::time::Duration::from_secs(0);
​
        // 主机存活探测
        if session.alive.is_some() {
            info!(
                "开始主机存活探测,并发度: {}",
                concurrency_config.host_concurrency
            );
            let host_start = std::time::Instant::now();
            let host_engine = app.host_alive.scan(&session, &cli.targets).await?;
            host_elapsed = host_start.elapsed();
​
            if host_engine.is_empty() {
                warn!("未探测到存活主机,扫描终止");
                return Ok(());
            }
            results.host_alive_num = results.record_hosts(host_engine.clone()).len();
            info!(
                "主机存活探测完成,发现 {} 个存活主机,耗时 {:.2}s",
                results.host_alive_num,
                host_elapsed.as_secs_f64()
            );
        } else {
            info!("跳过主机存活探测,直接扫描所有目标");
            // 创建虚拟的存活主机列表
            let hosts: Vec<HostDiscovery> = cli
                .targets
                .iter()
                .filter_map(|target| {
                    target
                        .parse::<std::net::IpAddr>()
                        .ok()
                        .map(|ip| HostDiscovery {
                            ip,
                            is_alive: true,
                            os_type: "unknown".to_string(),
                        })
                })
                .collect();
            results.host_alive_num = hosts.len();
            results.record_hosts(hosts.clone());
        }
​
        // 端口开放扫描
        info!(
            "开始端口扫描,并发度: {}",
            concurrency_config.port_concurrency
        );
        let port_start = std::time::Instant::now();
        let port_engine = app
            .port_scan
            .scan(&session, &results.host_discovery)
            .await?;
        let port_elapsed = port_start.elapsed();
​
        if port_engine.is_empty() {
            warn!("未探测到开放端口,扫描终止");
            return Ok(());
        }
        results.total_port_num = results.record_ports(port_engine.clone()).len();
        info!(
            "端口扫描完成,发现 {} 个开放端口,耗时 {:.2}s",
            results.total_port_num,
            port_elapsed.as_secs_f64()
        );
​
        // 服务类型识别
        info!(
            "开始服务识别,并发度: {}",
            concurrency_config.service_concurrency
        );
        let service_start = std::time::Instant::now();
        let service_engine = app.service_detector.scan(&session, &port_engine).await?;
        let service_elapsed = service_start.elapsed();
​
        results.record_services(service_engine);
        info!("服务识别完成,耗时 {:.2}s", service_elapsed.as_secs_f64());
​
        // 扫描结果打印
        print_scan_results(&results);
​
        // 性能统计
        let total_elapsed = start_time.elapsed();
        info!("=== 性能统计 ===");
        if session.alive.is_some() {
            info!("主机探测: {:.2}s", host_elapsed.as_secs_f64());
        }
        info!("端口扫描: {:.2}s", port_elapsed.as_secs_f64());
        info!("服务识别: {:.2}s", service_elapsed.as_secs_f64());
        info!("总耗时: {:.2}s", total_elapsed.as_secs_f64());
​
        Ok(())
    }
}
​
// ========== 异步 Trait 定义 ==========
​
/// 主机存活探测 trait
#[async_trait]
pub trait HostAlive: Send + Sync {
    async fn scan(&self, session: &Session, targets: &[String]) -> Result<Vec<HostDiscovery>>;
}
​
/// 端口扫描 trait
#[async_trait]
pub trait PortScan: Send + Sync {
    async fn scan(&self, session: &Session, hosts: &[HostDiscovery]) -> Result<Vec<PortDiscovery>>;
}
​
/// 服务识别 trait
#[async_trait]
pub trait ServiceDetector: Send + Sync {
    async fn scan(
        &self,
        session: &Session,
        ports: &[PortDiscovery],
    ) -> Result<Vec<ServiceDiscovery>>;
}

关键代码解读

异步 Trait 抽象

  • 定义 HostAlive/PortScan/ServiceDetector 异步 Trait,解耦扫描器框架与具体实现;

  • 基于 async_trait 实现异步 Trait,符合 Rust 异步编程规范;

  • Send + Sync 约束确保 Trait 对象可跨线程安全传递,适配并发场景。

Application 核心架构

  • 封装各阶段扫描器实例,通过工厂函数 create_xxx 创建具体实现,支持灵活扩展;

  • runner 方法作为扫描入口,串联三阶段扫描流程,统一处理并发配置与结果收集;

  • 配置化并发度:通过 ConcurrencyConfig 独立配置各阶段并发数,适配不同场景需求。

流程与性能管控

  • 分阶段耗时统计:记录主机探测、端口扫描、服务识别各阶段耗时,便于性能调优;

  • 空结果快速终止:存活主机 / 开放端口为空时及时终止扫描,避免无效资源消耗;

  • 日志可视化:通过 tracing 输出各阶段进度与统计信息,提升可观测性。

并发适配设计

  • 预留并发配置参数 ConcurrencyConfig,可直接对接 Worker Pool/Stream 模式的并发度控制;

  • 异步扫描方法返回 Result<Vec<T>>,便于上层统一处理错误与结果聚合;

  • Session 传递上下文:通过 Session 传递 CLI 参数与业务规则,确保各阶段行为一致。

四、核心特点与知识点总结

1. 核心特点

特点 说明
模式适配性 按扫描阶段特性匹配 Worker Pool/Stream 模式,最大化各阶段性能
架构扩展性 基于异步 Trait 抽象扫描行为,支持替换不同扫描实现(如 ping/icmp 存活探测)
资源可控性 配置化并发度,超大规模端口扫描用 Worker Pool 稳定内存,中等任务用 Stream 提升灵活度
工程化落地 完整的异步扫描框架,包含参数解析、流程串联、结果统计、错误处理
可观测性 分阶段性能统计 + 日志输出,便于问题定位与性能调优

2. 核心知识点

  • 并发模式选型:任务量超 1 万 + 选 Worker Pool,千级任务选 Stream 并发;

  • 异步 Trait 设计:基于 async_trait 实现跨阶段异步行为抽象,解耦框架与实现;

  • 并发度管控:参考 Nmap/Masscan/RustScan 最佳实践,端口扫描严格控并发,主机探测可激进;

  • 工程化流程:三阶段流水线设计,空结果快速终止,分阶段性能统计;

  • 架构设计原则:混合模式兼顾性能与资源,异步运行时统一调度降低整合成本。

五、下篇预告

本篇我们完成了并发框架升级,深度解析了 Worker Pool 与 Stream 两种核心并发模式,结合扫描器三阶段流水线特性设计了混合并发架构,并落地了工程级的异步扫描框架代码。下一篇【Rust 工程级扫描器实战】12 | 工程骨架期:主机存活探测并发实现,将聚焦主机存活探测阶段的并发落地,带你实现基于 Stream 模式的高性能存活探测,继续推进扫描器工程化落地!

📌 温馨提示:如需要本次升级后的完整源码,可关注微信公众号【砺石安全】回复:【20260105】 进行领取

📌 互动交流
停更这段时间里,一直在纠结一个问题:这套架构是不是有点 “重” 了?
最初的想法其实很纯粹 —— 做一个轻量化的扫描器,对标 fscan 的简洁实用。结果写着写着,不知不觉就借鉴了不少 Nmap 的设计思路,架构也越做越厚重。
不过转念一想,既然已经走到这一步,不如先把骨架打磨完整,先跑通最小可运行闭环。等核心流程稳定后,再针对性地做减法、轻量化优化。
如果本文对你理解 Rust 红队工具的工程化落地有帮助,欢迎点赞、收藏、关注!有任何架构优化建议或技术疑问,也欢迎在评论区一起交流~

📚 专栏导航:

上一篇:【Rust 工程级扫描器实战】10|cli+session 模块全解析(从设计到落地)

下一篇:【Rust 工程级扫描器实战】12 | 工程骨架期:主机存活探测并发实现

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐