Python爬虫的基础用法

python爬虫一般通过第三方库进行完成,request等

方法一(request)

  1. 导入第三方库(如import requests
    requests用于处理http协议请求的第三方库,beautifulsoup4用来解析获取到的网页内容,用python解释器中查看是否有这个库,没有点击+安装
  2. 获取网站url(url一定要解析正确,如在百度网站中的某个图片,不是指www.baidu.com,而是这个图片的具体地址(可以点击查看,亦可以用浏览器自带的检查功能进行寻找))
  3. 之后通过requests.get打开网站,设置自己适用的encoding
  4. a = re.findall('<标签>正则表达式<标签>',文件名)通过HTML定义的标签,找到自己想要的内容,再赋值给一个变量a(并将获取到的数据以指定的数据类型存储到文件中)
  5. 将文件内容使用print输出进行查看

示例如下:

url = "http://weather.com.cn/weather1d/101010100.shtml#search"
resp = requests.get(url)
      # 打开浏览器访问该地址
resp.encoding = 'UTF-8'
print(resp)				# 返回访问代码 200表示成功 500 错误
# print(resp.text)		# 展示网页 (html代码的形式展示网页)

city = re.findall('<span class="name">([\u4e00-\u9fa5]*)</span>', resp.text)
weather = re.findall('<span class="weather">([\u4e00-\u9fa5]*)</span>', resp.text)
# 以上赋值操作中的内容必须在resp.text中存在
lst = []
for a,b in zip(city, weather,):  # 通过zip方式将爬取到的内容填充到list中
        lst.append([a,b])
for i in lst:
    print(i)

方法二(request和beautifulsoup)

使用request获取网页源码,beautifulsoup解析网站上的标签,从其中提取数据。

import requests
from bs4 import BeautifulSoup

# 获取网页
url = 'http://weather.com.cn/weather1d/101010100.shtml#search'
headers = {'User-Agent': 'network下的东西'}
response = requests.get(url, headers=headers)

print(response.status_code)
# 解析内容
soup = BeautifulSoup(response.text, 'html.parser')
title = soup.find_all("div",attrs={"class":"column_article_desc"})

with open('result.html', 'w', encoding='utf-8') as f:
    for item in title:
        f.write(item.string)

  • beautifulsoup支持多种解析器:html.parser(python内置,无需额外安装)、lxml(需要额外安装,速度快,功能好)、html5lib(需要额外安装,容错性最好)
  • beautifulsoup 能够通过标签名、属性、CSS选择器进行查找对应的元素 从元素中
  • 提取对应的数据可以使用text属性代表所有文本,string仅代表当前标签的文本
# 通过标签名查找
soup.find('div')          # 查找第一个 div
soup.find_all('div')      # 查找所有 div
# 通过属性查找
soup.find(id='content')
soup.find(class_='article')
soup.find(attrs={'data-id': '123'})
# 通过 CSS 选择器查找
soup.select('div.content')        # 类选择器
soup.select('#main')             # ID 选择器
soup.select('div p a')           # 后代选择器
soup.select('div > p')           # 子元素选择器

# 获取文本
element = soup.find('h1')
print(element.text)           # 获取所有文本(包括子标签)
print(element.get_text())     # 同 text
print(element.string)         # 仅获取当前标签的文本
# 获取属性
link = soup.find('a')
print(link['href'])           # 获取 href 属性
print(link.get('href'))       # 安全获取属性
print(link.attrs)             # 获取所有属性字典
# 获取多个元素
for item in soup.find_all('li'):
    print(item.text)

方法三(request+webdirver)

WebDriver 是一个浏览器自动化工具,可以让你用代码控制浏览器,模拟真实用户的操作。它是 Selenium 的核心组件.。
WebDriver的使用方法:初始化Chrome()浏览器——>.get(url) ——>获取url的一些信息——>.quit()关闭浏览器

import requests
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time

# 使用Selenium获取动态页面
url = 'http://weather.com.cn/weather1d/101010100.shtml#search'

# 1. 设置Selenium选项
options = webdriver.ChromeOptions()
options.add_argument('--headless')  # 无头模式,不显示浏览器窗口
options.add_argument('--disable-blink-features=AutomationControlled')  # 防止被检测

# 2. 创建浏览器驱动
driver = webdriver.Chrome(options=options)

try:
    # 3. 访问网页
    driver.get(url)
    print(f"访问页面: {url}")
    
    # 4. 等待页面加载(等待某个元素出现)
    wait = WebDriverWait(driver, 10)  # 最多等10秒
    
    # 这里根据实际网页调整,等待天气数据相关的元素
    # 示例:等待温度元素出现(需要根据实际网页修改选择器)
    # wait.until(EC.presence_of_element_located((By.CLASS_NAME, "tem")))
    
    # 如果不知道具体元素,简单等待几秒
    time.sleep(3)
    
    # 5. 获取渲染后的页面源代码
    page_html = driver.page_source
    print("获取页面成功")
    
    # 6. 保存到文件查看
    with open('result_selenium.html', 'w', encoding='utf-8') as f:
        f.write(page_html)
    print("已保存到 result_selenium.html")
    
    # 7. 也可以用BeautifulSoup解析
    from bs4 import BeautifulSoup
    soup = BeautifulSoup(page_html, 'html.parser')
    
    # 查找天气数据(根据实际网页调整)
    # 查看保存的HTML文件,找到正确的选择器
    weather_items = soup.find_all("div", class_="t")
    
    # 输出找到的内容
    if weather_items:
        print(f"找到 {len(weather_items)} 个天气元素")
        for item in weather_items[:3]:  # 只显示前3个
            print(item.text[:100])  # 只显示前100字符
    else:
        print("未找到天气元素,请检查选择器")
        # 也可以尝试其他常见的选择器
        all_divs = soup.find_all("div", limit=20)
        print("前20个div的class:")
        for div in all_divs:
            if div.get('class'):
                print(f"  class: {div.get('class')}")
    
finally:
    # 8. 关闭浏览器
    driver.quit()
    print("浏览器已关闭")

反爬机制

1、基础检测类

反爬机制 应对方法 代码示例
User-Agent检测 轮换UA池 headers = {‘User-Agent’: random.choice(ua_list)}
请求头完整性 补全常用headers headers = { ‘Accept’: ‘text/html,…’, ‘Accept-Language’: ‘zh-CN,zh;q=0.9’, ‘Connection’: ‘keep-alive’ }
Cookie验证 Session保持 session = requests.Session()

2、频率限制类

原因:网站通过检测单位时间内的请求频率来判断是否是机器行为(因为人类浏览有随即延迟,而程序请求通常规律性比较高)
如何解决:分时段策略,模仿人类的作息时间;越是深度的页面停留的越久;模拟人访问时候的随即延迟

class IntelligentRateLimiter:
    """智能频率控制器"""
    def __init__(self):
        self.request_history = []
        self.min_delay = 1.0  # 最小延迟
        self.max_delay = 5.0  # 最大延迟
    
    def human_like_delay(self):
        """模拟人类随机延迟"""
        # 1. 正态分布延迟(中心在2-3秒)
        delay = random.normalvariate(2.5, 0.8)
        delay = max(self.min_delay, min(delay, self.max_delay))
        
        # 2. 随机添加思考时间(10%概率长时间停留)
        if random.random() < 0.1:
            delay += random.uniform(3, 10)
        
        time.sleep(delay)
        return delay
    
    def adaptive_throttling(self, response):
        """根据响应自适应调整"""
        if response.status_code == 429:  # Too Many Requests
            # 指数退避
            wait_time = 60 * (2 ** len(self.request_history))
            print(f"触发限制,等待{wait_time}秒")
            time.sleep(wait_time)
            
        # 动态调整频率
        if "slow_down" in response.headers.get('Retry-After', ''):
            self.min_delay += 0.5
            self.max_delay += 1.0

# 使用策略
limiter = IntelligentRateLimiter()
for page in range(100):
    limiter.human_like_delay()  # 关键:随机延迟
    response = requests.get(url)
    limiter.adaptive_throttling(response)

3、IP限制类

原因:IP是网络身份的核心标识,限制IP是最直接最有效的方法(黑白名单机制/阈值限制/信誉评分系统)。限制IP的封禁级别一般有:临时封禁(几分钟-几小时)/长期封禁(几天)/永久封禁(加入黑名单)
解决策略:使用代理轮询策略/地理分散策略

class SmartProxyManager:
    """智能代理管理器"""
    def __init__(self):
        self.proxy_pool = []
        self.proxy_stats = {}  # 代理使用统计
        self.current_proxy = None
        self.init_proxy_sources()
    
    def init_proxy_sources(self):
        """初始化代理源"""
        # 1. 免费代理(测试用)
        free_proxies = [
            'http://free-proxy1.com:8080',
            'http://free-proxy2.com:3128'
        ]
        
        # 2. 付费代理API
        paid_proxies_api = 'http://paid-service.com/get_proxy'
        
        # 3. 自建代理池(推荐)
        # 使用squid/3proxy搭建
        
        self.proxy_pool.extend(free_proxies)
    
    def rotate_proxy(self):
        """轮换代理策略"""
        # 策略1:按权重选择(成功率高优先)
        if self.proxy_stats:
            working_proxies = [p for p, s in self.proxy_stats.items() 
                              if s.get('success_rate', 0) > 0.7]
            if working_proxies:
                self.current_proxy = random.choice(working_proxies)
                return
        
        # 策略2:轮询选择
        if self.proxy_pool:
            self.current_proxy = random.choice(self.proxy_pool)
        else:
            self.current_proxy = None  # 直连
    
    def validate_proxy(self, proxy_url):
        """验证代理有效性"""
        test_urls = [
            'http://httpbin.org/ip',
            'http://icanhazip.com',
            'https://api.ipify.org?format=json'
        ]
        
        for test_url in test_urls:
            try:
                response = requests.get(test_url, 
                                       proxies={'http': proxy_url, 'https': proxy_url},
                                       timeout=5)
                if response.status_code == 200:
                    print(f"代理 {proxy_url} 可用,IP: {response.text[:50]}")
                    return True
            except:
                continue
        return False
    
    def residential_proxy_strategy(self):
        """住宅代理策略(最难检测)"""
        # 住宅代理特点:
        # 1. 真实用户IP,有浏览历史
        # 2. IP段分散,无规律
        # 3. 可配合真实浏览器指纹
        
        # 获取住宅代理(如Luminati、SmartProxy)
        residential_api = "https://residential-proxy-provider.com/get"
        response = requests.get(residential_api)
        proxy_data = response.json()
        
        # 格式:username:password@host:port
        proxy = f"http://{proxy_data['user']}:{proxy_data['pass']}@{proxy_data['host']}:{proxy_data['port']}"
        return proxy

# 使用示例
proxy_manager = SmartProxyManager()
session = requests.Session()

for i in range(100):
    proxy_manager.rotate_proxy()
    if proxy_manager.current_proxy:
        session.proxies = {'http': proxy_manager.current_proxy,
                          'https': proxy_manager.current_proxy}
    
    # 添加代理验证头(有些服务需要)
    session.headers.update({
        'Proxy-Authorization': 'Basic ...',
        'X-Forwarded-For': generate_random_ip()  # 伪造XFF头
    })
    
    response = session.get(target_url)

4、动态内容类

原因:大量四用JS动态渲染内容,导致初始的HTML为空。例如:无限滚动页面/单页应用/异步分页加载;
解决:直接调用API;分析网络请求(用Charles/Fiddler抓包);模拟完整流程(包含页面交互);处理加密参数(找到signature/token生成逻辑)

# 方法1:直接调用API(最优解)
def extract_api_urls(page_url):
    """分析页面找到数据API"""
    # 步骤1:人工分析
    # 1. F12打开开发者工具
    # 2. Network面板 -> XHR/Fetch过滤
    # 3. 查看数据返回的请求
    
    # 步骤2:自动化分析
    response = requests.get(page_url)
    
    # 查找可能的API端点
    import re
    api_patterns = [
        r'api/v\d+/data',  # REST API
        r'graphql',  # GraphQL端点
        r'\.json\?',  # JSON数据
        r'loadMore|getList|fetchData',  # 数据加载函数
    ]
    
    api_endpoints = []
    for pattern in api_patterns:
        matches = re.findall(pattern, response.text)
        api_endpoints.extend(matches)
    
    return list(set(api_endpoints))

# 方法2:无头浏览器方案
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException

class DynamicContentCrawler:
    def __init__(self, headless=True):
        options = webdriver.ChromeOptions()
        if headless:
            options.add_argument('--headless')
        options.add_argument('--disable-blink-features=AutomationControlled')
        options.add_experimental_option("excludeSwitches", ["enable-automation"])
        options.add_experimental_option('useAutomationExtension', False)
        
        self.driver = webdriver.Chrome(options=options)
        
        # 注入JS修改navigator属性
        self.driver.execute_script("""
            Object.defineProperty(navigator, 'webdriver', {
                get: () => undefined
            });
        """)
    
    def wait_for_element(self, selector, timeout=10):
        """等待元素加载"""
        try:
            element = WebDriverWait(self.driver, timeout).until(
                EC.presence_of_element_located((By.CSS_SELECTOR, selector))
            )
            return element
        except TimeoutException:
            print(f"元素 {selector} 加载超时")
            return None
    
    def scroll_to_load(self, scroll_times=5):
        """模拟滚动加载更多内容"""
        for i in range(scroll_times):
            # 滚动到页面底部
            self.driver.execute_script(
                "window.scrollTo(0, document.body.scrollHeight);"
            )
            
            # 等待新内容加载
            time.sleep(random.uniform(1, 3))
            
            # 检查是否有"加载更多"按钮
            load_more_buttons = self.driver.find_elements(
                By.XPATH, "//button[contains(text(), '加载更多') or contains(text(), 'Load More')]"
            )
            if load_more_buttons:
                load_more_buttons[0].click()
                time.sleep(2)
    
    def extract_data(self, url):
        """提取动态内容"""
        self.driver.get(url)
        
        # 等待主要内容加载
        self.wait_for_element(".content", timeout=15)
        
        # 滚动加载
        self.scroll_to_load(scroll_times=3)
        
        # 获取完整页面源码
        page_source = self.driver.page_source
        
        # 或者直接执行JS获取数据
        data = self.driver.execute_script("""
            // 直接访问JS中的全局变量
            if (window.__INITIAL_STATE__) {
                return window.__INITIAL_STATE__;
            }
            // 或者从DOM中提取
            var items = [];
            document.querySelectorAll('.item').forEach(function(el) {
                items.push({
                    title: el.querySelector('.title').innerText,
                    price: el.querySelector('.price').innerText
                });
            });
            return items;
        """)
        
        return data
    
    def close(self):
        self.driver.quit()

# 方法3:轻量级方案 - requests-html
from requests_html import HTMLSession

session = HTMLSession()
r = session.get('https://example.com')

# 执行JS(比Selenium轻量)
r.html.render(sleep=2, timeout=20)

# 提取渲染后的内容
items = r.html.find('.product-item')
for item in items:
    print(item.text)

# 方法4:逆向工程JS(高级)
def decrypt_js_data(encrypted_data):
    """解密JS加密的数据"""
    # 步骤1:找到JS加密函数
    # 步骤2:用Python重写或execjs调用
    
    import execjs
    
    # 读取JS解密函数
    with open('decrypt.js', 'r') as f:
        js_code = f.read()
    
    # 创建JS环境
    ctx = execjs.compile(js_code)
    
    # 执行解密
    result = ctx.call('decryptFunction', encrypted_data)
    return result

5、JS挑战类

原因:JS挑战类是通过环境检测(navigator属性)、代码混淆、行为验证(鼠标点击/移动)、加密通信(参数加密/每次不同)。典型的有Cloudflare 5秒盾;Akamai Bot Manager;PerimeterX等
处理:使用cloudscraper库绕过Cloudflare盾;修改WebDriver属性欺骗浏览器;模拟人的随机鼠标移动行为等

class JSAntiAntiSpider:
    """对抗JS挑战的策略集合"""
    
    def __init__(self):
        self.js_context = None
        
    def bypass_cloudflare(self, url):
        """绕过Cloudflare 5秒盾"""
        # 方法1:使用cloudscraper库
        import cloudscraper
        scraper = cloudscraper.create_scraper()
        response = scraper.get(url)
        return response.text
        
        # 方法2:手动处理
        # Cloudflare会返回一个计算挑战的JS
        # 需要解析JS并计算结果
    
    def fingerprint_spoofing(self, driver):
        """欺骗浏览器指纹检测"""
        # 修改WebDriver属性
        driver.execute_script("""
            // 修改navigator属性
            Object.defineProperty(navigator, 'webdriver', {
                get: () => undefined
            });
            
            // 修改plugins
            Object.defineProperty(navigator, 'plugins', {
                get: () => [1, 2, 3, 4, 5]
            });
            
            // 修改languages
            Object.defineProperty(navigator, 'languages', {
                get: () => ['zh-CN', 'zh', 'en']
            });
            
            // 修改屏幕属性
            Object.defineProperty(screen, 'width', {
                get: () => 1920
            });
            Object.defineProperty(screen, 'height', {
                get: () => 1080
            });
        """)
        
        # 添加更多指纹属性
        fingerprint_js = """
        window.chrome = {
            runtime: {},
            loadTimes: function() {},
            csi: function() {},
            app: {}
        };
        """
        driver.execute_script(fingerprint_js)
    
    def solve_js_challenge(self, challenge_js):
        """解析并解决JS挑战"""
        # 方法1:使用nodejs执行
        import subprocess
        
        with open('challenge.js', 'w') as f:
            f.write(challenge_js + '\nconsole.log(solve());')
        
        result = subprocess.run(['node', 'challenge.js'], 
                              capture_output=True, text=True)
        return result.stdout.strip()
        
        # 方法2:Python模拟JS逻辑
        # 分析JS代码,提取算法逻辑
        # 用Python重写
    
    def handle_obfuscated_js(self, js_code):
        """处理混淆的JS代码"""
        # 步骤1:去混淆(有限)
        # 简单替换
        deobfuscated = js_code
        replacements = {
            '_0xabc123': 'document',
            '_0xdef456': 'getElementById'
        }
        
        for old, new in replacements.items():
            deobfuscated = deobfuscated.replace(old, new)
        
        # 步骤2:使用AST分析
        # 需要esprima等库
        
        return deobfuscated
    
    def simulate_human_behavior(self, driver):
        """模拟人类行为模式"""
        # 1. 随机鼠标移动
        from selenium.webdriver.common.action_chains import ActionChains
        
        actions = ActionChains(driver)
        
        # 生成随机移动路径
        for i in range(random.randint(5, 10)):
            x = random.randint(0, 1000)
            y = random.randint(0, 700)
            actions.move_by_offset(x, y)
            time.sleep(random.uniform(0.1, 0.3))
        
        actions.perform()
        
        # 2. 随机滚动
        scroll_script = """
        window.scrollTo({
            top: %d,
            behavior: 'smooth'
        });
        """ % random.randint(0, 2000)
        driver.execute_script(scroll_script)
        
        # 3. 随机点击(非必要元素)
        if random.random() < 0.3:
            elements = driver.find_elements(By.TAG_NAME, 'a')
            if elements:
                random.choice(elements).click()
                time.sleep(random.uniform(1, 3))
                driver.back()
    
    def decrypt_api_params(self, url_pattern):
        """解密API参数生成逻辑"""
        # 常见参数加密方式:
        # 1. timestamp + token
        # 2. sign = md5(参数排序+密钥)
        # 3. 动态cookie
        
        import hashlib
        import time
        
        timestamp = int(time.time() * 1000)
        
        # 生成signature(需逆向分析JS)
        params = {
            'page': 1,
            'size': 20,
            't': timestamp
        }
        
        # 模拟sign生成(需根据实际JS调整)
        param_str = '&'.join([f'{k}={v}' for k, v in sorted(params.items())])
        secret = '网站密钥(需逆向)'
        
        sign = hashlib.md5((param_str + secret).encode()).hexdigest()
        params['sign'] = sign
        
        return params

# 使用专用库
def use_undetected_chromedriver():
    """使用undetected-chromedriver绕过检测"""
    import undetected_chromedriver as uc
    
    options = uc.ChromeOptions()
    options.add_argument('--disable-blink-features=AutomationControlled')
    
    driver = uc.Chrome(options=options)
    driver.get('https://nowsecure.nl')  # 著名的反爬测试网站
    
    # 检查是否成功绕过
    if "you are not a bot" in driver.page_source:
        print("成功绕过检测!")
    
    return driver

6、验证码处理

  • 简单字符验证码:OCR识别(ddddocr > tesseract)
  • 滑块验证码:计算缺口位置 + 人类轨迹模拟
  • 点选验证码:目标检测模型(需训练)
  • 无感验证:全面模拟浏览器环境和人类行为
  • 终极方案:第三方打码平台(成本换时间)
class CaptchaSolver:
    """验证码解决方案集合"""
    
    def __init__(self):
        self.ocr_cache = {}  # 缓存识别结果
    
    def solve_simple_captcha(self, image_url_or_bytes):
        """解决简单字符验证码"""
        # 方法1:使用ddddocr(推荐)
        try:
            import ddddocr
            
            ocr = ddddocr.DdddOcr()
            if isinstance(image_url_or_bytes, str):
                # 下载图片
                response = requests.get(image_url_or_bytes)
                img_bytes = response.content
            else:
                img_bytes = image_url_or_bytes
                
            result = ocr.classification(img_bytes)
            return result
        except ImportError:
            pass
        
        # 方法2:使用tesseract(需训练)
        try:
            import pytesseract
            from PIL import Image, ImageFilter
            
            # 图像预处理
            img = Image.open(io.BytesIO(img_bytes))
            img = img.convert('L')  # 灰度化
            img = img.filter(ImageFilter.MedianFilter())  # 中值滤波
            
            # 二值化
            threshold = 150
            img = img.point(lambda x: 0 if x < threshold else 255)
            
            result = pytesseract.image_to_string(img, 
                                                config='--psm 7 --oem 3')
            return result.strip()
        except:
            pass
        
        # 方法3:使用打码平台API
        return self.use_captcha_service(img_bytes)
    
    def solve_slide_captcha(self, bg_url, slide_url):
        """解决滑块验证码"""
        # 步骤1:下载背景图和滑块图
        bg_img = self.download_image(bg_url)
        slide_img = self.download_image(slide_url)
        
        # 步骤2:计算滑动距离
        distance = self.calculate_slide_distance(bg_img, slide_img)
        
        # 步骤3:生成人类滑动轨迹
        track = self.generate_human_track(distance)
        
        return track
    
    def calculate_slide_distance(self, bg_img, slide_img):
        """计算滑块需要滑动的距离"""
        # 方法1:模板匹配
        import cv2
        import numpy as np
        
        # 转换图像
        bg_gray = cv2.cvtColor(bg_img, cv2.COLOR_BGR2GRAY)
        slide_gray = cv2.cvtColor(slide_img, cv2.COLOR_BGR2GRAY)
        
        # 模板匹配
        result = cv2.matchTemplate(bg_gray, slide_gray, 
                                  cv2.TM_CCOEFF_NORMED)
        
        # 获取最佳匹配位置
        min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
        
        # 返回x坐标(距离)
        return max_loc[0]
    
    def generate_human_track(self, distance):
        """生成人类滑动轨迹"""
        # 人类滑动特征:
        # 1. 先快后慢
        # 2. 有微小抖动
        # 3. 最后可能回拉一点
        
        track = []
        current = 0
        t = 0
        
        # 分段滑动
        segments = [
            (0.3, 0.6),  # 初始加速段
            (0.4, 0.3),  # 匀速段
            (0.3, 0.1),  # 减速段
        ]
        
        for segment_ratio, speed_ratio in segments:
            segment_distance = distance * segment_ratio
            segment_time = random.uniform(0.5, 1.0)
            
            while current < distance * sum(s[0] for s in segments[:segments.index((segment_ratio, speed_ratio)) + 1]):
                # 计算这一步的位移
                step = random.uniform(1, 3) * speed_ratio
                
                # 添加随机抖动
                if random.random() < 0.1:
                    step *= random.uniform(0.5, 1.5)
                
                current += step
                t += random.uniform(0.01, 0.05)
                
                track.append({
                    'x': int(current),
                    'y': random.randint(-2, 2),  # Y轴微小偏移
                    't': int(t * 1000)  # 毫秒
                })
        
        # 最后可能有一点过冲和回拉
        if random.random() < 0.3:
            overshoot = random.randint(3, 10)
            track.append({'x': int(current + overshoot), 'y': 0, 't': int(t * 1000 + 50)})
            track.append({'x': int(current), 'y': 0, 't': int(t * 1000 + 100)})
        
        return track
    
    def solve_click_captcha(self, image_url, prompt):
        """解决点选验证码"""
        # 提示如:"点击图中的公交车"
        
        # 方法1:使用目标检测模型
        try:
            import torch
            from PIL import Image
            
            # 加载预训练模型(需提前训练)
            model = torch.hub.load('ultralytics/yolov5', 'custom', 
                                  path='click_captcha_model.pt')
            
            img = Image.open(io.BytesIO(requests.get(image_url).content))
            results = model(img)
            
            # 解析结果,返回坐标
            points = []
            for *box, conf, cls in results.xyxy[0]:
                if conf > 0.5:
                    x_center = (box[0] + box[2]) / 2
                    y_center = (box[1] + box[3]) / 2
                    points.append((x_center, y_center))
            
            return points[:4]  # 通常点选4个位置
        except:
            pass
        
        # 方法2:使用打码平台
        return self.use_click_captcha_service(image_url, prompt)
    
    def use_captcha_service(self, image_bytes, captcha_type='common'):
        """使用第三方打码平台"""
        # 平台列表(按推荐度排序)
        platforms = {
            '超级鹰': 'http://www.chaojiying.com/api/',
            '图鉴': 'http://www.ttshitu.com/',
            '联众': 'http://www.jsdati.com/',
            '2captcha': 'https://2captcha.com/',
        }
        
        # 以超级鹰为例
        def chaojiying_solve(img_bytes, soft_id='123456'):
            url = 'http://upload.chaojiying.net/Upload/Processing.php'
            data = {
                'user': 'your_username',
                'pass': 'your_password',
                'softid': soft_id,
                'codetype': '1004',  # 验证码类型代码
            }
            files = {'userfile': ('captcha.jpg', img_bytes)}
            
            response = requests.post(url, data=data, files=files)
            result = response.json()
            
            if result['err_no'] == 0:
                return result['pic_str']
            else:
                print(f"打码失败: {result['err_str']}")
                return None
        
        return chaojiying_solve(image_bytes)
    
    def bypass_captcha(self, session, url):
        """尝试绕过验证码"""
        strategies = [
            self.try_cookie_reuse,      # 复用已有cookie
            self.try_header_bypass,     # 修改请求头
            self.try_js_bypass,         # 执行JS绕过
            self.try_alternative_api,   # 找不需要验证码的API
        ]
        
        for strategy in strategies:
            result = strategy(session, url)
            if result:
                return result
        
        return None
    
    def try_cookie_reuse(self, session, url):
        """尝试复用cookie"""
        # 检查是否有历史cookie
        if session.cookies.get('login_token'):
            # 直接访问,看是否需要验证码
            response = session.get(url)
            if 'captcha' not in response.text:
                return response
        return None
    
    def try_alternative_api(self, session, url):
        """寻找替代接口"""
        # 有些网站有多个接口
        # 主站可能需要验证码,但API不需要
        
        # 尝试常见API端点
        endpoints = [
            url.replace('www.', 'api.'),
            url + '/api/data',
            url + '/json',
            url + '/v1/query',
        ]
        
        for endpoint in endpoints:
            try:
                response = session.get(endpoint, timeout=5)
                if response.status_code == 200:
                    return response
            except:
                continue
        
        return None

# 高级:机器学习方案
class ML_Captcha_Solver:
    """基于机器学习的验证码识别"""
    
    def __init__(self):
        self.models = {}
        
    def train_char_model(self, dataset_path):
        """训练字符验证码识别模型"""
        # 使用CNN网络
        import tensorflow as tf
        from tensorflow import keras
        
        # 数据加载和预处理
        # ...(实际需要大量标注数据)
        
        model = keras.Sequential([
            keras.layers.Conv2D(32, (3, 3), activation='relu', input_shape=(60, 160, 3)),
            keras.layers.MaxPooling2D((2, 2)),
            keras.layers.Conv2D(64, (3, 3), activation='relu'),
            keras.layers.MaxPooling2D((2, 2)),
            keras.layers.Flatten(),
            keras.layers.Dense(64, activation='relu'),
            keras.layers.Dense(4 * 36, activation='softmax')  # 4个字符,36种可能
        ])
        
        model.compile(optimizer='adam',
                     loss='categorical_crossentropy',
                     metrics=['accuracy'])
        
        # 训练(略)
        return model
    
    def solve_with_ai(self, image_bytes):
        """使用AI模型识别"""
        # 这里简化,实际需调用训练好的模型
        pass

# 实战:综合验证码处理流程
def handle_captcha_in_workflow():
    """在爬虫流程中处理验证码"""
    session = requests.Session()
    captcha_solver = CaptchaSolver()
    
    for attempt in range(3):  # 最多尝试3次
        # 1. 先尝试无验证码访问
        response = session.get('https://example.com/login')
        
        # 2. 检查是否有验证码
        if 'captcha' in response.text:
            # 提取验证码图片
            soup = BeautifulSoup(response.text, 'html.parser')
            captcha_img = soup.find('img', {'id': 'captcha_image'})
            
            if captcha_img:
                img_url = captcha_img['src']
                
                # 3. 识别验证码
                captcha_text = captcha_solver.solve_simple_captcha(img_url)
                
                if captcha_text:
                    # 4. 提交表单
                    login_data = {
                        'username': 'user',
                        'password': 'pass',
                        'captcha': captcha_text
                    }
                    
                    response = session.post('https://example.com/login', 
                                           data=login_data)
                    
                    if '登录成功' in response.text:
                        print(f"第{attempt+1}次尝试:验证码识别成功")
                        return session
                    else:
                        print(f"第{attempt+1}次尝试:验证码错误")
                else:
                    print("验证码识别失败")
            else:
                # 可能是滑块或点选验证码
                print("检测到复杂验证码,使用Selenium处理")
                return handle_complex_captcha_with_selenium()
        
        time.sleep(2)
    
    print("验证码处理失败")
    return None
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐