爬虫的基础用法
爬虫主要通过requests、BeautifulSoup和Selenium等库实现。requests用于发送HTTP请求,BeautifulSoup解析HTML内容,Selenium处理动态网页。基本流程包括:获取目标URL→发送请求→解析响应→提取数据→存储结果。反爬机制应对策略包括:轮换User-Agent、补全请求头、使用Session保持会话、智能控制请求频率等。示例展示三种方法:纯req
Python爬虫的基础用法
python爬虫一般通过第三方库进行完成,request等
方法一(request)
- 导入第三方库(如
import requests)requests用于处理http协议请求的第三方库,beautifulsoup4用来解析获取到的网页内容,用python解释器中查看是否有这个库,没有点击+安装 - 获取网站
url(url一定要解析正确,如在百度网站中的某个图片,不是指www.baidu.com,而是这个图片的具体地址(可以点击查看,亦可以用浏览器自带的检查功能进行寻找)) - 之后通过
requests.get打开网站,设置自己适用的encoding - 用
a = re.findall('<标签>正则表达式<标签>',文件名)通过HTML定义的标签,找到自己想要的内容,再赋值给一个变量a(并将获取到的数据以指定的数据类型存储到文件中) - 将文件内容使用
print输出进行查看
示例如下:
url = "http://weather.com.cn/weather1d/101010100.shtml#search"
resp = requests.get(url)
# 打开浏览器访问该地址
resp.encoding = 'UTF-8'
print(resp) # 返回访问代码 200表示成功 500 错误
# print(resp.text) # 展示网页 (html代码的形式展示网页)
city = re.findall('<span class="name">([\u4e00-\u9fa5]*)</span>', resp.text)
weather = re.findall('<span class="weather">([\u4e00-\u9fa5]*)</span>', resp.text)
# 以上赋值操作中的内容必须在resp.text中存在
lst = []
for a,b in zip(city, weather,): # 通过zip方式将爬取到的内容填充到list中
lst.append([a,b])
for i in lst:
print(i)
方法二(request和beautifulsoup)
使用request获取网页源码,beautifulsoup解析网站上的标签,从其中提取数据。
import requests
from bs4 import BeautifulSoup
# 获取网页
url = 'http://weather.com.cn/weather1d/101010100.shtml#search'
headers = {'User-Agent': 'network下的东西'}
response = requests.get(url, headers=headers)
print(response.status_code)
# 解析内容
soup = BeautifulSoup(response.text, 'html.parser')
title = soup.find_all("div",attrs={"class":"column_article_desc"})
with open('result.html', 'w', encoding='utf-8') as f:
for item in title:
f.write(item.string)
- beautifulsoup支持多种解析器:html.parser(python内置,无需额外安装)、lxml(需要额外安装,速度快,功能好)、html5lib(需要额外安装,容错性最好)
- beautifulsoup 能够通过标签名、属性、CSS选择器进行查找对应的元素 从元素中
- 提取对应的数据可以使用text属性代表所有文本,string仅代表当前标签的文本
# 通过标签名查找
soup.find('div') # 查找第一个 div
soup.find_all('div') # 查找所有 div
# 通过属性查找
soup.find(id='content')
soup.find(class_='article')
soup.find(attrs={'data-id': '123'})
# 通过 CSS 选择器查找
soup.select('div.content') # 类选择器
soup.select('#main') # ID 选择器
soup.select('div p a') # 后代选择器
soup.select('div > p') # 子元素选择器
# 获取文本
element = soup.find('h1')
print(element.text) # 获取所有文本(包括子标签)
print(element.get_text()) # 同 text
print(element.string) # 仅获取当前标签的文本
# 获取属性
link = soup.find('a')
print(link['href']) # 获取 href 属性
print(link.get('href')) # 安全获取属性
print(link.attrs) # 获取所有属性字典
# 获取多个元素
for item in soup.find_all('li'):
print(item.text)
方法三(request+webdirver)
WebDriver 是一个浏览器自动化工具,可以让你用代码控制浏览器,模拟真实用户的操作。它是 Selenium 的核心组件.。
WebDriver的使用方法:初始化Chrome()浏览器——>.get(url) ——>获取url的一些信息——>.quit()关闭浏览器
import requests
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
# 使用Selenium获取动态页面
url = 'http://weather.com.cn/weather1d/101010100.shtml#search'
# 1. 设置Selenium选项
options = webdriver.ChromeOptions()
options.add_argument('--headless') # 无头模式,不显示浏览器窗口
options.add_argument('--disable-blink-features=AutomationControlled') # 防止被检测
# 2. 创建浏览器驱动
driver = webdriver.Chrome(options=options)
try:
# 3. 访问网页
driver.get(url)
print(f"访问页面: {url}")
# 4. 等待页面加载(等待某个元素出现)
wait = WebDriverWait(driver, 10) # 最多等10秒
# 这里根据实际网页调整,等待天气数据相关的元素
# 示例:等待温度元素出现(需要根据实际网页修改选择器)
# wait.until(EC.presence_of_element_located((By.CLASS_NAME, "tem")))
# 如果不知道具体元素,简单等待几秒
time.sleep(3)
# 5. 获取渲染后的页面源代码
page_html = driver.page_source
print("获取页面成功")
# 6. 保存到文件查看
with open('result_selenium.html', 'w', encoding='utf-8') as f:
f.write(page_html)
print("已保存到 result_selenium.html")
# 7. 也可以用BeautifulSoup解析
from bs4 import BeautifulSoup
soup = BeautifulSoup(page_html, 'html.parser')
# 查找天气数据(根据实际网页调整)
# 查看保存的HTML文件,找到正确的选择器
weather_items = soup.find_all("div", class_="t")
# 输出找到的内容
if weather_items:
print(f"找到 {len(weather_items)} 个天气元素")
for item in weather_items[:3]: # 只显示前3个
print(item.text[:100]) # 只显示前100字符
else:
print("未找到天气元素,请检查选择器")
# 也可以尝试其他常见的选择器
all_divs = soup.find_all("div", limit=20)
print("前20个div的class:")
for div in all_divs:
if div.get('class'):
print(f" class: {div.get('class')}")
finally:
# 8. 关闭浏览器
driver.quit()
print("浏览器已关闭")
反爬机制
1、基础检测类
| 反爬机制 | 应对方法 | 代码示例 |
|---|---|---|
| User-Agent检测 | 轮换UA池 | headers = {‘User-Agent’: random.choice(ua_list)} |
| 请求头完整性 | 补全常用headers | headers = { ‘Accept’: ‘text/html,…’, ‘Accept-Language’: ‘zh-CN,zh;q=0.9’, ‘Connection’: ‘keep-alive’ } |
| Cookie验证 | Session保持 | session = requests.Session() |
2、频率限制类
原因:网站通过检测单位时间内的请求频率来判断是否是机器行为(因为人类浏览有随即延迟,而程序请求通常规律性比较高)
如何解决:分时段策略,模仿人类的作息时间;越是深度的页面停留的越久;模拟人访问时候的随即延迟
class IntelligentRateLimiter:
"""智能频率控制器"""
def __init__(self):
self.request_history = []
self.min_delay = 1.0 # 最小延迟
self.max_delay = 5.0 # 最大延迟
def human_like_delay(self):
"""模拟人类随机延迟"""
# 1. 正态分布延迟(中心在2-3秒)
delay = random.normalvariate(2.5, 0.8)
delay = max(self.min_delay, min(delay, self.max_delay))
# 2. 随机添加思考时间(10%概率长时间停留)
if random.random() < 0.1:
delay += random.uniform(3, 10)
time.sleep(delay)
return delay
def adaptive_throttling(self, response):
"""根据响应自适应调整"""
if response.status_code == 429: # Too Many Requests
# 指数退避
wait_time = 60 * (2 ** len(self.request_history))
print(f"触发限制,等待{wait_time}秒")
time.sleep(wait_time)
# 动态调整频率
if "slow_down" in response.headers.get('Retry-After', ''):
self.min_delay += 0.5
self.max_delay += 1.0
# 使用策略
limiter = IntelligentRateLimiter()
for page in range(100):
limiter.human_like_delay() # 关键:随机延迟
response = requests.get(url)
limiter.adaptive_throttling(response)
3、IP限制类
原因:IP是网络身份的核心标识,限制IP是最直接最有效的方法(黑白名单机制/阈值限制/信誉评分系统)。限制IP的封禁级别一般有:临时封禁(几分钟-几小时)/长期封禁(几天)/永久封禁(加入黑名单)
解决策略:使用代理轮询策略/地理分散策略
class SmartProxyManager:
"""智能代理管理器"""
def __init__(self):
self.proxy_pool = []
self.proxy_stats = {} # 代理使用统计
self.current_proxy = None
self.init_proxy_sources()
def init_proxy_sources(self):
"""初始化代理源"""
# 1. 免费代理(测试用)
free_proxies = [
'http://free-proxy1.com:8080',
'http://free-proxy2.com:3128'
]
# 2. 付费代理API
paid_proxies_api = 'http://paid-service.com/get_proxy'
# 3. 自建代理池(推荐)
# 使用squid/3proxy搭建
self.proxy_pool.extend(free_proxies)
def rotate_proxy(self):
"""轮换代理策略"""
# 策略1:按权重选择(成功率高优先)
if self.proxy_stats:
working_proxies = [p for p, s in self.proxy_stats.items()
if s.get('success_rate', 0) > 0.7]
if working_proxies:
self.current_proxy = random.choice(working_proxies)
return
# 策略2:轮询选择
if self.proxy_pool:
self.current_proxy = random.choice(self.proxy_pool)
else:
self.current_proxy = None # 直连
def validate_proxy(self, proxy_url):
"""验证代理有效性"""
test_urls = [
'http://httpbin.org/ip',
'http://icanhazip.com',
'https://api.ipify.org?format=json'
]
for test_url in test_urls:
try:
response = requests.get(test_url,
proxies={'http': proxy_url, 'https': proxy_url},
timeout=5)
if response.status_code == 200:
print(f"代理 {proxy_url} 可用,IP: {response.text[:50]}")
return True
except:
continue
return False
def residential_proxy_strategy(self):
"""住宅代理策略(最难检测)"""
# 住宅代理特点:
# 1. 真实用户IP,有浏览历史
# 2. IP段分散,无规律
# 3. 可配合真实浏览器指纹
# 获取住宅代理(如Luminati、SmartProxy)
residential_api = "https://residential-proxy-provider.com/get"
response = requests.get(residential_api)
proxy_data = response.json()
# 格式:username:password@host:port
proxy = f"http://{proxy_data['user']}:{proxy_data['pass']}@{proxy_data['host']}:{proxy_data['port']}"
return proxy
# 使用示例
proxy_manager = SmartProxyManager()
session = requests.Session()
for i in range(100):
proxy_manager.rotate_proxy()
if proxy_manager.current_proxy:
session.proxies = {'http': proxy_manager.current_proxy,
'https': proxy_manager.current_proxy}
# 添加代理验证头(有些服务需要)
session.headers.update({
'Proxy-Authorization': 'Basic ...',
'X-Forwarded-For': generate_random_ip() # 伪造XFF头
})
response = session.get(target_url)
4、动态内容类
原因:大量四用JS动态渲染内容,导致初始的HTML为空。例如:无限滚动页面/单页应用/异步分页加载;
解决:直接调用API;分析网络请求(用Charles/Fiddler抓包);模拟完整流程(包含页面交互);处理加密参数(找到signature/token生成逻辑)
# 方法1:直接调用API(最优解)
def extract_api_urls(page_url):
"""分析页面找到数据API"""
# 步骤1:人工分析
# 1. F12打开开发者工具
# 2. Network面板 -> XHR/Fetch过滤
# 3. 查看数据返回的请求
# 步骤2:自动化分析
response = requests.get(page_url)
# 查找可能的API端点
import re
api_patterns = [
r'api/v\d+/data', # REST API
r'graphql', # GraphQL端点
r'\.json\?', # JSON数据
r'loadMore|getList|fetchData', # 数据加载函数
]
api_endpoints = []
for pattern in api_patterns:
matches = re.findall(pattern, response.text)
api_endpoints.extend(matches)
return list(set(api_endpoints))
# 方法2:无头浏览器方案
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException
class DynamicContentCrawler:
def __init__(self, headless=True):
options = webdriver.ChromeOptions()
if headless:
options.add_argument('--headless')
options.add_argument('--disable-blink-features=AutomationControlled')
options.add_experimental_option("excludeSwitches", ["enable-automation"])
options.add_experimental_option('useAutomationExtension', False)
self.driver = webdriver.Chrome(options=options)
# 注入JS修改navigator属性
self.driver.execute_script("""
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
});
""")
def wait_for_element(self, selector, timeout=10):
"""等待元素加载"""
try:
element = WebDriverWait(self.driver, timeout).until(
EC.presence_of_element_located((By.CSS_SELECTOR, selector))
)
return element
except TimeoutException:
print(f"元素 {selector} 加载超时")
return None
def scroll_to_load(self, scroll_times=5):
"""模拟滚动加载更多内容"""
for i in range(scroll_times):
# 滚动到页面底部
self.driver.execute_script(
"window.scrollTo(0, document.body.scrollHeight);"
)
# 等待新内容加载
time.sleep(random.uniform(1, 3))
# 检查是否有"加载更多"按钮
load_more_buttons = self.driver.find_elements(
By.XPATH, "//button[contains(text(), '加载更多') or contains(text(), 'Load More')]"
)
if load_more_buttons:
load_more_buttons[0].click()
time.sleep(2)
def extract_data(self, url):
"""提取动态内容"""
self.driver.get(url)
# 等待主要内容加载
self.wait_for_element(".content", timeout=15)
# 滚动加载
self.scroll_to_load(scroll_times=3)
# 获取完整页面源码
page_source = self.driver.page_source
# 或者直接执行JS获取数据
data = self.driver.execute_script("""
// 直接访问JS中的全局变量
if (window.__INITIAL_STATE__) {
return window.__INITIAL_STATE__;
}
// 或者从DOM中提取
var items = [];
document.querySelectorAll('.item').forEach(function(el) {
items.push({
title: el.querySelector('.title').innerText,
price: el.querySelector('.price').innerText
});
});
return items;
""")
return data
def close(self):
self.driver.quit()
# 方法3:轻量级方案 - requests-html
from requests_html import HTMLSession
session = HTMLSession()
r = session.get('https://example.com')
# 执行JS(比Selenium轻量)
r.html.render(sleep=2, timeout=20)
# 提取渲染后的内容
items = r.html.find('.product-item')
for item in items:
print(item.text)
# 方法4:逆向工程JS(高级)
def decrypt_js_data(encrypted_data):
"""解密JS加密的数据"""
# 步骤1:找到JS加密函数
# 步骤2:用Python重写或execjs调用
import execjs
# 读取JS解密函数
with open('decrypt.js', 'r') as f:
js_code = f.read()
# 创建JS环境
ctx = execjs.compile(js_code)
# 执行解密
result = ctx.call('decryptFunction', encrypted_data)
return result
5、JS挑战类
原因:JS挑战类是通过环境检测(navigator属性)、代码混淆、行为验证(鼠标点击/移动)、加密通信(参数加密/每次不同)。典型的有Cloudflare 5秒盾;Akamai Bot Manager;PerimeterX等
处理:使用cloudscraper库绕过Cloudflare盾;修改WebDriver属性欺骗浏览器;模拟人的随机鼠标移动行为等
class JSAntiAntiSpider:
"""对抗JS挑战的策略集合"""
def __init__(self):
self.js_context = None
def bypass_cloudflare(self, url):
"""绕过Cloudflare 5秒盾"""
# 方法1:使用cloudscraper库
import cloudscraper
scraper = cloudscraper.create_scraper()
response = scraper.get(url)
return response.text
# 方法2:手动处理
# Cloudflare会返回一个计算挑战的JS
# 需要解析JS并计算结果
def fingerprint_spoofing(self, driver):
"""欺骗浏览器指纹检测"""
# 修改WebDriver属性
driver.execute_script("""
// 修改navigator属性
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
});
// 修改plugins
Object.defineProperty(navigator, 'plugins', {
get: () => [1, 2, 3, 4, 5]
});
// 修改languages
Object.defineProperty(navigator, 'languages', {
get: () => ['zh-CN', 'zh', 'en']
});
// 修改屏幕属性
Object.defineProperty(screen, 'width', {
get: () => 1920
});
Object.defineProperty(screen, 'height', {
get: () => 1080
});
""")
# 添加更多指纹属性
fingerprint_js = """
window.chrome = {
runtime: {},
loadTimes: function() {},
csi: function() {},
app: {}
};
"""
driver.execute_script(fingerprint_js)
def solve_js_challenge(self, challenge_js):
"""解析并解决JS挑战"""
# 方法1:使用nodejs执行
import subprocess
with open('challenge.js', 'w') as f:
f.write(challenge_js + '\nconsole.log(solve());')
result = subprocess.run(['node', 'challenge.js'],
capture_output=True, text=True)
return result.stdout.strip()
# 方法2:Python模拟JS逻辑
# 分析JS代码,提取算法逻辑
# 用Python重写
def handle_obfuscated_js(self, js_code):
"""处理混淆的JS代码"""
# 步骤1:去混淆(有限)
# 简单替换
deobfuscated = js_code
replacements = {
'_0xabc123': 'document',
'_0xdef456': 'getElementById'
}
for old, new in replacements.items():
deobfuscated = deobfuscated.replace(old, new)
# 步骤2:使用AST分析
# 需要esprima等库
return deobfuscated
def simulate_human_behavior(self, driver):
"""模拟人类行为模式"""
# 1. 随机鼠标移动
from selenium.webdriver.common.action_chains import ActionChains
actions = ActionChains(driver)
# 生成随机移动路径
for i in range(random.randint(5, 10)):
x = random.randint(0, 1000)
y = random.randint(0, 700)
actions.move_by_offset(x, y)
time.sleep(random.uniform(0.1, 0.3))
actions.perform()
# 2. 随机滚动
scroll_script = """
window.scrollTo({
top: %d,
behavior: 'smooth'
});
""" % random.randint(0, 2000)
driver.execute_script(scroll_script)
# 3. 随机点击(非必要元素)
if random.random() < 0.3:
elements = driver.find_elements(By.TAG_NAME, 'a')
if elements:
random.choice(elements).click()
time.sleep(random.uniform(1, 3))
driver.back()
def decrypt_api_params(self, url_pattern):
"""解密API参数生成逻辑"""
# 常见参数加密方式:
# 1. timestamp + token
# 2. sign = md5(参数排序+密钥)
# 3. 动态cookie
import hashlib
import time
timestamp = int(time.time() * 1000)
# 生成signature(需逆向分析JS)
params = {
'page': 1,
'size': 20,
't': timestamp
}
# 模拟sign生成(需根据实际JS调整)
param_str = '&'.join([f'{k}={v}' for k, v in sorted(params.items())])
secret = '网站密钥(需逆向)'
sign = hashlib.md5((param_str + secret).encode()).hexdigest()
params['sign'] = sign
return params
# 使用专用库
def use_undetected_chromedriver():
"""使用undetected-chromedriver绕过检测"""
import undetected_chromedriver as uc
options = uc.ChromeOptions()
options.add_argument('--disable-blink-features=AutomationControlled')
driver = uc.Chrome(options=options)
driver.get('https://nowsecure.nl') # 著名的反爬测试网站
# 检查是否成功绕过
if "you are not a bot" in driver.page_source:
print("成功绕过检测!")
return driver
6、验证码处理
- 简单字符验证码:OCR识别(ddddocr > tesseract)
- 滑块验证码:计算缺口位置 + 人类轨迹模拟
- 点选验证码:目标检测模型(需训练)
- 无感验证:全面模拟浏览器环境和人类行为
- 终极方案:第三方打码平台(成本换时间)
class CaptchaSolver:
"""验证码解决方案集合"""
def __init__(self):
self.ocr_cache = {} # 缓存识别结果
def solve_simple_captcha(self, image_url_or_bytes):
"""解决简单字符验证码"""
# 方法1:使用ddddocr(推荐)
try:
import ddddocr
ocr = ddddocr.DdddOcr()
if isinstance(image_url_or_bytes, str):
# 下载图片
response = requests.get(image_url_or_bytes)
img_bytes = response.content
else:
img_bytes = image_url_or_bytes
result = ocr.classification(img_bytes)
return result
except ImportError:
pass
# 方法2:使用tesseract(需训练)
try:
import pytesseract
from PIL import Image, ImageFilter
# 图像预处理
img = Image.open(io.BytesIO(img_bytes))
img = img.convert('L') # 灰度化
img = img.filter(ImageFilter.MedianFilter()) # 中值滤波
# 二值化
threshold = 150
img = img.point(lambda x: 0 if x < threshold else 255)
result = pytesseract.image_to_string(img,
config='--psm 7 --oem 3')
return result.strip()
except:
pass
# 方法3:使用打码平台API
return self.use_captcha_service(img_bytes)
def solve_slide_captcha(self, bg_url, slide_url):
"""解决滑块验证码"""
# 步骤1:下载背景图和滑块图
bg_img = self.download_image(bg_url)
slide_img = self.download_image(slide_url)
# 步骤2:计算滑动距离
distance = self.calculate_slide_distance(bg_img, slide_img)
# 步骤3:生成人类滑动轨迹
track = self.generate_human_track(distance)
return track
def calculate_slide_distance(self, bg_img, slide_img):
"""计算滑块需要滑动的距离"""
# 方法1:模板匹配
import cv2
import numpy as np
# 转换图像
bg_gray = cv2.cvtColor(bg_img, cv2.COLOR_BGR2GRAY)
slide_gray = cv2.cvtColor(slide_img, cv2.COLOR_BGR2GRAY)
# 模板匹配
result = cv2.matchTemplate(bg_gray, slide_gray,
cv2.TM_CCOEFF_NORMED)
# 获取最佳匹配位置
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
# 返回x坐标(距离)
return max_loc[0]
def generate_human_track(self, distance):
"""生成人类滑动轨迹"""
# 人类滑动特征:
# 1. 先快后慢
# 2. 有微小抖动
# 3. 最后可能回拉一点
track = []
current = 0
t = 0
# 分段滑动
segments = [
(0.3, 0.6), # 初始加速段
(0.4, 0.3), # 匀速段
(0.3, 0.1), # 减速段
]
for segment_ratio, speed_ratio in segments:
segment_distance = distance * segment_ratio
segment_time = random.uniform(0.5, 1.0)
while current < distance * sum(s[0] for s in segments[:segments.index((segment_ratio, speed_ratio)) + 1]):
# 计算这一步的位移
step = random.uniform(1, 3) * speed_ratio
# 添加随机抖动
if random.random() < 0.1:
step *= random.uniform(0.5, 1.5)
current += step
t += random.uniform(0.01, 0.05)
track.append({
'x': int(current),
'y': random.randint(-2, 2), # Y轴微小偏移
't': int(t * 1000) # 毫秒
})
# 最后可能有一点过冲和回拉
if random.random() < 0.3:
overshoot = random.randint(3, 10)
track.append({'x': int(current + overshoot), 'y': 0, 't': int(t * 1000 + 50)})
track.append({'x': int(current), 'y': 0, 't': int(t * 1000 + 100)})
return track
def solve_click_captcha(self, image_url, prompt):
"""解决点选验证码"""
# 提示如:"点击图中的公交车"
# 方法1:使用目标检测模型
try:
import torch
from PIL import Image
# 加载预训练模型(需提前训练)
model = torch.hub.load('ultralytics/yolov5', 'custom',
path='click_captcha_model.pt')
img = Image.open(io.BytesIO(requests.get(image_url).content))
results = model(img)
# 解析结果,返回坐标
points = []
for *box, conf, cls in results.xyxy[0]:
if conf > 0.5:
x_center = (box[0] + box[2]) / 2
y_center = (box[1] + box[3]) / 2
points.append((x_center, y_center))
return points[:4] # 通常点选4个位置
except:
pass
# 方法2:使用打码平台
return self.use_click_captcha_service(image_url, prompt)
def use_captcha_service(self, image_bytes, captcha_type='common'):
"""使用第三方打码平台"""
# 平台列表(按推荐度排序)
platforms = {
'超级鹰': 'http://www.chaojiying.com/api/',
'图鉴': 'http://www.ttshitu.com/',
'联众': 'http://www.jsdati.com/',
'2captcha': 'https://2captcha.com/',
}
# 以超级鹰为例
def chaojiying_solve(img_bytes, soft_id='123456'):
url = 'http://upload.chaojiying.net/Upload/Processing.php'
data = {
'user': 'your_username',
'pass': 'your_password',
'softid': soft_id,
'codetype': '1004', # 验证码类型代码
}
files = {'userfile': ('captcha.jpg', img_bytes)}
response = requests.post(url, data=data, files=files)
result = response.json()
if result['err_no'] == 0:
return result['pic_str']
else:
print(f"打码失败: {result['err_str']}")
return None
return chaojiying_solve(image_bytes)
def bypass_captcha(self, session, url):
"""尝试绕过验证码"""
strategies = [
self.try_cookie_reuse, # 复用已有cookie
self.try_header_bypass, # 修改请求头
self.try_js_bypass, # 执行JS绕过
self.try_alternative_api, # 找不需要验证码的API
]
for strategy in strategies:
result = strategy(session, url)
if result:
return result
return None
def try_cookie_reuse(self, session, url):
"""尝试复用cookie"""
# 检查是否有历史cookie
if session.cookies.get('login_token'):
# 直接访问,看是否需要验证码
response = session.get(url)
if 'captcha' not in response.text:
return response
return None
def try_alternative_api(self, session, url):
"""寻找替代接口"""
# 有些网站有多个接口
# 主站可能需要验证码,但API不需要
# 尝试常见API端点
endpoints = [
url.replace('www.', 'api.'),
url + '/api/data',
url + '/json',
url + '/v1/query',
]
for endpoint in endpoints:
try:
response = session.get(endpoint, timeout=5)
if response.status_code == 200:
return response
except:
continue
return None
# 高级:机器学习方案
class ML_Captcha_Solver:
"""基于机器学习的验证码识别"""
def __init__(self):
self.models = {}
def train_char_model(self, dataset_path):
"""训练字符验证码识别模型"""
# 使用CNN网络
import tensorflow as tf
from tensorflow import keras
# 数据加载和预处理
# ...(实际需要大量标注数据)
model = keras.Sequential([
keras.layers.Conv2D(32, (3, 3), activation='relu', input_shape=(60, 160, 3)),
keras.layers.MaxPooling2D((2, 2)),
keras.layers.Conv2D(64, (3, 3), activation='relu'),
keras.layers.MaxPooling2D((2, 2)),
keras.layers.Flatten(),
keras.layers.Dense(64, activation='relu'),
keras.layers.Dense(4 * 36, activation='softmax') # 4个字符,36种可能
])
model.compile(optimizer='adam',
loss='categorical_crossentropy',
metrics=['accuracy'])
# 训练(略)
return model
def solve_with_ai(self, image_bytes):
"""使用AI模型识别"""
# 这里简化,实际需调用训练好的模型
pass
# 实战:综合验证码处理流程
def handle_captcha_in_workflow():
"""在爬虫流程中处理验证码"""
session = requests.Session()
captcha_solver = CaptchaSolver()
for attempt in range(3): # 最多尝试3次
# 1. 先尝试无验证码访问
response = session.get('https://example.com/login')
# 2. 检查是否有验证码
if 'captcha' in response.text:
# 提取验证码图片
soup = BeautifulSoup(response.text, 'html.parser')
captcha_img = soup.find('img', {'id': 'captcha_image'})
if captcha_img:
img_url = captcha_img['src']
# 3. 识别验证码
captcha_text = captcha_solver.solve_simple_captcha(img_url)
if captcha_text:
# 4. 提交表单
login_data = {
'username': 'user',
'password': 'pass',
'captcha': captcha_text
}
response = session.post('https://example.com/login',
data=login_data)
if '登录成功' in response.text:
print(f"第{attempt+1}次尝试:验证码识别成功")
return session
else:
print(f"第{attempt+1}次尝试:验证码错误")
else:
print("验证码识别失败")
else:
# 可能是滑块或点选验证码
print("检测到复杂验证码,使用Selenium处理")
return handle_complex_captcha_with_selenium()
time.sleep(2)
print("验证码处理失败")
return None
更多推荐



所有评论(0)