『宝藏代码胶囊开张啦!』—— 我的 CodeCapsule 来咯!✨写代码不再头疼!我的新站点 CodeCapsule 主打一个 “白菜价”+“量身定制”!无论是卡脖子的毕设/课设/文献复现,需要灵光一现的算法改进,还是想给项目加个“外挂”,这里都有便宜又好用的代码方案等你发现!低成本,高适配,助你轻松通关!速来围观 👉 CodeCapsule官网

Python在数据工程中的角色:Airflow和Pandas实践

1. 数据工程概述与Python的地位

1.1 数据工程的演变与重要性

数据工程作为数据科学生态系统中的关键支柱,已经从传统的ETL(提取、转换、加载)流程演变为复杂的数据流水线管理。根据2024年数据工程现状报告,全球数据量预计将达到175ZB,而有效管理和处理这些数据的需求推动了数据工程技术的快速发展。

现代数据工程的核心挑战

  • 数据量呈指数级增长
  • 实时数据处理需求增加
  • 数据质量与一致性要求提高
  • 复杂的数据源集成
  • 合规性与安全性考量

1.2 Python在数据工程中的优势

Python凭借其丰富的生态系统和简洁的语法,已成为数据工程领域的主流语言:

class PythonDataEngineeringAdvantages:
    """Python在数据工程中的优势分析"""
    
    def __init__(self):
        self.advantages = {
            "丰富的库生态系统": ["Pandas", "PySpark", "Airflow", "Dask"],
            "易于学习与使用": ["简洁语法", "丰富文档", "强大社区"],
            "与其他技术栈集成": ["云服务", "数据库", "消息队列"],
            "性能优化能力": ["C扩展", "并行处理", "内存优化"]
        }
    
    def calculate_adoption_rate(self, year):
        """计算Python在数据工程中的采用率"""
        # 基于行业报告的趋势数据
        adoption_rates = {
            2020: 0.65,
            2021: 0.72,
            2022: 0.78,
            2023: 0.83,
            2024: 0.87
        }
        return adoption_rates.get(year, 0.90)
    
    def analyze_skill_demand(self):
        """分析技能需求趋势"""
        skills_demand = {
            "Pandas": {"需求度": 9.2, "增长率": 0.15},
            "Airflow": {"需求度": 8.8, "增长率": 0.22},
            "PySpark": {"需求度": 8.5, "增长率": 0.18},
            "SQL": {"需求度": 9.5, "增长率": 0.08},
            "Docker": {"需求度": 8.0, "增长率": 0.25}
        }
        return skills_demand

# 优势分析示例
analyzer = PythonDataEngineeringAdvantages()
print("=== Python在数据工程中的采用率 ===")
for year in range(2020, 2025):
    rate = analyzer.calculate_adoption_rate(year)
    print(f"{year}年: {rate:.1%}")

skills_demand = analyzer.analyze_skill_demand()
print("\n=== 技能需求分析 ===")
for skill, metrics in skills_demand.items():
    print(f"{skill}: 需求度{metrics['需求度']}/10, 年增长率{metrics['增长率']:.1%}")

2. Pandas:数据处理的利器

2.1 Pandas核心数据结构与操作

Pandas提供了两种核心数据结构:Series和DataFrame,它们是数据处理的基石。

import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

class AdvancedDataProcessor:
    """高级数据处理工具类"""
    
    def __init__(self):
        self.data_quality_metrics = {}
    
    def create_sample_dataset(self, num_records=10000):
        """创建模拟数据集用于演示"""
        np.random.seed(42)
        
        dates = pd.date_range(start='2024-01-01', periods=num_records, freq='H')
        
        data = {
            'timestamp': dates,
            'user_id': np.random.randint(1000, 9999, num_records),
            'product_category': np.random.choice(['Electronics', 'Clothing', 'Books', 'Home', 'Sports'], num_records),
            'sales_amount': np.random.exponential(100, num_records),
            'quantity': np.random.poisson(3, num_records),
            'region': np.random.choice(['North', 'South', 'East', 'West'], num_records),
            'is_returned': np.random.choice([True, False], num_records, p=[0.05, 0.95])
        }
        
        # 故意添加一些缺失值和异常值
        df = pd.DataFrame(data)
        df.loc[df.sample(frac=0.05).index, 'sales_amount'] = np.nan
        df.loc[df.sample(frac=0.02).index, 'quantity'] = 1000  # 异常值
        
        return df
    
    def comprehensive_data_cleaning(self, df):
        """综合数据清洗流程"""
        print("开始数据清洗...")
        print(f"原始数据形状: {df.shape}")
        
        # 1. 处理缺失值
        missing_report = df.isnull().sum()
        print(f"\n缺失值统计:\n{missing_report}")
        
        # 对数值列使用中位数填充
        numeric_cols = df.select_dtypes(include=[np.number]).columns
        for col in numeric_cols:
            if df[col].isnull().sum() > 0:
                df[col].fillna(df[col].median(), inplace=True)
        
        # 2. 处理异常值
        def cap_outliers(series, lower_quantile=0.01, upper_quantile=0.99):
            lower_bound = series.quantile(lower_quantile)
            upper_bound = series.quantile(upper_quantile)
            return series.clip(lower=lower_bound, upper=upper_bound)
        
        df['sales_amount'] = cap_outliers(df['sales_amount'])
        df['quantity'] = cap_outliers(df['quantity'])
        
        # 3. 数据类型优化
        df['user_id'] = df['user_id'].astype('category')
        df['product_category'] = df['product_category'].astype('category')
        df['region'] = df['region'].astype('category')
        
        print(f"清洗后数据形状: {df.shape}")
        return df
    
    def advanced_analytics(self, df):
        """高级数据分析"""
        print("\n=== 高级数据分析 ===")
        
        # 1. 时间序列分析
        df['hour'] = df['timestamp'].dt.hour
        df['day_of_week'] = df['timestamp'].dt.day_name()
        
        # 2. 销售趋势分析
        hourly_sales = df.groupby('hour')['sales_amount'].agg(['mean', 'sum', 'count'])
        
        # 3. 类别分析
        category_analysis = df.groupby('product_category').agg({
            'sales_amount': ['sum', 'mean', 'count'],
            'quantity': 'sum',
            'user_id': 'nunique'
        }).round(2)
        
        # 4. 区域表现分析
        region_performance = df.groupby('region').agg({
            'sales_amount': ['sum', 'mean'],
            'user_id': 'nunique'
        })
        
        return {
            'hourly_sales': hourly_sales,
            'category_analysis': category_analysis,
            'region_performance': region_performance
        }

# 演示Pandas数据处理
processor = AdvancedDataProcessor()
df = processor.create_sample_dataset(5000)
print("=== 原始数据集样本 ===")
print(df.head())
print(f"\n数据集信息:")
print(df.info())

# 数据清洗
cleaned_df = processor.comprehensive_data_cleaning(df)

# 高级分析
analytics_results = processor.advanced_analytics(cleaned_df)

print("\n=== 销售时间趋势 ===")
print(analytics_results['hourly_sales'].head())

print("\n=== 产品类别分析 ===")
print(analytics_results['category_analysis'])

2.2 性能优化与内存管理

处理大规模数据时,性能优化至关重要:

class PandasPerformanceOptimizer:
    """Pandas性能优化工具"""
    
    def __init__(self):
        self.memory_usage_log = []
    
    def analyze_memory_usage(self, df):
        """分析内存使用情况"""
        memory_usage = df.memory_usage(deep=True)
        total_memory = memory_usage.sum() / 1024**2  # 转换为MB
        
        print(f"总内存使用: {total_memory:.2f} MB")
        print("\n各列内存使用:")
        for col in df.columns:
            col_memory = df[col].memory_usage(deep=True) / 1024**2
            print(f"  {col}: {col_memory:.2f} MB")
        
        return total_memory
    
    def optimize_dataframe(self, df):
        """优化DataFrame内存使用"""
        print("\n开始内存优化...")
        initial_memory = self.analyze_memory_usage(df)
        
        # 优化数值列
        numeric_cols = df.select_dtypes(include=[np.number]).columns
        for col in numeric_cols:
            if df[col].dtype == 'float64':
                df[col] = pd.to_numeric(df[col], downcast='float')
            elif df[col].dtype == 'int64':
                df[col] = pd.to_numeric(df[col], downcast='integer')
        
        # 优化类别列
        categorical_cols = df.select_dtypes(include=['object']).columns
        for col in categorical_cols:
            if df[col].nunique() / len(df) < 0.5:  # 唯一值比例小于50%
                df[col] = df[col].astype('category')
        
        optimized_memory = self.analyze_memory_usage(df)
        savings = initial_memory - optimized_memory
        print(f"\n内存节省: {savings:.2f} MB ({savings/initial_memory*100:.1f}%)")
        
        return df
    
    def efficient_groupby_operations(self, df):
        """高效的分组操作"""
        print("\n=== 高效分组操作演示 ===")
        
        # 方法1: 标准分组(较慢)
        import time
        start_time = time.time()
        standard_result = df.groupby(['product_category', 'region']).agg({
            'sales_amount': ['sum', 'mean'],
            'quantity': 'sum'
        })
        standard_time = time.time() - start_time
        
        # 方法2: 优化分组(较快)
        start_time = time.time()
        
        # 预先过滤和准备数据
        relevant_cols = ['product_category', 'region', 'sales_amount', 'quantity']
        optimized_df = df[relevant_cols].copy()
        
        # 使用命名聚合
        optimized_result = optimized_df.groupby(['product_category', 'region']).agg(
            total_sales=('sales_amount', 'sum'),
            avg_sales=('sales_amount', 'mean'),
            total_quantity=('quantity', 'sum')
        )
        optimized_time = time.time() - start_time
        
        print(f"标准方法时间: {standard_time:.4f}秒")
        print(f"优化方法时间: {optimized_time:.4f}秒")
        print(f"性能提升: {(standard_time-optimized_time)/standard_time*100:.1f}%")
        
        return optimized_result

# 性能优化演示
optimizer = PandasPerformanceOptimizer()
optimized_df = optimizer.optimize_dataframe(cleaned_df)
groupby_results = optimizer.efficient_groupby_operations(optimized_df)

3. Apache Airflow:工作流编排引擎

3.1 Airflow核心概念与架构

Apache Airflow是一个用于编排复杂计算工作流和数据处理流水线的平台。

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.utils.dates import days_ago
import logging

class DataPipelineDesign:
    """数据流水线设计模式"""
    
    def __init__(self):
        self.default_args = {
            'owner': 'data_engineering',
            'depends_on_past': False,
            'email_on_failure': True,
            'email_on_retry': False,
            'retries': 3,
            'retry_delay': timedelta(minutes=5)
        }
    
    def create_etl_dag(self):
        """创建ETL流水线DAG"""
        
        def extract_data():
            """数据提取任务"""
            logging.info("开始数据提取...")
            # 模拟数据提取
            import time
            time.sleep(2)
            logging.info("数据提取完成")
            return "extract_success"
        
        def transform_data():
            """数据转换任务"""
            logging.info("开始数据转换...")
            # 模拟数据转换
            processor = AdvancedDataProcessor()
            df = processor.create_sample_dataset(1000)
            transformed_df = processor.comprehensive_data_cleaning(df)
            logging.info(f"数据转换完成,处理了{len(transformed_df)}条记录")
            return "transform_success"
        
        def load_data():
            """数据加载任务"""
            logging.info("开始数据加载...")
            # 模拟数据加载
            import time
            time.sleep(1)
            logging.info("数据加载完成")
            return "load_success"
        
        def data_quality_check():
            """数据质量检查"""
            logging.info("执行数据质量检查...")
            # 模拟质量检查
            import random
            quality_score = random.uniform(0.8, 1.0)
            logging.info(f"数据质量得分: {quality_score:.2f}")
            if quality_score < 0.9:
                raise ValueError(f"数据质量不足: {quality_score:.2f}")
            return f"quality_check_passed_{quality_score:.2f}"
        
        # 定义DAG
        with DAG(
            'comprehensive_etl_pipeline',
            default_args=self.default_args,
            description='完整的ETL数据流水线',
            schedule_interval=timedelta(hours=1),
            start_date=days_ago(1),
            tags=['data_engineering', 'etl']
        ) as dag:
            
            start = DummyOperator(task_id='start')
            
            extract = PythonOperator(
                task_id='extract_data',
                python_callable=extract_data
            )
            
            transform = PythonOperator(
                task_id='transform_data',
                python_callable=transform_data
            )
            
            load = PythonOperator(
                task_id='load_data',
                python_callable=load_data
            )
            
            quality_check = PythonOperator(
                task_id='data_quality_check',
                python_callable=data_quality_check
            )
            
            end = DummyOperator(task_id='end')
            
            # 定义任务依赖关系
            start >> extract >> transform >> load >> quality_check >> end
            
            return dag

# Airflow DAG配置示例
pipeline_design = DataPipelineDesign()
etl_dag = pipeline_design.create_etl_dag()

print("=== Airflow ETL流水线定义 ===")
print(f"DAG ID: {etl_dag.dag_id}")
print(f"调度间隔: {etl_dag.schedule_interval}")
print(f"任务数量: {len(etl_dag.tasks)}")

3.2 高级Airflow功能与实践

class AdvancedAirflowFeatures:
    """高级Airflow功能演示"""
    
    def create_data_quality_dag(self):
        """创建包含数据质量监控的DAG"""
        
        def generate_sales_report(**context):
            """生成销售报告"""
            execution_date = context['execution_date']
            logging.info(f"为 {execution_date} 生成销售报告")
            
            # 模拟报告生成
            report_data = {
                'date': execution_date.strftime('%Y-%m-%d'),
                'total_sales': 150000,
                'total_orders': 1200,
                'top_category': 'Electronics',
                'generated_at': datetime.now().isoformat()
            }
            
            return report_data
        
        def send_alert(**context):
            """发送警报"""
            ti = context['ti']
            report_data = ti.xcom_pull(task_ids='generate_sales_report')
            
            logging.info(f"发送销售报告警报: {report_data}")
            # 这里可以集成邮件、Slack等通知方式
            
        def backup_database(**context):
            """数据库备份任务"""
            logging.info("执行数据库备份...")
            # 模拟备份操作
            return "backup_completed"
        
        with DAG(
            'data_quality_monitoring',
            default_args=self.default_args,
            description='数据质量监控流水线',
            schedule_interval=timedelta(days=1),
            start_date=days_ago(1),
            catchup=False
        ) as dag:
            
            # 使用BranchOperator进行条件执行
            from airflow.operators.python import BranchPythonOperator
            
            def check_data_quality(**context):
                """检查数据质量并决定执行路径"""
                import random
                quality_score = random.uniform(0.7, 1.0)
                
                if quality_score >= 0.9:
                    return 'generate_sales_report'
                else:
                    return 'send_data_quality_alert'
            
            check_quality = BranchPythonOperator(
                task_id='check_data_quality',
                python_callable=check_data_quality
            )
            
            generate_report = PythonOperator(
                task_id='generate_sales_report',
                python_callable=generate_sales_report
            )
            
            send_quality_alert = PythonOperator(
                task_id='send_data_quality_alert',
                python_callable=send_alert
            )
            
            backup_task = PythonOperator(
                task_id='backup_database',
                python_callable=backup_database,
                trigger_rule='none_failed_min_one_success'
            )
            
            # 定义工作流
            check_quality >> [generate_report, send_quality_alert]
            [generate_report, send_quality_alert] >> backup_task
            
            return dag

# 高级功能演示
advanced_features = AdvancedAirflowFeatures()
quality_dag = advanced_features.create_data_quality_dag()

print("\n=== 高级Airflow功能 ===")
print("已实现的功能:")
print("- 条件分支执行 (BranchPythonOperator)")
print("- 任务间数据传递 (XCom)")
print("- 灵活的任务触发规则")
print("- 错误处理和重试机制")

4. 完整实践案例:电商数据流水线

4.1 端到端数据流水线实现

#!/usr/bin/env python3
"""
ecommerce_data_pipeline.py
电商数据流水线完整实现
集成Pandas数据处理和Airflow工作流编排
"""

import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import logging
from typing import Dict, List, Optional
import json

class EcommerceDataPipeline:
    """电商数据流水线核心类"""
    
    def __init__(self):
        self.setup_logging()
        self.data_quality_threshold = 0.85
    
    def setup_logging(self):
        """设置日志配置"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        self.logger = logging.getLogger(__name__)
    
    def extract_multiple_sources(self) -> Dict[str, pd.DataFrame]:
        """从多个数据源提取数据"""
        self.logger.info("开始从多个数据源提取数据...")
        
        # 模拟多个数据源
        sources = {}
        
        # 1. 销售数据
        sales_data = self._generate_sales_data(5000)
        sources['sales'] = sales_data
        
        # 2. 用户数据
        user_data = self._generate_user_data(2000)
        sources['users'] = user_data
        
        # 3. 产品数据
        product_data = self._generate_product_data(500)
        sources['products'] = product_data
        
        self.logger.info(f"成功提取 {len(sources)} 个数据源")
        return sources
    
    def _generate_sales_data(self, num_records: int) -> pd.DataFrame:
        """生成模拟销售数据"""
        np.random.seed(42)
        
        dates = pd.date_range(
            start='2024-01-01', 
            end=datetime.now(), 
            periods=num_records
        )
        
        data = {
            'order_id': [f'ORD{10000 + i}' for i in range(num_records)],
            'user_id': np.random.randint(1000, 3000, num_records),
            'product_id': np.random.randint(1, 501, num_records),
            'order_date': dates,
            'amount': np.random.exponential(150, num_records),
            'quantity': np.random.poisson(2, num_records) + 1,
            'category': np.random.choice(
                ['Electronics', 'Clothing', 'Books', 'Home', 'Sports'], 
                num_records,
                p=[0.3, 0.25, 0.15, 0.2, 0.1]
            ),
            'payment_method': np.random.choice(
                ['Credit Card', 'PayPal', 'Bank Transfer', 'Cash'], 
                num_records
            ),
            'status': np.random.choice(
                ['Completed', 'Pending', 'Cancelled'], 
                num_records,
                p=[0.85, 0.1, 0.05]
            )
        }
        
        df = pd.DataFrame(data)
        
        # 添加一些数据质量问题
        df.loc[df.sample(frac=0.03).index, 'amount'] = np.nan
        df.loc[df.sample(frac=0.02).index, 'quantity'] = -1
        
        return df
    
    def _generate_user_data(self, num_users: int) -> pd.DataFrame:
        """生成模拟用户数据"""
        np.random.seed(42)
        
        regions = ['North', 'South', 'East', 'West', 'Central']
        
        data = {
            'user_id': range(1000, 1000 + num_users),
            'join_date': pd.date_range(
                start='2020-01-01', 
                periods=num_users, 
                freq='D'
            ),
            'region': np.random.choice(regions, num_users),
            'age_group': np.random.choice(
                ['18-25', '26-35', '36-45', '46-55', '55+'], 
                num_users,
                p=[0.2, 0.3, 0.25, 0.15, 0.1]
            ),
            'loyalty_tier': np.random.choice(
                ['Bronze', 'Silver', 'Gold', 'Platinum'], 
                num_users,
                p=[0.4, 0.3, 0.2, 0.1]
            )
        }
        
        return pd.DataFrame(data)
    
    def _generate_product_data(self, num_products: int) -> pd.DataFrame:
        """生成模拟产品数据"""
        np.random.seed(42)
        
        categories = ['Electronics', 'Clothing', 'Books', 'Home', 'Sports']
        
        data = {
            'product_id': range(1, num_products + 1),
            'product_name': [f'Product_{i}' for i in range(1, num_products + 1)],
            'category': np.random.choice(categories, num_products),
            'price': np.random.uniform(10, 1000, num_products),
            'cost': np.random.uniform(5, 500, num_products),
            'stock_quantity': np.random.randint(0, 1000, num_products),
            'supplier': np.random.choice(
                ['Supplier_A', 'Supplier_B', 'Supplier_C', 'Supplier_D'], 
                num_products
            )
        }
        
        return pd.DataFrame(data)
    
    def transform_and_enrich_data(self, sources: Dict[str, pd.DataFrame]) -> Dict[str, pd.DataFrame]:
        """数据转换和 enrichment"""
        self.logger.info("开始数据转换和 enrichment...")
        
        transformed_data = {}
        
        # 1. 销售数据转换
        sales_df = sources['sales'].copy()
        sales_df = self._clean_sales_data(sales_df)
        sales_df = self._enrich_sales_data(sales_df, sources['users'], sources['products'])
        transformed_data['sales'] = sales_df
        
        # 2. 用户数据转换
        users_df = sources['users'].copy()
        users_df = self._enrich_user_data(users_df, sales_df)
        transformed_data['users'] = users_df
        
        # 3. 产品数据转换
        products_df = sources['products'].copy()
        products_df = self._enrich_product_data(products_df, sales_df)
        transformed_data['products'] = products_df
        
        # 4. 创建聚合数据集
        aggregated_data = self._create_aggregated_datasets(sales_df, users_df, products_df)
        transformed_data.update(aggregated_data)
        
        self.logger.info("数据转换和 enrichment 完成")
        return transformed_data
    
    def _clean_sales_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """清洗销售数据"""
        # 处理缺失值
        df['amount'].fillna(df['amount'].median(), inplace=True)
        
        # 处理异常值
        df = df[df['quantity'] > 0]
        df['amount'] = df['amount'].clip(
            lower=df['amount'].quantile(0.01),
            upper=df['amount'].quantile(0.99)
        )
        
        # 数据类型优化
        df['user_id'] = df['user_id'].astype('int32')
        df['product_id'] = df['product_id'].astype('int32')
        df['category'] = df['category'].astype('category')
        df['payment_method'] = df['payment_method'].astype('category')
        df['status'] = df['status'].astype('category')
        
        return df
    
    def _enrich_sales_data(self, sales_df: pd.DataFrame, users_df: pd.DataFrame, products_df: pd.DataFrame) -> pd.DataFrame:
        """Enrich销售数据"""
        # 合并用户信息
        enriched_df = sales_df.merge(
            users_df[['user_id', 'region', 'age_group', 'loyalty_tier']],
            on='user_id',
            how='left'
        )
        
        # 合并产品信息
        enriched_df = enriched_df.merge(
            products_df[['product_id', 'price', 'cost', 'supplier']],
            on='product_id',
            how='left'
        )
        
        # 计算衍生特征
        enriched_df['profit'] = enriched_df['amount'] - enriched_df['cost']
        enriched_df['hour_of_day'] = enriched_df['order_date'].dt.hour
        enriched_df['day_of_week'] = enriched_df['order_date'].dt.day_name()
        enriched_df['is_weekend'] = enriched_df['order_date'].dt.dayofweek >= 5
        
        return enriched_df
    
    def _enrich_user_data(self, users_df: pd.DataFrame, sales_df: pd.DataFrame) -> pd.DataFrame:
        """Enrich用户数据"""
        user_metrics = sales_df.groupby('user_id').agg({
            'order_id': 'count',
            'amount': ['sum', 'mean'],
            'order_date': ['min', 'max']
        }).round(2)
        
        # 扁平化列名
        user_metrics.columns = ['total_orders', 'total_spent', 'avg_order_value', 'first_order', 'last_order']
        
        user_metrics['customer_lifetime_days'] = (
            user_metrics['last_order'] - user_metrics['first_order']
        ).dt.days
        
        enriched_users = users_df.merge(
            user_metrics,
            left_on='user_id',
            right_index=True,
            how='left'
        )
        
        # 处理新用户(无订单记录)
        enriched_users['total_orders'].fillna(0, inplace=True)
        enriched_users['total_spent'].fillna(0, inplace=True)
        enriched_users['avg_order_value'].fillna(0, inplace=True)
        
        return enriched_users
    
    def _enrich_product_data(self, products_df: pd.DataFrame, sales_df: pd.DataFrame) -> pd.DataFrame:
        """Enrich产品数据"""
        product_metrics = sales_df.groupby('product_id').agg({
            'order_id': 'count',
            'quantity': 'sum',
            'amount': ['sum', 'mean'],
            'profit': 'sum'
        }).round(2)
        
        product_metrics.columns = [
            'times_ordered', 'total_quantity_sold', 'total_revenue', 
            'avg_sale_price', 'total_profit'
        ]
        
        enriched_products = products_df.merge(
            product_metrics,
            on='product_id',
            how='left'
        )
        
        # 处理新产品(无销售记录)
        for col in ['times_ordered', 'total_quantity_sold', 'total_revenue', 'avg_sale_price', 'total_profit']:
            enriched_products[col].fillna(0, inplace=True)
        
        # 计算利润率
        enriched_products['profit_margin'] = (
            enriched_products['total_profit'] / enriched_products['total_revenue']
        ).replace([np.inf, -np.inf], 0).fillna(0)
        
        return enriched_products
    
    def _create_aggregated_datasets(self, sales_df: pd.DataFrame, users_df: pd.DataFrame, products_df: pd.DataFrame) -> Dict[str, pd.DataFrame]:
        """创建聚合数据集"""
        aggregated_data = {}
        
        # 1. 每日销售聚合
        daily_sales = sales_df.groupby(sales_df['order_date'].dt.date).agg({
            'order_id': 'count',
            'amount': 'sum',
            'quantity': 'sum',
            'profit': 'sum'
        }).rename(columns={
            'order_id': 'daily_orders',
            'amount': 'daily_revenue',
            'quantity': 'daily_quantity',
            'profit': 'daily_profit'
        })
        
        # 计算移动平均
        daily_sales['revenue_7d_ma'] = daily_sales['daily_revenue'].rolling(7).mean()
        daily_sales['orders_7d_ma'] = daily_sales['daily_orders'].rolling(7).mean()
        
        aggregated_data['daily_sales'] = daily_sales
        
        # 2. 类别表现分析
        category_performance = sales_df.groupby('category').agg({
            'order_id': 'count',
            'amount': 'sum',
            'profit': 'sum',
            'user_id': 'nunique'
        }).rename(columns={
            'order_id': 'total_orders',
            'amount': 'total_revenue',
            'profit': 'total_profit',
            'user_id': 'unique_customers'
        })
        
        category_performance['avg_order_value'] = (
            category_performance['total_revenue'] / category_performance['total_orders']
        )
        category_performance['profit_margin'] = (
            category_performance['total_profit'] / category_performance['total_revenue']
        )
        
        aggregated_data['category_performance'] = category_performance
        
        # 3. 区域分析
        region_analysis = sales_df.groupby('region').agg({
            'order_id': 'count',
            'amount': 'sum',
            'user_id': 'nunique'
        }).rename(columns={
            'order_id': 'total_orders',
            'amount': 'total_revenue',
            'user_id': 'unique_customers'
        })
        
        aggregated_data['region_analysis'] = region_analysis
        
        return aggregated_data
    
    def perform_data_quality_checks(self, data: Dict[str, pd.DataFrame]) -> Dict[str, float]:
        """执行数据质量检查"""
        self.logger.info("执行数据质量检查...")
        
        quality_scores = {}
        
        for dataset_name, df in data.items():
            score = self._calculate_data_quality_score(df, dataset_name)
            quality_scores[dataset_name] = score
            self.logger.info(f"{dataset_name} 数据质量得分: {score:.3f}")
        
        overall_score = np.mean(list(quality_scores.values()))
        quality_scores['overall'] = overall_score
        
        self.logger.info(f"总体数据质量得分: {overall_score:.3f}")
        
        return quality_scores
    
    def _calculate_data_quality_score(self, df: pd.DataFrame, dataset_name: str) -> float:
        """计算单个数据集的质量得分"""
        checks = []
        
        # 1. 完整性检查
        completeness = 1 - (df.isnull().sum().sum() / (df.shape[0] * df.shape[1]))
        checks.append(completeness)
        
        # 2. 唯一性检查(针对ID列)
        id_columns = [col for col in df.columns if 'id' in col.lower() or 'ID' in col]
        uniqueness_scores = []
        for col in id_columns:
            if col in df.columns:
                uniqueness = df[col].nunique() / len(df)
                uniqueness_scores.append(uniqueness)
        
        uniqueness_score = np.mean(uniqueness_scores) if uniqueness_scores else 1.0
        checks.append(uniqueness_score)
        
        # 3. 有效性检查(针对数值列)
        numeric_cols = df.select_dtypes(include=[np.number]).columns
        validity_scores = []
        for col in numeric_cols:
            if df[col].dtype in [np.int64, np.float64]:
                # 检查是否在合理范围内(基于分位数)
                q1 = df[col].quantile(0.01)
                q99 = df[col].quantile(0.99)
                valid_count = ((df[col] >= q1) & (df[col] <= q99)).sum()
                validity = valid_count / len(df)
                validity_scores.append(validity)
        
        validity_score = np.mean(validity_scores) if validity_scores else 1.0
        checks.append(validity_score)
        
        # 4. 一致性检查(针对分类列)
        categorical_cols = df.select_dtypes(include=['category', 'object']).columns
        consistency_scores = []
        for col in categorical_cols:
            if df[col].nunique() > 0:
                # 检查主要类别是否占合理比例
                top_category_ratio = df[col].value_counts().iloc[0] / len(df)
                consistency = min(1.0, 1.5 - top_category_ratio)  # 避免单一类别主导
                consistency_scores.append(consistency)
        
        consistency_score = np.mean(consistency_scores) if consistency_scores else 1.0
        checks.append(consistency_score)
        
        return np.mean(checks)
    
    def generate_business_insights(self, data: Dict[str, pd.DataFrame]) -> Dict[str, any]:
        """生成业务洞察"""
        self.logger.info("生成业务洞察...")
        
        insights = {}
        
        sales_df = data['sales']
        daily_sales = data['daily_sales']
        category_perf = data['category_performance']
        
        # 1. 关键指标
        insights['key_metrics'] = {
            'total_revenue': sales_df['amount'].sum(),
            'total_orders': sales_df['order_id'].nunique(),
            'total_customers': sales_df['user_id'].nunique(),
            'avg_order_value': sales_df['amount'].mean(),
            'conversion_rate': len(sales_df) / data['users']['user_id'].nunique()
        }
        
        # 2. 销售趋势
        recent_sales = daily_sales.tail(30)
        insights['sales_trends'] = {
            'revenue_growth': (
                recent_sales['daily_revenue'].iloc[-1] - recent_sales['daily_revenue'].iloc[0]
            ) / recent_sales['daily_revenue'].iloc[0] if recent_sales['daily_revenue'].iloc[0] > 0 else 0,
            'best_selling_category': category_perf.loc[category_perf['total_revenue'].idxmax()].name,
            'most_profitable_category': category_perf.loc[category_perf['total_profit'].idxmax()].name
        }
        
        # 3. 客户洞察
        user_metrics = data['users']
        insights['customer_insights'] = {
            'avg_customer_lifetime': user_metrics['customer_lifetime_days'].mean(),
            'repeat_customer_rate': (user_metrics['total_orders'] > 1).mean(),
            'top_region': user_metrics['region'].mode().iloc[0] if len(user_metrics['region'].mode()) > 0 else 'N/A'
        }
        
        return insights
    
    def run_complete_pipeline(self) -> Dict[str, any]:
        """运行完整的数据流水线"""
        self.logger.info("启动完整电商数据流水线...")
        
        try:
            # 1. 数据提取
            sources = self.extract_multiple_sources()
            self.logger.info("数据提取阶段完成")
            
            # 2. 数据转换
            transformed_data = self.transform_and_enrich_data(sources)
            self.logger.info("数据转换阶段完成")
            
            # 3. 数据质量检查
            quality_scores = self.perform_data_quality_checks(transformed_data)
            
            if quality_scores['overall'] < self.data_quality_threshold:
                raise ValueError(f"数据质量不足: {quality_scores['overall']:.3f}")
            
            self.logger.info("数据质量检查通过")
            
            # 4. 生成业务洞察
            insights = self.generate_business_insights(transformed_data)
            self.logger.info("业务洞察生成完成")
            
            # 5. 汇总结果
            pipeline_result = {
                'success': True,
                'timestamp': datetime.now().isoformat(),
                'data_quality_scores': quality_scores,
                'business_insights': insights,
                'dataset_sizes': {name: len(df) for name, df in transformed_data.items()},
                'processing_summary': {
                    'total_records_processed': sum(len(df) for df in transformed_data.values()),
                    'total_datasets': len(transformed_data)
                }
            }
            
            self.logger.info("数据流水线执行成功!")
            return pipeline_result
            
        except Exception as e:
            self.logger.error(f"数据流水线执行失败: {str(e)}")
            return {
                'success': False,
                'error': str(e),
                'timestamp': datetime.now().isoformat()
            }

def main():
    """主函数 - 演示完整数据流水线"""
    pipeline = EcommerceDataPipeline()
    
    print("=== 电商数据流水线演示 ===")
    print("开始执行完整数据流水线...")
    
    start_time = datetime.now()
    result = pipeline.run_complete_pipeline()
    end_time = datetime.now()
    
    execution_time = (end_time - start_time).total_seconds()
    
    print(f"\n执行时间: {execution_time:.2f}秒")
    print(f"执行结果: {'成功' if result['success'] else '失败'}")
    
    if result['success']:
        print("\n=== 数据质量报告 ===")
        for dataset, score in result['data_quality_scores'].items():
            print(f"{dataset}: {score:.3f}")
        
        print("\n=== 关键业务指标 ===")
        metrics = result['business_insights']['key_metrics']
        for metric, value in metrics.items():
            if isinstance(value, float):
                print(f"{metric}: {value:,.2f}")
            else:
                print(f"{metric}: {value:,}")
        
        print("\n=== 处理摘要 ===")
        summary = result['processing_summary']
        print(f"处理记录总数: {summary['total_records_processed']:,}")
        print(f"生成数据集数量: {summary['total_datasets']}")

if __name__ == "__main__":
    main()

4.2 Airflow集成与生产部署

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.email import EmailOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from datetime import datetime, timedelta
import json

class ProductionDataPipeline:
    """生产环境数据流水线"""
    
    def __init__(self):
        self.default_args = {
            'owner': 'data_engineering_team',
            'depends_on_past': False,
            'start_date': datetime(2024, 1, 1),
            'email': ['data-team@company.com'],
            'email_on_failure': True,
            'email_on_retry': False,
            'retries': 3,
            'retry_delay': timedelta(minutes=10)
        }
    
    def create_production_dag(self):
        """创建生产环境DAG"""
        
        def run_data_pipeline(**context):
            """运行数据流水线"""
            pipeline = EcommerceDataPipeline()
            result = pipeline.run_complete_pipeline()
            
            # 将结果推送到XCom供后续任务使用
            context['ti'].xcom_push(key='pipeline_result', value=result)
            return result['success']
        
        def generate_daily_report(**context):
            """生成每日报告"""
            ti = context['ti']
            result = ti.xcom_pull(task_ids='run_data_pipeline', key='pipeline_result')
            
            if result and result['success']:
                insights = result['business_insights']
                
                report = f"""
每日数据流水线报告 - {datetime.now().strftime('%Y-%m-%d')}

📊 关键指标:
• 总营收: ${insights['key_metrics']['total_revenue']:,.2f}
• 订单数量: {insights['key_metrics']['total_orders']:,}
• 客户数量: {insights['key_metrics']['total_customers']:,}
• 平均订单价值: ${insights['key_metrics']['avg_order_value']:.2f}

📈 销售趋势:
• 收入增长率: {insights['sales_trends']['revenue_growth']:.1%}
• 最畅销类别: {insights['sales_trends']['best_selling_category']}
• 最盈利类别: {insights['sales_trends']['most_profitable_category']}

👥 客户洞察:
• 平均客户生命周期: {insights['customer_insights']['avg_customer_lifetime']:.1f} 天
• 复购率: {insights['customer_insights']['repeat_customer_rate']:.1%}

数据质量得分: {result['data_quality_scores']['overall']:.3f}
                """
                return report
            else:
                return "数据流水线执行失败,无法生成报告"
        
        def handle_pipeline_success(**context):
            """处理流水线成功"""
            ti = context['ti']
            report = ti.xcom_pull(task_ids='generate_daily_report')
            
            success_alert = f"""
✅ 数据流水线执行成功!

{report}

执行时间: {context['execution_date']}
            """
            return success_alert
        
        def handle_pipeline_failure(**context):
            """处理流水线失败"""
            error_message = f"""
❌ 数据流水线执行失败!

任务: {context['task_instance'].task_id}
执行时间: {context['execution_date']}
错误: {context.get('exception', 'Unknown error')}

请检查日志获取详细信息。
            """
            return error_message
        
        with DAG(
            'ecommerce_production_pipeline',
            default_args=self.default_args,
            description='电商数据生产流水线',
            schedule_interval=timedelta(hours=6),  # 每6小时运行一次
            catchup=False,
            tags=['production', 'ecommerce', 'data-pipeline']
        ) as dag:
            
            # 主要任务
            run_pipeline = PythonOperator(
                task_id='run_data_pipeline',
                python_callable=run_data_pipeline,
                provide_context=True
            )
            
            generate_report = PythonOperator(
                task_id='generate_daily_report',
                python_callable=generate_daily_report,
                provide_context=True
            )
            
            # 通知任务
            success_notification = SlackWebhookOperator(
                task_id='success_notification',
                slack_webhook_conn_id='slack_webhook',
                message="""✅ 电商数据流水线执行成功!
每日报告已生成,请查看相关系统。""",
                trigger_rule='all_success'
            )
            
            failure_notification = SlackWebhookOperator(
                task_id='failure_notification',
                slack_webhook_conn_id='slack_webhook',
                message="""❌ 电商数据流水线执行失败!
请立即检查系统状态和日志。""",
                trigger_rule='one_failed'
            )
            
            # 定义任务依赖
            run_pipeline >> generate_report
            generate_report >> success_notification
            run_pipeline >> failure_notification
            
            return dag

# 生产环境配置示例
production_pipeline = ProductionDataPipeline()
production_dag = production_pipeline.create_production_dag()

print("=== 生产环境数据流水线配置 ===")
print(f"DAG ID: {production_dag.dag_id}")
print(f"调度间隔: {production_dag.schedule_interval}")
print(f"Owner: {production_dag.default_args['owner']}")
print(f"重试策略: {production_dag.default_args['retries']}次")

5. 性能优化与最佳实践

5.1 Pandas性能优化技巧

class PerformanceOptimizer:
    """性能优化工具类"""
    
    @staticmethod
    def optimize_pandas_operations():
        """Pandas操作优化演示"""
        import time
        
        # 创建大型数据集
        large_df = pd.DataFrame({
            'A': np.random.rand(1000000),
            'B': np.random.rand(1000000),
            'C': np.random.choice(['X', 'Y', 'Z'], 1000000),
            'D': np.random.randint(1, 100, 1000000)
        })
        
        print("=== Pandas性能优化对比 ===")
        
        # 方法1: 传统的逐行操作(慢)
        start_time = time.time()
        result_slow = large_df.apply(
            lambda row: row['A'] * row['B'] if row['C'] == 'X' else row['A'] + row['B'], 
            axis=1
        )
        slow_time = time.time() - start_time
        
        # 方法2: 向量化操作(快)
        start_time = time.time()
        mask = large_df['C'] == 'X'
        result_fast = np.where(mask, large_df['A'] * large_df['B'], large_df['A'] + large_df['B'])
        fast_time = time.time() - start_time
        
        print(f"逐行操作时间: {slow_time:.4f}秒")
        print(f"向量化操作时间: {fast_time:.4f}秒")
        print(f"性能提升: {slow_time/fast_time:.1f}x")
        
        return slow_time, fast_time
    
    @staticmethod
    def memory_optimization_techniques():
        """内存优化技术"""
        print("\n=== 内存优化技术 ===")
        
        # 创建示例数据
        df = pd.DataFrame({
            'int_col': np.random.randint(1, 1000, 10000),
            'float_col': np.random.rand(10000),
            'category_col': np.random.choice(['A', 'B', 'C', 'D'], 10000),
            'string_col': ['text_' + str(i) for i in range(10000)]
        })
        
        print("优化前内存使用:")
        print(df.info(memory_usage='deep'))
        
        # 优化数据类型
        df_optimized = df.copy()
        df_optimized['int_col'] = pd.to_numeric(df_optimized['int_col'], downcast='integer')
        df_optimized['float_col'] = pd.to_numeric(df_optimized['float_col'], downcast='float')
        df_optimized['category_col'] = df_optimized['category_col'].astype('category')
        
        print("\n优化后内存使用:")
        print(df_optimized.info(memory_usage='deep'))
        
        # 计算节省的内存
        original_memory = df.memory_usage(deep=True).sum() / 1024**2
        optimized_memory = df_optimized.memory_usage(deep=True).sum() / 1024**2
        savings = original_memory - optimized_memory
        
        print(f"\n内存节省: {savings:.2f} MB ({savings/original_memory*100:.1f}%)")

# 性能优化演示
optimizer = PerformanceOptimizer()
slow_time, fast_time = optimizer.optimize_pandas_operations()
optimizer.memory_optimization_techniques()

5.2 数据流水线监控与告警

数据源
Airflow DAG
数据提取
数据清洗
数据转换
数据质量检查
质量合格?
数据加载
告警通知
生成报告
成功通知
错误处理
重试机制

6. 总结与展望

6.1 关键实践要点

通过本文的实践案例,我们展示了Python在数据工程中的强大能力:

  1. Pandas数据处理

    • 高效的数据清洗和转换
    • 内存优化和性能提升技巧
    • 复杂的数据分析和聚合
  2. Airflow工作流管理

    • 可靠的任务调度和依赖管理
    • 错误处理和重试机制
    • 生产环境的监控和告警
  3. 端到端流水线

    • 多数据源集成
    • 数据质量保障
    • 业务价值交付

6.2 未来发展趋势

数据工程领域正在快速发展,以下趋势值得关注:

  • 实时数据处理:流处理技术的普及
  • MLOps集成:机器学习流水线与数据工程的融合
  • 数据网格:分布式数据架构的兴起
  • 云原生技术:容器化和无服务器架构的应用

6.3 最佳实践建议

基于实践经验,我们提出以下建议:

class BestPracticesChecklist:
    """数据工程最佳实践检查清单"""
    
    @staticmethod
    def get_checklist():
        """获取最佳实践检查清单"""
        return {
            "代码质量": [
                "编写清晰的文档字符串",
                "使用类型注解",
                "遵循PEP 8规范",
                "编写单元测试"
            ],
            "数据处理": [
                "实现数据验证和清洗",
                "处理缺失值和异常值",
                "优化内存使用",
                "记录数据血缘"
            ],
            "工作流管理": [
                "设置合理的重试策略",
                "实现监控和告警",
                "记录执行日志",
                "定期维护和优化"
            ],
            "生产部署": [
                "使用配置管理",
                "实现错误处理",
                "设置性能监控",
                "定期备份数据"
            ]
        }
    
    @staticmethod
    def validate_pipeline(pipeline_config):
        """验证流水线配置"""
        checks = {
            "has_error_handling": pipeline_config.get('retries', 0) > 0,
            "has_monitoring": pipeline_config.get('email_on_failure', False),
            "has_documentation": bool(pipeline_config.get('description')),
            "has_testing": pipeline_config.get('test_coverage', 0) > 0.7
        }
        
        return checks

# 最佳实践验证
checklist = BestPracticesChecklist()
practices = checklist.get_checklist()

print("=== 数据工程最佳实践检查清单 ===")
for category, items in practices.items():
    print(f"\n{category}:")
    for item in items:
        print(f"  ✓ {item}")

# 验证示例配置
sample_config = {
    'retries': 3,
    'email_on_failure': True,
    'description': '电商数据流水线',
    'test_coverage': 0.8
}

validation = checklist.validate_pipeline(sample_config)
print(f"\n流水线配置验证: {sum(validation.values())}/{len(validation)} 项通过")

Python在数据工程领域的地位日益重要,通过掌握Pandas和Airflow等核心工具,数据工程师可以构建出强大、可靠的数据处理系统。随着技术的不断发展,持续学习和实践将是保持竞争力的关键。


本文通过完整的实践案例展示了Python在数据工程中的应用,涵盖了从数据处理到工作流编排的各个方面。希望这些实践经验能够帮助您在数据工程项目中取得成功。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐