010-网络命令与工具

难度:🟢 | 预计时间:90分钟 | 前置:009-网络设备与拓扑

学习目标

  • 掌握常用网络诊断命令的使用方法和参数
  • 理解各种网络工具的工作原理和适用场景
  • 能够使用网络命令进行故障诊断和性能分析
  • 学会编写网络监控和管理脚本
  • 了解图形化网络管理工具的使用

网络诊断命令概述

网络诊断命令是网络管理员和工程师必备的工具,用于测试网络连通性、分析网络性能、诊断网络故障。这些命令通常内置在操作系统中,提供了强大的网络分析能力。

命令分类

类别 命令 主要功能 适用场景
连通性测试 ping, traceroute 测试网络可达性 基础故障诊断
网络状态 netstat, ss 查看网络连接状态 连接分析
域名解析 nslookup, dig DNS查询和诊断 DNS问题排查
网络配置 ifconfig, ip 查看和配置网络接口 接口管理
流量分析 tcpdump, wireshark 数据包捕获分析 深度故障分析
路由表 route, ip route 查看和配置路由 路由问题诊断

ping命令详解

ping是最常用的网络诊断工具,基于ICMP协议测试网络连通性。

基本语法

# 基本用法
ping [选项] 目标主机

# 常用选项
-c count    # 发送指定数量的数据包
-i interval # 设置发送间隔(秒)
-s size     # 设置数据包大小
-t ttl      # 设置TTL值
-W timeout  # 设置超时时间

实用示例

# 文件路径: /scripts/ping_examples.sh
#!/bin/bash

# 基本连通性测试
ping -c 4 www.baidu.com

# 大数据包测试(测试MTU)
ping -c 3 -s 1472 www.baidu.com

# 快速ping测试
ping -c 1 -W 1 192.168.1.1

# 连续监控网络状态
ping -i 0.5 8.8.8.8

ping结果分析

# 文件路径: /tools/ping_analyzer.py
import subprocess
import re
import statistics
from typing import Dict, List, Optional

class PingAnalyzer:
    """Ping结果分析器"""
    
    def __init__(self):
        self.results = []
    
    def ping_host(self, host: str, count: int = 4) -> Dict:
        """执行ping命令并分析结果"""
        try:
            # 执行ping命令
            result = subprocess.run(
                ['ping', '-c', str(count), host],
                capture_output=True,
                text=True,
                timeout=30
            )
            
            if result.returncode == 0:
                return self._parse_ping_output(result.stdout, host)
            else:
                return {
                    'host': host,
                    'status': 'failed',
                    'error': result.stderr
                }
                
        except subprocess.TimeoutExpired:
            return {
                'host': host,
                'status': 'timeout',
                'error': 'Ping timeout'
            }
    
    def _parse_ping_output(self, output: str, host: str) -> Dict:
        """解析ping输出结果"""
        lines = output.strip().split('\n')
        
        # 提取RTT时间
        rtt_times = []
        for line in lines:
            if 'time=' in line:
                match = re.search(r'time=([\d.]+)', line)
                if match:
                    rtt_times.append(float(match.group(1)))
        
        # 提取统计信息
        stats_line = [line for line in lines if 'packet loss' in line]
        packet_loss = 0
        if stats_line:
            match = re.search(r'([\d.]+)% packet loss', stats_line[0])
            if match:
                packet_loss = float(match.group(1))
        
        # 计算统计值
        if rtt_times:
            return {
                'host': host,
                'status': 'success',
                'packet_loss': packet_loss,
                'rtt_min': min(rtt_times),
                'rtt_max': max(rtt_times),
                'rtt_avg': statistics.mean(rtt_times),
                'rtt_std': statistics.stdev(rtt_times) if len(rtt_times) > 1 else 0,
                'packets_sent': len(rtt_times),
                'quality': self._assess_quality(packet_loss, statistics.mean(rtt_times))
            }
        else:
            return {
                'host': host,
                'status': 'no_response',
                'packet_loss': 100
            }
    
    def _assess_quality(self, packet_loss: float, avg_rtt: float) -> str:
        """评估网络质量"""
        if packet_loss > 5:
            return 'poor'
        elif packet_loss > 1 or avg_rtt > 100:
            return 'fair'
        elif avg_rtt > 50:
            return 'good'
        else:
            return 'excellent'
    
    def batch_ping(self, hosts: List[str]) -> List[Dict]:
        """批量ping测试"""
        results = []
        for host in hosts:
            result = self.ping_host(host)
            results.append(result)
            print(f"Ping {host}: {result['status']}")
        return results

# 使用示例
if __name__ == "__main__":
    analyzer = PingAnalyzer()
    
    # 单个主机测试
    result = analyzer.ping_host('www.baidu.com')
    print(f"Ping结果: {result}")
    
    # 批量测试
    hosts = ['8.8.8.8', '114.114.114.114', 'www.google.com']
    results = analyzer.batch_ping(hosts)
    
    # 生成报告
    for result in results:
        if result['status'] == 'success':
            print(f"{result['host']}: 平均RTT {result['rtt_avg']:.2f}ms, "
                  f"丢包率 {result['packet_loss']}%, 质量 {result['quality']}")

traceroute命令详解

traceroute用于追踪数据包从源到目标的路径,显示经过的每个路由器。

工作原理

客户端 路由器1 路由器2 路由器3 目标服务器 TTL=1 ICMP包(TTL=1) TTL超时响应 TTL=2 ICMP包(TTL=2) ICMP包(TTL=1) TTL超时响应 TTL=3 ICMP包(TTL=3) ICMP包(TTL=2) ICMP包(TTL=1) TTL超时响应 TTL=4 ICMP包(TTL=4) ICMP包(TTL=3) ICMP包(TTL=2) ICMP包(TTL=1) 目标不可达或回应 客户端 路由器1 路由器2 路由器3 目标服务器

使用示例

# 基本traceroute
traceroute www.baidu.com

# 使用TCP而非ICMP
traceroute -T -p 80 www.baidu.com

# 设置最大跳数
traceroute -m 15 www.google.com

# 不解析主机名(加快速度)
traceroute -n 8.8.8.8

Python实现traceroute

# 文件路径: /tools/traceroute_tool.py
import socket
import struct
import time
import sys
from typing import List, Tuple, Optional

class Traceroute:
    """Traceroute工具实现"""
    
    def __init__(self, timeout: float = 3.0, max_hops: int = 30):
        self.timeout = timeout
        self.max_hops = max_hops
    
    def trace(self, destination: str, port: int = 33434) -> List[Tuple[int, str, float]]:
        """执行traceroute"""
        try:
            dest_addr = socket.gethostbyname(destination)
        except socket.gaierror:
            raise ValueError(f"无法解析主机名: {destination}")
        
        results = []
        
        for ttl in range(1, self.max_hops + 1):
            # 创建发送socket
            send_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            send_socket.setsockopt(socket.SOL_IP, socket.IP_TTL, ttl)
            
            # 创建接收socket
            recv_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
            recv_socket.settimeout(self.timeout)
            
            try:
                # 发送数据包
                start_time = time.time()
                send_socket.sendto(b'', (dest_addr, port))
                
                # 接收响应
                try:
                    data, addr = recv_socket.recvfrom(1024)
                    end_time = time.time()
                    rtt = (end_time - start_time) * 1000  # 转换为毫秒
                    
                    # 尝试解析主机名
                    try:
                        hostname = socket.gethostbyaddr(addr[0])[0]
                    except socket.herror:
                        hostname = addr[0]
                    
                    results.append((ttl, hostname, rtt))
                    
                    # 检查是否到达目标
                    if addr[0] == dest_addr:
                        break
                        
                except socket.timeout:
                    results.append((ttl, '*', 0))
                    
            finally:
                send_socket.close()
                recv_socket.close()
        
        return results
    
    def print_results(self, results: List[Tuple[int, str, float]], destination: str):
        """打印traceroute结果"""
        print(f"traceroute to {destination}")
        print("hop\thost\t\t\t\ttime")
        print("-" * 50)
        
        for hop, host, rtt in results:
            if host == '*':
                print(f"{hop}\t*\t\t\t\t*")
            else:
                print(f"{hop}\t{host:<25}\t{rtt:.2f} ms")

# 使用示例
if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("用法: python traceroute_tool.py <目标主机>")
        sys.exit(1)
    
    target = sys.argv[1]
    tracer = Traceroute()
    
    try:
        results = tracer.trace(target)
        tracer.print_results(results, target)
    except Exception as e:
        print(f"错误: {e}")

netstat命令详解

netstat用于显示网络连接、路由表、接口统计等网络状态信息。

常用选项

# 显示所有连接
netstat -a

# 显示TCP连接
netstat -t

# 显示UDP连接
netstat -u

# 显示监听端口
netstat -l

# 显示数字地址(不解析主机名)
netstat -n

# 显示进程信息
netstat -p

# 组合使用:显示所有TCP监听端口及进程
netstat -tlnp

# 显示网络接口统计
netstat -i

# 显示路由表
netstat -r

现代替代工具:ss命令

# ss命令是netstat的现代替代品,性能更好

# 显示所有socket
ss -a

# 显示TCP连接
ss -t

# 显示监听socket
ss -l

# 显示进程信息
ss -p

# 组合使用
ss -tlnp

# 显示特定端口
ss -tlnp | grep :80

# 显示连接统计
ss -s

网络连接分析工具

# 文件路径: /tools/network_analyzer.py
import subprocess
import re
import json
from collections import defaultdict
from typing import Dict, List, Tuple

class NetworkAnalyzer:
    """网络连接分析工具"""
    
    def get_connections(self) -> List[Dict]:
        """获取网络连接信息"""
        try:
            # 使用ss命令获取连接信息
            result = subprocess.run(
                ['ss', '-tlnp'],
                capture_output=True,
                text=True
            )
            
            if result.returncode != 0:
                # 如果ss不可用,尝试netstat
                result = subprocess.run(
                    ['netstat', '-tlnp'],
                    capture_output=True,
                    text=True
                )
            
            return self._parse_connections(result.stdout)
            
        except FileNotFoundError:
            print("错误: ss和netstat命令都不可用")
            return []
    
    def _parse_connections(self, output: str) -> List[Dict]:
        """解析连接信息"""
        connections = []
        lines = output.strip().split('\n')[1:]  # 跳过标题行
        
        for line in lines:
            if not line.strip():
                continue
                
            parts = line.split()
            if len(parts) >= 4:
                connection = {
                    'protocol': parts[0],
                    'state': parts[1] if len(parts) > 4 else 'LISTEN',
                    'local_address': parts[-3] if len(parts) > 4 else parts[3],
                    'foreign_address': parts[-2] if len(parts) > 4 else '0.0.0.0:*',
                    'process': parts[-1] if len(parts) > 4 and '/' in parts[-1] else 'unknown'
                }
                connections.append(connection)
        
        return connections
    
    def analyze_ports(self, connections: List[Dict]) -> Dict:
        """分析端口使用情况"""
        port_stats = defaultdict(list)
        
        for conn in connections:
            if conn['state'] == 'LISTEN' or 'LISTEN' in conn['state']:
                # 提取端口号
                local_addr = conn['local_address']
                if ':' in local_addr:
                    port = local_addr.split(':')[-1]
                    if port != '*':
                        port_stats[port].append({
                            'protocol': conn['protocol'],
                            'process': conn['process']
                        })
        
        return dict(port_stats)
    
    def find_suspicious_connections(self, connections: List[Dict]) -> List[Dict]:
        """查找可疑连接"""
        suspicious = []
        
        for conn in connections:
            # 检查异常端口
            if conn['state'] == 'ESTABLISHED':
                foreign_addr = conn['foreign_address']
                if ':' in foreign_addr:
                    ip, port = foreign_addr.rsplit(':', 1)
                    
                    # 检查常见恶意端口
                    suspicious_ports = ['1337', '31337', '12345', '54321']
                    if port in suspicious_ports:
                        suspicious.append({
                            **conn,
                            'reason': f'可疑端口: {port}'
                        })
                    
                    # 检查非标准SSH端口的连接
                    if port not in ['22', '80', '443', '53', '25', '110', '143', '993', '995']:
                        if conn['process'] == 'unknown':
                            suspicious.append({
                                **conn,
                                'reason': '未知进程的非标准端口连接'
                            })
        
        return suspicious
    
    def generate_report(self) -> Dict:
        """生成网络连接报告"""
        connections = self.get_connections()
        
        report = {
            'total_connections': len(connections),
            'listening_ports': self.analyze_ports(connections),
            'suspicious_connections': self.find_suspicious_connections(connections),
            'connection_states': defaultdict(int)
        }
        
        # 统计连接状态
        for conn in connections:
            report['connection_states'][conn['state']] += 1
        
        return report
    
    def print_report(self, report: Dict):
        """打印报告"""
        print("=== 网络连接分析报告 ===")
        print(f"总连接数: {report['total_connections']}")
        
        print("\n=== 监听端口 ===")
        for port, processes in report['listening_ports'].items():
            print(f"端口 {port}:")
            for proc in processes:
                print(f"  - {proc['protocol']} {proc['process']}")
        
        print("\n=== 连接状态统计 ===")
        for state, count in report['connection_states'].items():
            print(f"{state}: {count}")
        
        if report['suspicious_connections']:
            print("\n=== 可疑连接 ===")
            for conn in report['suspicious_connections']:
                print(f"⚠️  {conn['local_address']} -> {conn['foreign_address']}")
                print(f"   原因: {conn['reason']}")
                print(f"   进程: {conn['process']}")

# 使用示例
if __name__ == "__main__":
    analyzer = NetworkAnalyzer()
    report = analyzer.generate_report()
    analyzer.print_report(report)

DNS诊断工具

DNS问题是网络故障的常见原因,需要专门的工具进行诊断。

nslookup命令

# 基本DNS查询
nslookup www.baidu.com

# 指定DNS服务器
nslookup www.baidu.com 8.8.8.8

# 查询特定记录类型
nslookup -type=MX baidu.com
nslookup -type=NS baidu.com
nslookup -type=TXT baidu.com

# 反向DNS查询
nslookup 8.8.8.8

dig命令(更强大的DNS工具)

# 基本查询
dig www.baidu.com

# 简洁输出
dig +short www.baidu.com

# 查询特定记录类型
dig MX baidu.com
dig NS baidu.com
dig TXT baidu.com

# 追踪DNS查询路径
dig +trace www.baidu.com

# 反向查询
dig -x 8.8.8.8

DNS诊断工具

# 文件路径: /tools/dns_diagnostic.py
import socket
import dns.resolver
import dns.reversename
import time
from typing import Dict, List, Optional

class DNSDiagnostic:
    """DNS诊断工具"""
    
    def __init__(self):
        self.resolver = dns.resolver.Resolver()
    
    def query_record(self, domain: str, record_type: str = 'A') -> Dict:
        """查询DNS记录"""
        try:
            start_time = time.time()
            answers = self.resolver.resolve(domain, record_type)
            end_time = time.time()
            
            records = [str(answer) for answer in answers]
            
            return {
                'domain': domain,
                'type': record_type,
                'status': 'success',
                'records': records,
                'query_time': (end_time - start_time) * 1000,
                'nameserver': str(answers.nameserver)
            }
            
        except dns.resolver.NXDOMAIN:
            return {
                'domain': domain,
                'type': record_type,
                'status': 'nxdomain',
                'error': '域名不存在'
            }
        except dns.resolver.Timeout:
            return {
                'domain': domain,
                'type': record_type,
                'status': 'timeout',
                'error': 'DNS查询超时'
            }
        except Exception as e:
            return {
                'domain': domain,
                'type': record_type,
                'status': 'error',
                'error': str(e)
            }
    
    def reverse_lookup(self, ip: str) -> Dict:
        """反向DNS查询"""
        try:
            start_time = time.time()
            reverse_name = dns.reversename.from_address(ip)
            answers = self.resolver.resolve(reverse_name, 'PTR')
            end_time = time.time()
            
            return {
                'ip': ip,
                'status': 'success',
                'hostname': str(answers[0]),
                'query_time': (end_time - start_time) * 1000
            }
            
        except Exception as e:
            return {
                'ip': ip,
                'status': 'error',
                'error': str(e)
            }
    
    def test_dns_servers(self, domain: str, dns_servers: List[str]) -> List[Dict]:
        """测试多个DNS服务器"""
        results = []
        
        for dns_server in dns_servers:
            # 创建新的resolver实例
            resolver = dns.resolver.Resolver()
            resolver.nameservers = [dns_server]
            
            try:
                start_time = time.time()
                answers = resolver.resolve(domain, 'A')
                end_time = time.time()
                
                results.append({
                    'dns_server': dns_server,
                    'status': 'success',
                    'query_time': (end_time - start_time) * 1000,
                    'records': [str(answer) for answer in answers]
                })
                
            except Exception as e:
                results.append({
                    'dns_server': dns_server,
                    'status': 'error',
                    'error': str(e)
                })
        
        return results
    
    def comprehensive_check(self, domain: str) -> Dict:
        """综合DNS检查"""
        record_types = ['A', 'AAAA', 'MX', 'NS', 'TXT', 'CNAME']
        dns_servers = ['8.8.8.8', '114.114.114.114', '1.1.1.1']
        
        results = {
            'domain': domain,
            'records': {},
            'dns_servers': self.test_dns_servers(domain, dns_servers),
            'issues': []
        }
        
        # 查询各种记录类型
        for record_type in record_types:
            result = self.query_record(domain, record_type)
            results['records'][record_type] = result
            
            # 检查潜在问题
            if result['status'] == 'timeout':
                results['issues'].append(f"{record_type}记录查询超时")
            elif result['status'] == 'error':
                results['issues'].append(f"{record_type}记录查询失败: {result['error']}")
        
        # 检查DNS服务器一致性
        a_records = []
        for server_result in results['dns_servers']:
            if server_result['status'] == 'success':
                a_records.extend(server_result['records'])
        
        if len(set(a_records)) > 1:
            results['issues'].append('不同DNS服务器返回不一致的A记录')
        
        return results
    
    def print_comprehensive_report(self, results: Dict):
        """打印综合报告"""
        print(f"=== DNS综合检查报告: {results['domain']} ===")
        
        print("\n=== DNS记录 ===")
        for record_type, result in results['records'].items():
            if result['status'] == 'success':
                print(f"{record_type}: {', '.join(result['records'])} ({result['query_time']:.2f}ms)")
            else:
                print(f"{record_type}: {result['status']} - {result.get('error', '')}")
        
        print("\n=== DNS服务器测试 ===")
        for server_result in results['dns_servers']:
            if server_result['status'] == 'success':
                print(f"{server_result['dns_server']}: {server_result['query_time']:.2f}ms")
            else:
                print(f"{server_result['dns_server']}: {server_result['status']} - {server_result.get('error', '')}")
        
        if results['issues']:
            print("\n=== 发现的问题 ===")
            for issue in results['issues']:
                print(f"⚠️  {issue}")
        else:
            print("\n✅ 未发现DNS问题")

# 使用示例
if __name__ == "__main__":
    diagnostic = DNSDiagnostic()
    
    # 单个查询
    result = diagnostic.query_record('www.baidu.com', 'A')
    print(f"A记录查询: {result}")
    
    # 综合检查
    comprehensive_result = diagnostic.comprehensive_check('baidu.com')
    diagnostic.print_comprehensive_report(comprehensive_result)

网络接口配置工具

ifconfig命令(传统)

# 显示所有网络接口
ifconfig

# 显示特定接口
ifconfig eth0

# 配置IP地址
sudo ifconfig eth0 192.168.1.100 netmask 255.255.255.0

# 启用/禁用接口
sudo ifconfig eth0 up
sudo ifconfig eth0 down

# 配置别名接口
sudo ifconfig eth0:1 192.168.1.101 netmask 255.255.255.0

ip命令(现代)

# 显示所有接口
ip addr show
# 或简写
ip a

# 显示特定接口
ip addr show eth0

# 添加IP地址
sudo ip addr add 192.168.1.100/24 dev eth0

# 删除IP地址
sudo ip addr del 192.168.1.100/24 dev eth0

# 启用/禁用接口
sudo ip link set eth0 up
sudo ip link set eth0 down

# 显示路由表
ip route show

# 添加路由
sudo ip route add 192.168.2.0/24 via 192.168.1.1

# 删除路由
sudo ip route del 192.168.2.0/24

数据包捕获与分析

tcpdump命令

# 捕获所有数据包
sudo tcpdump

# 捕获特定接口
sudo tcpdump -i eth0

# 捕获特定主机
sudo tcpdump host 192.168.1.1

# 捕获特定端口
sudo tcpdump port 80

# 捕获HTTP流量
sudo tcpdump -i eth0 'port 80 or port 443'

# 保存到文件
sudo tcpdump -i eth0 -w capture.pcap

# 读取文件
tcpdump -r capture.pcap

# 详细输出
sudo tcpdump -i eth0 -v -n

简单的数据包分析工具

# 文件路径: /tools/packet_analyzer.py
import socket
import struct
import textwrap
from typing import Dict, Tuple

class PacketAnalyzer:
    """简单的数据包分析工具"""
    
    def __init__(self):
        # 协议号映射
        self.protocols = {
            1: 'ICMP',
            6: 'TCP',
            17: 'UDP'
        }
    
    def create_raw_socket(self) -> socket.socket:
        """创建原始套接字"""
        try:
            # 创建原始套接字(需要root权限)
            sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_IP)
            sock.bind((socket.gethostbyname(socket.gethostname()), 0))
            sock.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
            return sock
        except PermissionError:
            print("错误: 需要root权限来创建原始套接字")
            return None
    
    def parse_ip_header(self, packet: bytes) -> Dict:
        """解析IP头部"""
        # IP头部格式: 版本(4) + 头长度(4) + 服务类型(8) + 总长度(16) + ...
        ip_header = packet[0:20]
        iph = struct.unpack('!BBHHHBBH4s4s', ip_header)
        
        version_ihl = iph[0]
        version = version_ihl >> 4
        ihl = version_ihl & 0xF
        iph_length = ihl * 4
        
        return {
            'version': version,
            'header_length': iph_length,
            'type_of_service': iph[1],
            'total_length': iph[2],
            'identification': iph[3],
            'flags': iph[4] >> 13,
            'fragment_offset': iph[4] & 0x1FFF,
            'ttl': iph[5],
            'protocol': iph[6],
            'checksum': iph[7],
            'source_ip': socket.inet_ntoa(iph[8]),
            'destination_ip': socket.inet_ntoa(iph[9]),
            'protocol_name': self.protocols.get(iph[6], f'Unknown({iph[6]})')
        }
    
    def parse_tcp_header(self, packet: bytes, ip_header_length: int) -> Dict:
        """解析TCP头部"""
        tcp_header = packet[ip_header_length:ip_header_length+20]
        tcph = struct.unpack('!HHLLBBHHH', tcp_header)
        
        return {
            'source_port': tcph[0],
            'destination_port': tcph[1],
            'sequence': tcph[2],
            'acknowledgment': tcph[3],
            'data_offset': tcph[4] >> 4,
            'flags': tcph[5],
            'window': tcph[6],
            'checksum': tcph[7],
            'urgent_pointer': tcph[8]
        }
    
    def parse_udp_header(self, packet: bytes, ip_header_length: int) -> Dict:
        """解析UDP头部"""
        udp_header = packet[ip_header_length:ip_header_length+8]
        udph = struct.unpack('!HHHH', udp_header)
        
        return {
            'source_port': udph[0],
            'destination_port': udph[1],
            'length': udph[2],
            'checksum': udph[3]
        }
    
    def analyze_packet(self, packet: bytes) -> Dict:
        """分析数据包"""
        # 解析IP头部
        ip_info = self.parse_ip_header(packet)
        
        analysis = {
            'ip': ip_info,
            'transport': None,
            'payload_length': 0
        }
        
        # 根据协议解析传输层头部
        if ip_info['protocol'] == 6:  # TCP
            tcp_info = self.parse_tcp_header(packet, ip_info['header_length'])
            analysis['transport'] = tcp_info
            analysis['payload_length'] = ip_info['total_length'] - ip_info['header_length'] - (tcp_info['data_offset'] * 4)
            
        elif ip_info['protocol'] == 17:  # UDP
            udp_info = self.parse_udp_header(packet, ip_info['header_length'])
            analysis['transport'] = udp_info
            analysis['payload_length'] = udp_info['length'] - 8
        
        return analysis
    
    def print_packet_info(self, analysis: Dict):
        """打印数据包信息"""
        ip = analysis['ip']
        print(f"\n=== IP包分析 ===")
        print(f"版本: {ip['version']}")
        print(f"协议: {ip['protocol_name']}")
        print(f"源IP: {ip['source_ip']}")
        print(f"目标IP: {ip['destination_ip']}")
        print(f"TTL: {ip['ttl']}")
        print(f"总长度: {ip['total_length']}字节")
        
        if analysis['transport']:
            transport = analysis['transport']
            print(f"\n=== {ip['protocol_name']}头部 ===")
            print(f"源端口: {transport['source_port']}")
            print(f"目标端口: {transport['destination_port']}")
            
            if ip['protocol'] == 6:  # TCP
                flags = []
                if transport['flags'] & 0x01: flags.append('FIN')
                if transport['flags'] & 0x02: flags.append('SYN')
                if transport['flags'] & 0x04: flags.append('RST')
                if transport['flags'] & 0x08: flags.append('PSH')
                if transport['flags'] & 0x10: flags.append('ACK')
                if transport['flags'] & 0x20: flags.append('URG')
                
                print(f"序列号: {transport['sequence']}")
                print(f"确认号: {transport['acknowledgment']}")
                print(f"标志: {', '.join(flags) if flags else 'None'}")
                print(f"窗口大小: {transport['window']}")
            
            elif ip['protocol'] == 17:  # UDP
                print(f"长度: {transport['length']}字节")
        
        print(f"\n载荷长度: {analysis['payload_length']}字节")
        print("-" * 50)

# 使用示例(需要root权限)
if __name__ == "__main__":
    analyzer = PacketAnalyzer()
    sock = analyzer.create_raw_socket()
    
    if sock:
        print("开始捕获数据包... (按Ctrl+C停止)")
        try:
            while True:
                packet, addr = sock.recvfrom(65565)
                analysis = analyzer.analyze_packet(packet)
                analyzer.print_packet_info(analysis)
        except KeyboardInterrupt:
            print("\n停止捕获")
        finally:
            sock.close()

网络性能测试工具

iperf3工具

# 服务器端
iperf3 -s

# 客户端测试
iperf3 -c server_ip

# 指定测试时间
iperf3 -c server_ip -t 30

# 指定带宽
iperf3 -c server_ip -b 100M

# UDP测试
iperf3 -c server_ip -u

# 双向测试
iperf3 -c server_ip --bidir

# 并行连接测试
iperf3 -c server_ip -P 4

网络延迟测试工具

# 文件路径: /tools/latency_tester.py
import time
import socket
import statistics
from typing import List, Dict
import threading

class LatencyTester:
    """网络延迟测试工具"""
    
    def __init__(self):
        self.results = []
    
    def tcp_connect_test(self, host: str, port: int, timeout: float = 5.0) -> float:
        """TCP连接延迟测试"""
        try:
            start_time = time.time()
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(timeout)
            result = sock.connect_ex((host, port))
            end_time = time.time()
            sock.close()
            
            if result == 0:
                return (end_time - start_time) * 1000  # 转换为毫秒
            else:
                return -1  # 连接失败
                
        except Exception:
            return -1
    
    def http_response_test(self, url: str, timeout: float = 10.0) -> Dict:
        """HTTP响应时间测试"""
        import urllib.request
        import urllib.error
        
        try:
            start_time = time.time()
            
            # DNS解析时间
            dns_start = time.time()
            host = url.split('/')[2] if '://' in url else url.split('/')[0]
            socket.gethostbyname(host)
            dns_time = (time.time() - dns_start) * 1000
            
            # HTTP请求
            request = urllib.request.Request(url)
            request.add_header('User-Agent', 'LatencyTester/1.0')
            
            with urllib.request.urlopen(request, timeout=timeout) as response:
                # 读取响应头
                headers_time = (time.time() - start_time) * 1000
                
                # 读取内容(前1KB)
                content = response.read(1024)
                total_time = (time.time() - start_time) * 1000
                
                return {
                    'status': 'success',
                    'dns_time': dns_time,
                    'headers_time': headers_time,
                    'total_time': total_time,
                    'status_code': response.getcode(),
                    'content_length': len(content)
                }
                
        except urllib.error.URLError as e:
            return {
                'status': 'error',
                'error': str(e)
            }
        except Exception as e:
            return {
                'status': 'error',
                'error': str(e)
            }
    
    def batch_latency_test(self, targets: List[Dict], iterations: int = 10) -> Dict:
        """批量延迟测试"""
        results = {}
        
        for target in targets:
            target_name = target['name']
            target_type = target['type']
            
            print(f"测试 {target_name}...")
            
            if target_type == 'tcp':
                latencies = []
                for i in range(iterations):
                    latency = self.tcp_connect_test(target['host'], target['port'])
                    if latency > 0:
                        latencies.append(latency)
                    time.sleep(0.1)  # 避免过于频繁的连接
                
                if latencies:
                    results[target_name] = {
                        'type': 'tcp',
                        'successful_tests': len(latencies),
                        'failed_tests': iterations - len(latencies),
                        'min_latency': min(latencies),
                        'max_latency': max(latencies),
                        'avg_latency': statistics.mean(latencies),
                        'std_latency': statistics.stdev(latencies) if len(latencies) > 1 else 0
                    }
                else:
                    results[target_name] = {
                        'type': 'tcp',
                        'status': 'all_failed'
                    }
            
            elif target_type == 'http':
                http_results = []
                for i in range(iterations):
                    result = self.http_response_test(target['url'])
                    if result['status'] == 'success':
                        http_results.append(result)
                    time.sleep(0.5)  # HTTP请求间隔稍长
                
                if http_results:
                    dns_times = [r['dns_time'] for r in http_results]
                    total_times = [r['total_time'] for r in http_results]
                    
                    results[target_name] = {
                        'type': 'http',
                        'successful_tests': len(http_results),
                        'failed_tests': iterations - len(http_results),
                        'avg_dns_time': statistics.mean(dns_times),
                        'avg_total_time': statistics.mean(total_times),
                        'min_total_time': min(total_times),
                        'max_total_time': max(total_times)
                    }
                else:
                    results[target_name] = {
                        'type': 'http',
                        'status': 'all_failed'
                    }
        
        return results
    
    def print_results(self, results: Dict):
        """打印测试结果"""
        print("\n=== 延迟测试结果 ===")
        
        for target_name, result in results.items():
            print(f"\n{target_name}:")
            
            if result.get('status') == 'all_failed':
                print("  ❌ 所有测试都失败了")
                continue
            
            if result['type'] == 'tcp':
                print(f"  成功/总计: {result['successful_tests']}/{result['successful_tests'] + result['failed_tests']}")
                print(f"  平均延迟: {result['avg_latency']:.2f}ms")
                print(f"  最小/最大: {result['min_latency']:.2f}ms / {result['max_latency']:.2f}ms")
                print(f"  标准差: {result['std_latency']:.2f}ms")
                
                # 评估网络质量
                avg_latency = result['avg_latency']
                if avg_latency < 20:
                    quality = "优秀"
                elif avg_latency < 50:
                    quality = "良好"
                elif avg_latency < 100:
                    quality = "一般"
                else:
                    quality = "较差"
                print(f"  网络质量: {quality}")
            
            elif result['type'] == 'http':
                print(f"  成功/总计: {result['successful_tests']}/{result['successful_tests'] + result['failed_tests']}")
                print(f"  平均DNS时间: {result['avg_dns_time']:.2f}ms")
                print(f"  平均总时间: {result['avg_total_time']:.2f}ms")
                print(f"  最小/最大总时间: {result['min_total_time']:.2f}ms / {result['max_total_time']:.2f}ms")

# 使用示例
if __name__ == "__main__":
    tester = LatencyTester()
    
    # 定义测试目标
    targets = [
        {
            'name': 'Google DNS',
            'type': 'tcp',
            'host': '8.8.8.8',
            'port': 53
        },
        {
            'name': '百度',
            'type': 'http',
            'url': 'https://www.baidu.com'
        },
        {
            'name': 'GitHub',
            'type': 'http',
            'url': 'https://github.com'
        }
    ]
    
    # 执行批量测试
    results = tester.batch_latency_test(targets, iterations=5)
    tester.print_results(results)

综合网络诊断脚本

# 文件路径: /tools/network_diagnostic_suite.py
import subprocess
import socket
import time
import json
from typing import Dict, List
from datetime import datetime

class NetworkDiagnosticSuite:
    """综合网络诊断工具套件"""
    
    def __init__(self):
        self.report = {
            'timestamp': datetime.now().isoformat(),
            'hostname': socket.gethostname(),
            'tests': {}
        }
    
    def test_basic_connectivity(self, targets: List[str]) -> Dict:
        """基础连通性测试"""
        results = {}
        
        for target in targets:
            try:
                result = subprocess.run(
                    ['ping', '-c', '4', target],
                    capture_output=True,
                    text=True,
                    timeout=30
                )
                
                if result.returncode == 0:
                    # 解析ping结果
                    output = result.stdout
                    if 'packet loss' in output:
                        loss_line = [line for line in output.split('\n') if 'packet loss' in line][0]
                        loss_match = __import__('re').search(r'([\d.]+)% packet loss', loss_line)
                        packet_loss = float(loss_match.group(1)) if loss_match else 100
                    else:
                        packet_loss = 100
                    
                    results[target] = {
                        'status': 'success',
                        'packet_loss': packet_loss
                    }
                else:
                    results[target] = {
                        'status': 'failed',
                        'error': result.stderr
                    }
                    
            except subprocess.TimeoutExpired:
                results[target] = {
                    'status': 'timeout'
                }
            except Exception as e:
                results[target] = {
                    'status': 'error',
                    'error': str(e)
                }
        
        return results
    
    def test_dns_resolution(self, domains: List[str]) -> Dict:
        """DNS解析测试"""
        results = {}
        
        for domain in domains:
            try:
                start_time = time.time()
                ip = socket.gethostbyname(domain)
                end_time = time.time()
                
                results[domain] = {
                    'status': 'success',
                    'ip': ip,
                    'resolution_time': (end_time - start_time) * 1000
                }
                
            except socket.gaierror as e:
                results[domain] = {
                    'status': 'failed',
                    'error': str(e)
                }
            except Exception as e:
                results[domain] = {
                    'status': 'error',
                    'error': str(e)
                }
        
        return results
    
    def test_port_connectivity(self, targets: List[Dict]) -> Dict:
        """端口连通性测试"""
        results = {}
        
        for target in targets:
            host = target['host']
            port = target['port']
            key = f"{host}:{port}"
            
            try:
                start_time = time.time()
                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                sock.settimeout(5)
                result = sock.connect_ex((host, port))
                end_time = time.time()
                sock.close()
                
                if result == 0:
                    results[key] = {
                        'status': 'open',
                        'response_time': (end_time - start_time) * 1000
                    }
                else:
                    results[key] = {
                        'status': 'closed'
                    }
                    
            except Exception as e:
                results[key] = {
                    'status': 'error',
                    'error': str(e)
                }
        
        return results
    
    def get_network_interfaces(self) -> Dict:
        """获取网络接口信息"""
        try:
            result = subprocess.run(
                ['ip', 'addr', 'show'],
                capture_output=True,
                text=True
            )
            
            if result.returncode == 0:
                return {
                    'status': 'success',
                    'output': result.stdout
                }
            else:
                # 尝试ifconfig
                result = subprocess.run(
                    ['ifconfig'],
                    capture_output=True,
                    text=True
                )
                return {
                    'status': 'success',
                    'output': result.stdout
                }
                
        except Exception as e:
            return {
                'status': 'error',
                'error': str(e)
            }
    
    def get_routing_table(self) -> Dict:
        """获取路由表"""
        try:
            result = subprocess.run(
                ['ip', 'route', 'show'],
                capture_output=True,
                text=True
            )
            
            if result.returncode == 0:
                return {
                    'status': 'success',
                    'routes': result.stdout.strip().split('\n')
                }
            else:
                # 尝试route命令
                result = subprocess.run(
                    ['route', '-n'],
                    capture_output=True,
                    text=True
                )
                return {
                    'status': 'success',
                    'routes': result.stdout.strip().split('\n')
                }
                
        except Exception as e:
            return {
                'status': 'error',
                'error': str(e)
            }
    
    def run_comprehensive_test(self) -> Dict:
        """运行综合网络诊断"""
        print("开始综合网络诊断...")
        
        # 基础连通性测试
        print("1. 测试基础连通性...")
        connectivity_targets = ['8.8.8.8', '114.114.114.114', 'www.baidu.com']
        self.report['tests']['connectivity'] = self.test_basic_connectivity(connectivity_targets)
        
        # DNS解析测试
        print("2. 测试DNS解析...")
        dns_targets = ['www.baidu.com', 'www.google.com', 'github.com']
        self.report['tests']['dns'] = self.test_dns_resolution(dns_targets)
        
        # 端口连通性测试
        print("3. 测试端口连通性...")
        port_targets = [
            {'host': '8.8.8.8', 'port': 53},
            {'host': 'www.baidu.com', 'port': 80},
            {'host': 'www.baidu.com', 'port': 443}
        ]
        self.report['tests']['ports'] = self.test_port_connectivity(port_targets)
        
        # 网络接口信息
        print("4. 获取网络接口信息...")
        self.report['tests']['interfaces'] = self.get_network_interfaces()
        
        # 路由表信息
        print("5. 获取路由表信息...")
        self.report['tests']['routing'] = self.get_routing_table()
        
        print("诊断完成!")
        return self.report
    
    def print_report(self):
        """打印诊断报告"""
        print(f"\n=== 网络诊断报告 ===")
        print(f"时间: {self.report['timestamp']}")
        print(f"主机: {self.report['hostname']}")
        
        # 连通性测试结果
        print("\n=== 连通性测试 ===")
        for target, result in self.report['tests']['connectivity'].items():
            if result['status'] == 'success':
                print(f"✅ {target}: 丢包率 {result['packet_loss']}%")
            else:
                print(f"❌ {target}: {result['status']}")
        
        # DNS测试结果
        print("\n=== DNS解析测试 ===")
        for domain, result in self.report['tests']['dns'].items():
            if result['status'] == 'success':
                print(f"✅ {domain}: {result['ip']} ({result['resolution_time']:.2f}ms)")
            else:
                print(f"❌ {domain}: {result['status']}")
        
        # 端口测试结果
        print("\n=== 端口连通性测试 ===")
        for target, result in self.report['tests']['ports'].items():
            if result['status'] == 'open':
                print(f"✅ {target}: 开放 ({result['response_time']:.2f}ms)")
            elif result['status'] == 'closed':
                print(f"❌ {target}: 关闭")
            else:
                print(f"⚠️  {target}: {result['status']}")
        
        # 问题总结
        issues = []
        
        # 检查连通性问题
        for target, result in self.report['tests']['connectivity'].items():
            if result['status'] != 'success' or result.get('packet_loss', 0) > 5:
                issues.append(f"连通性问题: {target}")
        
        # 检查DNS问题
        for domain, result in self.report['tests']['dns'].items():
            if result['status'] != 'success':
                issues.append(f"DNS解析问题: {domain}")
        
        # 检查端口问题
        for target, result in self.report['tests']['ports'].items():
            if result['status'] != 'open':
                issues.append(f"端口连接问题: {target}")
        
        if issues:
            print("\n=== 发现的问题 ===")
            for issue in issues:
                print(f"⚠️  {issue}")
        else:
            print("\n✅ 网络状态良好,未发现明显问题")
    
    def save_report(self, filename: str = None):
        """保存诊断报告到文件"""
        if not filename:
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            filename = f"network_diagnostic_{timestamp}.json"
        
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(self.report, f, indent=2, ensure_ascii=False)
        
        print(f"报告已保存到: {filename}")

# 使用示例
if __name__ == "__main__":
    diagnostic = NetworkDiagnosticSuite()
    
    # 运行综合诊断
    report = diagnostic.run_comprehensive_test()
    
    # 打印报告
    diagnostic.print_report()
    
    # 保存报告
    diagnostic.save_report()

图形化网络工具

Wireshark

Wireshark是最强大的图形化网络协议分析工具:

  • 安装: 从官网下载安装包
  • 基本使用: 选择网络接口开始捕获
  • 过滤器: 使用显示过滤器筛选数据包
  • 协议分析: 自动解析各层协议
  • 统计功能: 提供丰富的网络统计信息

常用过滤器语法

# 协议过滤
http
tcp
udp
icmp
dns

# IP地址过滤
ip.addr == 192.168.1.1
ip.src == 192.168.1.1
ip.dst == 192.168.1.1

# 端口过滤
tcp.port == 80
udp.port == 53

# 组合过滤
http and ip.addr == 192.168.1.1
tcp.port == 80 or tcp.port == 443

# 内容过滤
http contains "login"
tcp contains "password"

实践练习

练习1:基础网络诊断

目标:使用基础命令诊断网络连通性问题
步骤

  1. 使用ping测试到网关的连通性
  2. 使用traceroute追踪到外网的路径
  3. 使用nslookup测试DNS解析
  4. 分析结果并定位问题
    验收标准:能够准确识别网络故障点

练习2:端口扫描与分析

目标:使用netstat/ss分析本机网络连接
步骤

  1. 查看所有监听端口
  2. 识别各端口对应的服务
  3. 检查是否有异常连接
  4. 生成端口使用报告
    验收标准:完整的端口使用分析报告

练习3:DNS故障排除

目标:诊断和解决DNS解析问题
步骤

  1. 测试不同DNS服务器的响应
  2. 使用dig命令追踪DNS查询过程
  3. 检查本地DNS缓存
  4. 配置备用DNS服务器
    验收标准:DNS解析正常工作

练习4:网络性能测试

目标:使用工具测试网络性能
步骤

  1. 使用ping测试延迟和丢包率
  2. 使用iperf3测试带宽
  3. 分析网络质量指标
  4. 提出优化建议
    验收标准:完整的网络性能评估报告

练习5:数据包分析

目标:使用tcpdump或Wireshark分析网络流量
步骤

  1. 捕获HTTP访问的数据包
  2. 分析TCP三次握手过程
  3. 查看HTTP请求和响应内容
  4. 识别可能的安全问题
    验收标准:准确分析网络通信过程

常见问题

Q1: ping通但无法访问网页,可能是什么原因?
A: 可能的原因包括:

  • DNS解析问题:使用nslookup测试域名解析
  • 防火墙阻止:检查本地和远程防火墙设置
  • 代理设置:检查浏览器代理配置
  • 端口问题:使用telnet测试80/443端口
  • 路由问题:使用traceroute检查路由路径

Q2: 如何判断网络延迟是否正常?
A: 延迟评估标准:

  • 局域网内:< 1ms 优秀,1-5ms 良好
  • 同城网络:< 20ms 优秀,20-50ms 良好
  • 国内网络:< 50ms 优秀,50-100ms 良好
  • 国际网络:< 200ms 可接受,> 300ms 较差
  • 丢包率:< 1% 优秀,1-3% 可接受,> 5% 需要关注

Q3: netstat和ss命令有什么区别?
A: 主要区别:

  • 性能:ss命令比netstat更快,特别是在连接数多的情况下
  • 功能:ss提供更详细的socket信息
  • 输出格式:ss的输出更易于脚本处理
  • 维护状态:ss是现代Linux系统的推荐工具
  • 兼容性:netstat在老系统上更通用

Q4: 如何选择合适的DNS服务器?
A: 选择标准:

  • 响应速度:使用dig测试查询时间
  • 稳定性:长期监控可用性
  • 安全性:支持DNS over HTTPS/TLS
  • 功能特性:是否支持广告过滤、恶意网站拦截
  • 常用公共DNS:8.8.8.8(Google)、114.114.114.114(114DNS)、1.1.1.1(Cloudflare)

Q5: tcpdump和Wireshark如何选择?
A: 选择依据:

  • tcpdump:命令行工具,适合服务器环境、脚本自动化、远程诊断
  • Wireshark:图形界面,适合详细分析、协议学习、复杂故障排除
  • 性能:tcpdump占用资源更少
  • 功能:Wireshark分析功能更强大
  • 使用场景:tcpdump用于捕获,Wireshark用于分析

Q6: 如何提高网络诊断效率?
A: 效率提升方法:

  • 建立标准化诊断流程
  • 使用脚本自动化常见检查
  • 建立网络基线数据
  • 使用监控工具持续观察
  • 记录常见问题和解决方案
  • 定期更新工具和知识

总结

本章详细介绍了网络命令与工具的使用:

  • 基础命令:ping、traceroute、netstat等核心诊断工具
  • DNS工具:nslookup、dig等域名解析诊断工具
  • 网络配置:ifconfig、ip等接口配置工具
  • 数据包分析:tcpdump、Wireshark等流量分析工具
  • 性能测试:iperf3、自定义延迟测试工具
  • 综合诊断:集成多种工具的诊断套件
  • 图形化工具:Wireshark等可视化分析工具

掌握这些工具是网络故障排除和性能优化的基础,在实际工作中要根据具体情况选择合适的工具组合。

下一步

  • 深入学习:011-VLAN与交换技术
  • 扩展阅读:网络监控系统的搭建和使用
  • 实践建议:在实验环境中练习各种网络诊断场景
  • 工具推荐:安装和配置常用的网络分析工具

参考与引用

更新记录

  • 更新时间: 2024-01-20 | 更新内容: 创建网络命令与工具章节,包含常用诊断命令、Python工具实现、综合诊断套件等内容 | 更新人: Assistant
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐