Python 在数据采集与预处理(ETL)领域的应用与实践
本文系统介绍了Python在ETL(数据提取-转换-加载)流程中的应用实践。文章重点阐述了数据采集阶段的多种来源(关系型数据库、REST API、消息队列等)及对应Python工具(如psycopg2、aiohttp、kafka-python),详细讲解了数据清洗转换的核心操作(类型转换、去重、缺失值处理等)及pandas实现方法,并提供了写入优化的具体策略。通过完整示例演示了从API获取数据到写
引言:
随着数据驱动决策成为主流,企业每天都要处理海量、多源、格式各异的数据。要把这些原始数据变成能被分析与建模使用的“干净数据”,就离不开 ETL(Extract-Transform-Load,提取—转换—加载)流程。Python 因其语法简洁、生态丰富以及与数据科学工具无缝衔接,成为 ETL 工作的首选语言之一。本文面向工程实践与初中级开发者,系统介绍 Python 在数据采集与预处理领域的常用方法、工具链、设计要点与工程示例,并提供可执行代码片段,帮助你把理论落到实处。
1、ETL 的任务与挑战
- 数据源多样:数据库、API、日志、消息队列、第三方文件等;
- 数据质量参差:缺失值、格式不一致、重复与异常值;
- 实时性与吞吐:有的场景需要低延迟(近实时),有的则做批量离线处理;
- 可扩展性和可靠性:如何在数据量增大时保持稳定;
- 可重复性与可观测性:任务可重跑、版本化、监控和告警。
2、Python 在 ETL 中的角色与生态概览
- 丰富的库:requests、aiohttp(网络请求)、pandas(数据处理)、pyarrow(高效 I/O)、sqlalchemy(数据库)、kafka-python、airflow(调度)等;
- 社区与文档:大量案例和工程实践参考;
- 可扩展性:C/Java 后端(pyarrow、numpy、pyspark)弥补性能瓶颈。
常见工具链(按职责)
- 数据采集:requests、aiohttp、Scrapy、tweepy、kafka-python
- 数据处理:pandas、numpy、pyarrow、polars、dask
- 分布式计算:PySpark、Dask、Ray
- 数据存储与格式:Parquet、ORC、CSV、JSON、Feather
- 调度与工程化:Airflow、Prefect、Dagster
- 序列化/传输:protobuf、avro、msgpack
- 监控与日志:Prometheus、Grafana、ELK
3、数据采集(Extract):常见源与方法
3.1 关系型数据库
通过 ODBC、JDBC(通过 SQLAlchemy)或数据库专用驱动(psycopg2、pymysql)提取。注意使用分批(pagination)或流式游标以避免内存暴涨。
示例:从 PostgreSQL 分批读取(psycopg2)
import psycopg2
conn = psycopg2.connect(dbname="db", user="u", password="p", host="host")
cur = conn.cursor(name='stream_cursor') # server-side cursor,避免一次读太多
cur.execute("SELECT id, ts, value FROM events WHERE ts >= %s AND ts < %s", (start, end))
batch_size = 10000
while True:
rows = cur.fetchmany(batch_size)
if not rows:
break
for r in rows:
process(r)
cur.close()
conn.close()
3.2 REST API 与爬虫
常遇到带限速的 API、分页、或 HTML 页面。requests 是同步工具,aiohttp 支持并发异步请求。Scrapy 更适合复杂的爬虫场景。
示例:使用 aiohttp 并发请求带分页 API
import asyncio
import aiohttp
async def fetch_page(session, url, params):
async with session.get(url, params=params) as resp:
resp.raise_for_status()
return await resp.json()
async def fetch_all(base_url, pages):
async with aiohttp.ClientSession() as session:
tasks = [fetch_page(session, base_url, {'page': p}) for p in range(1, pages+1)]
return await asyncio.gather(*tasks)
data = asyncio.run(fetch_all("https://api.example.com/items", 10))
3.3 流式来源(Kafka、日志、消息队列)
对实时 ETL,常用 kafka-python 或 confluent-kafka 消费消息。注意消费幂等、commit 策略与失败重试。
示例:使用 confluent-kafka 读取并批量写入
from confluent_kafka import Consumer
conf = {
'bootstrap.servers': 'broker:9092',
'group.id': 'etl_group',
'auto.offset.reset': 'earliest'
}
consumer = Consumer(conf)
consumer.subscribe(['events'])
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print("Consumer error:", msg.error())
continue
process(msg.value())
finally:
consumer.close()
3.4 文件系统与云存储(S3、HDFS)
大文件读取优先使用流式 I/O 或列式文件格式(Parquet)以节省内存和提高读取效率。pyarrow、s3fs、boto3 是常用工具。
示例:使用 pyarrow 读取 S3 上的 parquet
import pyarrow.parquet as pq
import s3fs
fs = s3fs.S3FileSystem()
dataset = pq.ParquetDataset('s3://mybucket/path/', filesystem=fs)
table = dataset.read()
df = table.to_pandas()
4、数据清洗与转换(Transform):核心操作与技巧
4.1 数据规范化与类型转换
- 统一字段命名、时区归一化、数据类型映射(字符串->时间、数值->浮点/整数)。
- 对时间戳使用明确的时区与格式:推荐使用 UTC 存储,按需展示时转换。
示例:pandas 时间转换与缺失值处理
import pandas as pd
df['ts'] = pd.to_datetime(df['ts'], utc=True) # 解析并转为 UTC
df['value'] = pd.to_numeric(df['value'], errors='coerce') # 非数字设为 NaN
df = df.dropna(subset=['value']) # 删除关键字段缺失
4.2 去重与主键合并
在分布式采集时重复数据常见。可通过主键 dedup(保留最新/最早)或合并逻辑(如按事件时间)。
示例:pandas groupby 保留最新记录
df = df.sort_values('ts').drop_duplicates(subset=['id'], keep='last')
4.3 缺失值与异常值处理
- 缺失值:删除、填充(均值、中位数、前向/后向填充)或插值;
- 异常值:基于 Domain Rule、Z-score、IQR、或更复杂的模型检测并处理(丢弃、截断或单独标记)。
示例:IQR 异常值检测
q1 = df['value'].quantile(0.25)
q3 = df['value'].quantile(0.75)
iqr = q3 - q1
lower, upper = q1 - 1.5*iqr, q3 + 1.5*iqr
df = df[(df['value'] >= lower) & (df['value'] <= upper)]
4.4 字段派生与特征工程(ETL 与建模衔接)
- 时间窗口特征(小时、星期几、是否工作日)、滚动统计(过去 N 天均值)、类别编码(one-hot、label、频次编码)等。
- 对高基数类别,可使用 target encoding、hashing trick 或 embedding 表示。
示例:基于 pandas 的滑动窗口平均(按分组)
df = df.sort_values('ts')
df['rolling_mean_1h'] = df.groupby('user_id')['value'].transform(lambda x: x.rolling('1h', on=df['ts']).mean())
4.5 批量/分块处理以控制内存
对于大数据集使用分块读取(pandas read_csv chunksize),或直接使用更高性能的工具(Dask、PySpark、Polars)。
示例:pandas 分块读取 CSV 并处理
import pandas as pd
chunks = pd.read_csv('big.csv', chunksize=100000)
for chunk in chunks:
process(chunk)
5、数据加载(Load):目标存储与性能优化
5.1 目标存储选择
- 数据仓库(Snowflake、Redshift、BigQuery):适合分析查询,通常配合批量加载或专用 connector。
- 数据湖(S3 + Parquet/Delta Lake):用于大规模存储与离线计算,支持列式压缩与分区。
- OLTP 数据库(Postgres、MySQL):用于在线应用,写入需考虑事务与索引代价。
- 时序数据库(InfluxDB、ClickHouse):高效的时序/分析型写入与查询场景。
5.2 写入优化
- 批量写入而非行写(使用 COPY、bulk insert、parquet 分块上传)。
- 按时间/字段分区(partitioning)以加速后续查询。
- 合理设置文件大小(parquet chunk)以避免大量小文件问题。
- 使用幂等写入或事务机制防止重复(upsert / merge 策略)。
示例:使用 pandas 将分块结果写入 Parquet 并上传 S3
import pyarrow as pa
import pyarrow.parquet as pq
import s3fs
fs = s3fs.S3FileSystem()
for i, chunk in enumerate(pd.read_csv('big.csv', chunksize=100000)):
table = pa.Table.from_pandas(chunk)
with fs.open(f's3://mybucket/part-{i}.parquet', 'wb') as f:
pq.write_table(table, f)
6、流式 ETL 与批处理的对比
- 批处理(Batch):高吞吐、相对简单、适合离线训练和报表;延迟通常较大(分钟到小时)。
- 流式(Streaming):低延迟、持续处理(near real-time);实现更复杂(乱序、状态管理、容错)。
Python 在流式场景常借助 Flink(PyFlink)、Kafka Streams(via Faust)、Beam 等框架,或在微批上用 Spark Structured Streaming。
7、架构模式与工程实践要点
7.1 可重复性与幂等性
- ETL 作业应可重跑而不产生重复或冲突:使用幂等写入(如 upsert、使用事务或版本化输出路径)。
7.2 良好的监控与可观测性 - 采集失败率、延迟、数据质量指标(缺失率、异常值率)、处理时长等需暴露并告警。
7.3 数据契约与 Schema 管理 - 使用 Schema Registry 或定义严格契约(字段、类型、nullability),避免下游突发中断。
7.4 任务调度与依赖 - 使用 Airflow/Prefect 做 DAG 调度与依赖管理,并结合 retries、告警与 SLA。
7.5 数据治理与血缘 - 记录数据血缘(从源到目标的变换链路)便于问题定位与审计。
8、代码示例(完整小型 ETL 演示)
下面给出一个较完整的示例:从一个分页 REST API 并发获取事件,进行清洗(时间解析、去重、缺失处理),并把结果批量写入本地 Parquet 文件。示例采用 aiohttp + pandas + pyarrow。
说明:
- API 模拟返回带分页的 JSON,每页若干事件,字段:id, timestamp (ISO), value(字符串可能为空)
- 我们会并发拉取页面,合并为 DataFrame,清洗并去重,然后分块写入 Parquet。
完整代码:
# etl_demo.py
import asyncio
import aiohttp
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from datetime import datetime, timezone
import os
API_BASE = "https://api.mockservice.com/events" # 假定
PAGES = 20
CONCURRENCY = 8
OUT_DIR = "out_parquet"
os.makedirs(OUT_DIR, exist_ok=True)
async def fetch_page(session, page):
params = {'page': page, 'page_size': 1000}
async with session.get(API_BASE, params=params) as resp:
resp.raise_for_status()
return await resp.json()
async def bounded_fetch(sem, session, page):
async with sem:
return await fetch_page(session, page)
async def fetch_all(pages):
sem = asyncio.Semaphore(CONCURRENCY)
async with aiohttp.ClientSession() as session:
tasks = [bounded_fetch(sem, session, p) for p in range(1, pages+1)]
results = await asyncio.gather(*tasks)
# Flatten list of pages
events = []
for page in results:
events.extend(page.get('items', []))
return events
def clean_and_transform(events):
# 转为 DataFrame
df = pd.DataFrame(events)
# 解析时间,统一为 UTC
df['timestamp'] = pd.to_datetime(df['timestamp'], utc=True, errors='coerce')
# value 转为数值
df['value'] = pd.to_numeric(df['value'], errors='coerce')
# 删除 timestamp 或 value 解析失败的行
df = df.dropna(subset=['timestamp', 'value'])
# 去重:保留最新一条(按 timestamp)
df = df.sort_values('timestamp').drop_duplicates(subset=['id'], keep='last')
return df
def write_parquet_in_chunks(df, out_dir, base_name='events', chunk_size=200000):
total = len(df)
for i, start in enumerate(range(0, total, chunk_size)):
chunk = df.iloc[start:start+chunk_size]
table = pa.Table.from_pandas(chunk)
out_path = os.path.join(out_dir, f"{base_name}-{i}.parquet")
pq.write_table(table, out_path)
print(f"Wrote {len(chunk)} rows to {out_path}")
def main():
events = asyncio.run(fetch_all(PAGES))
print(f"Fetched {len(events)} events")
df = clean_and_transform(events)
print(f"After cleaning: {len(df)} rows")
write_parquet_in_chunks(df, OUT_DIR)
if __name__ == "__main__":
main()
说明与可扩展点:
- 在实际生产场景,请替换 API 地址并增加错误重试(如使用 tenacity)与限速/backoff。
- 若数据量非常大,使用 pandas 可能内存受限,建议改为 Dask/Polars 或直接写入中间队列(如 Kafka)后用 Spark 做大规模处理。
- 写入目标可以替换为 S3(使用 s3fs)或数据库(使用 SQLAlchemy 的 bulk insert / COPY)。
9、总结与进阶方向
本文介绍了 Python 在 ETL(数据采集与预处理)领域的主流技术路径、常见问题与应对策略,并给出实用代码示例。归纳几条实践建议:
- 优先使用结构化、列式存储(Parquet)与 schema 来提升后续计算性能;
- 对实时性强的场景,评估是否需要流式框架(Flink/Beam)而非简单的 Python 脚本;
- 对高吞吐与大数据量,采用分布式解决方案(PySpark、Dask、Ray)或把 Python 与更高性能的后端(pyarrow、C++)结合;
- 做好可观测性、幂等性与数据血缘,这些工程细节往往决定系统能否长期稳定运行。
更多推荐
所有评论(0)