这是基于Python和Selenium实现的网络小说爬虫,能够从小说网站抓取整本小说内容并保存为TXT文件。代码具有断点续传、异常处理、随机延迟等功能,能够有效应对网站反爬机制。

注:该程序适用于小说章节链接需翻页获取的网站,并且小说一个章节有多页链接。


编写Prompt

单小说章节是多页的,只需要了解后几页的网址结构,在以下文章的基础上向Trae略作补充,加入类似提示词:有网站对于单个小说章节,它有多个页面,每个章节的页数即page不同。后面的page范围是(2,9),page链接形式是在第1页的章节链接后加“-{page}",接".html"。当未获取到page标题和内容时终止后续尝试。要获取完整小说章节和内容,修改代码。Trae可修改至完成任务。

Trae+Python小说爬虫2:“翻页章节链接+单页小说章节”式https://blog.csdn.net/mz159_357/article/details/151176441?spm=1001.2014.3001.5502

之后多次运行调试,以终端结果作为反馈,Trae可自行补充修改,直至完成任务。

完整代码

修改网址、小说和小说单章节翻页页数、标题和内容元素标签、保存路径即可广泛使用。

# 可设置翻页范围
# 获取章节列表,一个章节分两个链接

import time
import json
import random
import os
from urllib.parse import urljoin, urlparse
from lxml import etree
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import logging

# 配置日志
import argparse

# 解析命令行参数
parser = argparse.ArgumentParser(description='小说网站爬虫')
parser.add_argument('--debug', action='store_true', help='启用调试模式')
parser.add_argument('--url', type=str, help='要爬取的小说URL')
parser.add_argument('--name', type=str, help='小说名称(可选,默认从网页提取)')
parser.add_argument('--output', type=str, help='输出文件路径(可选)')
parser.add_argument('--dump-links', action='store_true', help='将所有找到的链接保存到文件')
parser.add_argument('--no-filter', action='store_true', help='不过滤链接,获取页面上所有链接')
parser.add_argument('--timeout', type=int, default=30, help='页面加载超时时间(秒)')
args, unknown = parser.parse_known_args()

# 根据命令行参数设置日志级别
log_level = logging.DEBUG if args.debug else logging.INFO

logging.basicConfig(
    level=log_level,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("spider_log.txt", encoding='utf-8'),
        logging.StreamHandler()
    ]
)

if args.debug:
    logging.info("调试模式已启用,将显示详细日志信息")

# 预定义的User-Agent列表
USER_AGENTS = [
    'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
    'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36',
    # ... 省略部分User-Agent(保持不变)
    'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.7258.67 Safari/537.36'
]

# 初始化全局WebDriver(类内部会重新初始化,此处可保留)
options = webdriver.ChromeOptions()
options.add_argument('--headless=new')
options.add_argument('--disable-gpu')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument(f'User-Agent={random.choice(USER_AGENTS)}')
options.add_argument('--remote-allow-origins=*')
driver = webdriver.Chrome(options=options)


class NovelSpider(object):
    def __init__(self, url=None, filename=None, novel_name=None, timeout=30, dump_links=False, no_filter=False):
        self.url = url or 'https://www.biqugexs.net/book/6254323/'
        self.timeout = timeout
        self.dump_links = dump_links
        self.no_filter = no_filter

        logging.info(f"初始化爬虫: URL={self.url}, 超时={self.timeout}秒, 保存链接={self.dump_links}, 不过滤链接={self.no_filter}")

        # 直接使用提供的小说名称,或者设置为默认值
        self.novel_name = novel_name or '赶尸客栈,带着铁三角去赶尸'
        logging.info(f"使用小说名称: {self.novel_name}")

        # 设置输出文件名
        if filename:
            self.filename = filename
        else:
            # 清理小说名称,移除不适合作为文件名的字符
            safe_name = ''.join(c for c in self.novel_name if c.isalnum() or c in ' _-')
            safe_name = safe_name.strip()
            if not safe_name:
                safe_name = "novel"
            self.filename = fr'D:\文档\大创\小说文本\{safe_name}_2022.txt'

        logging.info(f"输出文件设置为: {self.filename}")
        self.progress_file = self.filename + '.progress'
        self.driver = None
        self.max_retries = 5  # 最大重试次数
        self.retry_delay = 3  # 重试间隔(秒)
        self.downloaded_chapters = set()
        self.init_driver()
        self.downloaded_chapters = self.load_progress()

    def init_driver(self):
        # 初始化WebDriver,添加更多配置选项
        options = webdriver.ChromeOptions()
        options.add_argument('--headless=new')
        options.add_argument('--disable-gpu')
        options.add_argument('--no-sandbox')
        options.add_argument('--disable-dev-shm-usage')
        options.add_argument(f'user-agent={random.choice(USER_AGENTS)}')
        options.add_argument('--remote-allow-origins=*')

        # 添加更多配置以提高稳定性
        options.add_argument('--disable-extensions')  # 禁用扩展
        options.add_argument('--disable-popup-blocking')  # 禁用弹窗拦截
        options.add_argument('--disable-notifications')  # 禁用通知
        options.add_argument('--ignore-certificate-errors')  # 忽略证书错误
        options.add_argument('--disable-logging')  # 禁用日志
        options.add_argument('--log-level=3')  # 设置日志级别为最低
        options.add_argument('--disable-infobars')  # 禁用信息栏
        options.add_argument('--disable-blink-features=AutomationControlled')  # 禁用自动化控制检测

        # 添加实验性选项
        options.add_experimental_option('excludeSwitches', ['enable-automation', 'enable-logging'])
        options.add_experimental_option('useAutomationExtension', False)
        options.add_experimental_option('prefs', {
            'profile.default_content_setting_values.notifications': 2,  # 禁用通知
            'credentials_enable_service': False,  # 禁用保存密码提示
            'profile.password_manager_enabled': False  # 禁用密码管理器
        })

        if self.driver:
            try:
                self.driver.quit()
                logging.info("关闭旧WebDriver成功")
            except Exception as e:
                logging.warning(f"关闭旧WebDriver时出错: {e}")

        try:
            self.driver = webdriver.Chrome(options=options)
            # 设置超时时间
            self.driver.set_page_load_timeout(self.timeout)
            self.driver.set_script_timeout(self.timeout)
            logging.info(f"WebDriver初始化成功,超时设置为 {self.timeout} 秒")
        except Exception as e:
            logging.error(f"WebDriver初始化失败: {e}")
            raise

    def load_progress(self):
        # 加载进度(保持不变)
        if os.path.exists(self.progress_file):
            try:
                with open(self.progress_file, 'r', encoding='utf-8') as f:
                    return set(json.load(f))
            except Exception as e:
                logging.warning(f"加载进度文件失败: {e}")
                return set()
        logging.info("未找到进度文件,将创建新的进度记录")
        return set()

    def save_progress(self, chapter_url):
        # 保存进度(保持不变)
        self.downloaded_chapters.add(chapter_url)
        try:
            with open(self.progress_file, 'w', encoding='utf-8') as f:
                json.dump(list(self.downloaded_chapters), f)
            logging.debug(f"保存进度: {chapter_url}")
        except Exception as e:
            logging.error(f"保存进度失败: {e}")

    def random_delay(self, min_delay=2, max_delay=5):
        # 随机延迟(保持不变)
        delay = random.uniform(min_delay, max_delay)
        logging.debug(f"随机延迟 {delay:.2f} 秒")
        time.sleep(delay)

    def process_single_chapter(self, chapter_url, index, total_chapters, file_handle, is_supplement=False):
        """封装单个URL的处理逻辑(主URL或补充URL)"""
        if chapter_url in self.downloaded_chapters:
            supplement_tag = "(补充部分)" if is_supplement else ""
            logging.info(f'第 {index}/{total_chapters} 章{supplement_tag}已下载,跳过')
            return True

        supplement_tag = "(补充部分)" if is_supplement else ""
        logging.info(f'处理第 {index}/{total_chapters} 章{supplement_tag}: {chapter_url}')

        # 随机延迟
        self.random_delay()

        # 尝试请求URL
        retry_count = self.max_retries
        success = False
        while retry_count > 0 and not success:
            try:
                self.driver.get(chapter_url)
                success = True
            except Exception as e:
                retry_count -= 1
                logging.warning(f'请求章节URL{supplement_tag}失败 (剩余重试次数: {retry_count}): {e}')

                if 'invalid session id' in str(e).lower() or 'timeout' in str(e).lower():
                    logging.info('会话失效或超时,重建driver...')
                    self.init_driver()
                    # 重新访问首页建立会话
                    try:
                        self.driver.get(self.url)
                        time.sleep(2)
                    except Exception as e2:
                        logging.error(f'重新访问首页失败: {e2}')

                if retry_count > 0:
                    wait_time = self.retry_delay * (self.max_retries - retry_count + 1)
                    logging.info(f'等待 {wait_time} 秒后重试...')
                    time.sleep(wait_time)
                else:
                    logging.error(f'章节 {index}{supplement_tag} 请求失败,跳过')
                    return False

        if not success:
            return False

        # 获取章节标题(改进选择器)
        title_selectors = [
            '/html/body/div[3]/div[1]/div/div/h3',  # 原始选择器
            '//h1',  # 常见的标题标签
            '//div[contains(@class, "panel-heading")]',  # 基于类名的选择器
            '//div[contains(@class, "chapter-title")]',  # 另一个常见的标题选择器
            '//header//h1',  # 在header中的h1
            '//header//h2',  # 在header中的h2
            '//div[contains(@class, "header")]//h1'  # 在header类中的h1
        ]

        chapter_title = None
        chapter_title_text = None

        # 尝试获取标题文本,而不是保存元素引用
        for selector in title_selectors:
            try:
                element = WebDriverWait(self.driver, 5).until(
                    EC.presence_of_element_located((By.XPATH, selector))
                )
                if element and element.text and len(element.text.strip()) > 0:
                    # 直接获取文本,避免后续使用时元素已失效
                    chapter_title_text = element.text.strip()
                    logging.info(f'使用选择器 "{selector}" 获取标题成功: {chapter_title_text}')
                    break
            except Exception as e:
                logging.debug(f'选择器 "{selector}" 获取标题失败: {e}')

        if not chapter_title_text:
            # 如果所有选择器都失败,尝试从URL中提取章节信息
            try:
                chapter_number = chapter_url.split('/')[-1].split('.')[0]
                chapter_title_text = f"第{chapter_number}章"
                logging.warning(f'从URL提取章节标题: {chapter_title_text}')
            except Exception as e:
                logging.error(f'无法获取章节标题{supplement_tag},跳过此部分: {e}')
                return False

        # 获取章节内容(改进选择器和内容提取)
        content_selectors = [
            '/html/body/div[3]/div[1]/div/div//p',  # 原始选择器
            '//*[@id="content"]//p',  # 原始选择器的段落
            '//*[@id="chaptercontent"]',  # 常见的章节内容选择器
            '//div[contains(@class, "read_btn")]',  # 基于类名的选择器
            '//div[contains(@class, "chapter-content")]',  # 另一个常见的章节内容选择器
            '//div[contains(@class, "layout layout-col1")]',  # 包含content类的div
            '//div[@class="content"]',  # 精确匹配content类
            '//article',  # 一些网站使用article标签
            '//div[@id="chapter-content"]',  # 另一种常见ID
            '//div[@class="read-content"]',  # 阅读内容类
            '//div[@id="BookText"]',  # 另一种常见的小说内容ID
            '//div[@class="showtxt"]',  # 显示文本类
            '//div[@class="txt"]',  # 简单文本类
            '//div[@class="noveltext"]',  # 小说文本类
            '//div[@class="readcontent"]'  # 阅读内容类
        ]

        content = ""
        success = False

        for selector in content_selectors:
            try:
                chapter_content = self.driver.find_element(By.XPATH, selector)
                html = chapter_content.get_attribute("innerHTML")
                # 清理HTML
                html = html.replace('<p>', '\n').replace('</p>', '\n')
                html = html.replace('<br>', '\n').replace('<br/>', '\n').replace('<br />', '\n')
                html = html.replace('&nbsp;', ' ')

                # 解析内容
                tree = etree.HTML(f"<div>{html}</div>")
                if tree is not None:
                    content = tree.xpath("string(.)")
                    if content and len(content.strip()) > 50:
                        logging.info(f"使用选择器 '{selector}' 成功获取章节内容")
                        success = True
                        break
                    else:
                        logging.debug(f"选择器 '{selector}' 获取的内容过短: {len(content.strip())} 字符")
            except Exception as e:
                logging.debug(f"选择器 '{selector}' 获取内容失败: {e}")

        if not success:
            logging.warning(f'章节内容{supplement_tag}获取失败,尝试刷新页面重新获取')
            self.driver.refresh()
            time.sleep(3)

            # 刷新后再次尝试所有选择器
            for selector in content_selectors:
                try:
                    chapter_content = WebDriverWait(self.driver, 10).until(
                        EC.presence_of_element_located((By.XPATH, selector))
                    )
                    html = chapter_content.get_attribute("innerHTML")
                    # 清理HTML
                    html = html.replace('<p>', '\n').replace('</p>', '\n')
                    html = html.replace('<br>', '\n').replace('<br/>', '\n').replace('<br />', '\n')
                    html = html.replace('&nbsp;', ' ')

                    # 解析内容
                    tree = etree.HTML(f"<div>{html}</div>")
                    if tree is not None:
                        content = tree.xpath("string(.)")
                        if content and len(content.strip()) > 50:
                            logging.info(f'刷新后使用选择器 "{selector}" 成功获取章节内容')
                            success = True
                            break
                except Exception as e:
                    logging.debug(f'刷新后选择器 "{selector}" 获取内容失败: {e}')

            if not success:
                logging.error(f'刷新后仍无法获取章节内容{supplement_tag},跳过此部分')
                return False

        # 处理标题和内容
        chapter_title_text = chapter_title_text.replace("、", "") if chapter_title_text else f'未知章节{index}'
        if is_supplement:
            chapter_title_text += "(续)"  # 补充部分标题加标识

        # 清理内容
        # 保存原始内容长度,用于日志记录
        original_length = len(content.strip()) if content else 0

        try:
            # 移除常见的广告和无关文本
            content = content.replace("www.jingrou.me", "")
            content = content.replace("本章未完,请点击下一页继续阅读>>", "")
            content = content.replace("请记住本站域名:", "")
            content = content.replace("手机版访问:", "")
            content = content.replace("天才一秒记住本站地址:", "")
            content = content.replace("最快更新!无广告!", "")
            content = content.replace("请收藏本站:", "")
            content = content.replace("投推荐票", "")

            # 移除可能的网站名称和链接
            import re
            content = re.sub(r'https?://\S+', '', content)  # 移除URL
            content = re.sub(r'www\.\S+', '', content)  # 移除www开头的域名
            content = re.sub(r'[a-zA-Z0-9][-a-zA-Z0-9]{0,62}(\.[a-zA-Z0-9][-a-zA-Z0-9]{0,62})+\.?', '', content)  # 移除域名

            # 记录清理后的内容长度
            cleaned_length = len(content.strip()) if content else 0
            logging.info(f'内容清理完成,原始长度: {original_length} 字符,清理后长度: {cleaned_length} 字符')
        except Exception as e:
            logging.warning(f'内容清理过程中出错: {e},但将继续处理')
            # 继续处理,不返回False
        content = '\n'.join([line.strip() for line in content.split('\n') if line.strip()])

        # 写入文件
        file_handle.write(f'【{chapter_title_text}】\n\n')
        file_handle.write(content)
        file_handle.write('\n\n')

        # 保存进度
        self.save_progress(chapter_url)
        return True

    def get_parse_save_data(self, url):
        logging.info(f"开始处理小说: {url}")
        os.makedirs(os.path.dirname(self.filename), exist_ok=True)

        # 存储所有章节链接
        all_chapter_links = []

        # 处理第一页(基础URL)
        logging.info(f"处理第1页: {url}")
        chapter_links = self.get_chapter_links_from_page(url)
        if chapter_links:
            all_chapter_links.extend(chapter_links)
            logging.info(f"第1页获取到 {len(chapter_links)} 个章节链接")
        else:
            logging.warning("第1页未获取到任何章节链接")

        # 处理后续页面(翻页)
        for page in range(1, 4):  # 翻页范围,可根据需要调整
            page_url = f"{url}{page}/"
            logging.info(f"处理第{page}页: {page_url}")

            # 随机延迟,避免请求过快
            self.random_delay(3, 6)

            # 获取当前页的章节链接,添加重试机制
            max_page_retries = 3
            page_success = False

            for page_attempt in range(max_page_retries):
                try:
                    # 获取当前页的章节链接
                    page_chapter_links = self.get_chapter_links_from_page(page_url)

                    if page_chapter_links:
                        all_chapter_links.extend(page_chapter_links)
                        logging.info(f"第{page}页获取到 {len(page_chapter_links)} 个章节链接")
                        page_success = True
                        break
                    else:
                        logging.warning(f"第{page}页尝试{page_attempt + 1}未获取到任何章节链接")
                        if page_attempt < max_page_retries - 1:
                            logging.info(f"等待后重试第{page}页...")
                            time.sleep(5)
                            # 重新初始化driver以避免会话问题
                            self.init_driver()
                except Exception as e:
                    logging.error(f"处理第{page}页时出错 (尝试 {page_attempt + 1}/{max_page_retries}): {e}")
                    if page_attempt < max_page_retries - 1:
                        logging.info(f"等待后重试第{page}页...")
                        time.sleep(5)
                        # 重新初始化driver以避免会话问题
                        self.init_driver()

            # 如果多次尝试后仍未成功获取链接,停止翻页
            if not page_success:
                logging.warning(f"多次尝试后第{page}页未获取到任何章节链接,停止翻页")
                break

        logging.info(f"所有页面共获取到 {len(all_chapter_links)} 个章节链接")

        if not all_chapter_links:
            logging.critical("未找到任何有效章节链接")
            return False

        # 如果启用了链接保存功能,将所有链接保存到文件
        if self.dump_links:
            links_file = self.filename + '.links.txt'
            try:
                with open(links_file, 'w', encoding='utf-8') as f:
                    for i, (url, title) in enumerate(all_chapter_links, 1):
                        f.write(f"{i}. {url} - {title}\n")
                logging.info(f"已将 {len(all_chapter_links)} 个链接保存到文件: {links_file}")
            except Exception as e:
                logging.error(f"保存链接到文件失败: {e}")

        # 写入文件(核心修改:处理主URL和补充URL)
        mode = 'a' if os.path.exists(self.filename) and self.downloaded_chapters else 'w'
        with open(self.filename, mode, encoding='utf-8') as f:
            if mode == 'w':
                f.write(f'《{self.novel_name}》\n\n')

            total_chapters = len(all_chapter_links)
            for index, (chapter_url, title) in enumerate(all_chapter_links, 1):
                # 1. 处理主章节URL
                self.process_single_chapter(chapter_url, index, total_chapters, f, is_supplement=False)

                # 2. 生成并处理补充URL(xxx-2.html)
                # 处理URL格式,避免xxx.html-2(改为xxx-2.html)
                page = 1
                while True:
                    base, ext = os.path.splitext(chapter_url)
                    chapter_url_2 = f"{base}-{page}{ext}"  # 正确格式:xxx-2.html

                    # 尝试处理补充章节,如果处理失败则退出循环
                    if not self.process_single_chapter(chapter_url_2, index, total_chapters, f, is_supplement=True):
                        logging.info(f"未找到第{page}页补充内容,停止获取后续页面")
                        break

                    page += 1
                    # 设置最大页数限制,避免无限循环
                    if page > 2:
                        logging.warning("达到最大页数限制(2),停止获取补充内容")
                        break
            logging.info(f"小说《{self.novel_name}》已完整下载到文件: {self.filename}")
            return True

    def get_chapter_links_from_page(self, url):
        """从指定页面获取章节链接"""
        logging.info(f"获取页面章节链接: {url}")

        # 请求页面
        for attempt in range(self.max_retries):
            try:
                self.driver.get(url)
                # 确保页面完全加载
                WebDriverWait(self.driver, 10).until(
                    lambda d: d.execute_script('return document.readyState') == 'complete'
                )
                logging.info(f"成功加载页面: {url} (尝试 {attempt + 1}/{self.max_retries})")
                break
            except Exception as e:
                logging.error(f'页面请求失败 (尝试 {attempt + 1}/{self.max_retries}): {e}')
                if attempt < self.max_retries - 1:
                    self.random_delay(3, 8)
                    self.init_driver()
                else:
                    logging.critical(f"无法访问页面,跳过: {url}")
                    return []

        # 加载章节列表(使用多个选择器)
        chapter_list_selectors = [
            '/html/body/div[3]/div[2]/div/ul[2]//a',  # 原始选择器
            '/html/body/div[3]/div[2]/div/ul[1]//a',  # 原始选择器
            '//div[contains(@class, "intro")]//a',  # 包含chapter的div下的链接
            '//ul[contains(@class, "chapter")]//a',  # 包含chapter的ul下的链接
            '//div[@id="chapterlist"]//a',  # 另一个常见的章节列表id
            '//div[@class="ml_content"]//a',  # 小说目录内容
            '//div[@class="catalog"]//a',  # 目录div
            '//div[@id="list"]//a',  # 列表div
            '//div[@class="listmain"]//a'  # 主列表div
        ]

        chapter_list_loaded = False
        successful_selector = None

        # 遍历所有选择器尝试加载章节列表
        for selector in chapter_list_selectors:
            try:
                logging.info(f"尝试使用选择器加载章节列表: {selector}")
                WebDriverWait(self.driver, 10).until(EC.presence_of_element_located((By.XPATH, selector)))
                self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
                time.sleep(2)

                # 验证是否真的找到了链接
                elements = self.driver.find_elements(By.XPATH, selector)
                if elements and len(elements) > 0:
                    logging.info(f"使用选择器 '{selector}' 成功加载章节列表,找到 {len(elements)} 个元素")
                    chapter_list_loaded = True
                    successful_selector = selector
                    break
                else:
                    logging.debug(f"选择器 '{selector}' 未找到任何元素")

            except Exception as e:
                logging.debug(f"选择器 '{selector}' 加载章节列表失败: {e}")

        if not chapter_list_loaded:
            logging.warning("所有选择器均未能加载章节列表,尝试刷新页面")
            self.driver.refresh()
            time.sleep(5)  # 给页面更多加载时间

            # 刷新后再次尝试所有选择器
            for selector in chapter_list_selectors:
                try:
                    logging.info(f"刷新后尝试使用选择器加载章节列表: {selector}")
                    WebDriverWait(self.driver, 20).until(EC.presence_of_element_located((By.XPATH, selector)))
                    self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
                    time.sleep(2)

                    # 验证是否找到了链接
                    elements = self.driver.find_elements(By.XPATH, selector)
                    if elements and len(elements) > 0:
                        logging.info(f"刷新后使用选择器 '{selector}' 成功加载章节列表,找到 {len(elements)} 个元素")
                        chapter_list_loaded = True
                        successful_selector = selector
                        break
                    else:
                        logging.debug(f"刷新后选择器 '{selector}' 未找到任何元素")

                except Exception as e:
                    logging.debug(f"刷新后选择器 '{selector}' 加载章节列表失败: {e}")

            if not chapter_list_loaded:
                logging.warning("所有选择器均无法找到章节列表容器,将直接尝试获取所有链接")

        # 从当前页面URL中提取基础URL
        current_url = self.driver.current_url
        parsed_url = urlparse(current_url)
        base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
        logging.info(f"使用基础URL: {base_url}")

        # 获取章节链接
        chapter_links = []
        max_attempts = 3  # 最大尝试次数

        for attempt in range(max_attempts):
            try:
                # 使用WebDriverWait确保元素可交互
                WebDriverWait(self.driver, 10).until(
                    EC.presence_of_all_elements_located((By.XPATH, selector))
                )

                # 获取所有链接元素
                elements = self.driver.find_elements(By.XPATH, selector)
                logging.info(f"使用选择器 '{selector}' 找到 {len(elements)} 个潜在章节链接")

                # 直接提取链接和文本,避免后续处理时元素失效
                link_data = []
                for element in elements:
                    try:
                        url = element.get_attribute('href')
                        text = element.text.strip()
                        if url:
                            link_data.append((url, text))
                    except Exception as e:
                        logging.debug(f"提取链接属性时出错: {e}")
                        continue

                # 处理所有提取到的链接
                for url, text in link_data:
                    # 处理相对URL
                    if url.startswith('/'):
                        url = base_url + url
                    elif not url.startswith('http'):
                        url = urljoin(current_url, url)

                    # 添加到章节链接列表
                    chapter_links.append((url, text))

                # 如果成功获取链接,跳出循环
                if chapter_links:
                    logging.info(f"成功获取 {len(chapter_links)} 个有效章节链接")
                    break

            except Exception as e:
                logging.warning(f"获取章节链接失败 (尝试 {attempt + 1}/{max_attempts}): {e}")
                if attempt < max_attempts - 1:
                    logging.info("刷新页面并重试...")
                    self.driver.refresh()
                    time.sleep(3)

        if not chapter_links:
            logging.error("多次尝试后仍无法获取章节链接")
            import traceback
            logging.error(traceback.format_exc())

        return chapter_links

    def run(self):
        try:
            success = self.get_parse_save_data(self.url)
            if success:
                logging.info("爬虫任务成功完成")
            else:
                logging.error("爬虫任务失败")
        except Exception as e:
            logging.critical(f"爬虫运行时发生严重错误: {e}")
        finally:
            if self.driver:
                try:
                    self.driver.quit()
                    logging.info("WebDriver已关闭")
                except:
                    pass


if __name__ == '__main__':
    try:
        os.makedirs(r'D:\文档\大创\小说文本', exist_ok=True)

        # 使用命令行参数
        target_url = args.url if args.url else 'https://www.biqugexs.net/book/6254323/'
        logging.info(f"爬取小说URL: {target_url}")

        # 创建爬虫实例,传入命令行参数
        spider = NovelSpider(
            url=target_url,
            novel_name=args.name,
            filename=args.output,
            timeout=args.timeout,
            dump_links=args.dump_links,
            no_filter=args.no_filter
        )

        # 运行爬虫
        spider.run()
    except Exception as e:
        logging.critical(f"程序启动失败: {e}")
        import traceback

        logging.critical(traceback.format_exc())

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐