7. 工作流调度组件

7.1 Apache Airflow详解

from typing import Dict, List, Any, Optional, Tuple, Union, Callable
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime, timedelta
import threading
import time
import random
import json
import uuid
from collections import defaultdict, deque
import copy

class TaskState(Enum):
    """任务状态"""
    NONE = "无状态"
    SCHEDULED = "已调度"
    QUEUED = "队列中"
    RUNNING = "运行中"
    SUCCESS = "成功"
    FAILED = "失败"
    UP_FOR_RETRY = "等待重试"
    UP_FOR_RESCHEDULE = "等待重新调度"
    UPSTREAM_FAILED = "上游失败"
    SKIPPED = "跳过"
    REMOVED = "已移除"

class DagState(Enum):
    """DAG状态"""
    RUNNING = "运行中"
    SUCCESS = "成功"
    FAILED = "失败"
    PAUSED = "暂停"
    QUEUED = "队列中"

class TriggerRule(Enum):
    """触发规则"""
    ALL_SUCCESS = "all_success"  # 所有上游任务成功
    ALL_FAILED = "all_failed"    # 所有上游任务失败
    ALL_DONE = "all_done"        # 所有上游任务完成
    ONE_SUCCESS = "one_success"  # 至少一个上游任务成功
    ONE_FAILED = "one_failed"    # 至少一个上游任务失败
    NONE_FAILED = "none_failed"  # 没有上游任务失败
    NONE_SKIPPED = "none_skipped" # 没有上游任务跳过
    DUMMY = "dummy"              # 总是触发

class ScheduleInterval(Enum):
    """调度间隔"""
    ONCE = "@once"               # 只执行一次
    HOURLY = "@hourly"           # 每小时
    DAILY = "@daily"             # 每天
    WEEKLY = "@weekly"           # 每周
    MONTHLY = "@monthly"         # 每月
    YEARLY = "@yearly"           # 每年
    NONE = None                  # 不调度

class ExecutorType(Enum):
    """执行器类型"""
    SEQUENTIAL = "SequentialExecutor"
    LOCAL = "LocalExecutor"
    CELERY = "CeleryExecutor"
    KUBERNETES = "KubernetesExecutor"
    DEBUG = "DebugExecutor"

@dataclass
class TaskInstance:
    """任务实例"""
    task_id: str
    dag_id: str
    execution_date: datetime
    state: TaskState = TaskState.NONE
    start_date: Optional[datetime] = None
    end_date: Optional[datetime] = None
    duration: Optional[float] = None
    try_number: int = 1
    max_tries: int = 1
    hostname: str = ""
    unixname: str = ""
    job_id: Optional[int] = None
    pool: str = "default_pool"
    queue: str = "default"
    priority_weight: int = 1
    operator: str = ""
    queued_dttm: Optional[datetime] = None
    pid: Optional[int] = None
    executor_config: Dict[str, Any] = field(default_factory=dict)
    log_url: str = ""
    
@dataclass
class Task:
    """任务定义"""
    task_id: str
    dag_id: str
    operator_class: str
    operator_kwargs: Dict[str, Any] = field(default_factory=dict)
    upstream_task_ids: List[str] = field(default_factory=list)
    downstream_task_ids: List[str] = field(default_factory=list)
    trigger_rule: TriggerRule = TriggerRule.ALL_SUCCESS
    depends_on_past: bool = False
    wait_for_downstream: bool = False
    retries: int = 0
    retry_delay: timedelta = timedelta(minutes=5)
    retry_exponential_backoff: bool = False
    max_retry_delay: Optional[timedelta] = None
    start_date: Optional[datetime] = None
    end_date: Optional[datetime] = None
    schedule_interval: Optional[str] = None
    pool: str = "default_pool"
    queue: str = "default"
    priority_weight: int = 1
    weight_rule: str = "downstream"  # downstream, upstream, absolute
    sla: Optional[timedelta] = None
    execution_timeout: Optional[timedelta] = None
    on_failure_callback: Optional[Callable] = None
    on_success_callback: Optional[Callable] = None
    on_retry_callback: Optional[Callable] = None
    email_on_failure: bool = True
    email_on_retry: bool = True
    email: List[str] = field(default_factory=list)
    
@dataclass
class DagRun:
    """DAG运行实例"""
    dag_id: str
    run_id: str
    execution_date: datetime
    start_date: Optional[datetime] = None
    end_date: Optional[datetime] = None
    state: DagState = DagState.RUNNING
    run_type: str = "scheduled"  # scheduled, manual, backfill
    external_trigger: bool = False
    conf: Dict[str, Any] = field(default_factory=dict)
    data_interval_start: Optional[datetime] = None
    data_interval_end: Optional[datetime] = None
    last_scheduling_decision: Optional[datetime] = None
    dag_hash: Optional[str] = None
    creating_job_id: Optional[int] = None
    queued_at: Optional[datetime] = None
    
@dataclass
class Dag:
    """DAG定义"""
    dag_id: str
    description: str = ""
    schedule_interval: Optional[str] = ScheduleInterval.DAILY.value
    start_date: Optional[datetime] = None
    end_date: Optional[datetime] = None
    catchup: bool = True
    max_active_runs: int = 16
    max_active_tasks: int = 16
    default_args: Dict[str, Any] = field(default_factory=dict)
    params: Dict[str, Any] = field(default_factory=dict)
    tags: List[str] = field(default_factory=list)
    tasks: Dict[str, Task] = field(default_factory=dict)
    is_paused: bool = False
    is_subdag: bool = False
    fileloc: str = ""
    template_searchpath: Optional[List[str]] = None
    template_undefined: str = "strict"  # strict, jinja2.Undefined
    user_defined_macros: Optional[Dict[str, Any]] = None
    user_defined_filters: Optional[Dict[str, Any]] = None
    default_view: str = "tree"  # tree, graph, duration, gantt, landing_times
    orientation: str = "LR"  # LR, TB, RL, BT
    concurrency: int = 16
    max_active_runs_per_dag: int = 16
    dagrun_timeout: Optional[timedelta] = None
    sla_miss_callback: Optional[Callable] = None
    default_view: str = "tree"
    access_control: Dict[str, Any] = field(default_factory=dict)
    doc_md: Optional[str] = None
    
@dataclass
class Connection:
    """连接配置"""
    conn_id: str
    conn_type: str
    description: str = ""
    host: Optional[str] = None
    login: Optional[str] = None
    password: Optional[str] = None
    schema: Optional[str] = None
    port: Optional[int] = None
    extra: Dict[str, Any] = field(default_factory=dict)
    uri: Optional[str] = None
    is_encrypted: bool = False
    is_extra_encrypted: bool = False
    
@dataclass
class Variable:
    """变量"""
    key: str
    val: str
    description: str = ""
    is_encrypted: bool = False
    
@dataclass
class Pool:
    """资源池"""
    pool: str
    slots: int
    description: str = """
    include_deferred: bool = True
    
class AirflowScheduler:
    """
    Airflow调度器
    """
    
    def __init__(self, scheduler_id: str = "airflow-scheduler-1"):
        self.scheduler_id = scheduler_id
        self.dags = {}  # dag_id -> Dag
        self.dag_runs = {}  # run_id -> DagRun
        self.task_instances = {}  # (dag_id, task_id, execution_date) -> TaskInstance
        self.connections = {}  # conn_id -> Connection
        self.variables = {}  # key -> Variable
        self.pools = {}  # pool_name -> Pool
        
        # 调度器状态
        self.is_running = False
        self.executor_type = ExecutorType.LOCAL
        self.max_threads = 4
        self.heartbeat_interval = 5  # 秒
        self.dag_dir_list_interval = 300  # 秒
        self.child_process_timeout = 60  # 秒
        self.zombie_task_threshold = 300  # 秒
        
        # 线程锁
        self.dag_lock = threading.Lock()
        self.task_lock = threading.Lock()
        self.run_lock = threading.Lock()
        
        # 调度器线程
        self.scheduler_thread = None
        self.heartbeat_thread = None
        
        # 统计信息
        self.stats = {
            'total_dags': 0,
            'active_dags': 0,
            'paused_dags': 0,
            'total_dag_runs': 0,
            'running_dag_runs': 0,
            'total_task_instances': 0,
            'running_task_instances': 0,
            'successful_task_instances': 0,
            'failed_task_instances': 0
        }
        
        # 初始化默认数据
        self._initialize_default_data()
    
    def _initialize_default_data(self):
        """初始化默认数据"""
        # 创建默认资源池
        default_pool = Pool(
            pool="default_pool",
            slots=128,
            description="Default pool"
        )
        self.pools["default_pool"] = default_pool
        
        # 创建默认连接
        local_mysql = Connection(
            conn_id="mysql_default",
            conn_type="mysql",
            description="Default MySQL connection",
            host="localhost",
            login="airflow",
            password="airflow",
            schema="airflow",
            port=3306
        )
        self.connections["mysql_default"] = local_mysql
        
        # 创建示例DAG
        self._create_example_dags()
    
    def _create_example_dags(self):
        """创建示例DAG"""
        # 数据处理DAG
        data_processing_dag = Dag(
            dag_id="data_processing_pipeline",
            description="Daily data processing pipeline",
            schedule_interval=ScheduleInterval.DAILY.value,
            start_date=datetime(2024, 1, 1),
            catchup=False,
            max_active_runs=1,
            tags=["data", "etl", "daily"]
        )
        
        # 添加任务
        extract_task = Task(
            task_id="extract_data",
            dag_id="data_processing_pipeline",
            operator_class="BashOperator",
            operator_kwargs={
                'bash_command': 'echo "Extracting data from source systems..."'
            },
            retries=2,
            retry_delay=timedelta(minutes=5)
        )
        
        transform_task = Task(
            task_id="transform_data",
            dag_id="data_processing_pipeline",
            operator_class="PythonOperator",
            operator_kwargs={
                'python_callable': 'transform_data_function'
            },
            upstream_task_ids=["extract_data"],
            retries=1,
            retry_delay=timedelta(minutes=3)
        )
        
        load_task = Task(
            task_id="load_data",
            dag_id="data_processing_pipeline",
            operator_class="SqlOperator",
            operator_kwargs={
                'sql': 'INSERT INTO target_table SELECT * FROM staging_table',
                'conn_id': 'mysql_default'
            },
            upstream_task_ids=["transform_data"],
            retries=3,
            retry_delay=timedelta(minutes=2)
        )
        
        validate_task = Task(
            task_id="validate_data",
            dag_id="data_processing_pipeline",
            operator_class="PythonOperator",
            operator_kwargs={
                'python_callable': 'validate_data_quality'
            },
            upstream_task_ids=["load_data"]
        )
        
        # 设置任务依赖关系
        transform_task.upstream_task_ids = ["extract_data"]
        load_task.upstream_task_ids = ["transform_data"]
        validate_task.upstream_task_ids = ["load_data"]
        
        extract_task.downstream_task_ids = ["transform_data"]
        transform_task.downstream_task_ids = ["load_data"]
        load_task.downstream_task_ids = ["validate_data"]
        
        data_processing_dag.tasks = {
            "extract_data": extract_task,
            "transform_data": transform_task,
            "load_data": load_task,
            "validate_data": validate_task
        }
        
        self.dags["data_processing_pipeline"] = data_processing_dag
        
        # 机器学习训练DAG
        ml_training_dag = Dag(
            dag_id="ml_model_training",
            description="Machine learning model training pipeline",
            schedule_interval=ScheduleInterval.WEEKLY.value,
            start_date=datetime(2024, 1, 1),
            catchup=False,
            max_active_runs=1,
            tags=["ml", "training", "weekly"]
        )
        
        # 添加ML任务
        prepare_data_task = Task(
            task_id="prepare_training_data",
            dag_id="ml_model_training",
            operator_class="PythonOperator",
            operator_kwargs={
                'python_callable': 'prepare_training_data'
            }
        )
        
        train_model_task = Task(
            task_id="train_model",
            dag_id="ml_model_training",
            operator_class="PythonOperator",
            operator_kwargs={
                'python_callable': 'train_ml_model'
            },
            upstream_task_ids=["prepare_training_data"],
            execution_timeout=timedelta(hours=2)
        )
        
        evaluate_model_task = Task(
            task_id="evaluate_model",
            dag_id="ml_model_training",
            operator_class="PythonOperator",
            operator_kwargs={
                'python_callable': 'evaluate_model_performance'
            },
            upstream_task_ids=["train_model"]
        )
        
        deploy_model_task = Task(
            task_id="deploy_model",
            dag_id="ml_model_training",
            operator_class="BashOperator",
            operator_kwargs={
                'bash_command': 'echo "Deploying model to production..."'
            },
            upstream_task_ids=["evaluate_model"],
            trigger_rule=TriggerRule.ALL_SUCCESS
        )
        
        # 设置任务依赖关系
        train_model_task.upstream_task_ids = ["prepare_training_data"]
        evaluate_model_task.upstream_task_ids = ["train_model"]
        deploy_model_task.upstream_task_ids = ["evaluate_model"]
        
        prepare_data_task.downstream_task_ids = ["train_model"]
        train_model_task.downstream_task_ids = ["evaluate_model"]
        evaluate_model_task.downstream_task_ids = ["deploy_model"]
        
        ml_training_dag.tasks = {
            "prepare_training_data": prepare_data_task,
            "train_model": train_model_task,
            "evaluate_model": evaluate_model_task,
            "deploy_model": deploy_model_task
        }
        
        self.dags["ml_model_training"] = ml_training_dag
    
    def add_dag(self, dag: Dag) -> Dict[str, Any]:
        """
        添加DAG
        
        Args:
            dag: DAG对象
            
        Returns:
            Dict[str, Any]: 添加结果
        """
        if dag.dag_id in self.dags:
            return {'status': 'error', 'message': f'DAG {dag.dag_id} already exists'}
        
        with self.dag_lock:
            self.dags[dag.dag_id] = dag
            self.stats['total_dags'] += 1
            if not dag.is_paused:
                self.stats['active_dags'] += 1
            else:
                self.stats['paused_dags'] += 1
        
        return {
            'status': 'success',
            'dag': {
                'dag_id': dag.dag_id,
                'description': dag.description,
                'schedule_interval': dag.schedule_interval,
                'is_paused': dag.is_paused,
                'task_count': len(dag.tasks)
            }
        }
    
    def get_dag(self, dag_id: str) -> Dict[str, Any]:
        """
        获取DAG信息
        
        Args:
            dag_id: DAG ID
            
        Returns:
            Dict[str, Any]: DAG信息
        """
        if dag_id not in self.dags:
            return {'status': 'error', 'message': f'DAG {dag_id} not found'}
        
        dag = self.dags[dag_id]
        
        # 获取任务信息
        tasks_info = []
        for task_id, task in dag.tasks.items():
            tasks_info.append({
                'task_id': task.task_id,
                'operator_class': task.operator_class,
                'upstream_task_ids': task.upstream_task_ids,
                'downstream_task_ids': task.downstream_task_ids,
                'trigger_rule': task.trigger_rule.value,
                'retries': task.retries,
                'pool': task.pool,
                'queue': task.queue
            })
        
        return {
            'status': 'success',
            'dag': {
                'dag_id': dag.dag_id,
                'description': dag.description,
                'schedule_interval': dag.schedule_interval,
                'start_date': dag.start_date.isoformat() if dag.start_date else None,
                'end_date': dag.end_date.isoformat() if dag.end_date else None,
                'catchup': dag.catchup,
                'max_active_runs': dag.max_active_runs,
                'max_active_tasks': dag.max_active_tasks,
                'is_paused': dag.is_paused,
                'tags': dag.tags,
                'tasks': tasks_info,
                'task_count': len(dag.tasks)
            }
        }
    
    def trigger_dag(self, dag_id: str, run_id: Optional[str] = None,
                   conf: Optional[Dict[str, Any]] = None,
                   execution_date: Optional[datetime] = None) -> Dict[str, Any]:
        """
        触发DAG运行
        
        Args:
            dag_id: DAG ID
            run_id: 运行ID
            conf: 配置参数
            execution_date: 执行日期
            
        Returns:
            Dict[str, Any]: 触发结果
        """
        if dag_id not in self.dags:
            return {'status': 'error', 'message': f'DAG {dag_id} not found'}
        
        dag = self.dags[dag_id]
        
        if dag.is_paused:
            return {'status': 'error', 'message': f'DAG {dag_id} is paused'}
        
        if execution_date is None:
            execution_date = datetime.now()
        
        if run_id is None:
            run_id = f"manual__{execution_date.strftime('%Y-%m-%dT%H:%M:%S')}"
        
        # 检查是否已存在相同的运行
        if run_id in self.dag_runs:
            return {'status': 'error', 'message': f'DagRun {run_id} already exists'}
        
        # 创建DAG运行实例
        dag_run = DagRun(
            dag_id=dag_id,
            run_id=run_id,
            execution_date=execution_date,
            start_date=datetime.now(),
            state=DagState.RUNNING,
            run_type="manual",
            external_trigger=True,
            conf=conf or {},
            data_interval_start=execution_date,
            data_interval_end=execution_date + timedelta(days=1)
        )
        
        with self.run_lock:
            self.dag_runs[run_id] = dag_run
            self.stats['total_dag_runs'] += 1
            self.stats['running_dag_runs'] += 1
        
        # 创建任务实例
        self._create_task_instances(dag, dag_run)
        
        return {
            'status': 'success',
            'dag_run': {
                'dag_id': dag_id,
                'run_id': run_id,
                'execution_date': execution_date.isoformat(),
                'state': dag_run.state.value,
                'run_type': dag_run.run_type
            }
        }
    
    def _create_task_instances(self, dag: Dag, dag_run: DagRun):
        """
        为DAG运行创建任务实例
        
        Args:
            dag: DAG对象
            dag_run: DAG运行实例
        """
        with self.task_lock:
            for task_id, task in dag.tasks.items():
                task_instance = TaskInstance(
                    task_id=task_id,
                    dag_id=dag.dag_id,
                    execution_date=dag_run.execution_date,
                    state=TaskState.SCHEDULED,
                    max_tries=task.retries + 1,
                    pool=task.pool,
                    queue=task.queue,
                    priority_weight=task.priority_weight,
                    operator=task.operator_class
                )
                
                key = (dag.dag_id, task_id, dag_run.execution_date)
                self.task_instances[key] = task_instance
                self.stats['total_task_instances'] += 1
    
    def get_dag_runs(self, dag_id: Optional[str] = None,
                    state: Optional[DagState] = None,
                    limit: int = 100) -> Dict[str, Any]:
        """
        获取DAG运行列表
        
        Args:
            dag_id: DAG ID过滤
            state: 状态过滤
            limit: 限制数量
            
        Returns:
            Dict[str, Any]: DAG运行列表
        """
        dag_runs_info = []
        count = 0
        
        for run_id, dag_run in self.dag_runs.items():
            if count >= limit:
                break
                
            # 应用过滤条件
            if dag_id is not None and dag_run.dag_id != dag_id:
                continue
            if state is not None and dag_run.state != state:
                continue
            
            dag_runs_info.append({
                'dag_id': dag_run.dag_id,
                'run_id': dag_run.run_id,
                'execution_date': dag_run.execution_date.isoformat(),
                'start_date': dag_run.start_date.isoformat() if dag_run.start_date else None,
                'end_date': dag_run.end_date.isoformat() if dag_run.end_date else None,
                'state': dag_run.state.value,
                'run_type': dag_run.run_type,
                'external_trigger': dag_run.external_trigger,
                'conf': dag_run.conf
            })
            count += 1
        
        return {
            'status': 'success',
            'dag_runs': dag_runs_info,
            'total': len(dag_runs_info)
        }
    
    def get_task_instances(self, dag_id: str, run_id: str) -> Dict[str, Any]:
        """
        获取任务实例列表
        
        Args:
            dag_id: DAG ID
            run_id: 运行ID
            
        Returns:
            Dict[str, Any]: 任务实例列表
        """
        if run_id not in self.dag_runs:
            return {'status': 'error', 'message': f'DagRun {run_id} not found'}
        
        dag_run = self.dag_runs[run_id]
        task_instances_info = []
        
        for key, task_instance in self.task_instances.items():
            ti_dag_id, ti_task_id, ti_execution_date = key
            
            if (ti_dag_id == dag_id and 
                ti_execution_date == dag_run.execution_date):
                
                task_instances_info.append({
                    'task_id': task_instance.task_id,
                    'dag_id': task_instance.dag_id,
                    'execution_date': task_instance.execution_date.isoformat(),
                    'state': task_instance.state.value,
                    'start_date': task_instance.start_date.isoformat() if task_instance.start_date else None,
                    'end_date': task_instance.end_date.isoformat() if task_instance.end_date else None,
                    'duration': task_instance.duration,
                    'try_number': task_instance.try_number,
                    'max_tries': task_instance.max_tries,
                    'operator': task_instance.operator,
                    'pool': task_instance.pool,
                    'queue': task_instance.queue,
                    'priority_weight': task_instance.priority_weight
                })
        
        return {
            'status': 'success',
            'task_instances': task_instances_info,
            'total': len(task_instances_info)
        }
    
    def run_task_instance(self, dag_id: str, task_id: str,
                         execution_date: datetime) -> Dict[str, Any]:
        """
        运行任务实例
        
        Args:
            dag_id: DAG ID
            task_id: 任务ID
            execution_date: 执行日期
            
        Returns:
            Dict[str, Any]: 运行结果
        """
        key = (dag_id, task_id, execution_date)
        
        if key not in self.task_instances:
            return {'status': 'error', 'message': f'TaskInstance {key} not found'}
        
        task_instance = self.task_instances[key]
        
        if task_instance.state not in [TaskState.SCHEDULED, TaskState.QUEUED, TaskState.UP_FOR_RETRY]:
            return {'status': 'error', 'message': f'TaskInstance {key} is not in a runnable state'}
        
        # 模拟任务执行
        with self.task_lock:
            task_instance.state = TaskState.RUNNING
            task_instance.start_date = datetime.now()
            task_instance.hostname = "airflow-worker-1"
            task_instance.pid = random.randint(1000, 9999)
            self.stats['running_task_instances'] += 1
        
        # 模拟任务执行时间
        execution_time = random.uniform(1, 10)  # 1-10秒
        time.sleep(execution_time)
        
        # 模拟任务结果(90%成功率)
        success = random.random() > 0.1
        
        with self.task_lock:
            task_instance.end_date = datetime.now()
            task_instance.duration = execution_time
            
            if success:
                task_instance.state = TaskState.SUCCESS
                self.stats['successful_task_instances'] += 1
            else:
                if task_instance.try_number < task_instance.max_tries:
                    task_instance.state = TaskState.UP_FOR_RETRY
                    task_instance.try_number += 1
                else:
                    task_instance.state = TaskState.FAILED
                    self.stats['failed_task_instances'] += 1
            
            self.stats['running_task_instances'] -= 1
        
        return {
            'status': 'success',
            'task_instance': {
                'task_id': task_instance.task_id,
                'dag_id': task_instance.dag_id,
                'execution_date': task_instance.execution_date.isoformat(),
                'state': task_instance.state.value,
                'duration': task_instance.duration,
                'try_number': task_instance.try_number
            }
        }
    
    def pause_dag(self, dag_id: str) -> Dict[str, Any]:
        """
        暂停DAG
        
        Args:
            dag_id: DAG ID
            
        Returns:
            Dict[str, Any]: 暂停结果
        """
        if dag_id not in self.dags:
            return {'status': 'error', 'message': f'DAG {dag_id} not found'}
        
        dag = self.dags[dag_id]
        
        if dag.is_paused:
            return {'status': 'error', 'message': f'DAG {dag_id} is already paused'}
        
        with self.dag_lock:
            dag.is_paused = True
            self.stats['active_dags'] -= 1
            self.stats['paused_dags'] += 1
        
        return {
            'status': 'success',
            'message': f'DAG {dag_id} paused successfully',
            'dag_id': dag_id,
            'is_paused': True
        }
    
    def unpause_dag(self, dag_id: str) -> Dict[str, Any]:
        """
        取消暂停DAG
        
        Args:
            dag_id: DAG ID
            
        Returns:
            Dict[str, Any]: 取消暂停结果
        """
        if dag_id not in self.dags:
            return {'status': 'error', 'message': f'DAG {dag_id} not found'}
        
        dag = self.dags[dag_id]
        
        if not dag.is_paused:
            return {'status': 'error', 'message': f'DAG {dag_id} is not paused'}
        
        with self.dag_lock:
            dag.is_paused = False
            self.stats['paused_dags'] -= 1
            self.stats['active_dags'] += 1
        
        return {
            'status': 'success',
            'message': f'DAG {dag_id} unpaused successfully',
            'dag_id': dag_id,
            'is_paused': False
        }
    
    def add_connection(self, connection: Connection) -> Dict[str, Any]:
        """
        添加连接
        
        Args:
            connection: 连接对象
            
        Returns:
            Dict[str, Any]: 添加结果
        """
        if connection.conn_id in self.connections:
            return {'status': 'error', 'message': f'Connection {connection.conn_id} already exists'}
        
        self.connections[connection.conn_id] = connection
        
        return {
            'status': 'success',
            'connection': {
                'conn_id': connection.conn_id,
                'conn_type': connection.conn_type,
                'host': connection.host,
                'port': connection.port,
                'schema': connection.schema
            }
        }
    
    def get_connections(self) -> Dict[str, Any]:
        """
        获取连接列表
        
        Returns:
            Dict[str, Any]: 连接列表
        """
        connections_info = []
        
        for conn_id, connection in self.connections.items():
            connections_info.append({
                'conn_id': connection.conn_id,
                'conn_type': connection.conn_type,
                'description': connection.description,
                'host': connection.host,
                'login': connection.login,
                'schema': connection.schema,
                'port': connection.port,
                'is_encrypted': connection.is_encrypted
            })
        
        return {
            'status': 'success',
            'connections': connections_info,
            'total': len(connections_info)
        }
    
    def set_variable(self, key: str, value: str, description: str = "") -> Dict[str, Any]:
        """
        设置变量
        
        Args:
            key: 变量键
            value: 变量值
            description: 描述
            
        Returns:
            Dict[str, Any]: 设置结果
        """
        variable = Variable(
            key=key,
            val=value,
            description=description
        )
        
        self.variables[key] = variable
        
        return {
            'status': 'success',
            'variable': {
                'key': key,
                'value': value,
                'description': description
            }
        }
    
    def get_variable(self, key: str) -> Dict[str, Any]:
        """
        获取变量
        
        Args:
            key: 变量键
            
        Returns:
            Dict[str, Any]: 变量信息
        """
        if key not in self.variables:
            return {'status': 'error', 'message': f'Variable {key} not found'}
        
        variable = self.variables[key]
        
        return {
            'status': 'success',
            'variable': {
                'key': variable.key,
                'value': variable.val,
                'description': variable.description,
                'is_encrypted': variable.is_encrypted
            }
        }
    
    def get_scheduler_status(self) -> Dict[str, Any]:
        """
        获取调度器状态
        
        Returns:
            Dict[str, Any]: 调度器状态
        """
        return {
            'scheduler_id': self.scheduler_id,
            'is_running': self.is_running,
            'executor_type': self.executor_type.value,
            'max_threads': self.max_threads,
            'heartbeat_interval': self.heartbeat_interval,
            'stats': self.stats,
            'config': {
                'dag_dir_list_interval': self.dag_dir_list_interval,
                'child_process_timeout': self.child_process_timeout,
                'zombie_task_threshold': self.zombie_task_threshold
            },
            'pools': {
                pool_name: {
                    'slots': pool.slots,
                    'description': pool.description
                } for pool_name, pool in self.pools.items()
            },
            'timestamp': datetime.now().isoformat()
        }

# 使用示例
if __name__ == "__main__":
    # 创建Airflow调度器
    scheduler = AirflowScheduler("prod-scheduler")
    
    print("=== Apache Airflow调度器示例 ===")
    
    # 获取DAG信息
    print("\n=== DAG信息 ===")
    dag_info = scheduler.get_dag("data_processing_pipeline")
    if dag_info['status'] == 'success':
        dag = dag_info['dag']
        print(f"DAG ID: {dag['dag_id']}")
        print(f"描述: {dag['description']}")
        print(f"调度间隔: {dag['schedule_interval']}")
        print(f"任务数量: {dag['task_count']}")
        print(f"标签: {', '.join(dag['tags'])}")
        print("任务列表:")
        for task in dag['tasks']:
            print(f"  - {task['task_id']} ({task['operator_class']})")
            if task['upstream_task_ids']:
                print(f"    上游任务: {', '.join(task['upstream_task_ids'])}")
    
    # 触发DAG运行
    print("\n=== 触发DAG运行 ===")
    trigger_result = scheduler.trigger_dag(
        dag_id="data_processing_pipeline",
        conf={'env': 'production', 'batch_size': 1000}
    )
    print(f"触发结果: {trigger_result['status']}")
    if trigger_result['status'] == 'success':
        dag_run = trigger_result['dag_run']
        print(f"运行ID: {dag_run['run_id']}")
        print(f"执行日期: {dag_run['execution_date']}")
        print(f"状态: {dag_run['state']}")
    
    # 获取任务实例
    print("\n=== 任务实例 ===")
    if trigger_result['status'] == 'success':
        run_id = trigger_result['dag_run']['run_id']
        task_instances = scheduler.get_task_instances("data_processing_pipeline", run_id)
        print(f"任务实例总数: {task_instances['total']}")
        for ti in task_instances['task_instances']:
            print(f"  - {ti['task_id']}: {ti['state']} (尝试次数: {ti['try_number']}/{ti['max_tries']})")
    
    # 运行任务实例
    print("\n=== 运行任务实例 ===")
    if trigger_result['status'] == 'success':
        execution_date = datetime.fromisoformat(trigger_result['dag_run']['execution_date'])
        
        # 运行extract_data任务
        run_result = scheduler.run_task_instance(
            dag_id="data_processing_pipeline",
            task_id="extract_data",
            execution_date=execution_date
        )
        print(f"运行extract_data任务: {run_result['status']}")
        if run_result['status'] == 'success':
            ti = run_result['task_instance']
            print(f"  状态: {ti['state']}")
            print(f"  执行时间: {ti['duration']:.2f}秒")
    
    # 添加连接
    print("\n=== 添加连接 ===")
    postgres_conn = Connection(
        conn_id="postgres_default",
        conn_type="postgres",
        description="Default PostgreSQL connection",
        host="localhost",
        login="postgres",
        password="postgres",
        schema="airflow",
        port=5432
    )
    conn_result = scheduler.add_connection(postgres_conn)
    print(f"添加连接结果: {conn_result}")
    
    # 设置变量
    print("\n=== 设置变量 ===")
    var_result = scheduler.set_variable(
        key="data_source_url",
        value="https://api.example.com/data",
        description="Data source API URL"
    )
    print(f"设置变量结果: {var_result}")
    
    # 暂停DAG
    print("\n=== 暂停DAG ===")
    pause_result = scheduler.pause_dag("ml_model_training")
    print(f"暂停DAG结果: {pause_result}")
    
    # 获取DAG运行列表
    print("\n=== DAG运行列表 ===")
    dag_runs = scheduler.get_dag_runs(dag_id="data_processing_pipeline")
    print(f"DAG运行总数: {dag_runs['total']}")
    for run in dag_runs['dag_runs']:
        print(f"  - {run['run_id']}: {run['state']} ({run['run_type']})")
    
    # 获取调度器状态
    print("\n=== 调度器状态 ===")
    status = scheduler.get_scheduler_status()
    print(f"调度器ID: {status['scheduler_id']}")
    print(f"运行状态: {status['is_running']}")
    print(f"执行器类型: {status['executor_type']}")
    print(f"最大线程数: {status['max_threads']}")
    print("统计信息:")
    for key, value in status['stats'].items():
        print(f"  {key}: {value}")

## 10. 生态系统集成与最佳实践

### 10.1 组件集成架构

```python
from enum import Enum
from dataclasses import dataclass
from typing import Dict, List, Optional, Any
from datetime import datetime
import json

class IntegrationType(Enum):
    """集成类型"""
    DATA_PIPELINE = "data_pipeline"
    REAL_TIME_ANALYTICS = "real_time_analytics"
    BATCH_PROCESSING = "batch_processing"
    MACHINE_LEARNING = "machine_learning"
    DATA_WAREHOUSE = "data_warehouse"

class ComponentRole(Enum):
    """组件角色"""
    DATA_SOURCE = "data_source"
    DATA_INGESTION = "data_ingestion"
    DATA_PROCESSING = "data_processing"
    DATA_STORAGE = "data_storage"
    DATA_QUERY = "data_query"
    DATA_VISUALIZATION = "data_visualization"
    WORKFLOW_ORCHESTRATION = "workflow_orchestration"
    MONITORING = "monitoring"

@dataclass
class ComponentIntegration:
    """组件集成配置"""
    component_name: str
    role: ComponentRole
    dependencies: List[str]
    configuration: Dict[str, Any]
    health_check_url: Optional[str] = None
    metrics_endpoint: Optional[str] = None

@dataclass
class DataPipeline:
    """数据管道"""
    pipeline_id: str
    name: str
    integration_type: IntegrationType
    components: List[ComponentIntegration]
    data_flow: List[Dict[str, str]]
    schedule: Optional[str] = None
    sla_minutes: Optional[int] = None
    created_at: datetime = None
    
    def __post_init__(self):
        if self.created_at is None:
            self.created_at = datetime.now()

class HadoopEcosystemIntegrator:
    """Hadoop生态系统集成器"""
    
    def __init__(self):
        self.pipelines: Dict[str, DataPipeline] = {}
        self.component_registry: Dict[str, ComponentIntegration] = {}
        self.integration_templates = self._initialize_templates()
    
    def _initialize_templates(self) -> Dict[str, Dict[str, Any]]:
        """初始化集成模板"""
        return {
            "real_time_analytics": {
                "components": [
                    "kafka", "storm", "hbase", "elasticsearch", "grafana"
                ],
                "data_flow": [
                    {"from": "kafka", "to": "storm", "type": "stream"},
                    {"from": "storm", "to": "hbase", "type": "write"},
                    {"from": "hbase", "to": "elasticsearch", "type": "index"},
                    {"from": "elasticsearch", "to": "grafana", "type": "query"}
                ]
            },
            "batch_processing": {
                "components": [
                    "hdfs", "yarn", "mapreduce", "hive", "oozie"
                ],
                "data_flow": [
                    {"from": "hdfs", "to": "mapreduce", "type": "read"},
                    {"from": "mapreduce", "to": "hive", "type": "transform"},
                    {"from": "hive", "to": "hdfs", "type": "write"}
                ]
            },
            "data_warehouse": {
                "components": [
                    "sqoop", "hdfs", "hive", "impala", "tableau"
                ],
                "data_flow": [
                    {"from": "sqoop", "to": "hdfs", "type": "import"},
                    {"from": "hdfs", "to": "hive", "type": "load"},
                    {"from": "hive", "to": "impala", "type": "query"},
                    {"from": "impala", "to": "tableau", "type": "visualize"}
                ]
            }
        }
    
    def register_component(self, component: ComponentIntegration) -> Dict[str, Any]:
        """注册组件"""
        try:
            # 验证依赖关系
            for dep in component.dependencies:
                if dep not in self.component_registry:
                    return {
                        'status': 'error',
                        'message': f'Dependency {dep} not found'
                    }
            
            self.component_registry[component.component_name] = component
            
            return {
                'status': 'success',
                'component_name': component.component_name,
                'role': component.role.value,
                'dependencies': component.dependencies
            }
        
        except Exception as e:
            return {
                'status': 'error',
                'message': f'Failed to register component: {str(e)}'
            }
    
    def create_pipeline(self, pipeline_config: Dict[str, Any]) -> Dict[str, Any]:
        """创建数据管道"""
        try:
            pipeline_id = pipeline_config['pipeline_id']
            integration_type = IntegrationType(pipeline_config['integration_type'])
            
            # 获取模板
            template = self.integration_templates.get(integration_type.value)
            if not template:
                return {
                    'status': 'error',
                    'message': f'No template found for {integration_type.value}'
                }
            
            # 构建组件列表
            components = []
            for comp_name in template['components']:
                if comp_name in self.component_registry:
                    components.append(self.component_registry[comp_name])
                else:
                    # 创建默认组件配置
                    default_component = ComponentIntegration(
                        component_name=comp_name,
                        role=self._get_default_role(comp_name),
                        dependencies=[],
                        configuration={}
                    )
                    components.append(default_component)
            
            # 创建管道
            pipeline = DataPipeline(
                pipeline_id=pipeline_id,
                name=pipeline_config.get('name', f'Pipeline-{pipeline_id}'),
                integration_type=integration_type,
                components=components,
                data_flow=template['data_flow'],
                schedule=pipeline_config.get('schedule'),
                sla_minutes=pipeline_config.get('sla_minutes')
            )
            
            self.pipelines[pipeline_id] = pipeline
            
            return {
                'status': 'success',
                'pipeline_id': pipeline_id,
                'integration_type': integration_type.value,
                'components_count': len(components),
                'data_flow_steps': len(template['data_flow'])
            }
        
        except Exception as e:
            return {
                'status': 'error',
                'message': f'Failed to create pipeline: {str(e)}'
            }
    
    def _get_default_role(self, component_name: str) -> ComponentRole:
        """获取组件默认角色"""
        role_mapping = {
            'kafka': ComponentRole.DATA_INGESTION,
            'storm': ComponentRole.DATA_PROCESSING,
            'spark': ComponentRole.DATA_PROCESSING,
            'hdfs': ComponentRole.DATA_STORAGE,
            'hbase': ComponentRole.DATA_STORAGE,
            'hive': ComponentRole.DATA_QUERY,
            'impala': ComponentRole.DATA_QUERY,
            'elasticsearch': ComponentRole.DATA_STORAGE,
            'grafana': ComponentRole.DATA_VISUALIZATION,
            'sqoop': ComponentRole.DATA_INGESTION,
            'flume': ComponentRole.DATA_INGESTION,
            'oozie': ComponentRole.WORKFLOW_ORCHESTRATION,
            'airflow': ComponentRole.WORKFLOW_ORCHESTRATION,
            'ranger': ComponentRole.MONITORING,
            'knox': ComponentRole.MONITORING
        }
        return role_mapping.get(component_name, ComponentRole.DATA_PROCESSING)
    
    def validate_pipeline(self, pipeline_id: str) -> Dict[str, Any]:
        """验证管道配置"""
        if pipeline_id not in self.pipelines:
            return {
                'status': 'error',
                'message': f'Pipeline {pipeline_id} not found'
            }
        
        pipeline = self.pipelines[pipeline_id]
        validation_results = []
        
        # 验证组件依赖
        for component in pipeline.components:
            for dep in component.dependencies:
                dep_exists = any(c.component_name == dep for c in pipeline.components)
                if not dep_exists:
                    validation_results.append({
                        'type': 'dependency_missing',
                        'component': component.component_name,
                        'missing_dependency': dep
                    })
        
        # 验证数据流
        component_names = {c.component_name for c in pipeline.components}
        for flow in pipeline.data_flow:
            if flow['from'] not in component_names:
                validation_results.append({
                    'type': 'flow_source_missing',
                    'flow': flow,
                    'missing_component': flow['from']
                })
            if flow['to'] not in component_names:
                validation_results.append({
                    'type': 'flow_target_missing',
                    'flow': flow,
                    'missing_component': flow['to']
                })
        
        return {
            'status': 'success' if not validation_results else 'warning',
            'pipeline_id': pipeline_id,
            'validation_results': validation_results,
            'is_valid': len(validation_results) == 0
        }
    
    def get_pipeline_status(self, pipeline_id: str) -> Dict[str, Any]:
        """获取管道状态"""
        if pipeline_id not in self.pipelines:
            return {
                'status': 'error',
                'message': f'Pipeline {pipeline_id} not found'
            }
        
        pipeline = self.pipelines[pipeline_id]
        
        # 模拟组件健康检查
        component_health = []
        for component in pipeline.components:
            health_status = 'healthy' if component.component_name in ['hdfs', 'yarn', 'hive'] else 'unknown'
            component_health.append({
                'component': component.component_name,
                'role': component.role.value,
                'status': health_status,
                'last_check': datetime.now().isoformat()
            })
        
        return {
            'status': 'success',
            'pipeline_id': pipeline_id,
            'name': pipeline.name,
            'integration_type': pipeline.integration_type.value,
            'created_at': pipeline.created_at.isoformat(),
            'components_count': len(pipeline.components),
            'component_health': component_health,
            'data_flow_steps': len(pipeline.data_flow)
        }
    
    def list_pipelines(self) -> Dict[str, Any]:
        """列出所有管道"""
        pipelines_info = []
        for pipeline_id, pipeline in self.pipelines.items():
            pipelines_info.append({
                'pipeline_id': pipeline_id,
                'name': pipeline.name,
                'integration_type': pipeline.integration_type.value,
                'components_count': len(pipeline.components),
                'created_at': pipeline.created_at.isoformat()
            })
        
        return {
            'status': 'success',
            'total_pipelines': len(pipelines_info),
            'pipelines': pipelines_info
        }
    
    def get_integration_recommendations(self, requirements: Dict[str, Any]) -> Dict[str, Any]:
        """获取集成建议"""
        data_volume = requirements.get('data_volume', 'medium')
        latency_requirement = requirements.get('latency', 'batch')
        data_types = requirements.get('data_types', ['structured'])
        
        recommendations = []
        
        # 基于需求推荐架构
        if latency_requirement == 'real_time':
            if 'streaming' in data_types:
                recommendations.append({
                    'architecture': 'Lambda Architecture',
                    'components': ['kafka', 'storm', 'hbase', 'hdfs', 'spark'],
                    'description': '实时流处理架构,支持低延迟数据处理'
                })
            
            recommendations.append({
                'architecture': 'Kappa Architecture',
                'components': ['kafka', 'spark_streaming', 'elasticsearch'],
                'description': '简化的流处理架构,统一批处理和流处理'
            })
        
        elif latency_requirement == 'batch':
            recommendations.append({
                'architecture': 'Traditional Batch Processing',
                'components': ['hdfs', 'yarn', 'mapreduce', 'hive', 'oozie'],
                'description': '传统批处理架构,适合大数据量离线分析'
            })
        
        # 基于数据量推荐
        if data_volume == 'large':
            recommendations.append({
                'architecture': 'Distributed Data Warehouse',
                'components': ['hdfs', 'hive', 'impala', 'kudu', 'spark'],
                'description': '分布式数据仓库,支持大规模数据存储和查询'
            })
        
        return {
            'status': 'success',
            'requirements': requirements,
            'recommendations': recommendations,
            'total_recommendations': len(recommendations)
        }

### 10.2 性能优化最佳实践

```python
class PerformanceOptimizer:
    """性能优化器"""
    
    def __init__(self):
        self.optimization_rules = self._initialize_rules()
    
    def _initialize_rules(self) -> Dict[str, Dict[str, Any]]:
        """初始化优化规则"""
        return {
            'hdfs': {
                'block_size': {
                    'small_files': '64MB',
                    'large_files': '256MB',
                    'very_large_files': '512MB'
                },
                'replication_factor': {
                    'development': 1,
                    'testing': 2,
                    'production': 3
                },
                'compression': {
                    'text_files': 'gzip',
                    'sequence_files': 'snappy',
                    'parquet_files': 'snappy'
                }
            },
            'yarn': {
                'memory_allocation': {
                    'container_min_mb': 1024,
                    'container_max_mb': 8192,
                    'am_memory_mb': 1024
                },
                'cpu_allocation': {
                    'container_min_vcores': 1,
                    'container_max_vcores': 4
                }
            },
            'mapreduce': {
                'map_tasks': {
                    'memory_mb': 2048,
                    'java_opts': '-Xmx1638m',
                    'io_sort_mb': 512
                },
                'reduce_tasks': {
                    'memory_mb': 4096,
                    'java_opts': '-Xmx3276m',
                    'parallel_copies': 10
                }
            },
            'hive': {
                'execution_engine': 'tez',
                'vectorization': True,
                'cost_based_optimizer': True,
                'compression': {
                    'intermediate': 'snappy',
                    'final': 'gzip'
                }
            },
            'spark': {
                'executor_memory': '4g',
                'executor_cores': 2,
                'driver_memory': '2g',
                'serializer': 'org.apache.spark.serializer.KryoSerializer',
                'sql_adaptive_enabled': True
            }
        }
    
    def analyze_performance(self, component: str, metrics: Dict[str, Any]) -> Dict[str, Any]:
        """分析性能指标"""
        if component not in self.optimization_rules:
            return {
                'status': 'error',
                'message': f'No optimization rules for component: {component}'
            }
        
        recommendations = []
        
        # 基于组件类型分析
        if component == 'hdfs':
            recommendations.extend(self._analyze_hdfs_performance(metrics))
        elif component == 'yarn':
            recommendations.extend(self._analyze_yarn_performance(metrics))
        elif component == 'mapreduce':
            recommendations.extend(self._analyze_mapreduce_performance(metrics))
        elif component == 'hive':
            recommendations.extend(self._analyze_hive_performance(metrics))
        elif component == 'spark':
            recommendations.extend(self._analyze_spark_performance(metrics))
        
        return {
            'status': 'success',
            'component': component,
            'recommendations': recommendations,
            'total_recommendations': len(recommendations)
        }
    
    def _analyze_hdfs_performance(self, metrics: Dict[str, Any]) -> List[Dict[str, Any]]:
        """分析HDFS性能"""
        recommendations = []
        
        # 检查小文件问题
        if metrics.get('avg_file_size_mb', 0) < 64:
            recommendations.append({
                'type': 'small_files',
                'priority': 'high',
                'description': '存在大量小文件,建议合并文件或增加块大小',
                'action': 'Merge small files or increase block size to 128MB'
            })
        
        # 检查磁盘使用率
        if metrics.get('disk_usage_percent', 0) > 80:
            recommendations.append({
                'type': 'disk_usage',
                'priority': 'high',
                'description': '磁盘使用率过高,建议清理或扩容',
                'action': 'Clean up old data or add more storage capacity'
            })
        
        # 检查网络带宽
        if metrics.get('network_utilization_percent', 0) > 70:
            recommendations.append({
                'type': 'network',
                'priority': 'medium',
                'description': '网络带宽使用率高,建议优化数据本地性',
                'action': 'Improve data locality or upgrade network infrastructure'
            })
        
        return recommendations
    
    def _analyze_yarn_performance(self, metrics: Dict[str, Any]) -> List[Dict[str, Any]]:
        """分析YARN性能"""
        recommendations = []
        
        # 检查资源利用率
        memory_utilization = metrics.get('memory_utilization_percent', 0)
        if memory_utilization > 90:
            recommendations.append({
                'type': 'memory_pressure',
                'priority': 'high',
                'description': '内存使用率过高,建议调整容器大小或增加节点',
                'action': 'Adjust container sizes or add more NodeManagers'
            })
        elif memory_utilization < 30:
            recommendations.append({
                'type': 'memory_underutilization',
                'priority': 'medium',
                'description': '内存利用率低,建议增加并发任务或调整资源配置',
                'action': 'Increase parallelism or adjust resource allocation'
            })
        
        # 检查队列配置
        if metrics.get('pending_applications', 0) > 10:
            recommendations.append({
                'type': 'queue_congestion',
                'priority': 'medium',
                'description': '队列拥堵,建议调整队列配置或增加资源',
                'action': 'Adjust queue configuration or add more resources'
            })
        
        return recommendations
    
    def _analyze_mapreduce_performance(self, metrics: Dict[str, Any]) -> List[Dict[str, Any]]:
        """分析MapReduce性能"""
        recommendations = []
        
        # 检查任务执行时间
        avg_map_time = metrics.get('avg_map_time_seconds', 0)
        if avg_map_time > 300:  # 5分钟
            recommendations.append({
                'type': 'slow_map_tasks',
                'priority': 'high',
                'description': 'Map任务执行时间过长,建议优化输入分片或增加内存',
                'action': 'Optimize input splits or increase map memory'
            })
        
        # 检查数据倾斜
        task_time_variance = metrics.get('task_time_variance', 0)
        if task_time_variance > 0.5:
            recommendations.append({
                'type': 'data_skew',
                'priority': 'high',
                'description': '存在数据倾斜,建议重新分区或使用自定义分区器',
                'action': 'Repartition data or use custom partitioner'
            })
        
        return recommendations
    
    def _analyze_hive_performance(self, metrics: Dict[str, Any]) -> List[Dict[str, Any]]:
        """分析Hive性能"""
        recommendations = []
        
        # 检查查询执行时间
        avg_query_time = metrics.get('avg_query_time_seconds', 0)
        if avg_query_time > 600:  # 10分钟
            recommendations.append({
                'type': 'slow_queries',
                'priority': 'high',
                'description': '查询执行时间过长,建议优化SQL或创建索引',
                'action': 'Optimize SQL queries or create appropriate indexes'
            })
        
        # 检查表格式
        if metrics.get('orc_tables_percent', 0) < 50:
            recommendations.append({
                'type': 'table_format',
                'priority': 'medium',
                'description': '建议使用ORC格式提高查询性能',
                'action': 'Convert tables to ORC format for better performance'
            })
        
        return recommendations
    
    def _analyze_spark_performance(self, metrics: Dict[str, Any]) -> List[Dict[str, Any]]:
        """分析Spark性能"""
        recommendations = []
        
        # 检查GC时间
        gc_time_percent = metrics.get('gc_time_percent', 0)
        if gc_time_percent > 10:
            recommendations.append({
                'type': 'gc_pressure',
                'priority': 'high',
                'description': 'GC时间过长,建议调整内存配置或使用G1GC',
                'action': 'Adjust memory settings or use G1 garbage collector'
            })
        
        # 检查数据序列化
        if not metrics.get('kryo_serializer_enabled', False):
            recommendations.append({
                'type': 'serialization',
                'priority': 'medium',
                'description': '建议启用Kryo序列化器提高性能',
                'action': 'Enable Kryo serializer for better performance'
            })
        
        return recommendations

### 10.3 监控和运维

```python
class EcosystemMonitor:
    """生态系统监控器"""
    
    def __init__(self):
        self.alerts = []
        self.metrics_history = []
        self.thresholds = self._initialize_thresholds()
    
    def _initialize_thresholds(self) -> Dict[str, Dict[str, float]]:
        """初始化告警阈值"""
        return {
            'hdfs': {
                'disk_usage_percent': 85.0,
                'namenode_heap_usage_percent': 80.0,
                'datanode_failed_volumes': 1.0
            },
            'yarn': {
                'memory_utilization_percent': 90.0,
                'nodemanager_unhealthy_percent': 10.0,
                'application_failed_percent': 5.0
            },
            'hive': {
                'metastore_connection_failures': 5.0,
                'query_failure_rate_percent': 10.0,
                'avg_query_time_seconds': 300.0
            }
        }
    
    def collect_metrics(self, component: str) -> Dict[str, Any]:
        """收集组件指标"""
        # 模拟指标收集
        import random
        
        if component == 'hdfs':
            metrics = {
                'disk_usage_percent': random.uniform(60, 95),
                'namenode_heap_usage_percent': random.uniform(40, 85),
                'datanode_count': random.randint(5, 20),
                'total_capacity_tb': random.uniform(100, 1000),
                'used_capacity_tb': random.uniform(50, 800),
                'block_count': random.randint(1000000, 10000000)
            }
        elif component == 'yarn':
            metrics = {
                'memory_utilization_percent': random.uniform(30, 95),
                'cpu_utilization_percent': random.uniform(20, 80),
                'active_nodes': random.randint(5, 50),
                'running_applications': random.randint(10, 100),
                'pending_applications': random.randint(0, 20)
            }
        elif component == 'hive':
            metrics = {
                'active_sessions': random.randint(5, 50),
                'avg_query_time_seconds': random.uniform(30, 600),
                'query_success_rate_percent': random.uniform(85, 99),
                'metastore_connections': random.randint(10, 100)
            }
        else:
            metrics = {'status': 'unknown_component'}
        
        # 添加时间戳
        metrics['timestamp'] = datetime.now().isoformat()
        metrics['component'] = component
        
        # 保存到历史记录
        self.metrics_history.append(metrics)
        
        # 检查告警
        self._check_alerts(component, metrics)
        
        return {
            'status': 'success',
            'component': component,
            'metrics': metrics
        }
    
    def _check_alerts(self, component: str, metrics: Dict[str, Any]):
        """检查告警条件"""
        if component not in self.thresholds:
            return
        
        component_thresholds = self.thresholds[component]
        
        for metric_name, threshold in component_thresholds.items():
            if metric_name in metrics:
                value = metrics[metric_name]
                if value > threshold:
                    alert = {
                        'alert_id': f"{component}_{metric_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
                        'component': component,
                        'metric': metric_name,
                        'value': value,
                        'threshold': threshold,
                        'severity': self._get_severity(metric_name, value, threshold),
                        'timestamp': datetime.now().isoformat(),
                        'message': f"{component} {metric_name} is {value}, exceeds threshold {threshold}"
                    }
                    self.alerts.append(alert)
    
    def _get_severity(self, metric_name: str, value: float, threshold: float) -> str:
        """获取告警严重程度"""
        ratio = value / threshold
        if ratio > 1.2:
            return 'critical'
        elif ratio > 1.1:
            return 'warning'
        else:
            return 'info'
    
    def get_alerts(self, severity: Optional[str] = None, limit: int = 50) -> Dict[str, Any]:
        """获取告警列表"""
        filtered_alerts = self.alerts
        
        if severity:
            filtered_alerts = [a for a in self.alerts if a['severity'] == severity]
        
        # 按时间倒序排列
        filtered_alerts.sort(key=lambda x: x['timestamp'], reverse=True)
        
        return {
            'status': 'success',
            'total_alerts': len(filtered_alerts),
            'alerts': filtered_alerts[:limit]
        }
    
    def get_health_summary(self) -> Dict[str, Any]:
        """获取健康状况摘要"""
        # 统计各组件状态
        component_status = {}
        recent_metrics = {}
        
        # 获取最近的指标
        for metrics in reversed(self.metrics_history[-50:]):
            component = metrics['component']
            if component not in recent_metrics:
                recent_metrics[component] = metrics
        
        # 评估组件健康状态
        for component, metrics in recent_metrics.items():
            health_score = self._calculate_health_score(component, metrics)
            component_status[component] = {
                'health_score': health_score,
                'status': self._get_health_status(health_score),
                'last_update': metrics['timestamp']
            }
        
        # 统计告警
        alert_summary = {
            'critical': len([a for a in self.alerts if a['severity'] == 'critical']),
            'warning': len([a for a in self.alerts if a['severity'] == 'warning']),
            'info': len([a for a in self.alerts if a['severity'] == 'info'])
        }
        
        return {
            'status': 'success',
            'overall_health': self._calculate_overall_health(component_status),
            'component_status': component_status,
            'alert_summary': alert_summary,
            'total_components': len(component_status)
        }
    
    def _calculate_health_score(self, component: str, metrics: Dict[str, Any]) -> float:
        """计算健康分数 (0-100)"""
        if component not in self.thresholds:
            return 100.0
        
        score = 100.0
        thresholds = self.thresholds[component]
        
        for metric_name, threshold in thresholds.items():
            if metric_name in metrics:
                value = metrics[metric_name]
                if value > threshold:
                    # 根据超出程度扣分
                    ratio = value / threshold
                    penalty = min(50, (ratio - 1) * 100)
                    score -= penalty
        
        return max(0.0, score)
    
    def _get_health_status(self, health_score: float) -> str:
        """根据健康分数获取状态"""
        if health_score >= 90:
            return 'excellent'
        elif health_score >= 70:
            return 'good'
        elif health_score >= 50:
            return 'warning'
        else:
            return 'critical'
    
    def _calculate_overall_health(self, component_status: Dict[str, Dict[str, Any]]) -> str:
        """计算整体健康状态"""
        if not component_status:
            return 'unknown'
        
        avg_score = sum(status['health_score'] for status in component_status.values()) / len(component_status)
        return self._get_health_status(avg_score)

# 使用示例
if __name__ == "__main__":
    # 创建生态系统集成器
    integrator = HadoopEcosystemIntegrator()
    
    print("=== Hadoop生态系统集成示例 ===\n")
    
    # 注册组件
    print("=== 注册组件 ===")
    components = [
        ComponentIntegration(
            component_name="kafka",
            role=ComponentRole.DATA_INGESTION,
            dependencies=[],
            configuration={"brokers": 3, "partitions": 12}
        ),
        ComponentIntegration(
            component_name="storm",
            role=ComponentRole.DATA_PROCESSING,
            dependencies=["kafka"],
            configuration={"workers": 4, "executors": 16}
        ),
        ComponentIntegration(
            component_name="hbase",
            role=ComponentRole.DATA_STORAGE,
            dependencies=["hdfs"],
            configuration={"regions": 100, "replication": 3}
        )
    ]
    
    for component in components:
        result = integrator.register_component(component)
        print(f"注册 {component.component_name}: {result['status']}")
    
    # 创建实时分析管道
    print("\n=== 创建实时分析管道 ===")
    pipeline_config = {
        'pipeline_id': 'real-time-analytics-001',
        'name': 'Real-time User Behavior Analytics',
        'integration_type': 'real_time_analytics',
        'schedule': '0 */6 * * *',
        'sla_minutes': 30
    }
    
    pipeline_result = integrator.create_pipeline(pipeline_config)
    print(f"管道创建结果: {pipeline_result}")
    
    if pipeline_result['status'] == 'success':
        pipeline_id = pipeline_result['pipeline_id']
        
        # 验证管道
        print("\n=== 验证管道配置 ===")
        validation_result = integrator.validate_pipeline(pipeline_id)
        print(f"验证结果: {validation_result['is_valid']}")
        if validation_result['validation_results']:
            for issue in validation_result['validation_results']:
                print(f"  问题: {issue}")
        
        # 获取管道状态
        print("\n=== 管道状态 ===")
        status_result = integrator.get_pipeline_status(pipeline_id)
        if status_result['status'] == 'success':
            print(f"管道ID: {status_result['pipeline_id']}")
            print(f"名称: {status_result['name']}")
            print(f"集成类型: {status_result['integration_type']}")
            print(f"组件数量: {status_result['components_count']}")
            print("组件健康状态:")
            for health in status_result['component_health']:
                print(f"  - {health['component']} ({health['role']}): {health['status']}")
    
    # 获取集成建议
    print("\n=== 集成建议 ===")
    requirements = {
        'data_volume': 'large',
        'latency': 'real_time',
        'data_types': ['streaming', 'structured']
    }
    
    recommendations = integrator.get_integration_recommendations(requirements)
    if recommendations['status'] == 'success':
        print(f"基于需求的建议 ({recommendations['total_recommendations']}个):")
        for rec in recommendations['recommendations']:
            print(f"  架构: {rec['architecture']}")
            print(f"  组件: {', '.join(rec['components'])}")
            print(f"  描述: {rec['description']}\n")
    
    # 性能优化分析
    print("=== 性能优化分析 ===")
    optimizer = PerformanceOptimizer()
    
    # 模拟HDFS性能指标
    hdfs_metrics = {
        'avg_file_size_mb': 32,  # 小文件问题
        'disk_usage_percent': 85,  # 磁盘使用率高
        'network_utilization_percent': 45
    }
    
    hdfs_analysis = optimizer.analyze_performance('hdfs', hdfs_metrics)
    if hdfs_analysis['status'] == 'success':
        print(f"HDFS性能分析 ({hdfs_analysis['total_recommendations']}个建议):")
        for rec in hdfs_analysis['recommendations']:
            print(f"  类型: {rec['type']} (优先级: {rec['priority']})")
            print(f"  描述: {rec['description']}")
            print(f"  建议: {rec['action']}\n")
    
    # 监控示例
    print("=== 系统监控 ===")
    monitor = EcosystemMonitor()
    
    # 收集各组件指标
    components_to_monitor = ['hdfs', 'yarn', 'hive']
    for component in components_to_monitor:
        metrics_result = monitor.collect_metrics(component)
        if metrics_result['status'] == 'success':
            metrics = metrics_result['metrics']
            print(f"{component.upper()}指标:")
            for key, value in metrics.items():
                if key not in ['timestamp', 'component']:
                    if isinstance(value, float):
                        print(f"  {key}: {value:.2f}")
                    else:
                        print(f"  {key}: {value}")
            print()
    
    # 获取告警
    print("=== 告警信息 ===")
    alerts_result = monitor.get_alerts(limit=5)
    if alerts_result['status'] == 'success':
        print(f"总告警数: {alerts_result['total_alerts']}")
        for alert in alerts_result['alerts']:
            print(f"  [{alert['severity'].upper()}] {alert['message']}")
    
    # 获取健康摘要
    print("\n=== 健康状况摘要 ===")
    health_summary = monitor.get_health_summary()
    if health_summary['status'] == 'success':
        print(f"整体健康状态: {health_summary['overall_health']}")
        print(f"监控组件数: {health_summary['total_components']}")
        print("组件状态:")
        for component, status in health_summary['component_status'].items():
            print(f"  {component}: {status['status']} (分数: {status['health_score']:.1f})")
        
        alert_summary = health_summary['alert_summary']
        print(f"告警统计: 严重 {alert_summary['critical']}, 警告 {alert_summary['warning']}, 信息 {alert_summary['info']}")

## 11. 总结与展望

### 11.1 Hadoop生态系统总结

Hadoop生态系统是一个庞大而复杂的大数据处理平台,包含了从数据存储、处理、查询到监控、安全等各个方面的组件。通过本教程的学习,我们深入了解了:

1. **核心存储组件**: HDFS提供分布式文件存储,HBase提供NoSQL数据库功能
2. **数据处理组件**: MapReduce、Spark、Storm等提供批处理和流处理能力
3. **数据查询组件**: Hive、Impala、Drill等提供SQL查询接口
4. **数据流组件**: Kafka、Pulsar提供消息队列和流处理平台
5. **监控管理组件**: Ambari、Cloudera Manager提供集群管理和监控
6. **工作流调度组件**: Airflow、Oozie提供任务调度和工作流管理
7. **数据传输组件**: Sqoop、Flume、NiFi提供数据导入导出和流式传输
8. **安全组件**: Ranger、Knox提供权限管理和安全网关

### 11.2 架构设计原则

在设计Hadoop生态系统架构时,应遵循以下原则:

1. **可扩展性**: 系统应能够水平扩展以处理不断增长的数据量
2. **容错性**: 系统应能够处理硬件故障和网络问题
3. **性能优化**: 通过合理的配置和优化提高系统性能
4. **安全性**: 实施适当的安全措施保护数据和系统
5. **可维护性**: 系统应易于监控、管理和维护
6. **成本效益**: 在满足需求的前提下控制成本

### 11.3 技术发展趋势

1. **云原生化**: 越来越多的Hadoop组件支持容器化部署和云原生架构
2. **实时处理**: 流处理技术的发展使得实时数据处理成为主流
3. **机器学习集成**: 大数据平台与机器学习框架的深度集成
4. **自动化运维**: 智能化的监控、调优和故障处理
5. **多云支持**: 支持跨云平台的数据处理和管理

### 11.4 学习建议

1. **实践为主**: 通过搭建实际环境来学习和理解各组件
2. **循序渐进**: 从核心组件开始,逐步学习扩展组件
3. **关注社区**: 跟踪开源社区的发展和最新技术
4. **项目实战**: 通过实际项目来应用所学知识
5. **持续学习**: 大数据技术发展迅速,需要持续学习新技术

### 11.5 未来展望

Hadoop生态系统将继续演进,主要发展方向包括:

1. **更好的用户体验**: 简化部署、配置和使用流程
2. **更强的性能**: 通过新的算法和优化技术提高处理性能
3. **更广的应用场景**: 支持更多类型的数据和应用场景
4. **更深的集成**: 与其他技术栈的更深度集成
5. **更智能的管理**: 基于AI的自动化管理和优化

通过掌握Hadoop生态系统,您将能够构建强大的大数据处理平台,为企业的数字化转型提供技术支撑。

---

**本教程到此结束。希望通过学习本教程,您能够深入理解Hadoop生态系统的各个组件,并能够在实际项目中应用这些知识。**

## 9. 安全组件

### 9.1 Apache Ranger详解

Apache Ranger是一个用于在Hadoop平台上启用、监控和管理全面数据安全的框架。

```python
from typing import Dict, List, Any, Optional, Set
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime, timedelta
import json
import uuid

class PolicyType(Enum):
    """策略类型"""
    ACCESS = "access"
    MASKING = "masking"
    ROW_FILTER = "row_filter"
    TAG_BASED = "tag_based"

class PermissionType(Enum):
    """权限类型"""
    READ = "read"
    WRITE = "write"
    CREATE = "create"
    DELETE = "delete"
    ADMIN = "admin"
    SELECT = "select"
    UPDATE = "update"
    DROP = "drop"
    ALTER = "alter"
    INDEX = "index"
    LOCK = "lock"
    ALL = "all"

class ServiceType(Enum):
    """服务类型"""
    HDFS = "hdfs"
    HIVE = "hive"
    HBASE = "hbase"
    YARN = "yarn"
    KAFKA = "kafka"
    STORM = "storm"
    KNOX = "knox"
    SOLR = "solr"
    ATLAS = "atlas"

class AuditAction(Enum):
    """审计动作"""
    ACCESS = "access"
    CREATE = "create"
    UPDATE = "update"
    DELETE = "delete"
    LOGIN = "login"
    LOGOUT = "logout"
    POLICY_CHANGE = "policy_change"
    ADMIN_ACTION = "admin_action"

class AccessResult(Enum):
    """访问结果"""
    ALLOWED = "allowed"
    DENIED = "denied"
    NOT_DETERMINED = "not_determined"

@dataclass
class RangerResource:
    """Ranger资源"""
    service_type: ServiceType
    resource_type: str  # database, table, column, path等
    resource_name: str
    resource_path: Optional[str] = None
    is_recursive: bool = False
    is_excludes: bool = False

@dataclass
class PolicyItem:
    """策略项"""
    users: List[str] = field(default_factory=list)
    groups: List[str] = field(default_factory=list)
    roles: List[str] = field(default_factory=list)
    permissions: List[PermissionType] = field(default_factory=list)
    delegate_admin: bool = False
    conditions: Dict[str, Any] = field(default_factory=dict)

@dataclass
class RangerPolicy:
    """Ranger策略"""
    id: str
    name: str
    service_name: str
    service_type: ServiceType
    policy_type: PolicyType
    description: str
    is_enabled: bool
    is_audit_enabled: bool
    resources: Dict[str, RangerResource]
    policy_items: List[PolicyItem]
    deny_policy_items: List[PolicyItem] = field(default_factory=list)
    allow_exceptions: List[PolicyItem] = field(default_factory=list)
    deny_exceptions: List[PolicyItem] = field(default_factory=list)
    created_by: str = "admin"
    updated_by: str = "admin"
    create_time: datetime = field(default_factory=datetime.now)
    update_time: datetime = field(default_factory=datetime.now)
    version: int = 1

@dataclass
class AuditEvent:
    """审计事件"""
    id: str
    event_time: datetime
    user: str
    service_name: str
    service_type: ServiceType
    resource_path: str
    resource_type: str
    action: AuditAction
    access_type: str
    result: AccessResult
    policy_id: Optional[str] = None
    client_ip: Optional[str] = None
    session_id: Optional[str] = None
    request_data: Optional[str] = None
    additional_info: Dict[str, Any] = field(default_factory=dict)

@dataclass
class RangerService:
    """Ranger服务"""
    id: str
    name: str
    type: ServiceType
    description: str
    is_enabled: bool
    configs: Dict[str, Any]
    created_by: str = "admin"
    updated_by: str = "admin"
    create_time: datetime = field(default_factory=datetime.now)
    update_time: datetime = field(default_factory=datetime.now)
    version: int = 1

@dataclass
class RangerUser:
    """Ranger用户"""
    id: str
    name: str
    first_name: str
    last_name: str
    email_address: str
    password: str
    user_source: int = 0  # 0: Internal, 1: External
    is_visible: int = 1
    user_role_list: List[str] = field(default_factory=list)
    group_id_list: List[str] = field(default_factory=list)
    status: int = 1  # 1: Active, 0: Inactive
    created_by: str = "admin"
    updated_by: str = "admin"
    create_time: datetime = field(default_factory=datetime.now)
    update_time: datetime = field(default_factory=datetime.now)

@dataclass
class RangerGroup:
    """Ranger组"""
    id: str
    name: str
    description: str
    group_type: int = 1  # 1: Internal, 0: External
    group_source: int = 0
    is_visible: int = 1
    created_by: str = "admin"
    updated_by: str = "admin"
    create_time: datetime = field(default_factory=datetime.now)
    update_time: datetime = field(default_factory=datetime.now)

class RangerAdmin:
    """
    Apache Ranger管理服务器
    提供策略管理、用户管理、审计等功能
    """
    
    def __init__(self, admin_url: str = "http://localhost:6080"):
        self.admin_url = admin_url
        self.services: Dict[str, RangerService] = {}
        self.policies: Dict[str, RangerPolicy] = {}
        self.users: Dict[str, RangerUser] = {}
        self.groups: Dict[str, RangerGroup] = {}
        self.audit_events: List[AuditEvent] = []
        self.is_running = True
        self.version = "2.4.0"
        
        # 初始化默认数据
        self._initialize_default_data()
    
    def _initialize_default_data(self):
        """初始化默认数据"""
        # 创建默认用户
        admin_user = RangerUser(
            id="user_1",
            name="admin",
            first_name="Admin",
            last_name="User",
            email_address="admin@example.com",
            password="admin123",
            user_role_list=["ROLE_SYS_ADMIN"]
        )
        self.users[admin_user.id] = admin_user
        
        # 创建默认组
        admin_group = RangerGroup(
            id="group_1",
            name="admin",
            description="Administrator group"
        )
        self.groups[admin_group.id] = admin_group
        
        # 创建默认服务
        hdfs_service = RangerService(
            id="service_1",
            name="hadoop-hdfs",
            type=ServiceType.HDFS,
            description="HDFS Service",
            is_enabled=True,
            configs={
                "username": "hdfs",
                "password": "hdfs",
                "fs.default.name": "hdfs://localhost:9000",
                "hadoop.security.authorization": "true"
            }
        )
        self.services[hdfs_service.id] = hdfs_service
    
    def create_service(self, name: str, service_type: ServiceType, 
                      description: str, configs: Dict[str, Any]) -> Dict[str, Any]:
        """
        创建服务
        
        Args:
            name: 服务名称
            service_type: 服务类型
            description: 描述
            configs: 配置
            
        Returns:
            Dict[str, Any]: 创建结果
        """
        try:
            service_id = f"service_{len(self.services) + 1}"
            
            service = RangerService(
                id=service_id,
                name=name,
                type=service_type,
                description=description,
                is_enabled=True,
                configs=configs
            )
            
            self.services[service_id] = service
            
            # 记录审计事件
            self._log_audit_event(
                user="admin",
                service_name=name,
                service_type=service_type,
                resource_path=f"/service/{name}",
                resource_type="service",
                action=AuditAction.CREATE,
                access_type="create",
                result=AccessResult.ALLOWED
            )
            
            return {
                'status': 'success',
                'service_id': service_id,
                'message': f'Service {name} created successfully'
            }
            
        except Exception as e:
            return {
                'status': 'error',
                'message': f'Failed to create service: {str(e)}'
            }
    
    def create_policy(self, policy_name: str, service_name: str, 
                     policy_type: PolicyType, resources: Dict[str, Any],
                     policy_items: List[Dict[str, Any]], 
                     description: str = "") -> Dict[str, Any]:
        """
        创建策略
        
        Args:
            policy_name: 策略名称
            service_name: 服务名称
            policy_type: 策略类型
            resources: 资源定义
            policy_items: 策略项
            description: 描述
            
        Returns:
            Dict[str, Any]: 创建结果
        """
        try:
            # 查找服务
            service = None
            for svc in self.services.values():
                if svc.name == service_name:
                    service = svc
                    break
            
            if not service:
                return {
                    'status': 'error',
                    'message': f'Service {service_name} not found'
                }
            
            policy_id = f"policy_{len(self.policies) + 1}"
            
            # 转换资源
            ranger_resources = {}
            for res_type, res_values in resources.items():
                if isinstance(res_values, str):
                    res_values = [res_values]
                ranger_resources[res_type] = RangerResource(
                    service_type=service.type,
                    resource_type=res_type,
                    resource_name=",".join(res_values) if isinstance(res_values, list) else res_values
                )
            
            # 转换策略项
            ranger_policy_items = []
            for item in policy_items:
                policy_item = PolicyItem(
                    users=item.get('users', []),
                    groups=item.get('groups', []),
                    roles=item.get('roles', []),
                    permissions=[PermissionType(p) for p in item.get('permissions', [])],
                    delegate_admin=item.get('delegate_admin', False)
                )
                ranger_policy_items.append(policy_item)
            
            policy = RangerPolicy(
                id=policy_id,
                name=policy_name,
                service_name=service_name,
                service_type=service.type,
                policy_type=policy_type,
                description=description,
                is_enabled=True,
                is_audit_enabled=True,
                resources=ranger_resources,
                policy_items=ranger_policy_items
            )
            
            self.policies[policy_id] = policy
            
            # 记录审计事件
            self._log_audit_event(
                user="admin",
                service_name=service_name,
                service_type=service.type,
                resource_path=f"/policy/{policy_name}",
                resource_type="policy",
                action=AuditAction.CREATE,
                access_type="create",
                result=AccessResult.ALLOWED,
                policy_id=policy_id
            )
            
            return {
                'status': 'success',
                'policy_id': policy_id,
                'message': f'Policy {policy_name} created successfully'
            }
            
        except Exception as e:
            return {
                'status': 'error',
                'message': f'Failed to create policy: {str(e)}'
            }
    
    def check_access(self, user: str, resource_path: str, 
                    access_type: str, service_name: str) -> Dict[str, Any]:
        """
        检查访问权限
        
        Args:
            user: 用户名
            resource_path: 资源路径
            access_type: 访问类型
            service_name: 服务名称
            
        Returns:
            Dict[str, Any]: 访问检查结果
        """
        try:
            # 查找服务
            service = None
            for svc in self.services.values():
                if svc.name == service_name:
                    service = svc
                    break
            
            if not service:
                return {
                    'status': 'error',
                    'message': f'Service {service_name} not found'
                }
            
            # 检查策略
            access_result = AccessResult.DENIED
            matched_policy = None
            
            for policy in self.policies.values():
                if (policy.service_name == service_name and 
                    policy.is_enabled and 
                    self._matches_resource(policy, resource_path)):
                    
                    # 检查策略项
                    for item in policy.policy_items:
                        if (user in item.users and 
                            any(perm.value == access_type or perm == PermissionType.ALL 
                                for perm in item.permissions)):
                            access_result = AccessResult.ALLOWED
                            matched_policy = policy
                            break
                    
                    if access_result == AccessResult.ALLOWED:
                        break
            
            # 记录审计事件
            self._log_audit_event(
                user=user,
                service_name=service_name,
                service_type=service.type,
                resource_path=resource_path,
                resource_type="file",
                action=AuditAction.ACCESS,
                access_type=access_type,
                result=access_result,
                policy_id=matched_policy.id if matched_policy else None
            )
            
            return {
                'status': 'success',
                'access_result': access_result.value,
                'policy_id': matched_policy.id if matched_policy else None,
                'policy_name': matched_policy.name if matched_policy else None
            }
            
        except Exception as e:
            return {
                'status': 'error',
                'message': f'Failed to check access: {str(e)}'
            }
    
    def _matches_resource(self, policy: RangerPolicy, resource_path: str) -> bool:
        """
        检查资源是否匹配策略
        
        Args:
            policy: 策略
            resource_path: 资源路径
            
        Returns:
            bool: 是否匹配
        """
        # 简化的资源匹配逻辑
        for resource in policy.resources.values():
            if resource.resource_name in resource_path or resource_path.startswith(resource.resource_name):
                return True
        return False
    
    def _log_audit_event(self, user: str, service_name: str, service_type: ServiceType,
                        resource_path: str, resource_type: str, action: AuditAction,
                        access_type: str, result: AccessResult, policy_id: Optional[str] = None):
        """
        记录审计事件
        
        Args:
            user: 用户
            service_name: 服务名称
            service_type: 服务类型
            resource_path: 资源路径
            resource_type: 资源类型
            action: 动作
            access_type: 访问类型
            result: 结果
            policy_id: 策略ID
        """
        event = AuditEvent(
            id=f"audit_{len(self.audit_events) + 1}",
            event_time=datetime.now(),
            user=user,
            service_name=service_name,
            service_type=service_type,
            resource_path=resource_path,
            resource_type=resource_type,
            action=action,
            access_type=access_type,
            result=result,
            policy_id=policy_id,
            client_ip="192.168.1.100",
            session_id=f"session_{uuid.uuid4().hex[:8]}"
        )
        
        self.audit_events.append(event)
        
        # 保持最近1000条审计记录
        if len(self.audit_events) > 1000:
            self.audit_events = self.audit_events[-1000:]
    
    def get_policies(self, service_name: Optional[str] = None) -> Dict[str, Any]:
        """
        获取策略列表
        
        Args:
            service_name: 服务名称(可选)
            
        Returns:
            Dict[str, Any]: 策略列表
        """
        policies_info = []
        
        for policy in self.policies.values():
            if service_name is None or policy.service_name == service_name:
                policies_info.append({
                    'id': policy.id,
                    'name': policy.name,
                    'service_name': policy.service_name,
                    'service_type': policy.service_type.value,
                    'policy_type': policy.policy_type.value,
                    'description': policy.description,
                    'is_enabled': policy.is_enabled,
                    'is_audit_enabled': policy.is_audit_enabled,
                    'created_by': policy.created_by,
                    'create_time': policy.create_time.isoformat(),
                    'version': policy.version
                })
        
        return {
            'status': 'success',
            'policies': policies_info,
            'total': len(policies_info)
        }
    
    def get_audit_events(self, user: Optional[str] = None, 
                        service_name: Optional[str] = None,
                        start_time: Optional[datetime] = None,
                        end_time: Optional[datetime] = None,
                        limit: int = 100) -> Dict[str, Any]:
        """
        获取审计事件
        
        Args:
            user: 用户名(可选)
            service_name: 服务名称(可选)
            start_time: 开始时间(可选)
            end_time: 结束时间(可选)
            limit: 限制数量
            
        Returns:
            Dict[str, Any]: 审计事件列表
        """
        filtered_events = []
        
        for event in self.audit_events:
            # 应用过滤条件
            if user and event.user != user:
                continue
            if service_name and event.service_name != service_name:
                continue
            if start_time and event.event_time < start_time:
                continue
            if end_time and event.event_time > end_time:
                continue
            
            filtered_events.append({
                'id': event.id,
                'event_time': event.event_time.isoformat(),
                'user': event.user,
                'service_name': event.service_name,
                'service_type': event.service_type.value,
                'resource_path': event.resource_path,
                'resource_type': event.resource_type,
                'action': event.action.value,
                'access_type': event.access_type,
                'result': event.result.value,
                'policy_id': event.policy_id,
                'client_ip': event.client_ip,
                'session_id': event.session_id
            })
        
        # 按时间倒序排列并限制数量
        filtered_events.sort(key=lambda x: x['event_time'], reverse=True)
        filtered_events = filtered_events[:limit]
        
        return {
            'status': 'success',
            'events': filtered_events,
            'total': len(filtered_events)
        }
    
    def get_services(self) -> Dict[str, Any]:
        """
        获取服务列表
        
        Returns:
            Dict[str, Any]: 服务列表
        """
        services_info = []
        
        for service in self.services.values():
            services_info.append({
                'id': service.id,
                'name': service.name,
                'type': service.type.value,
                'description': service.description,
                'is_enabled': service.is_enabled,
                'created_by': service.created_by,
                'create_time': service.create_time.isoformat(),
                'version': service.version
            })
        
        return {
            'status': 'success',
            'services': services_info,
            'total': len(services_info)
        }
    
    def get_admin_status(self) -> Dict[str, Any]:
        """
        获取管理服务器状态
        
        Returns:
            Dict[str, Any]: 服务器状态
        """
        return {
            'admin_url': self.admin_url,
            'is_running': self.is_running,
            'version': self.version,
            'stats': {
                'total_services': len(self.services),
                'total_policies': len(self.policies),
                'total_users': len(self.users),
                'total_groups': len(self.groups),
                'total_audit_events': len(self.audit_events),
                'enabled_services': len([s for s in self.services.values() if s.is_enabled]),
                'enabled_policies': len([p for p in self.policies.values() if p.is_enabled])
            },
            'timestamp': datetime.now().isoformat()
        }

# 使用示例
if __name__ == "__main__":
    # 创建Ranger管理服务器
    ranger = RangerAdmin()
    
    print("=== Apache Ranger安全管理示例 ===")
    
    # 创建Hive服务
    print("\n=== 创建Hive服务 ===")
    hive_service_result = ranger.create_service(
        name="hadoop-hive",
        service_type=ServiceType.HIVE,
        description="Hive Service for data warehouse",
        configs={
            "username": "hive",
            "password": "hive123",
            "jdbc.driverClassName": "org.apache.hive.jdbc.HiveDriver",
            "jdbc.url": "jdbc:hive2://localhost:10000"
        }
    )
    print(f"Hive服务创建结果: {hive_service_result}")
    
    # 创建HDFS访问策略
    print("\n=== 创建HDFS访问策略 ===")
    hdfs_policy_result = ranger.create_policy(
        policy_name="hdfs-data-access",
        service_name="hadoop-hdfs",
        policy_type=PolicyType.ACCESS,
        resources={
            "path": ["/user/data/*"]
        },
        policy_items=[
            {
                "users": ["datauser", "analyst"],
                "groups": ["data-team"],
                "permissions": ["read", "write"]
            }
        ],
        description="Allow data team to access /user/data directory"
    )
    print(f"HDFS策略创建结果: {hdfs_policy_result}")
    
    # 创建Hive数据库策略
    print("\n=== 创建Hive数据库策略 ===")
    hive_policy_result = ranger.create_policy(
        policy_name="hive-sales-db-access",
        service_name="hadoop-hive",
        policy_type=PolicyType.ACCESS,
        resources={
            "database": ["sales"],
            "table": ["*"],
            "column": ["*"]
        },
        policy_items=[
            {
                "users": ["analyst"],
                "groups": ["analytics-team"],
                "permissions": ["select"]
            },
            {
                "users": ["dataadmin"],
                "permissions": ["all"]
            }
        ],
        description="Access control for sales database"
    )
    print(f"Hive策略创建结果: {hive_policy_result}")
    
    # 检查访问权限
    print("\n=== 检查访问权限 ===")
    
    # 检查HDFS访问
    hdfs_access_check = ranger.check_access(
        user="datauser",
        resource_path="/user/data/sales.csv",
        access_type="read",
        service_name="hadoop-hdfs"
    )
    print(f"datauser访问/user/data/sales.csv (read): {hdfs_access_check}")
    
    # 检查Hive访问
    hive_access_check = ranger.check_access(
        user="analyst",
        resource_path="sales.customers",
        access_type="select",
        service_name="hadoop-hive"
    )
    print(f"analyst访问sales.customers (select): {hive_access_check}")
    
    # 检查未授权访问
    unauthorized_check = ranger.check_access(
        user="guest",
        resource_path="/user/data/confidential.csv",
        access_type="read",
        service_name="hadoop-hdfs"
    )
    print(f"guest访问/user/data/confidential.csv (read): {unauthorized_check}")
    
    # 获取策略列表
    print("\n=== 策略列表 ===")
    policies = ranger.get_policies()
    if policies['status'] == 'success':
        print(f"总策略数: {policies['total']}")
        for policy in policies['policies']:
            print(f"  - {policy['name']} ({policy['service_name']}) - {policy['policy_type']}")
    
    # 获取审计事件
    print("\n=== 审计事件 ===")
    audit_events = ranger.get_audit_events(limit=10)
    if audit_events['status'] == 'success':
        print(f"审计事件数: {audit_events['total']}")
        for event in audit_events['events'][:5]:  # 显示前5个事件
            print(f"  - {event['event_time']}: {event['user']} {event['action']} {event['resource_path']} -> {event['result']}")
    
    # 获取服务列表
    print("\n=== 服务列表 ===")
    services = ranger.get_services()
    if services['status'] == 'success':
        print(f"总服务数: {services['total']}")
        for service in services['services']:
            print(f"  - {service['name']} ({service['type']}) - {'启用' if service['is_enabled'] else '禁用'}")
    
    # 获取管理服务器状态
    print("\n=== 管理服务器状态 ===")
    status = ranger.get_admin_status()
    print(f"服务器URL: {status['admin_url']}")
    print(f"运行状态: {status['is_running']}")
    print(f"版本: {status['version']}")
    print("统计信息:")
    for key, value in status['stats'].items():
        print(f"  {key}: {value}")

9.2 Apache Knox详解

Apache Knox是一个为Hadoop集群提供安全访问的网关服务。

from typing import Dict, List, Any, Optional
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime, timedelta
import base64
import hashlib
import jwt
import uuid

class AuthenticationType(Enum):
    """认证类型"""
    BASIC = "basic"
    KERBEROS = "kerberos"
    JWT = "jwt"
    LDAP = "ldap"
    PAM = "pam"

class ServiceRole(Enum):
    """服务角色"""
    NAMENODE = "NAMENODE"
    DATANODE = "DATANODE"
    RESOURCEMANAGER = "RESOURCEMANAGER"
    NODEMANAGER = "NODEMANAGER"
    HIVESERVER2 = "HIVESERVER2"
    WEBHCAT = "WEBHCAT"
    OOZIE = "OOZIE"
    WEBHBASE = "WEBHBASE"
    STORM = "STORM"
    KAFKA = "KAFKA"

class TopologyStatus(Enum):
    """拓扑状态"""
    ACTIVE = "active"
    INACTIVE = "inactive"
    DEPLOYING = "deploying"
    ERROR = "error"

@dataclass
class KnoxService:
    """Knox服务定义"""
    name: str
    role: ServiceRole
    url: str
    version: str = "1.0.0"
    params: Dict[str, Any] = field(default_factory=dict)

@dataclass
class KnoxProvider:
    """Knox提供者"""
    name: str
    enabled: bool
    role: str  # authentication, authorization, identity-assertion等
    params: Dict[str, Any] = field(default_factory=dict)

@dataclass
class KnoxTopology:
    """Knox拓扑"""
    name: str
    description: str
    services: List[KnoxService]
    providers: List[KnoxProvider]
    status: TopologyStatus = TopologyStatus.INACTIVE
    created_time: datetime = field(default_factory=datetime.now)
    updated_time: datetime = field(default_factory=datetime.now)
    version: str = "1.0.0"

@dataclass
class KnoxUser:
    """Knox用户"""
    username: str
    password_hash: str
    roles: List[str] = field(default_factory=list)
    groups: List[str] = field(default_factory=list)
    is_active: bool = True
    created_time: datetime = field(default_factory=datetime.now)
    last_login: Optional[datetime] = None

@dataclass
class KnoxSession:
    """Knox会话"""
    session_id: str
    username: str
    client_ip: str
    user_agent: str
    created_time: datetime
    last_access_time: datetime
    expires_at: datetime
    is_active: bool = True
    jwt_token: Optional[str] = None

@dataclass
class AccessLog:
    """访问日志"""
    timestamp: datetime
    client_ip: str
    username: str
    method: str
    url: str
    status_code: int
    response_size: int
    user_agent: str
    session_id: str
    topology_name: str
    service_name: str
    response_time_ms: int

class KnoxGateway:
    """
    Apache Knox网关
    提供Hadoop集群的安全访问入口
    """
    
    def __init__(self, gateway_host: str = "localhost", gateway_port: int = 8443):
        self.gateway_host = gateway_host
        self.gateway_port = gateway_port
        self.gateway_url = f"https://{gateway_host}:{gateway_port}"
        self.topologies: Dict[str, KnoxTopology] = {}
        self.users: Dict[str, KnoxUser] = {}
        self.sessions: Dict[str, KnoxSession] = {}
        self.access_logs: List[AccessLog] = []
        self.is_running = True
        self.version = "1.6.1"
        self.jwt_secret = "knox-secret-key"
        
        # 初始化默认数据
        self._initialize_default_data()
    
    def _initialize_default_data(self):
        """初始化默认数据"""
        # 创建默认用户
        admin_user = KnoxUser(
            username="admin",
            password_hash=self._hash_password("admin123"),
            roles=["admin"],
            groups=["administrators"]
        )
        self.users["admin"] = admin_user
        
        guest_user = KnoxUser(
            username="guest",
            password_hash=self._hash_password("guest123"),
            roles=["user"],
            groups=["users"]
        )
        self.users["guest"] = guest_user
        
        # 创建默认拓扑
        self._create_default_topology()
    
    def _create_default_topology(self):
        """创建默认拓扑"""
        # 定义服务
        services = [
            KnoxService(
                name="namenode",
                role=ServiceRole.NAMENODE,
                url="hdfs://localhost:9000"
            ),
            KnoxService(
                name="resourcemanager",
                role=ServiceRole.RESOURCEMANAGER,
                url="http://localhost:8088"
            ),
            KnoxService(
                name="hive",
                role=ServiceRole.HIVESERVER2,
                url="http://localhost:10001"
            )
        ]
        
        # 定义提供者
        providers = [
            KnoxProvider(
                name="ShiroProvider",
                enabled=True,
                role="authentication",
                params={
                    "sessionTimeout": "30",
                    "main.ldapRealm": "org.apache.knox.gateway.shirorealm.KnoxLdapRealm",
                    "main.ldapContextFactory": "org.apache.knox.gateway.shirorealm.KnoxLdapContextFactory",
                    "main.ldapRealm.contextFactory": "$ldapContextFactory"
                }
            ),
            KnoxProvider(
                name="AclsAuthz",
                enabled=True,
                role="authorization",
                params={
                    "webhdfs.acl": "admin;*;*",
                    "webhcat.acl": "admin;*;*"
                }
            ),
            KnoxProvider(
                name="Default",
                enabled=True,
                role="identity-assertion",
                params={
                    "name": "Default"
                }
            )
        ]
        
        default_topology = KnoxTopology(
            name="sandbox",
            description="Default sandbox topology",
            services=services,
            providers=providers,
            status=TopologyStatus.ACTIVE
        )
        
        self.topologies["sandbox"] = default_topology
    
    def _hash_password(self, password: str) -> str:
        """密码哈希"""
        return hashlib.sha256(password.encode()).hexdigest()
    
    def authenticate(self, username: str, password: str, 
                    auth_type: AuthenticationType = AuthenticationType.BASIC) -> Dict[str, Any]:
        """
        用户认证
        
        Args:
            username: 用户名
            password: 密码
            auth_type: 认证类型
            
        Returns:
            Dict[str, Any]: 认证结果
        """
        try:
            if username not in self.users:
                return {
                    'status': 'error',
                    'message': 'User not found'
                }
            
            user = self.users[username]
            
            if not user.is_active:
                return {
                    'status': 'error',
                    'message': 'User account is disabled'
                }
            
            # 验证密码
            if user.password_hash != self._hash_password(password):
                return {
                    'status': 'error',
                    'message': 'Invalid credentials'
                }
            
            # 创建会话
            session_id = str(uuid.uuid4())
            expires_at = datetime.now() + timedelta(hours=8)
            
            # 生成JWT令牌
            jwt_payload = {
                'username': username,
                'roles': user.roles,
                'groups': user.groups,
                'session_id': session_id,
                'exp': expires_at.timestamp(),
                'iat': datetime.now().timestamp()
            }
            
            jwt_token = jwt.encode(jwt_payload, self.jwt_secret, algorithm='HS256')
            
            session = KnoxSession(
                session_id=session_id,
                username=username,
                client_ip="192.168.1.100",
                user_agent="Knox-Client/1.0",
                created_time=datetime.now(),
                last_access_time=datetime.now(),
                expires_at=expires_at,
                jwt_token=jwt_token
            )
            
            self.sessions[session_id] = session
            
            # 更新用户最后登录时间
            user.last_login = datetime.now()
            
            return {
                'status': 'success',
                'session_id': session_id,
                'jwt_token': jwt_token,
                'expires_at': expires_at.isoformat(),
                'user_info': {
                    'username': username,
                    'roles': user.roles,
                    'groups': user.groups
                }
            }
            
        except Exception as e:
            return {
                'status': 'error',
                'message': f'Authentication failed: {str(e)}'
            }
    
    def create_topology(self, name: str, description: str, 
                       services: List[Dict[str, Any]], 
                       providers: List[Dict[str, Any]]) -> Dict[str, Any]:
        """
        创建拓扑
        
        Args:
            name: 拓扑名称
            description: 描述
            services: 服务列表
            providers: 提供者列表
            
        Returns:
            Dict[str, Any]: 创建结果
        """
        try:
            if name in self.topologies:
                return {
                    'status': 'error',
                    'message': f'Topology {name} already exists'
                }
            
            # 转换服务
            knox_services = []
            for svc in services:
                knox_service = KnoxService(
                    name=svc['name'],
                    role=ServiceRole(svc['role']),
                    url=svc['url'],
                    version=svc.get('version', '1.0.0'),
                    params=svc.get('params', {})
                )
                knox_services.append(knox_service)
            
            # 转换提供者
            knox_providers = []
            for prov in providers:
                knox_provider = KnoxProvider(
                    name=prov['name'],
                    enabled=prov.get('enabled', True),
                    role=prov['role'],
                    params=prov.get('params', {})
                )
                knox_providers.append(knox_provider)
            
            topology = KnoxTopology(
                name=name,
                description=description,
                services=knox_services,
                providers=knox_providers,
                status=TopologyStatus.DEPLOYING
            )
            
            self.topologies[name] = topology
            
            # 模拟部署过程
            import time
            time.sleep(1)
            topology.status = TopologyStatus.ACTIVE
            
            return {
                'status': 'success',
                'topology_name': name,
                'message': f'Topology {name} created and deployed successfully'
            }
            
        except Exception as e:
            return {
                'status': 'error',
                'message': f'Failed to create topology: {str(e)}'
            }
    
    def proxy_request(self, topology_name: str, service_name: str, 
                     path: str, method: str = "GET", 
                     session_id: Optional[str] = None) -> Dict[str, Any]:
        """
        代理请求到后端服务
        
        Args:
            topology_name: 拓扑名称
            service_name: 服务名称
            path: 请求路径
            method: HTTP方法
            session_id: 会话ID
            
        Returns:
            Dict[str, Any]: 代理结果
        """
        try:
            # 检查拓扑
            if topology_name not in self.topologies:
                return {
                    'status': 'error',
                    'status_code': 404,
                    'message': f'Topology {topology_name} not found'
                }
            
            topology = self.topologies[topology_name]
            
            if topology.status != TopologyStatus.ACTIVE:
                return {
                    'status': 'error',
                    'status_code': 503,
                    'message': f'Topology {topology_name} is not active'
                }
            
            # 检查会话(如果提供)
            username = "anonymous"
            if session_id:
                if session_id not in self.sessions:
                    return {
                        'status': 'error',
                        'status_code': 401,
                        'message': 'Invalid session'
                    }
                
                session = self.sessions[session_id]
                if not session.is_active or datetime.now() > session.expires_at:
                    return {
                        'status': 'error',
                        'status_code': 401,
                        'message': 'Session expired'
                    }
                
                username = session.username
                session.last_access_time = datetime.now()
            
            # 查找服务
            target_service = None
            for service in topology.services:
                if service.name == service_name:
                    target_service = service
                    break
            
            if not target_service:
                return {
                    'status': 'error',
                    'status_code': 404,
                    'message': f'Service {service_name} not found in topology {topology_name}'
                }
            
            # 模拟代理请求
            start_time = datetime.now()
            
            # 构建目标URL
            target_url = f"{target_service.url}{path}"
            
            # 模拟响应
            if service_name == "namenode" and "webhdfs" in path:
                response_data = {
                    "FileStatuses": {
                        "FileStatus": [
                            {
                                "accessTime": 1640995200000,
                                "blockSize": 134217728,
                                "group": "supergroup",
                                "length": 1024,
                                "modificationTime": 1640995200000,
                                "owner": "hdfs",
                                "pathSuffix": "data.txt",
                                "permission": "644",
                                "replication": 3,
                                "type": "FILE"
                            }
                        ]
                    }
                }
                status_code = 200
            elif service_name == "resourcemanager":
                response_data = {
                    "clusterInfo": {
                        "id": 1640995200000,
                        "startedOn": 1640995200000,
                        "state": "STARTED",
                        "haState": "ACTIVE",
                        "resourceManagerVersion": "3.3.4",
                        "resourceManagerBuildVersion": "3.3.4",
                        "hadoopVersion": "3.3.4"
                    }
                }
                status_code = 200
            else:
                response_data = {"message": "Service response", "path": path}
                status_code = 200
            
            end_time = datetime.now()
            response_time_ms = int((end_time - start_time).total_seconds() * 1000)
            
            # 记录访问日志
            access_log = AccessLog(
                timestamp=start_time,
                client_ip="192.168.1.100",
                username=username,
                method=method,
                url=f"/gateway/{topology_name}/{service_name}{path}",
                status_code=status_code,
                response_size=len(str(response_data)),
                user_agent="Knox-Client/1.0",
                session_id=session_id or "anonymous",
                topology_name=topology_name,
                service_name=service_name,
                response_time_ms=response_time_ms
            )
            
            self.access_logs.append(access_log)
            
            # 保持最近1000条访问日志
            if len(self.access_logs) > 1000:
                self.access_logs = self.access_logs[-1000:]
            
            return {
                'status': 'success',
                'status_code': status_code,
                'target_url': target_url,
                'response_data': response_data,
                'response_time_ms': response_time_ms
            }
            
        except Exception as e:
            return {
                'status': 'error',
                'status_code': 500,
                'message': f'Proxy request failed: {str(e)}'
            }
    
    def get_topologies(self) -> Dict[str, Any]:
        """
        获取拓扑列表
        
        Returns:
            Dict[str, Any]: 拓扑列表
        """
        topologies_info = []
        
        for topology in self.topologies.values():
            topologies_info.append({
                'name': topology.name,
                'description': topology.description,
                'status': topology.status.value,
                'services_count': len(topology.services),
                'providers_count': len(topology.providers),
                'created_time': topology.created_time.isoformat(),
                'updated_time': topology.updated_time.isoformat(),
                'version': topology.version
            })
        
        return {
            'status': 'success',
            'topologies': topologies_info,
            'total': len(topologies_info)
        }
    
    def get_access_logs(self, topology_name: Optional[str] = None,
                       username: Optional[str] = None,
                       limit: int = 100) -> Dict[str, Any]:
        """
        获取访问日志
        
        Args:
            topology_name: 拓扑名称(可选)
            username: 用户名(可选)
            limit: 限制数量
            
        Returns:
            Dict[str, Any]: 访问日志列表
        """
        filtered_logs = []
        
        for log in self.access_logs:
            if topology_name and log.topology_name != topology_name:
                continue
            if username and log.username != username:
                continue
            
            filtered_logs.append({
                'timestamp': log.timestamp.isoformat(),
                'client_ip': log.client_ip,
                'username': log.username,
                'method': log.method,
                'url': log.url,
                'status_code': log.status_code,
                'response_size': log.response_size,
                'user_agent': log.user_agent,
                'session_id': log.session_id,
                'topology_name': log.topology_name,
                'service_name': log.service_name,
                'response_time_ms': log.response_time_ms
            })
        
        # 按时间倒序排列并限制数量
        filtered_logs.sort(key=lambda x: x['timestamp'], reverse=True)
        filtered_logs = filtered_logs[:limit]
        
        return {
            'status': 'success',
            'logs': filtered_logs,
            'total': len(filtered_logs)
        }
    
    def get_gateway_status(self) -> Dict[str, Any]:
        """
        获取网关状态
        
        Returns:
            Dict[str, Any]: 网关状态
        """
        active_sessions = len([s for s in self.sessions.values() 
                              if s.is_active and datetime.now() <= s.expires_at])
        
        return {
            'gateway_url': self.gateway_url,
            'is_running': self.is_running,
            'version': self.version,
            'stats': {
                'total_topologies': len(self.topologies),
                'active_topologies': len([t for t in self.topologies.values() 
                                        if t.status == TopologyStatus.ACTIVE]),
                'total_users': len(self.users),
                'active_sessions': active_sessions,
                'total_access_logs': len(self.access_logs)
            },
            'timestamp': datetime.now().isoformat()
        }

# 使用示例
if __name__ == "__main__":
    # 创建Knox网关
    knox = KnoxGateway()
    
    print("=== Apache Knox网关示例 ===")
    
    # 用户认证
    print("\n=== 用户认证 ===")
    auth_result = knox.authenticate("admin", "admin123")
    print(f"认证结果: {auth_result}")
    
    if auth_result['status'] == 'success':
        session_id = auth_result['session_id']
        jwt_token = auth_result['jwt_token']
        
        # 创建自定义拓扑
        print("\n=== 创建自定义拓扑 ===")
        topology_result = knox.create_topology(
            name="production",
            description="Production environment topology",
            services=[
                {
                    "name": "namenode",
                    "role": "NAMENODE",
                    "url": "hdfs://prod-namenode:9000"
                },
                {
                    "name": "resourcemanager",
                    "role": "RESOURCEMANAGER",
                    "url": "http://prod-rm:8088"
                },
                {
                    "name": "hive",
                    "role": "HIVESERVER2",
                    "url": "http://prod-hive:10001"
                }
            ],
            providers=[
                {
                    "name": "ShiroProvider",
                    "role": "authentication",
                    "enabled": True,
                    "params": {
                        "sessionTimeout": "60"
                    }
                },
                {
                    "name": "AclsAuthz",
                    "role": "authorization",
                    "enabled": True,
                    "params": {
                        "webhdfs.acl": "admin;*;*"
                    }
                }
            ]
        )
        print(f"拓扑创建结果: {topology_result}")
        
        # 代理请求
        print("\n=== 代理请求 ===")
        
        # HDFS WebHDFS请求
        hdfs_request = knox.proxy_request(
            topology_name="sandbox",
            service_name="namenode",
            path="/webhdfs/v1/user?op=LISTSTATUS",
            method="GET",
            session_id=session_id
        )
        print(f"HDFS请求结果: {hdfs_request}")
        
        # YARN ResourceManager请求
        yarn_request = knox.proxy_request(
            topology_name="sandbox",
            service_name="resourcemanager",
            path="/ws/v1/cluster/info",
            method="GET",
            session_id=session_id
        )
        print(f"YARN请求结果: {yarn_request}")
        
        # 未授权请求
        unauthorized_request = knox.proxy_request(
            topology_name="sandbox",
            service_name="namenode",
            path="/webhdfs/v1/admin?op=LISTSTATUS",
            method="GET"
        )
        print(f"未授权请求结果: {unauthorized_request}")
    
    # 获取拓扑列表
    print("\n=== 拓扑列表 ===")
    topologies = knox.get_topologies()
    if topologies['status'] == 'success':
        print(f"总拓扑数: {topologies['total']}")
        for topology in topologies['topologies']:
            print(f"  - {topology['name']}: {topology['description']} ({topology['status']})")
    
    # 获取访问日志
    print("\n=== 访问日志 ===")
    access_logs = knox.get_access_logs(limit=5)
    if access_logs['status'] == 'success':
        print(f"访问日志数: {access_logs['total']}")
        for log in access_logs['logs']:
            print(f"  - {log['timestamp']}: {log['username']} {log['method']} {log['url']} -> {log['status_code']} ({log['response_time_ms']}ms)")
    
    # 获取网关状态
    print("\n=== 网关状态 ===")
    status = knox.get_gateway_status()
    print(f"网关URL: {status['gateway_url']}")
    print(f"运行状态: {status['is_running']}")
    print(f"版本: {status['version']}")
    print("统计信息:")
    for key, value in status['stats'].items():
        print(f"  {key}: {value}")
print("资源池:")
for pool_name, pool_info in status['pools'].items():
     print(f"  {pool_name}: {pool_info['slots']} slots")

### 7.2 Apache Oozie详解

```python
from typing import Dict, List, Any, Optional, Tuple, Union
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime, timedelta
import threading
import time
import random
import json
import uuid
from collections import defaultdict, deque
import xml.etree.ElementTree as ET

class WorkflowStatus(Enum):
   """工作流状态"""
   PREP = "PREP"           # 准备中
   RUNNING = "RUNNING"     # 运行中
   SUCCEEDED = "SUCCEEDED" # 成功
   FAILED = "FAILED"       # 失败
   KILLED = "KILLED"       # 已终止
   SUSPENDED = "SUSPENDED" # 暂停

class ActionStatus(Enum):
   """动作状态"""
   PREP = "PREP"           # 准备中
   RUNNING = "RUNNING"     # 运行中
   OK = "OK"               # 成功
   ERROR = "ERROR"         # 错误
   KILLED = "KILLED"       # 已终止
   START_RETRY = "START_RETRY"  # 开始重试
   START_MANUAL = "START_MANUAL"  # 手动开始
   DONE = "DONE"           # 完成
   END_RETRY = "END_RETRY" # 结束重试
   END_MANUAL = "END_MANUAL"  # 手动结束
   USER_RETRY = "USER_RETRY"  # 用户重试

class CoordinatorStatus(Enum):
   """协调器状态"""
   PREP = "PREP"           # 准备中
   RUNNING = "RUNNING"     # 运行中
   SUCCEEDED = "SUCCEEDED" # 成功
   FAILED = "FAILED"       # 失败
   KILLED = "KILLED"       # 已终止
   SUSPENDED = "SUSPENDED" # 暂停
   DONEWITHERROR = "DONEWITHERROR"  # 完成但有错误
   RUNNINGWITHERROR = "RUNNINGWITHERROR"  # 运行中但有错误

class ActionType(Enum):
   """动作类型"""
   MAP_REDUCE = "map-reduce"
   PIG = "pig"
   HIVE = "hive"
   SQOOP = "sqoop"
   SHELL = "shell"
   JAVA = "java"
   FS = "fs"
   EMAIL = "email"
   SSH = "ssh"
   SUB_WORKFLOW = "sub-workflow"
   SPARK = "spark"
   DISTCP = "distcp"

class BundleStatus(Enum):
   """Bundle状态"""
   PREP = "PREP"           # 准备中
   RUNNING = "RUNNING"     # 运行中
   SUCCEEDED = "SUCCEEDED" # 成功
   FAILED = "FAILED"       # 失败
   KILLED = "KILLED"       # 已终止
   SUSPENDED = "SUSPENDED" # 暂停
   DONEWITHERROR = "DONEWITHERROR"  # 完成但有错误
   RUNNINGWITHERROR = "RUNNINGWITHERROR"  # 运行中但有错误

@dataclass
class WorkflowAction:
   """工作流动作"""
   name: str
   action_type: ActionType
   status: ActionStatus = ActionStatus.PREP
   start_time: Optional[datetime] = None
   end_time: Optional[datetime] = None
   transition: str = ""  # 成功时的下一个动作
   error_transition: str = ""  # 失败时的下一个动作
   retry_max: int = 3
   retry_interval: int = 10  # 分钟
   configuration: Dict[str, Any] = field(default_factory=dict)
   external_id: str = ""
   external_status: str = ""
   external_child_ids: str = ""
   error_code: str = ""
   error_message: str = ""
   data: str = ""
   stats: str = ""
   console_url: str = ""
   user_retry_count: int = 0
   user_retry_max: int = 3
   user_retry_interval: int = 10
   
@dataclass
class WorkflowJob:
   """工作流作业"""
   id: str
   app_name: str
   app_path: str
   status: WorkflowStatus = WorkflowStatus.PREP
   user: str = ""
   group: str = ""
   start_time: Optional[datetime] = None
   end_time: Optional[datetime] = None
   created_time: datetime = field(default_factory=datetime.now)
   last_modified_time: datetime = field(default_factory=datetime.now)
   run: int = 0
   console_url: str = ""
   actions: Dict[str, WorkflowAction] = field(default_factory=dict)
   conf: str = ""
   parent_id: str = ""
   external_id: str = ""
   log_token: str = ""
   
@dataclass
class CoordinatorAction:
   """协调器动作"""
   id: str
   job_id: str
   status: ActionStatus = ActionStatus.PREP
   external_id: str = ""
   external_status: str = ""
   nominal_time: Optional[datetime] = None
   created_time: datetime = field(default_factory=datetime.now)
   last_modified_time: datetime = field(default_factory=datetime.now)
   missing_dependencies: str = ""
   push_missing_dependencies: str = ""
   timeout: int = 120  # 分钟
   error_code: str = ""
   error_message: str = ""
   console_url: str = ""
   
@dataclass
class CoordinatorJob:
   """协调器作业"""
   id: str
   app_name: str
   app_path: str
   status: CoordinatorStatus = CoordinatorStatus.PREP
   user: str = ""
   group: str = ""
   start_time: Optional[datetime] = None
   end_time: Optional[datetime] = None
   pause_time: Optional[datetime] = None
   created_time: datetime = field(default_factory=datetime.now)
   last_modified_time: datetime = field(default_factory=datetime.now)
   next_materialized_time: Optional[datetime] = None
   frequency: str = ""  # 频率表达式
   time_zone: str = "UTC"
   concurrency: int = 1
   execution: str = "FIFO"  # FIFO, LIFO, LAST_ONLY
   timeout: int = 120  # 分钟
   actions: Dict[str, CoordinatorAction] = field(default_factory=dict)
   conf: str = ""
   mat_throttling: int = 12
   
@dataclass
class BundleJob:
   """Bundle作业"""
   id: str
   app_name: str
   app_path: str
   status: BundleStatus = BundleStatus.PREP
   user: str = ""
   group: str = ""
   start_time: Optional[datetime] = None
   end_time: Optional[datetime] = None
   pause_time: Optional[datetime] = None
   created_time: datetime = field(default_factory=datetime.now)
   last_modified_time: datetime = field(default_factory=datetime.now)
   kick_off_time: Optional[datetime] = None
   time_zone: str = "UTC"
   coordinators: List[str] = field(default_factory=list)
   conf: str = ""
   
class OozieServer:
   """
   Oozie工作流引擎服务器
   """
   
   def __init__(self, server_url: str = "http://localhost:11000/oozie"):
       self.server_url = server_url
       self.workflow_jobs = {}  # job_id -> WorkflowJob
       self.coordinator_jobs = {}  # job_id -> CoordinatorJob
       self.bundle_jobs = {}  # job_id -> BundleJob
       
       # 服务器状态
       self.is_running = True
       self.build_version = "5.2.1"
       self.system_mode = "NORMAL"  # NORMAL, NOWEBSERVICE, SAFEMODE
       self.java_version = "1.8.0_281"
       self.os_name = "Linux"
       self.os_version = "3.10.0"
       
       # 配置
       self.config = {
           'oozie.service.WorkflowStoreService.workflow.callable.limit': 100,
           'oozie.service.coord.normal.default.timeout': 120,
           'oozie.service.coord.push.check.requeue.interval': 30000,
           'oozie.service.CallableQueueService.queue.size': 10000,
           'oozie.service.CallableQueueService.threads': 10,
           'oozie.service.coord.materialization.window': 3600000,
           'oozie.service.coord.lookup.trigger.window': 10800000,
           'oozie.service.bundle.ForkedActionExecutor.buffer.size': 1000
       }
       
       # 线程锁
       self.workflow_lock = threading.Lock()
       self.coordinator_lock = threading.Lock()
       self.bundle_lock = threading.Lock()
       
       # 统计信息
       self.stats = {
           'total_workflows': 0,
           'running_workflows': 0,
           'succeeded_workflows': 0,
           'failed_workflows': 0,
           'total_coordinators': 0,
           'running_coordinators': 0,
           'total_bundles': 0,
           'running_bundles': 0
       }
       
       # 初始化示例作业
       self._create_example_jobs()
   
   def _create_example_jobs(self):
       """创建示例作业"""
       # 创建示例工作流
       workflow_id = f"0000001-{datetime.now().strftime('%Y%m%d%H%M%S')}-oozie-W"
       
       # 创建工作流动作
       start_action = WorkflowAction(
           name="start",
           action_type=ActionType.FS,
           status=ActionStatus.OK,
           transition="data-processing"
       )
       
       data_processing_action = WorkflowAction(
           name="data-processing",
           action_type=ActionType.MAP_REDUCE,
           status=ActionStatus.PREP,
           transition="data-validation",
           error_transition="fail",
           configuration={
               'mapred.job.queue.name': 'default',
               'mapred.mapper.class': 'org.example.DataMapper',
               'mapred.reducer.class': 'org.example.DataReducer',
               'mapred.input.dir': '/user/input',
               'mapred.output.dir': '/user/output'
           }
       )
       
       data_validation_action = WorkflowAction(
           name="data-validation",
           action_type=ActionType.HIVE,
           status=ActionStatus.PREP,
           transition="end",
           error_transition="fail",
           configuration={
               'hive.script': 'validate_data.hql',
               'hive.params': 'INPUT=/user/output,OUTPUT=/user/validated'
           }
       )
       
       end_action = WorkflowAction(
           name="end",
           action_type=ActionType.FS,
           status=ActionStatus.PREP
       )
       
       fail_action = WorkflowAction(
           name="fail",
           action_type=ActionType.FS,
           status=ActionStatus.PREP
       )
       
       workflow_job = WorkflowJob(
           id=workflow_id,
           app_name="data-processing-workflow",
           app_path="/user/oozie/workflows/data-processing",
           user="hadoop",
           group="hadoop",
           actions={
               "start": start_action,
               "data-processing": data_processing_action,
               "data-validation": data_validation_action,
               "end": end_action,
               "fail": fail_action
           },
           conf='<configuration><property><name>jobTracker</name><value>localhost:8032</value></property></configuration>'
       )
       
       self.workflow_jobs[workflow_id] = workflow_job
       self.stats['total_workflows'] += 1
       
       # 创建示例协调器
       coordinator_id = f"0000001-{datetime.now().strftime('%Y%m%d%H%M%S')}-oozie-C"
       
       coordinator_job = CoordinatorJob(
           id=coordinator_id,
           app_name="daily-data-processing",
           app_path="/user/oozie/coordinators/daily-processing",
           user="hadoop",
           group="hadoop",
           start_time=datetime.now(),
           end_time=datetime.now() + timedelta(days=30),
           frequency="${coord:days(1)}",
           concurrency=1,
           execution="FIFO",
           timeout=120
       )
       
       self.coordinator_jobs[coordinator_id] = coordinator_job
       self.stats['total_coordinators'] += 1
       
       # 创建示例Bundle
       bundle_id = f"0000001-{datetime.now().strftime('%Y%m%d%H%M%S')}-oozie-B"
       
       bundle_job = BundleJob(
           id=bundle_id,
           app_name="data-pipeline-bundle",
           app_path="/user/oozie/bundles/data-pipeline",
           user="hadoop",
           group="hadoop",
           start_time=datetime.now(),
           end_time=datetime.now() + timedelta(days=90),
           coordinators=[coordinator_id]
       )
       
       self.bundle_jobs[bundle_id] = bundle_job
       self.stats['total_bundles'] += 1
   
   def submit_workflow(self, app_path: str, config: Dict[str, Any],
                      user: str = "hadoop") -> Dict[str, Any]:
       """
       提交工作流
       
       Args:
           app_path: 应用路径
           config: 配置参数
           user: 用户名
           
       Returns:
           Dict[str, Any]: 提交结果
       """
       job_id = f"{len(self.workflow_jobs)+1:07d}-{datetime.now().strftime('%Y%m%d%H%M%S')}-oozie-W"
       
       # 解析应用名称
       app_name = app_path.split('/')[-1] if app_path else "workflow"
       
       # 创建工作流作业
       workflow_job = WorkflowJob(
           id=job_id,
           app_name=app_name,
           app_path=app_path,
           user=user,
           group=user,
           conf=json.dumps(config)
       )
       
       # 创建基本动作
       start_action = WorkflowAction(
           name="start",
           action_type=ActionType.FS,
           status=ActionStatus.OK,
           transition="main-action"
       )
       
       main_action = WorkflowAction(
           name="main-action",
           action_type=ActionType.MAP_REDUCE,
           status=ActionStatus.PREP,
           transition="end",
           error_transition="fail",
           configuration=config
       )
       
       end_action = WorkflowAction(
           name="end",
           action_type=ActionType.FS,
           status=ActionStatus.PREP
       )
       
       fail_action = WorkflowAction(
           name="fail",
           action_type=ActionType.FS,
           status=ActionStatus.PREP
       )
       
       workflow_job.actions = {
           "start": start_action,
           "main-action": main_action,
           "end": end_action,
           "fail": fail_action
       }
       
       with self.workflow_lock:
           self.workflow_jobs[job_id] = workflow_job
           self.stats['total_workflows'] += 1
       
       return {
           'status': 'success',
           'job_id': job_id,
           'message': f'Workflow submitted successfully'
       }
   
   def start_workflow(self, job_id: str) -> Dict[str, Any]:
       """
       启动工作流
       
       Args:
           job_id: 作业ID
           
       Returns:
           Dict[str, Any]: 启动结果
       """
       if job_id not in self.workflow_jobs:
           return {'status': 'error', 'message': f'Workflow {job_id} not found'}
       
       workflow_job = self.workflow_jobs[job_id]
       
       if workflow_job.status != WorkflowStatus.PREP:
           return {'status': 'error', 'message': f'Workflow {job_id} is not in PREP state'}
       
       with self.workflow_lock:
           workflow_job.status = WorkflowStatus.RUNNING
           workflow_job.start_time = datetime.now()
           workflow_job.run += 1
           self.stats['running_workflows'] += 1
       
       # 启动第一个动作
       self._execute_workflow_action(job_id, "start")
       
       return {
           'status': 'success',
           'job_id': job_id,
           'message': f'Workflow started successfully'
       }
   
   def _execute_workflow_action(self, job_id: str, action_name: str):
       """
       执行工作流动作
       
       Args:
           job_id: 作业ID
           action_name: 动作名称
       """
       workflow_job = self.workflow_jobs[job_id]
       action = workflow_job.actions.get(action_name)
       
       if not action:
           return
       
       # 模拟动作执行
       action.status = ActionStatus.RUNNING
       action.start_time = datetime.now()
       
       # 模拟执行时间
       execution_time = random.uniform(1, 5)
       time.sleep(execution_time)
       
       # 模拟执行结果(90%成功率)
       success = random.random() > 0.1
       
       action.end_time = datetime.now()
       
       if success:
           action.status = ActionStatus.OK
           # 执行下一个动作
           if action.transition and action.transition != "end":
               self._execute_workflow_action(job_id, action.transition)
           elif action.transition == "end":
               self._complete_workflow(job_id, WorkflowStatus.SUCCEEDED)
       else:
           action.status = ActionStatus.ERROR
           action.error_code = "E0001"
           action.error_message = "Action execution failed"
           
           # 执行错误转换
           if action.error_transition:
               if action.error_transition == "fail":
                   self._complete_workflow(job_id, WorkflowStatus.FAILED)
               else:
                   self._execute_workflow_action(job_id, action.error_transition)
   
   def _complete_workflow(self, job_id: str, status: WorkflowStatus):
       """
       完成工作流
       
       Args:
           job_id: 作业ID
           status: 最终状态
       """
       workflow_job = self.workflow_jobs[job_id]
       
       with self.workflow_lock:
           workflow_job.status = status
           workflow_job.end_time = datetime.now()
           self.stats['running_workflows'] -= 1
           
           if status == WorkflowStatus.SUCCEEDED:
               self.stats['succeeded_workflows'] += 1
           elif status == WorkflowStatus.FAILED:
               self.stats['failed_workflows'] += 1
   
   def get_workflow_job(self, job_id: str) -> Dict[str, Any]:
       """
       获取工作流作业信息
       
       Args:
           job_id: 作业ID
           
       Returns:
           Dict[str, Any]: 作业信息
       """
       if job_id not in self.workflow_jobs:
           return {'status': 'error', 'message': f'Workflow {job_id} not found'}
       
       workflow_job = self.workflow_jobs[job_id]
       
       # 获取动作信息
       actions_info = []
       for action_name, action in workflow_job.actions.items():
           actions_info.append({
               'name': action.name,
               'type': action.action_type.value,
               'status': action.status.value,
               'start_time': action.start_time.isoformat() if action.start_time else None,
               'end_time': action.end_time.isoformat() if action.end_time else None,
               'transition': action.transition,
               'error_transition': action.error_transition,
               'retry_max': action.retry_max,
               'error_code': action.error_code,
               'error_message': action.error_message
           })
       
       return {
           'status': 'success',
           'workflow': {
               'id': workflow_job.id,
               'app_name': workflow_job.app_name,
               'app_path': workflow_job.app_path,
               'status': workflow_job.status.value,
               'user': workflow_job.user,
               'group': workflow_job.group,
               'start_time': workflow_job.start_time.isoformat() if workflow_job.start_time else None,
               'end_time': workflow_job.end_time.isoformat() if workflow_job.end_time else None,
               'created_time': workflow_job.created_time.isoformat(),
               'last_modified_time': workflow_job.last_modified_time.isoformat(),
               'run': workflow_job.run,
               'actions': actions_info,
               'parent_id': workflow_job.parent_id,
               'external_id': workflow_job.external_id
           }
       }
   
   def kill_workflow(self, job_id: str) -> Dict[str, Any]:
       """
       终止工作流
       
       Args:
           job_id: 作业ID
           
       Returns:
           Dict[str, Any]: 终止结果
       """
       if job_id not in self.workflow_jobs:
           return {'status': 'error', 'message': f'Workflow {job_id} not found'}
       
       workflow_job = self.workflow_jobs[job_id]
       
       if workflow_job.status not in [WorkflowStatus.RUNNING, WorkflowStatus.SUSPENDED]:
           return {'status': 'error', 'message': f'Workflow {job_id} is not in a killable state'}
       
       with self.workflow_lock:
           workflow_job.status = WorkflowStatus.KILLED
           workflow_job.end_time = datetime.now()
           
           # 终止所有运行中的动作
           for action in workflow_job.actions.values():
               if action.status == ActionStatus.RUNNING:
                   action.status = ActionStatus.KILLED
                   action.end_time = datetime.now()
           
           if workflow_job.status == WorkflowStatus.RUNNING:
               self.stats['running_workflows'] -= 1
       
       return {
           'status': 'success',
           'job_id': job_id,
           'message': f'Workflow killed successfully'
       }
   
   def suspend_workflow(self, job_id: str) -> Dict[str, Any]:
       """
       暂停工作流
       
       Args:
           job_id: 作业ID
           
       Returns:
           Dict[str, Any]: 暂停结果
       """
       if job_id not in self.workflow_jobs:
           return {'status': 'error', 'message': f'Workflow {job_id} not found'}
       
       workflow_job = self.workflow_jobs[job_id]
       
       if workflow_job.status != WorkflowStatus.RUNNING:
           return {'status': 'error', 'message': f'Workflow {job_id} is not running'}
       
       with self.workflow_lock:
           workflow_job.status = WorkflowStatus.SUSPENDED
           workflow_job.last_modified_time = datetime.now()
       
       return {
           'status': 'success',
           'job_id': job_id,
           'message': f'Workflow suspended successfully'
       }
   
   def resume_workflow(self, job_id: str) -> Dict[str, Any]:
       """
       恢复工作流
       
       Args:
           job_id: 作业ID
           
       Returns:
           Dict[str, Any]: 恢复结果
       """
       if job_id not in self.workflow_jobs:
           return {'status': 'error', 'message': f'Workflow {job_id} not found'}
       
       workflow_job = self.workflow_jobs[job_id]
       
       if workflow_job.status != WorkflowStatus.SUSPENDED:
           return {'status': 'error', 'message': f'Workflow {job_id} is not suspended'}
       
       with self.workflow_lock:
           workflow_job.status = WorkflowStatus.RUNNING
           workflow_job.last_modified_time = datetime.now()
       
       return {
           'status': 'success',
           'job_id': job_id,
           'message': f'Workflow resumed successfully'
       }
   
   def submit_coordinator(self, app_path: str, config: Dict[str, Any],
                         user: str = "hadoop") -> Dict[str, Any]:
       """
       提交协调器
       
       Args:
           app_path: 应用路径
           config: 配置参数
           user: 用户名
           
       Returns:
           Dict[str, Any]: 提交结果
       """
       job_id = f"{len(self.coordinator_jobs)+1:07d}-{datetime.now().strftime('%Y%m%d%H%M%S')}-oozie-C"
       
       # 解析应用名称
       app_name = app_path.split('/')[-1] if app_path else "coordinator"
       
       # 创建协调器作业
       coordinator_job = CoordinatorJob(
           id=job_id,
           app_name=app_name,
           app_path=app_path,
           user=user,
           group=user,
           start_time=datetime.now(),
           end_time=datetime.now() + timedelta(days=30),
           frequency=config.get('frequency', '${coord:days(1)}'),
           concurrency=config.get('concurrency', 1),
           execution=config.get('execution', 'FIFO'),
           timeout=config.get('timeout', 120),
           conf=json.dumps(config)
       )
       
       with self.coordinator_lock:
           self.coordinator_jobs[job_id] = coordinator_job
           self.stats['total_coordinators'] += 1
       
       return {
           'status': 'success',
           'job_id': job_id,
           'message': f'Coordinator submitted successfully'
       }
   
   def start_coordinator(self, job_id: str) -> Dict[str, Any]:
       """
       启动协调器
       
       Args:
           job_id: 作业ID
           
       Returns:
           Dict[str, Any]: 启动结果
       """
       if job_id not in self.coordinator_jobs:
           return {'status': 'error', 'message': f'Coordinator {job_id} not found'}
       
       coordinator_job = self.coordinator_jobs[job_id]
       
       if coordinator_job.status != CoordinatorStatus.PREP:
           return {'status': 'error', 'message': f'Coordinator {job_id} is not in PREP state'}
       
       with self.coordinator_lock:
           coordinator_job.status = CoordinatorStatus.RUNNING
           coordinator_job.last_modified_time = datetime.now()
           coordinator_job.next_materialized_time = datetime.now() + timedelta(days=1)
           self.stats['running_coordinators'] += 1
       
       return {
           'status': 'success',
           'job_id': job_id,
           'message': f'Coordinator started successfully'
       }
   
   def get_coordinator_job(self, job_id: str) -> Dict[str, Any]:
       """
       获取协调器作业信息
       
       Args:
           job_id: 作业ID
           
       Returns:
           Dict[str, Any]: 作业信息
       """
       if job_id not in self.coordinator_jobs:
           return {'status': 'error', 'message': f'Coordinator {job_id} not found'}
       
       coordinator_job = self.coordinator_jobs[job_id]
       
       # 获取动作信息
       actions_info = []
       for action_id, action in coordinator_job.actions.items():
           actions_info.append({
               'id': action.id,
               'job_id': action.job_id,
               'status': action.status.value,
               'external_id': action.external_id,
               'nominal_time': action.nominal_time.isoformat() if action.nominal_time else None,
               'created_time': action.created_time.isoformat(),
               'missing_dependencies': action.missing_dependencies,
               'timeout': action.timeout,
               'error_code': action.error_code,
               'error_message': action.error_message
           })
       
       return {
           'status': 'success',
           'coordinator': {
               'id': coordinator_job.id,
               'app_name': coordinator_job.app_name,
               'app_path': coordinator_job.app_path,
               'status': coordinator_job.status.value,
               'user': coordinator_job.user,
               'group': coordinator_job.group,
               'start_time': coordinator_job.start_time.isoformat() if coordinator_job.start_time else None,
               'end_time': coordinator_job.end_time.isoformat() if coordinator_job.end_time else None,
               'pause_time': coordinator_job.pause_time.isoformat() if coordinator_job.pause_time else None,
               'created_time': coordinator_job.created_time.isoformat(),
               'last_modified_time': coordinator_job.last_modified_time.isoformat(),
               'next_materialized_time': coordinator_job.next_materialized_time.isoformat() if coordinator_job.next_materialized_time else None,
               'frequency': coordinator_job.frequency,
               'time_zone': coordinator_job.time_zone,
               'concurrency': coordinator_job.concurrency,
               'execution': coordinator_job.execution,
               'timeout': coordinator_job.timeout,
               'actions': actions_info,
               'mat_throttling': coordinator_job.mat_throttling
           }
       }
   
   def submit_bundle(self, app_path: str, config: Dict[str, Any],
                    user: str = "hadoop") -> Dict[str, Any]:
       """
       提交Bundle
       
       Args:
           app_path: 应用路径
           config: 配置参数
           user: 用户名
           
       Returns:
           Dict[str, Any]: 提交结果
       """
       job_id = f"{len(self.bundle_jobs)+1:07d}-{datetime.now().strftime('%Y%m%d%H%M%S')}-oozie-B"
       
       # 解析应用名称
       app_name = app_path.split('/')[-1] if app_path else "bundle"
       
       # 创建Bundle作业
       bundle_job = BundleJob(
           id=job_id,
           app_name=app_name,
           app_path=app_path,
           user=user,
           group=user,
           start_time=datetime.now(),
           end_time=datetime.now() + timedelta(days=90),
           coordinators=config.get('coordinators', []),
           conf=json.dumps(config)
       )
       
       with self.bundle_lock:
           self.bundle_jobs[job_id] = bundle_job
           self.stats['total_bundles'] += 1
       
       return {
           'status': 'success',
           'job_id': job_id,
           'message': f'Bundle submitted successfully'
       }
   
   def start_bundle(self, job_id: str) -> Dict[str, Any]:
       """
       启动Bundle
       
       Args:
           job_id: 作业ID
           
       Returns:
           Dict[str, Any]: 启动结果
       """
       if job_id not in self.bundle_jobs:
           return {'status': 'error', 'message': f'Bundle {job_id} not found'}
       
       bundle_job = self.bundle_jobs[job_id]
       
       if bundle_job.status != BundleStatus.PREP:
           return {'status': 'error', 'message': f'Bundle {job_id} is not in PREP state'}
       
       with self.bundle_lock:
           bundle_job.status = BundleStatus.RUNNING
           bundle_job.kick_off_time = datetime.now()
           self.stats['running_bundles'] += 1
       
       return {
           'status': 'success',
           'job_id': job_id,
           'message': f'Bundle started successfully'
       }
   
   def get_bundle_job(self, job_id: str) -> Dict[str, Any]:
       """
       获取Bundle作业信息
       
       Args:
           job_id: 作业ID
           
       Returns:
           Dict[str, Any]: 作业信息
       """
       if job_id not in self.bundle_jobs:
           return {'status': 'error', 'message': f'Bundle {job_id} not found'}
       
       bundle_job = self.bundle_jobs[job_id]
       
       return {
           'status': 'success',
           'bundle': {
               'id': bundle_job.id,
               'app_name': bundle_job.app_name,
               'app_path': bundle_job.app_path,
               'status': bundle_job.status.value,
               'user': bundle_job.user,
               'group': bundle_job.group,
               'start_time': bundle_job.start_time.isoformat() if bundle_job.start_time else None,
               'end_time': bundle_job.end_time.isoformat() if bundle_job.end_time else None,
               'pause_time': bundle_job.pause_time.isoformat() if bundle_job.pause_time else None,
               'created_time': bundle_job.created_time.isoformat(),
               'last_modified_time': bundle_job.last_modified_time.isoformat(),
               'kick_off_time': bundle_job.kick_off_time.isoformat() if bundle_job.kick_off_time else None,
               'time_zone': bundle_job.time_zone,
               'coordinators': bundle_job.coordinators
           }
       }
   
   def get_jobs(self, job_type: str = "workflow", status: Optional[str] = None,
               user: Optional[str] = None, limit: int = 50) -> Dict[str, Any]:
       """
       获取作业列表
       
       Args:
           job_type: 作业类型 (workflow, coordinator, bundle)
           status: 状态过滤
           user: 用户过滤
           limit: 限制数量
           
       Returns:
           Dict[str, Any]: 作业列表
       """
       jobs_info = []
       count = 0
       
       if job_type == "workflow":
           for job_id, job in self.workflow_jobs.items():
               if count >= limit:
                   break
                   
               # 应用过滤条件
               if status and job.status.value != status:
                   continue
               if user and job.user != user:
                   continue
               
               jobs_info.append({
                   'id': job.id,
                   'app_name': job.app_name,
                   'status': job.status.value,
                   'user': job.user,
                   'group': job.group,
                   'start_time': job.start_time.isoformat() if job.start_time else None,
                   'end_time': job.end_time.isoformat() if job.end_time else None,
                   'created_time': job.created_time.isoformat(),
                   'run': job.run
               })
               count += 1
       
       elif job_type == "coordinator":
           for job_id, job in self.coordinator_jobs.items():
               if count >= limit:
                   break
                   
               # 应用过滤条件
               if status and job.status.value != status:
                   continue
               if user and job.user != user:
                   continue
               
               jobs_info.append({
                   'id': job.id,
                   'app_name': job.app_name,
                   'status': job.status.value,
                   'user': job.user,
                   'group': job.group,
                   'start_time': job.start_time.isoformat() if job.start_time else None,
                   'end_time': job.end_time.isoformat() if job.end_time else None,
                   'created_time': job.created_time.isoformat(),
                   'frequency': job.frequency,
                   'concurrency': job.concurrency
               })
               count += 1
       
       elif job_type == "bundle":
           for job_id, job in self.bundle_jobs.items():
               if count >= limit:
                   break
                   
               # 应用过滤条件
               if status and job.status.value != status:
                   continue
               if user and job.user != user:
                   continue
               
               jobs_info.append({
                   'id': job.id,
                   'app_name': job.app_name,
                   'status': job.status.value,
                   'user': job.user,
                   'group': job.group,
                   'start_time': job.start_time.isoformat() if job.start_time else None,
                   'end_time': job.end_time.isoformat() if job.end_time else None,
                   'created_time': job.created_time.isoformat(),
                   'coordinators_count': len(job.coordinators)
               })
               count += 1
       
       return {
           'status': 'success',
           'jobs': jobs_info,
           'total': len(jobs_info),
           'job_type': job_type
       }
   
   def get_server_status(self) -> Dict[str, Any]:
       """
       获取服务器状态
       
       Returns:
           Dict[str, Any]: 服务器状态
       """
       return {
           'server_url': self.server_url,
           'is_running': self.is_running,
           'build_version': self.build_version,
           'system_mode': self.system_mode,
           'java_version': self.java_version,
           'os_name': self.os_name,
           'os_version': self.os_version,
           'stats': self.stats,
           'config': self.config,
           'timestamp': datetime.now().isoformat()
       }

# 使用示例
if __name__ == "__main__":
   # 创建Oozie服务器
   oozie = OozieServer("http://localhost:11000/oozie")
   
   print("=== Apache Oozie工作流引擎示例 ===")
   
   # 提交工作流
   print("\n=== 提交工作流 ===")
   workflow_config = {
       'jobTracker': 'localhost:8032',
       'nameNode': 'hdfs://localhost:9000',
       'queueName': 'default',
       'inputDir': '/user/input',
       'outputDir': '/user/output'
   }
   
   submit_result = oozie.submit_workflow(
       app_path="/user/oozie/workflows/data-processing",
       config=workflow_config,
       user="hadoop"
   )
   print(f"提交结果: {submit_result}")
   
   if submit_result['status'] == 'success':
       workflow_id = submit_result['job_id']
       
       # 启动工作流
       print("\n=== 启动工作流 ===")
       start_result = oozie.start_workflow(workflow_id)
       print(f"启动结果: {start_result}")
       
       # 等待一段时间
       time.sleep(2)
       
       # 获取工作流信息
       print("\n=== 工作流信息 ===")
       workflow_info = oozie.get_workflow_job(workflow_id)
       if workflow_info['status'] == 'success':
           workflow = workflow_info['workflow']
           print(f"工作流ID: {workflow['id']}")
           print(f"应用名称: {workflow['app_name']}")
           print(f"状态: {workflow['status']}")
           print(f"用户: {workflow['user']}")
           print(f"开始时间: {workflow['start_time']}")
           print(f"运行次数: {workflow['run']}")
           print("动作列表:")
           for action in workflow['actions']:
               print(f"  - {action['name']}: {action['status']} ({action['type']})")
               if action['transition']:
                   print(f"    成功转换: {action['transition']}")
               if action['error_transition']:
                   print(f"    错误转换: {action['error_transition']}")
   
   # 提交协调器
   print("\n=== 提交协调器 ===")
   coordinator_config = {
       'frequency': '${coord:days(1)}',
       'concurrency': 1,
       'execution': 'FIFO',
       'timeout': 120,
       'workflowPath': '/user/oozie/workflows/daily-processing'
   }
   
   coord_submit_result = oozie.submit_coordinator(
       app_path="/user/oozie/coordinators/daily-processing",
       config=coordinator_config,
       user="hadoop"
   )
   print(f"协调器提交结果: {coord_submit_result}")
   
   if coord_submit_result['status'] == 'success':
       coordinator_id = coord_submit_result['job_id']
       
       # 启动协调器
       coord_start_result = oozie.start_coordinator(coordinator_id)
       print(f"协调器启动结果: {coord_start_result}")
       
       # 获取协调器信息
       print("\n=== 协调器信息 ===")
       coord_info = oozie.get_coordinator_job(coordinator_id)
       if coord_info['status'] == 'success':
           coordinator = coord_info['coordinator']
           print(f"协调器ID: {coordinator['id']}")
           print(f"应用名称: {coordinator['app_name']}")
           print(f"状态: {coordinator['status']}")
           print(f"频率: {coordinator['frequency']}")
           print(f"并发数: {coordinator['concurrency']}")
           print(f"执行策略: {coordinator['execution']}")
           print(f"下次物化时间: {coordinator['next_materialized_time']}")
   
   # 提交Bundle
   print("\n=== 提交Bundle ===")
   bundle_config = {
       'coordinators': [coordinator_id] if coord_submit_result['status'] == 'success' else [],
       'kickOffTime': datetime.now().isoformat()
   }
   
   bundle_submit_result = oozie.submit_bundle(
       app_path="/user/oozie/bundles/data-pipeline",
       config=bundle_config,
       user="hadoop"
   )
   print(f"Bundle提交结果: {bundle_submit_result}")
   
   if bundle_submit_result['status'] == 'success':
       bundle_id = bundle_submit_result['job_id']
       
       # 启动Bundle
       bundle_start_result = oozie.start_bundle(bundle_id)
       print(f"Bundle启动结果: {bundle_start_result}")
       
       # 获取Bundle信息
       print("\n=== Bundle信息 ===")
       bundle_info = oozie.get_bundle_job(bundle_id)
       if bundle_info['status'] == 'success':
           bundle = bundle_info['bundle']
           print(f"Bundle ID: {bundle['id']}")
           print(f"应用名称: {bundle['app_name']}")
           print(f"状态: {bundle['status']}")
           print(f"协调器数量: {len(bundle['coordinators'])}")
           print(f"启动时间: {bundle['kick_off_time']}")
   
   # 获取作业列表
   print("\n=== 工作流作业列表 ===")
   workflows = oozie.get_jobs(job_type="workflow", limit=10)
   print(f"工作流总数: {workflows['total']}")
   for job in workflows['jobs']:
       print(f"  - {job['id']}: {job['app_name']} ({job['status']})")
   
   print("\n=== 协调器作业列表 ===")
   coordinators = oozie.get_jobs(job_type="coordinator", limit=10)
   print(f"协调器总数: {coordinators['total']}")
   for job in coordinators['jobs']:
       print(f"  - {job['id']}: {job['app_name']} ({job['status']}) - {job['frequency']}")
   
   # 获取服务器状态
   print("\n=== 服务器状态 ===")
   status = oozie.get_server_status()
   print(f"服务器URL: {status['server_url']}")
   print(f"运行状态: {status['is_running']}")
   print(f"版本: {status['build_version']}")
   print(f"系统模式: {status['system_mode']}")
   print(f"Java版本: {status['java_version']}")
   print("统计信息:")
   for key, value in status['stats'].items():
        print(f"  {key}: {value}")

8.2 Apache Flume详解

from typing import Dict, List, Any, Optional, Tuple, Union
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime, timedelta
import threading
import time
import random
import json
import uuid
from collections import defaultdict, deque
import re

class FlumeComponentType(Enum):
    """Flume组件类型"""
    SOURCE = "source"
    SINK = "sink"
    CHANNEL = "channel"
    INTERCEPTOR = "interceptor"
    SERIALIZER = "serializer"
    SINK_PROCESSOR = "sink_processor"

class FlumeComponentStatus(Enum):
    """Flume组件状态"""
    IDLE = "IDLE"                # 空闲
    START = "START"              # 启动中
    STOP = "STOP"                # 停止中
    ERROR = "ERROR"              # 错误

class SourceType(Enum):
    """Source类型"""
    SPOOLDIR = "spooldir"        # 目录监控
    TAILDIR = "taildir"          # 文件尾部监控
    KAFKA = "kafka"              # Kafka消费者
    HTTP = "http"                # HTTP接收器
    NETCAT = "netcat"            # Netcat
    EXEC = "exec"                # 命令执行
    JMS = "jms"                  # JMS消息
    AVRO = "avro"                # Avro RPC
    THRIFT = "thrift"            # Thrift RPC
    SEQUENCE_GENERATOR = "seq"   # 序列生成器

class SinkType(Enum):
    """Sink类型"""
    HDFS = "hdfs"                # HDFS写入
    HBASE = "hbase"              # HBase写入
    KAFKA = "kafka"              # Kafka生产者
    ELASTICSEARCH = "elasticsearch"  # Elasticsearch
    SOLR = "solr"                # Solr搜索
    LOGGER = "logger"            # 日志输出
    NULL = "null"                # 空输出
    AVRO = "avro"                # Avro RPC
    THRIFT = "thrift"            # Thrift RPC
    FILE_ROLL = "file_roll"      # 文件滚动
    HIVE = "hive"                # Hive写入

class ChannelType(Enum):
    """Channel类型"""
    MEMORY = "memory"            # 内存通道
    FILE = "file"                # 文件通道
    SPILLABLE_MEMORY = "spillablememory"  # 可溢出内存通道
    JDBC = "jdbc"                # JDBC通道
    KAFKA = "kafka"              # Kafka通道
    PSEUDO_TRANSACTION = "pseudotxn"  # 伪事务通道

class EventStatus(Enum):
    """事件状态"""
    READY = "READY"              # 就绪
    INFLIGHT = "INFLIGHT"        # 传输中
    TAKE = "TAKE"                # 已取出
    COMMIT = "COMMIT"            # 已提交
    ROLLBACK = "ROLLBACK"        # 已回滚

@dataclass
class FlumeEvent:
    """Flume事件"""
    event_id: str
    headers: Dict[str, str] = field(default_factory=dict)
    body: bytes = b""
    timestamp: datetime = field(default_factory=datetime.now)
    status: EventStatus = EventStatus.READY
    source_name: str = ""
    channel_name: str = ""
    sink_name: str = ""
    retry_count: int = 0
    max_retries: int = 3
    
    def get_header(self, key: str, default: str = "") -> str:
        """获取头部信息"""
        return self.headers.get(key, default)
    
    def set_header(self, key: str, value: str):
        """设置头部信息"""
        self.headers[key] = value
    
    def get_body_as_string(self, encoding: str = 'utf-8') -> str:
        """获取事件体字符串"""
        try:
            return self.body.decode(encoding)
        except UnicodeDecodeError:
            return self.body.decode('utf-8', errors='ignore')

@dataclass
class FlumeTransaction:
    """Flume事务"""
    transaction_id: str
    channel_name: str
    events: List[FlumeEvent] = field(default_factory=list)
    start_time: datetime = field(default_factory=datetime.now)
    end_time: Optional[datetime] = None
    is_committed: bool = False
    is_rolled_back: bool = False
    
    def add_event(self, event: FlumeEvent):
        """添加事件到事务"""
        self.events.append(event)
    
    def commit(self):
        """提交事务"""
        self.is_committed = True
        self.end_time = datetime.now()
        for event in self.events:
            event.status = EventStatus.COMMIT
    
    def rollback(self):
        """回滚事务"""
        self.is_rolled_back = True
        self.end_time = datetime.now()
        for event in self.events:
            event.status = EventStatus.ROLLBACK

@dataclass
class FlumeSource:
    """Flume Source"""
    name: str
    source_type: SourceType
    channels: List[str] = field(default_factory=list)
    config: Dict[str, Any] = field(default_factory=dict)
    status: FlumeComponentStatus = FlumeComponentStatus.IDLE
    events_received: int = 0
    events_accepted: int = 0
    events_rejected: int = 0
    last_activity: Optional[datetime] = None
    error_count: int = 0
    error_message: str = ""
    
    def add_channel(self, channel_name: str):
        """添加通道"""
        if channel_name not in self.channels:
            self.channels.append(channel_name)
    
    def remove_channel(self, channel_name: str):
        """移除通道"""
        if channel_name in self.channels:
            self.channels.remove(channel_name)

@dataclass
class FlumeSink:
    """Flume Sink"""
    name: str
    sink_type: SinkType
    channel: str
    config: Dict[str, Any] = field(default_factory=dict)
    status: FlumeComponentStatus = FlumeComponentStatus.IDLE
    events_drained: int = 0
    events_failed: int = 0
    connection_failures: int = 0
    batch_size: int = 100
    batch_timeout: int = 3000  # 毫秒
    last_activity: Optional[datetime] = None
    error_count: int = 0
    error_message: str = ""

@dataclass
class FlumeChannel:
    """Flume Channel"""
    name: str
    channel_type: ChannelType
    config: Dict[str, Any] = field(default_factory=dict)
    status: FlumeComponentStatus = FlumeComponentStatus.IDLE
    capacity: int = 10000
    transaction_capacity: int = 1000
    events: deque = field(default_factory=deque)
    transactions: Dict[str, FlumeTransaction] = field(default_factory=dict)
    events_put: int = 0
    events_take: int = 0
    channel_size: int = 0
    channel_fill_percentage: float = 0.0
    last_activity: Optional[datetime] = None
    
    def put_event(self, event: FlumeEvent, transaction_id: str) -> bool:
        """放入事件"""
        if len(self.events) >= self.capacity:
            return False
        
        if transaction_id not in self.transactions:
            self.transactions[transaction_id] = FlumeTransaction(
                transaction_id=transaction_id,
                channel_name=self.name
            )
        
        transaction = self.transactions[transaction_id]
        if len(transaction.events) >= self.transaction_capacity:
            return False
        
        event.status = EventStatus.INFLIGHT
        event.channel_name = self.name
        transaction.add_event(event)
        self.events_put += 1
        self.last_activity = datetime.now()
        return True
    
    def take_event(self, transaction_id: str) -> Optional[FlumeEvent]:
        """取出事件"""
        if not self.events:
            return None
        
        if transaction_id not in self.transactions:
            self.transactions[transaction_id] = FlumeTransaction(
                transaction_id=transaction_id,
                channel_name=self.name
            )
        
        event = self.events.popleft()
        event.status = EventStatus.TAKE
        transaction = self.transactions[transaction_id]
        transaction.add_event(event)
        self.events_take += 1
        self.channel_size = len(self.events)
        self.channel_fill_percentage = (self.channel_size / self.capacity) * 100
        self.last_activity = datetime.now()
        return event
    
    def commit_transaction(self, transaction_id: str) -> bool:
        """提交事务"""
        if transaction_id not in self.transactions:
            return False
        
        transaction = self.transactions[transaction_id]
        transaction.commit()
        
        # 对于put操作,将事件添加到通道
        for event in transaction.events:
            if event.status == EventStatus.INFLIGHT:
                self.events.append(event)
                self.channel_size = len(self.events)
                self.channel_fill_percentage = (self.channel_size / self.capacity) * 100
        
        del self.transactions[transaction_id]
        return True
    
    def rollback_transaction(self, transaction_id: str) -> bool:
        """回滚事务"""
        if transaction_id not in self.transactions:
            return False
        
        transaction = self.transactions[transaction_id]
        transaction.rollback()
        
        # 对于take操作,将事件放回通道
        for event in transaction.events:
            if event.status == EventStatus.TAKE:
                self.events.appendleft(event)
                self.channel_size = len(self.events)
                self.channel_fill_percentage = (self.channel_size / self.capacity) * 100
        
        del self.transactions[transaction_id]
        return True

@dataclass
class FlumeAgent:
    """Flume Agent"""
    name: str
    host: str = "localhost"
    port: int = 41414
    sources: Dict[str, FlumeSource] = field(default_factory=dict)
    sinks: Dict[str, FlumeSink] = field(default_factory=dict)
    channels: Dict[str, FlumeChannel] = field(default_factory=dict)
    is_running: bool = False
    start_time: Optional[datetime] = None
    config_file: str = ""
    jvm_metrics: Dict[str, Any] = field(default_factory=dict)
    
class FlumeCluster:
    """
    Apache Flume数据收集集群
    """
    
    def __init__(self, cluster_name: str = "flume-cluster"):
        self.cluster_name = cluster_name
        
        # 数据存储
        self.agents = {}  # agent_name -> FlumeAgent
        
        # 集群状态
        self.is_running = True
        self.version = "1.11.0"
        self.java_version = "1.8.0_281"
        
        # 配置
        self.config = {
            'flume.monitoring.type': 'http',
            'flume.monitoring.port': 34545,
            'flume.root.logger': 'INFO,console',
            'flume.log.dir': './logs',
            'flume.log.file': 'flume.log'
        }
        
        # 线程锁
        self.agents_lock = threading.Lock()
        
        # 统计信息
        self.stats = {
            'total_agents': 0,
            'running_agents': 0,
            'total_sources': 0,
            'total_sinks': 0,
            'total_channels': 0,
            'total_events_processed': 0,
            'total_events_failed': 0,
            'average_throughput': 0.0
        }
        
        # 初始化示例代理
        self._create_example_agents()
    
    def _create_example_agents(self):
        """创建示例代理"""
        # 创建示例代理
        agent1 = FlumeAgent(
            name="web-log-agent",
            host="node1.example.com",
            port=41414
        )
        
        agent2 = FlumeAgent(
            name="app-log-agent",
            host="node2.example.com",
            port=41414
        )
        
        with self.agents_lock:
            self.agents[agent1.name] = agent1
            self.agents[agent2.name] = agent2
            self._update_stats()
    
    def _update_stats(self):
        """更新统计信息"""
        self.stats['total_agents'] = len(self.agents)
        self.stats['running_agents'] = len([a for a in self.agents.values() if a.is_running])
        self.stats['total_sources'] = sum(len(a.sources) for a in self.agents.values())
        self.stats['total_sinks'] = sum(len(a.sinks) for a in self.agents.values())
        self.stats['total_channels'] = sum(len(a.channels) for a in self.agents.values())
    
    def create_agent(self, agent_name: str, host: str = "localhost", port: int = 41414) -> Dict[str, Any]:
        """
        创建Flume代理
        
        Args:
            agent_name: 代理名称
            host: 主机地址
            port: 端口号
            
        Returns:
            Dict[str, Any]: 创建结果
        """
        if agent_name in self.agents:
            return {'status': 'error', 'message': f'Agent {agent_name} already exists'}
        
        agent = FlumeAgent(
            name=agent_name,
            host=host,
            port=port
        )
        
        with self.agents_lock:
            self.agents[agent_name] = agent
            self._update_stats()
        
        return {
            'status': 'success',
            'agent_name': agent_name,
            'host': host,
            'port': port,
            'message': f'Agent {agent_name} created successfully'
        }
    
    def start_agent(self, agent_name: str) -> Dict[str, Any]:
        """
        启动代理
        
        Args:
            agent_name: 代理名称
            
        Returns:
            Dict[str, Any]: 启动结果
        """
        if agent_name not in self.agents:
            return {'status': 'error', 'message': f'Agent {agent_name} not found'}
        
        agent = self.agents[agent_name]
        
        if agent.is_running:
            return {'status': 'error', 'message': f'Agent {agent_name} is already running'}
        
        agent.is_running = True
        agent.start_time = datetime.now()
        
        # 启动所有组件
        for source in agent.sources.values():
            source.status = FlumeComponentStatus.START
        
        for sink in agent.sinks.values():
            sink.status = FlumeComponentStatus.START
        
        for channel in agent.channels.values():
            channel.status = FlumeComponentStatus.START
        
        with self.agents_lock:
            self._update_stats()
        
        return {
            'status': 'success',
            'agent_name': agent_name,
            'start_time': agent.start_time.isoformat(),
            'message': f'Agent {agent_name} started successfully'
        }
    
    def stop_agent(self, agent_name: str) -> Dict[str, Any]:
        """
        停止代理
        
        Args:
            agent_name: 代理名称
            
        Returns:
            Dict[str, Any]: 停止结果
        """
        if agent_name not in self.agents:
            return {'status': 'error', 'message': f'Agent {agent_name} not found'}
        
        agent = self.agents[agent_name]
        
        if not agent.is_running:
            return {'status': 'error', 'message': f'Agent {agent_name} is not running'}
        
        agent.is_running = False
        
        # 停止所有组件
        for source in agent.sources.values():
            source.status = FlumeComponentStatus.STOP
        
        for sink in agent.sinks.values():
            sink.status = FlumeComponentStatus.STOP
        
        for channel in agent.channels.values():
            channel.status = FlumeComponentStatus.STOP
        
        with self.agents_lock:
            self._update_stats()
        
        return {
            'status': 'success',
            'agent_name': agent_name,
            'message': f'Agent {agent_name} stopped successfully'
        }
    
    def add_source(self, agent_name: str, source_name: str, source_type: SourceType,
                  channels: List[str], config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
        """
        添加Source
        
        Args:
            agent_name: 代理名称
            source_name: Source名称
            source_type: Source类型
            channels: 通道列表
            config: 配置
            
        Returns:
            Dict[str, Any]: 添加结果
        """
        if agent_name not in self.agents:
            return {'status': 'error', 'message': f'Agent {agent_name} not found'}
        
        agent = self.agents[agent_name]
        
        if source_name in agent.sources:
            return {'status': 'error', 'message': f'Source {source_name} already exists'}
        
        source = FlumeSource(
            name=source_name,
            source_type=source_type,
            channels=channels.copy(),
            config=config or {}
        )
        
        agent.sources[source_name] = source
        
        with self.agents_lock:
            self._update_stats()
        
        return {
            'status': 'success',
            'agent_name': agent_name,
            'source_name': source_name,
            'source_type': source_type.value,
            'channels': channels,
            'message': f'Source {source_name} added successfully'
        }
    
    def add_sink(self, agent_name: str, sink_name: str, sink_type: SinkType,
                channel: str, config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
        """
        添加Sink
        
        Args:
            agent_name: 代理名称
            sink_name: Sink名称
            sink_type: Sink类型
            channel: 通道名称
            config: 配置
            
        Returns:
            Dict[str, Any]: 添加结果
        """
        if agent_name not in self.agents:
            return {'status': 'error', 'message': f'Agent {agent_name} not found'}
        
        agent = self.agents[agent_name]
        
        if sink_name in agent.sinks:
            return {'status': 'error', 'message': f'Sink {sink_name} already exists'}
        
        sink = FlumeSink(
            name=sink_name,
            sink_type=sink_type,
            channel=channel,
            config=config or {}
        )
        
        agent.sinks[sink_name] = sink
        
        with self.agents_lock:
            self._update_stats()
        
        return {
            'status': 'success',
            'agent_name': agent_name,
            'sink_name': sink_name,
            'sink_type': sink_type.value,
            'channel': channel,
            'message': f'Sink {sink_name} added successfully'
        }
    
    def add_channel(self, agent_name: str, channel_name: str, channel_type: ChannelType,
                   config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
        """
        添加Channel
        
        Args:
            agent_name: 代理名称
            channel_name: Channel名称
            channel_type: Channel类型
            config: 配置
            
        Returns:
            Dict[str, Any]: 添加结果
        """
        if agent_name not in self.agents:
            return {'status': 'error', 'message': f'Agent {agent_name} not found'}
        
        agent = self.agents[agent_name]
        
        if channel_name in agent.channels:
            return {'status': 'error', 'message': f'Channel {channel_name} already exists'}
        
        channel_config = config or {}
        capacity = channel_config.get('capacity', 10000)
        transaction_capacity = channel_config.get('transactionCapacity', 1000)
        
        channel = FlumeChannel(
            name=channel_name,
            channel_type=channel_type,
            config=channel_config,
            capacity=capacity,
            transaction_capacity=transaction_capacity
        )
        
        agent.channels[channel_name] = channel
        
        with self.agents_lock:
            self._update_stats()
        
        return {
            'status': 'success',
            'agent_name': agent_name,
            'channel_name': channel_name,
            'channel_type': channel_type.value,
            'capacity': capacity,
            'message': f'Channel {channel_name} added successfully'
        }
    
    def send_event(self, agent_name: str, source_name: str, headers: Dict[str, str],
                  body: Union[str, bytes]) -> Dict[str, Any]:
        """
        发送事件
        
        Args:
            agent_name: 代理名称
            source_name: Source名称
            headers: 事件头部
            body: 事件体
            
        Returns:
            Dict[str, Any]: 发送结果
        """
        if agent_name not in self.agents:
            return {'status': 'error', 'message': f'Agent {agent_name} not found'}
        
        agent = self.agents[agent_name]
        
        if source_name not in agent.sources:
            return {'status': 'error', 'message': f'Source {source_name} not found'}
        
        source = agent.sources[source_name]
        
        if source.status != FlumeComponentStatus.START:
            return {'status': 'error', 'message': f'Source {source_name} is not running'}
        
        # 创建事件
        event_id = f"event-{uuid.uuid4().hex[:8]}"
        event_body = body.encode('utf-8') if isinstance(body, str) else body
        
        event = FlumeEvent(
            event_id=event_id,
            headers=headers.copy(),
            body=event_body,
            source_name=source_name
        )
        
        # 发送到所有配置的通道
        success_channels = []
        failed_channels = []
        
        for channel_name in source.channels:
            if channel_name not in agent.channels:
                failed_channels.append(channel_name)
                continue
            
            channel = agent.channels[channel_name]
            transaction_id = f"txn-{uuid.uuid4().hex[:8]}"
            
            if channel.put_event(event, transaction_id):
                if channel.commit_transaction(transaction_id):
                    success_channels.append(channel_name)
                else:
                    channel.rollback_transaction(transaction_id)
                    failed_channels.append(channel_name)
            else:
                failed_channels.append(channel_name)
        
        # 更新Source统计
        source.events_received += 1
        source.last_activity = datetime.now()
        
        if success_channels:
            source.events_accepted += 1
        else:
            source.events_rejected += 1
        
        return {
            'status': 'success' if success_channels else 'error',
            'event_id': event_id,
            'success_channels': success_channels,
            'failed_channels': failed_channels,
            'message': f'Event sent to {len(success_channels)} channels'
        }
    
    def process_events(self, agent_name: str, sink_name: str, batch_size: int = 100) -> Dict[str, Any]:
        """
        处理事件(Sink从Channel取出事件)
        
        Args:
            agent_name: 代理名称
            sink_name: Sink名称
            batch_size: 批次大小
            
        Returns:
            Dict[str, Any]: 处理结果
        """
        if agent_name not in self.agents:
            return {'status': 'error', 'message': f'Agent {agent_name} not found'}
        
        agent = self.agents[agent_name]
        
        if sink_name not in agent.sinks:
            return {'status': 'error', 'message': f'Sink {sink_name} not found'}
        
        sink = agent.sinks[sink_name]
        
        if sink.status != FlumeComponentStatus.START:
            return {'status': 'error', 'message': f'Sink {sink_name} is not running'}
        
        if sink.channel not in agent.channels:
            return {'status': 'error', 'message': f'Channel {sink.channel} not found'}
        
        channel = agent.channels[sink.channel]
        transaction_id = f"txn-{uuid.uuid4().hex[:8]}"
        
        # 取出事件批次
        events = []
        for _ in range(min(batch_size, sink.batch_size)):
            event = channel.take_event(transaction_id)
            if event is None:
                break
            events.append(event)
        
        if not events:
            return {
                'status': 'success',
                'events_processed': 0,
                'message': 'No events to process'
            }
        
        # 模拟事件处理(90%成功率)
        success = random.random() > 0.1
        
        if success:
            # 提交事务
            channel.commit_transaction(transaction_id)
            sink.events_drained += len(events)
            sink.last_activity = datetime.now()
            
            return {
                'status': 'success',
                'events_processed': len(events),
                'sink_type': sink.sink_type.value,
                'message': f'Successfully processed {len(events)} events'
            }
        else:
            # 回滚事务
            channel.rollback_transaction(transaction_id)
            sink.events_failed += len(events)
            sink.error_count += 1
            sink.error_message = "Failed to write events to destination"
            
            return {
                'status': 'error',
                'events_processed': 0,
                'events_failed': len(events),
                'message': 'Failed to process events'
            }
    
    def get_agent_status(self, agent_name: str) -> Dict[str, Any]:
        """
        获取代理状态
        
        Args:
            agent_name: 代理名称
            
        Returns:
            Dict[str, Any]: 代理状态
        """
        if agent_name not in self.agents:
            return {'status': 'error', 'message': f'Agent {agent_name} not found'}
        
        agent = self.agents[agent_name]
        
        # 收集Source信息
        sources_info = []
        for source_name, source in agent.sources.items():
            sources_info.append({
                'name': source.name,
                'type': source.source_type.value,
                'status': source.status.value,
                'channels': source.channels,
                'events_received': source.events_received,
                'events_accepted': source.events_accepted,
                'events_rejected': source.events_rejected,
                'last_activity': source.last_activity.isoformat() if source.last_activity else None
            })
        
        # 收集Sink信息
        sinks_info = []
        for sink_name, sink in agent.sinks.items():
            sinks_info.append({
                'name': sink.name,
                'type': sink.sink_type.value,
                'status': sink.status.value,
                'channel': sink.channel,
                'events_drained': sink.events_drained,
                'events_failed': sink.events_failed,
                'connection_failures': sink.connection_failures,
                'last_activity': sink.last_activity.isoformat() if sink.last_activity else None
            })
        
        # 收集Channel信息
        channels_info = []
        for channel_name, channel in agent.channels.items():
            channels_info.append({
                'name': channel.name,
                'type': channel.channel_type.value,
                'status': channel.status.value,
                'capacity': channel.capacity,
                'channel_size': channel.channel_size,
                'channel_fill_percentage': channel.channel_fill_percentage,
                'events_put': channel.events_put,
                'events_take': channel.events_take,
                'active_transactions': len(channel.transactions),
                'last_activity': channel.last_activity.isoformat() if channel.last_activity else None
            })
        
        return {
            'status': 'success',
            'agent': {
                'name': agent.name,
                'host': agent.host,
                'port': agent.port,
                'is_running': agent.is_running,
                'start_time': agent.start_time.isoformat() if agent.start_time else None,
                'sources': sources_info,
                'sinks': sinks_info,
                'channels': channels_info
            }
        }
    
    def list_agents(self) -> Dict[str, Any]:
        """
        列出所有代理
        
        Returns:
            Dict[str, Any]: 代理列表
        """
        agents_info = []
        
        for agent_name, agent in self.agents.items():
            agents_info.append({
                'name': agent.name,
                'host': agent.host,
                'port': agent.port,
                'is_running': agent.is_running,
                'start_time': agent.start_time.isoformat() if agent.start_time else None,
                'sources_count': len(agent.sources),
                'sinks_count': len(agent.sinks),
                'channels_count': len(agent.channels)
            })
        
        return {
            'status': 'success',
            'agents': agents_info,
            'total': len(agents_info)
        }
    
    def get_cluster_status(self) -> Dict[str, Any]:
        """
        获取集群状态
        
        Returns:
            Dict[str, Any]: 集群状态
        """
        # 计算总体统计
        total_events_processed = 0
        total_events_failed = 0
        
        for agent in self.agents.values():
            for source in agent.sources.values():
                total_events_processed += source.events_accepted
            for sink in agent.sinks.values():
                total_events_processed += sink.events_drained
                total_events_failed += sink.events_failed
        
        self.stats['total_events_processed'] = total_events_processed
        self.stats['total_events_failed'] = total_events_failed
        
        return {
            'cluster_name': self.cluster_name,
            'is_running': self.is_running,
            'version': self.version,
            'java_version': self.java_version,
            'stats': self.stats,
            'config': self.config,
            'timestamp': datetime.now().isoformat()
        }

# 使用示例
if __name__ == "__main__":
    # 创建Flume集群
    flume = FlumeCluster("production-flume")
    
    print("=== Apache Flume数据收集示例 ===")
    
    # 创建代理
    print("\n=== 创建Flume代理 ===")
    agent_result = flume.create_agent(
        agent_name="web-server-agent",
        host="web-server-01",
        port=41414
    )
    print(f"代理创建结果: {agent_result}")
    
    if agent_result['status'] == 'success':
        agent_name = agent_result['agent_name']
        
        # 添加Channel
        print("\n=== 添加Channel ===")
        channel_result = flume.add_channel(
            agent_name=agent_name,
            channel_name="memory-channel",
            channel_type=ChannelType.MEMORY,
            config={
                'capacity': 10000,
                'transactionCapacity': 1000
            }
        )
        print(f"Channel添加结果: {channel_result}")
        
        # 添加Source
        print("\n=== 添加Source ===")
        source_result = flume.add_source(
            agent_name=agent_name,
            source_name="spooldir-source",
            source_type=SourceType.SPOOLDIR,
            channels=["memory-channel"],
            config={
                'spoolDir': '/var/log/web/spool',
                'channels': 'memory-channel'
            }
        )
        print(f"Source添加结果: {source_result}")
        
        # 添加Sink
        print("\n=== 添加Sink ===")
        sink_result = flume.add_sink(
            agent_name=agent_name,
            sink_name="hdfs-sink",
            sink_type=SinkType.HDFS,
            channel="memory-channel",
            config={
                'hdfs.path': '/flume/events/%Y/%m/%d',
                'hdfs.fileType': 'DataStream',
                'hdfs.writeFormat': 'Text',
                'hdfs.batchSize': 1000,
                'hdfs.rollInterval': 600,
                'hdfs.rollSize': 268435456
            }
        )
        print(f"Sink添加结果: {sink_result}")
        
        # 启动代理
        print("\n=== 启动代理 ===")
        start_result = flume.start_agent(agent_name)
        print(f"代理启动结果: {start_result}")
        
        if start_result['status'] == 'success':
            # 发送事件
            print("\n=== 发送事件 ===")
            for i in range(10):
                event_result = flume.send_event(
                    agent_name=agent_name,
                    source_name="spooldir-source",
                    headers={
                        'timestamp': str(int(time.time())),
                        'host': 'web-server-01',
                        'source': 'access.log'
                    },
                    body=f"192.168.1.{100+i} - - [25/Dec/2023:10:00:{i:02d} +0000] \"GET /api/users HTTP/1.1\" 200 1234"
                )
                if i < 3:  # 只打印前3个结果
                    print(f"事件 {i+1} 发送结果: {event_result}")
            
            # 处理事件
            print("\n=== 处理事件 ===")
            for i in range(3):
                process_result = flume.process_events(
                    agent_name=agent_name,
                    sink_name="hdfs-sink",
                    batch_size=5
                )
                print(f"批次 {i+1} 处理结果: {process_result}")
                time.sleep(1)
            
            # 获取代理状态
            print("\n=== 代理状态 ===")
            agent_status = flume.get_agent_status(agent_name)
            if agent_status['status'] == 'success':
                agent_info = agent_status['agent']
                print(f"代理名称: {agent_info['name']}")
                print(f"主机: {agent_info['host']}:{agent_info['port']}")
                print(f"运行状态: {agent_info['is_running']}")
                print(f"启动时间: {agent_info['start_time']}")
                
                print("\nSources:")
                for source in agent_info['sources']:
                    print(f"  - {source['name']} ({source['type']}): {source['events_received']} received, {source['events_accepted']} accepted")
                
                print("\nChannels:")
                for channel in agent_info['channels']:
                    print(f"  - {channel['name']} ({channel['type']}): {channel['channel_size']}/{channel['capacity']} ({channel['channel_fill_percentage']:.1f}%)")
                
                print("\nSinks:")
                for sink in agent_info['sinks']:
                    print(f"  - {sink['name']} ({sink['type']}): {sink['events_drained']} drained, {sink['events_failed']} failed")
    
    # 列出所有代理
    print("\n=== 代理列表 ===")
    agents_list = flume.list_agents()
    if agents_list['status'] == 'success':
        print(f"总代理数: {agents_list['total']}")
        for agent in agents_list['agents']:
            print(f"  - {agent['name']} @ {agent['host']}:{agent['port']} - 运行: {agent['is_running']}")
    
    # 获取集群状态
    print("\n=== 集群状态 ===")
    cluster_status = flume.get_cluster_status()
    print(f"集群名称: {cluster_status['cluster_name']}")
    print(f"版本: {cluster_status['version']}")
    print(f"运行状态: {cluster_status['is_running']}")
    print("统计信息:")
    for key, value in cluster_status['stats'].items():
         print(f"  {key}: {value}")

8.3 Apache NiFi详解

from typing import Dict, List, Any, Optional, Tuple, Union, Set
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime, timedelta
import threading
import time
import random
import json
import uuid
from collections import defaultdict, deque
import re
from urllib.parse import urlparse

class ProcessorType(Enum):
    """处理器类型"""
    GET_FILE = "GetFile"                    # 获取文件
    PUT_FILE = "PutFile"                    # 写入文件
    GET_HTTP = "GetHTTP"                    # HTTP获取
    POST_HTTP = "PostHTTP"                  # HTTP发送
    GET_KAFKA = "GetKafka"                  # Kafka消费
    PUT_KAFKA = "PutKafka"                  # Kafka生产
    GET_HDFS = "GetHDFS"                    # HDFS读取
    PUT_HDFS = "PutHDFS"                    # HDFS写入
    EXTRACT_TEXT = "ExtractText"            # 文本提取
    REPLACE_TEXT = "ReplaceText"            # 文本替换
    ROUTE_ON_ATTRIBUTE = "RouteOnAttribute" # 属性路由
    UPDATE_ATTRIBUTE = "UpdateAttribute"    # 更新属性
    SPLIT_TEXT = "SplitText"                # 文本分割
    MERGE_CONTENT = "MergeContent"          # 内容合并
    COMPRESS_CONTENT = "CompressContent"    # 内容压缩
    DECOMPRESS_CONTENT = "DecompressContent" # 内容解压
    CONVERT_RECORD = "ConvertRecord"        # 记录转换
    QUERY_RECORD = "QueryRecord"            # 记录查询
    EXECUTE_SQL = "ExecuteSQL"              # SQL执行
    PUT_SQL = "PutSQL"                      # SQL写入
    GENERATE_FLOWFILE = "GenerateFlowFile"  # 生成FlowFile
    LOG_ATTRIBUTE = "LogAttribute"          # 日志属性

class ProcessorState(Enum):
    """处理器状态"""
    STOPPED = "STOPPED"                     # 停止
    RUNNING = "RUNNING"                     # 运行
    DISABLED = "DISABLED"                   # 禁用
    INVALID = "INVALID"                     # 无效
    VALIDATING = "VALIDATING"               # 验证中

class FlowFileState(Enum):
    """FlowFile状态"""
    ACTIVE = "ACTIVE"                       # 活跃
    QUEUED = "QUEUED"                       # 排队
    PROCESSING = "PROCESSING"               # 处理中
    COMPLETED = "COMPLETED"                 # 完成
    FAILED = "FAILED"                       # 失败
    PENALIZED = "PENALIZED"                 # 惩罚
    EXPIRED = "EXPIRED"                     # 过期

class RelationshipType(Enum):
    """关系类型"""
    SUCCESS = "success"                     # 成功
    FAILURE = "failure"                     # 失败
    ORIGINAL = "original"                   # 原始
    MATCHED = "matched"                     # 匹配
    UNMATCHED = "unmatched"                 # 不匹配
    RETRY = "retry"                         # 重试
    INVALID = "invalid"                     # 无效
    SPLIT = "split"                         # 分割
    MERGED = "merged"                       # 合并
    ROUTED = "routed"                       # 路由

class ConnectionType(Enum):
    """连接类型"""
    STANDARD = "standard"                   # 标准连接
    LOAD_BALANCE = "load_balance"           # 负载均衡
    FUNNEL = "funnel"                       # 漏斗
    SELF_LOOP = "self_loop"                 # 自循环

class QueuePriority(Enum):
    """队列优先级"""
    FIFO = "FirstInFirstOut"                # 先进先出
    LIFO = "LastInFirstOut"                 # 后进先出
    OLDEST_FIRST = "OldestFlowFileFirst"    # 最旧优先
    NEWEST_FIRST = "NewestFlowFileFirst"    # 最新优先
    LARGEST_FIRST = "LargestFlowFileFirst"  # 最大优先
    SMALLEST_FIRST = "SmallestFlowFileFirst" # 最小优先

@dataclass
class FlowFile:
    """NiFi FlowFile"""
    uuid: str
    attributes: Dict[str, str] = field(default_factory=dict)
    content: bytes = b""
    size: int = 0
    entry_date: datetime = field(default_factory=datetime.now)
    lineage_start_date: datetime = field(default_factory=datetime.now)
    last_queued_date: datetime = field(default_factory=datetime.now)
    queue_date_index: int = 0
    state: FlowFileState = FlowFileState.ACTIVE
    penalty_expiration: Optional[datetime] = None
    
    def __post_init__(self):
        if self.size == 0 and self.content:
            self.size = len(self.content)
    
    def get_attribute(self, key: str, default: str = "") -> str:
        """获取属性"""
        return self.attributes.get(key, default)
    
    def put_attribute(self, key: str, value: str):
        """设置属性"""
        self.attributes[key] = value
    
    def remove_attribute(self, key: str) -> bool:
        """移除属性"""
        if key in self.attributes:
            del self.attributes[key]
            return True
        return False
    
    def clone(self) -> 'FlowFile':
        """克隆FlowFile"""
        return FlowFile(
            uuid=f"clone-{uuid.uuid4().hex[:8]}",
            attributes=self.attributes.copy(),
            content=self.content,
            size=self.size,
            entry_date=self.entry_date,
            lineage_start_date=self.lineage_start_date
        )

@dataclass
class Relationship:
    """处理器关系"""
    name: str
    description: str = ""
    auto_terminate: bool = False
    
@dataclass
class Connection:
    """连接"""
    id: str
    name: str
    source_id: str
    destination_id: str
    relationships: List[str] = field(default_factory=list)
    connection_type: ConnectionType = ConnectionType.STANDARD
    queue_priority: QueuePriority = QueuePriority.FIFO
    flow_file_expiration: timedelta = field(default_factory=lambda: timedelta(seconds=0))
    back_pressure_object_threshold: int = 10000
    back_pressure_data_size_threshold: str = "1 GB"
    load_balance_strategy: str = "DO_NOT_LOAD_BALANCE"
    load_balance_partition_attribute: str = ""
    queue: deque = field(default_factory=deque)
    queued_count: int = 0
    queued_size: int = 0
    
    def enqueue(self, flow_file: FlowFile):
        """入队"""
        flow_file.state = FlowFileState.QUEUED
        flow_file.last_queued_date = datetime.now()
        self.queue.append(flow_file)
        self.queued_count += 1
        self.queued_size += flow_file.size
    
    def dequeue(self) -> Optional[FlowFile]:
        """出队"""
        if not self.queue:
            return None
        
        flow_file = self.queue.popleft()
        flow_file.state = FlowFileState.PROCESSING
        self.queued_count -= 1
        self.queued_size -= flow_file.size
        return flow_file
    
    def is_back_pressure_enabled(self) -> bool:
        """检查是否启用背压"""
        return (self.queued_count >= self.back_pressure_object_threshold or
                self.queued_size >= self._parse_data_size(self.back_pressure_data_size_threshold))
    
    def _parse_data_size(self, size_str: str) -> int:
        """解析数据大小字符串"""
        size_str = size_str.upper().strip()
        if size_str.endswith('KB'):
            return int(float(size_str[:-2]) * 1024)
        elif size_str.endswith('MB'):
            return int(float(size_str[:-2]) * 1024 * 1024)
        elif size_str.endswith('GB'):
            return int(float(size_str[:-2]) * 1024 * 1024 * 1024)
        elif size_str.endswith('TB'):
            return int(float(size_str[:-2]) * 1024 * 1024 * 1024 * 1024)
        else:
            return int(float(size_str))

@dataclass
class Processor:
    """NiFi处理器"""
    id: str
    name: str
    processor_type: ProcessorType
    state: ProcessorState = ProcessorState.STOPPED
    properties: Dict[str, str] = field(default_factory=dict)
    relationships: Dict[str, Relationship] = field(default_factory=dict)
    incoming_connections: List[str] = field(default_factory=list)
    outgoing_connections: List[str] = field(default_factory=list)
    run_duration_millis: int = 0
    scheduling_period: str = "0 sec"
    scheduling_strategy: str = "TIMER_DRIVEN"
    execution_node: str = "ALL"
    penalty_duration: str = "30 sec"
    yield_duration: str = "1 sec"
    bulletin_level: str = "WARN"
    run_schedule: Optional[str] = None
    concurrent_tasks: int = 1
    
    # 统计信息
    bytes_read: int = 0
    bytes_written: int = 0
    bytes_transferred: int = 0
    flow_files_received: int = 0
    flow_files_sent: int = 0
    flow_files_removed: int = 0
    invocations: int = 0
    processing_nanos: int = 0
    
    def add_relationship(self, name: str, description: str = "", auto_terminate: bool = False):
        """添加关系"""
        self.relationships[name] = Relationship(
            name=name,
            description=description,
            auto_terminate=auto_terminate
        )
    
    def get_property(self, key: str, default: str = "") -> str:
        """获取属性"""
        return self.properties.get(key, default)
    
    def set_property(self, key: str, value: str):
        """设置属性"""
        self.properties[key] = value

@dataclass
class ProcessGroup:
    """处理组"""
    id: str
    name: str
    parent_group_id: Optional[str] = None
    processors: Dict[str, Processor] = field(default_factory=dict)
    connections: Dict[str, Connection] = field(default_factory=dict)
    process_groups: Dict[str, 'ProcessGroup'] = field(default_factory=dict)
    input_ports: Dict[str, Any] = field(default_factory=dict)
    output_ports: Dict[str, Any] = field(default_factory=dict)
    labels: Dict[str, Any] = field(default_factory=dict)
    variables: Dict[str, str] = field(default_factory=dict)
    parameter_context_id: Optional[str] = None
    flow_file_concurrency: str = "UNBOUNDED"
    flow_file_outbound_policy: str = "STREAM_WHEN_AVAILABLE"
    default_flow_file_expiration: str = "0 sec"
    default_back_pressure_object_threshold: int = 10000
    default_back_pressure_data_size_threshold: str = "1 GB"
    
    def add_processor(self, processor: Processor):
        """添加处理器"""
        self.processors[processor.id] = processor
    
    def add_connection(self, connection: Connection):
        """添加连接"""
        self.connections[connection.id] = connection
    
    def get_variable(self, name: str, default: str = "") -> str:
        """获取变量"""
        return self.variables.get(name, default)
    
    def set_variable(self, name: str, value: str):
        """设置变量"""
        self.variables[name] = value

@dataclass
class FlowRegistry:
    """流注册表"""
    id: str
    name: str
    url: str
    description: str = ""
    
@dataclass
class VersionedFlow:
    """版本化流"""
    registry_id: str
    bucket_id: str
    flow_id: str
    version: int
    flow_name: str
    description: str = ""
    comments: str = ""
    
class NiFiCluster:
    """
    Apache NiFi数据流处理集群
    """
    
    def __init__(self, cluster_name: str = "nifi-cluster"):
        self.cluster_name = cluster_name
        
        # 数据存储
        self.process_groups = {}  # group_id -> ProcessGroup
        self.processors = {}      # processor_id -> Processor
        self.connections = {}     # connection_id -> Connection
        self.flow_registries = {} # registry_id -> FlowRegistry
        self.versioned_flows = {} # flow_id -> VersionedFlow
        
        # 集群状态
        self.is_running = True
        self.version = "1.18.0"
        self.java_version = "11.0.16"
        self.cluster_coordinator = "node1.example.com:11443"
        
        # 配置
        self.config = {
            'nifi.web.http.port': 8080,
            'nifi.web.https.port': 8443,
            'nifi.cluster.is.node': True,
            'nifi.cluster.node.address': 'localhost',
            'nifi.cluster.node.protocol.port': 11443,
            'nifi.zookeeper.connect.string': 'localhost:2181',
            'nifi.state.management.embedded.zookeeper.start': True,
            'nifi.flowfile.repository.implementation': 'org.apache.nifi.controller.repository.WriteAheadFlowFileRepository',
            'nifi.content.repository.implementation': 'org.apache.nifi.controller.repository.FileSystemRepository',
            'nifi.provenance.repository.implementation': 'org.apache.nifi.provenance.WriteAheadProvenanceRepository'
        }
        
        # 线程锁
        self.cluster_lock = threading.Lock()
        
        # 统计信息
        self.stats = {
            'total_processors': 0,
            'running_processors': 0,
            'stopped_processors': 0,
            'invalid_processors': 0,
            'total_connections': 0,
            'total_process_groups': 0,
            'active_threads': 0,
            'queued_flow_files': 0,
            'queued_content_size': 0,
            'bytes_read_5_min': 0,
            'bytes_written_5_min': 0,
            'bytes_transferred_5_min': 0,
            'flow_files_received_5_min': 0,
            'flow_files_sent_5_min': 0
        }
        
        # 初始化根处理组
        self._create_root_process_group()
        
        # 创建示例流
        self._create_example_flow()
    
    def _create_root_process_group(self):
        """创建根处理组"""
        root_group = ProcessGroup(
            id="root",
            name="NiFi Flow"
        )
        
        with self.cluster_lock:
            self.process_groups["root"] = root_group
            self._update_stats()
    
    def _create_example_flow(self):
        """创建示例数据流"""
        root_group = self.process_groups["root"]
        
        # 创建示例处理器
        generate_processor = Processor(
            id="generate-001",
            name="GenerateFlowFile",
            processor_type=ProcessorType.GENERATE_FLOWFILE
        )
        generate_processor.add_relationship("success", "成功生成FlowFile")
        generate_processor.set_property("File Size", "1KB")
        generate_processor.set_property("Batch Size", "1")
        generate_processor.set_property("Data Format", "Text")
        
        log_processor = Processor(
            id="log-001",
            name="LogAttribute",
            processor_type=ProcessorType.LOG_ATTRIBUTE
        )
        log_processor.add_relationship("success", "成功记录属性")
        log_processor.set_property("Log Level", "info")
        log_processor.set_property("Log Payload", "true")
        
        # 创建连接
        connection = Connection(
            id="conn-001",
            name="GenerateFlowFile to LogAttribute",
            source_id="generate-001",
            destination_id="log-001",
            relationships=["success"]
        )
        
        # 添加到根组
        root_group.add_processor(generate_processor)
        root_group.add_processor(log_processor)
        root_group.add_connection(connection)
        
        # 更新全局索引
        with self.cluster_lock:
            self.processors[generate_processor.id] = generate_processor
            self.processors[log_processor.id] = log_processor
            self.connections[connection.id] = connection
            self._update_stats()
    
    def _update_stats(self):
        """更新统计信息"""
        self.stats['total_processors'] = len(self.processors)
        self.stats['running_processors'] = len([p for p in self.processors.values() if p.state == ProcessorState.RUNNING])
        self.stats['stopped_processors'] = len([p for p in self.processors.values() if p.state == ProcessorState.STOPPED])
        self.stats['invalid_processors'] = len([p for p in self.processors.values() if p.state == ProcessorState.INVALID])
        self.stats['total_connections'] = len(self.connections)
        self.stats['total_process_groups'] = len(self.process_groups)
        
        # 计算队列统计
        total_queued = 0
        total_queued_size = 0
        for connection in self.connections.values():
            total_queued += connection.queued_count
            total_queued_size += connection.queued_size
        
        self.stats['queued_flow_files'] = total_queued
        self.stats['queued_content_size'] = total_queued_size
    
    def create_processor(self, group_id: str, processor_name: str, processor_type: ProcessorType,
                        properties: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
        """
        创建处理器
        
        Args:
            group_id: 处理组ID
            processor_name: 处理器名称
            processor_type: 处理器类型
            properties: 处理器属性
            
        Returns:
            Dict[str, Any]: 创建结果
        """
        if group_id not in self.process_groups:
            return {'status': 'error', 'message': f'Process group {group_id} not found'}
        
        processor_id = f"proc-{uuid.uuid4().hex[:8]}"
        
        processor = Processor(
            id=processor_id,
            name=processor_name,
            processor_type=processor_type,
            properties=properties or {}
        )
        
        # 根据处理器类型添加默认关系
        if processor_type in [ProcessorType.GET_FILE, ProcessorType.GET_HTTP, ProcessorType.GET_KAFKA]:
            processor.add_relationship("success", "成功获取数据")
            processor.add_relationship("failure", "获取数据失败")
        elif processor_type in [ProcessorType.PUT_FILE, ProcessorType.PUT_HDFS, ProcessorType.PUT_KAFKA]:
            processor.add_relationship("success", "成功写入数据")
            processor.add_relationship("failure", "写入数据失败")
        elif processor_type == ProcessorType.ROUTE_ON_ATTRIBUTE:
            processor.add_relationship("matched", "属性匹配")
            processor.add_relationship("unmatched", "属性不匹配")
        elif processor_type == ProcessorType.SPLIT_TEXT:
            processor.add_relationship("splits", "分割结果")
            processor.add_relationship("original", "原始文件")
            processor.add_relationship("failure", "分割失败")
        else:
            processor.add_relationship("success", "处理成功")
            processor.add_relationship("failure", "处理失败")
        
        group = self.process_groups[group_id]
        group.add_processor(processor)
        
        with self.cluster_lock:
            self.processors[processor_id] = processor
            self._update_stats()
        
        return {
            'status': 'success',
            'processor_id': processor_id,
            'processor_name': processor_name,
            'processor_type': processor_type.value,
            'group_id': group_id,
            'message': f'Processor {processor_name} created successfully'
        }
    
    def create_connection(self, source_id: str, destination_id: str, relationships: List[str],
                         name: Optional[str] = None) -> Dict[str, Any]:
        """
        创建连接
        
        Args:
            source_id: 源处理器ID
            destination_id: 目标处理器ID
            relationships: 关系列表
            name: 连接名称
            
        Returns:
            Dict[str, Any]: 创建结果
        """
        if source_id not in self.processors:
            return {'status': 'error', 'message': f'Source processor {source_id} not found'}
        
        if destination_id not in self.processors:
            return {'status': 'error', 'message': f'Destination processor {destination_id} not found'}
        
        source_processor = self.processors[source_id]
        destination_processor = self.processors[destination_id]
        
        # 验证关系是否存在
        for rel in relationships:
            if rel not in source_processor.relationships:
                return {'status': 'error', 'message': f'Relationship {rel} not found in source processor'}
        
        connection_id = f"conn-{uuid.uuid4().hex[:8]}"
        connection_name = name or f"{source_processor.name} to {destination_processor.name}"
        
        connection = Connection(
            id=connection_id,
            name=connection_name,
            source_id=source_id,
            destination_id=destination_id,
            relationships=relationships.copy()
        )
        
        # 更新处理器连接信息
        source_processor.outgoing_connections.append(connection_id)
        destination_processor.incoming_connections.append(connection_id)
        
        # 找到处理器所在的组并添加连接
        for group in self.process_groups.values():
            if source_id in group.processors:
                group.add_connection(connection)
                break
        
        with self.cluster_lock:
            self.connections[connection_id] = connection
            self._update_stats()
        
        return {
            'status': 'success',
            'connection_id': connection_id,
            'connection_name': connection_name,
            'source_id': source_id,
            'destination_id': destination_id,
            'relationships': relationships,
            'message': f'Connection {connection_name} created successfully'
        }
    
    def start_processor(self, processor_id: str) -> Dict[str, Any]:
        """
        启动处理器
        
        Args:
            processor_id: 处理器ID
            
        Returns:
            Dict[str, Any]: 启动结果
        """
        if processor_id not in self.processors:
            return {'status': 'error', 'message': f'Processor {processor_id} not found'}
        
        processor = self.processors[processor_id]
        
        if processor.state == ProcessorState.RUNNING:
            return {'status': 'error', 'message': f'Processor {processor.name} is already running'}
        
        if processor.state == ProcessorState.INVALID:
            return {'status': 'error', 'message': f'Processor {processor.name} is invalid and cannot be started'}
        
        processor.state = ProcessorState.RUNNING
        
        with self.cluster_lock:
            self._update_stats()
        
        return {
            'status': 'success',
            'processor_id': processor_id,
            'processor_name': processor.name,
            'message': f'Processor {processor.name} started successfully'
        }
    
    def stop_processor(self, processor_id: str) -> Dict[str, Any]:
        """
        停止处理器
        
        Args:
            processor_id: 处理器ID
            
        Returns:
            Dict[str, Any]: 停止结果
        """
        if processor_id not in self.processors:
            return {'status': 'error', 'message': f'Processor {processor_id} not found'}
        
        processor = self.processors[processor_id]
        
        if processor.state == ProcessorState.STOPPED:
            return {'status': 'error', 'message': f'Processor {processor.name} is already stopped'}
        
        processor.state = ProcessorState.STOPPED
        
        with self.cluster_lock:
            self._update_stats()
        
        return {
            'status': 'success',
            'processor_id': processor_id,
            'processor_name': processor.name,
            'message': f'Processor {processor.name} stopped successfully'
        }
    
    def create_flow_file(self, processor_id: str, content: Union[str, bytes],
                        attributes: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
        """
        创建FlowFile
        
        Args:
            processor_id: 处理器ID
            content: 内容
            attributes: 属性
            
        Returns:
            Dict[str, Any]: 创建结果
        """
        if processor_id not in self.processors:
            return {'status': 'error', 'message': f'Processor {processor_id} not found'}
        
        processor = self.processors[processor_id]
        
        if processor.state != ProcessorState.RUNNING:
            return {'status': 'error', 'message': f'Processor {processor.name} is not running'}
        
        # 创建FlowFile
        flow_file_uuid = f"ff-{uuid.uuid4().hex}"
        content_bytes = content.encode('utf-8') if isinstance(content, str) else content
        
        flow_file = FlowFile(
            uuid=flow_file_uuid,
            attributes=attributes or {},
            content=content_bytes,
            size=len(content_bytes)
        )
        
        # 设置默认属性
        flow_file.put_attribute("filename", f"flowfile-{flow_file_uuid[:8]}")
        flow_file.put_attribute("path", "/")
        flow_file.put_attribute("uuid", flow_file_uuid)
        
        # 更新处理器统计
        processor.flow_files_received += 1
        processor.bytes_read += flow_file.size
        processor.invocations += 1
        
        return {
            'status': 'success',
            'flow_file_uuid': flow_file_uuid,
            'size': flow_file.size,
            'attributes': flow_file.attributes,
            'message': f'FlowFile created successfully'
        }
    
    def transfer_flow_file(self, processor_id: str, flow_file_uuid: str,
                          relationship: str) -> Dict[str, Any]:
        """
        传输FlowFile
        
        Args:
            processor_id: 处理器ID
            flow_file_uuid: FlowFile UUID
            relationship: 关系名称
            
        Returns:
            Dict[str, Any]: 传输结果
        """
        if processor_id not in self.processors:
            return {'status': 'error', 'message': f'Processor {processor_id} not found'}
        
        processor = self.processors[processor_id]
        
        if relationship not in processor.relationships:
            return {'status': 'error', 'message': f'Relationship {relationship} not found'}
        
        # 查找目标连接
        target_connections = []
        for conn_id in processor.outgoing_connections:
            connection = self.connections[conn_id]
            if relationship in connection.relationships:
                target_connections.append(connection)
        
        if not target_connections:
            # 检查是否自动终止
            if processor.relationships[relationship].auto_terminate:
                processor.flow_files_removed += 1
                return {
                    'status': 'success',
                    'action': 'auto_terminated',
                    'message': f'FlowFile auto-terminated on relationship {relationship}'
                }
            else:
                return {'status': 'error', 'message': f'No connection found for relationship {relationship}'}
        
        # 创建模拟FlowFile(实际应该从会话中获取)
        flow_file = FlowFile(
            uuid=flow_file_uuid,
            attributes={'filename': f'flowfile-{flow_file_uuid[:8]}'},
            content=b'sample content',
            size=14
        )
        
        # 传输到所有目标连接
        transferred_count = 0
        for connection in target_connections:
            connection.enqueue(flow_file.clone())
            transferred_count += 1
        
        # 更新处理器统计
        processor.flow_files_sent += transferred_count
        processor.bytes_written += flow_file.size * transferred_count
        
        with self.cluster_lock:
            self._update_stats()
        
        return {
            'status': 'success',
            'flow_file_uuid': flow_file_uuid,
            'relationship': relationship,
            'transferred_count': transferred_count,
            'target_connections': [conn.id for conn in target_connections],
            'message': f'FlowFile transferred to {transferred_count} connections'
        }
    
    def get_processor_status(self, processor_id: str) -> Dict[str, Any]:
        """
        获取处理器状态
        
        Args:
            processor_id: 处理器ID
            
        Returns:
            Dict[str, Any]: 处理器状态
        """
        if processor_id not in self.processors:
            return {'status': 'error', 'message': f'Processor {processor_id} not found'}
        
        processor = self.processors[processor_id]
        
        # 计算连接队列信息
        incoming_queued = 0
        outgoing_queued = 0
        
        for conn_id in processor.incoming_connections:
            if conn_id in self.connections:
                incoming_queued += self.connections[conn_id].queued_count
        
        for conn_id in processor.outgoing_connections:
            if conn_id in self.connections:
                outgoing_queued += self.connections[conn_id].queued_count
        
        return {
            'status': 'success',
            'processor': {
                'id': processor.id,
                'name': processor.name,
                'type': processor.processor_type.value,
                'state': processor.state.value,
                'properties': processor.properties,
                'relationships': {name: rel.name for name, rel in processor.relationships.items()},
                'incoming_connections': processor.incoming_connections,
                'outgoing_connections': processor.outgoing_connections,
                'concurrent_tasks': processor.concurrent_tasks,
                'scheduling_period': processor.scheduling_period,
                'scheduling_strategy': processor.scheduling_strategy,
                'stats': {
                    'bytes_read': processor.bytes_read,
                    'bytes_written': processor.bytes_written,
                    'bytes_transferred': processor.bytes_transferred,
                    'flow_files_received': processor.flow_files_received,
                    'flow_files_sent': processor.flow_files_sent,
                    'flow_files_removed': processor.flow_files_removed,
                    'invocations': processor.invocations,
                    'processing_nanos': processor.processing_nanos,
                    'incoming_queued': incoming_queued,
                    'outgoing_queued': outgoing_queued
                }
            }
        }
    
    def get_connection_status(self, connection_id: str) -> Dict[str, Any]:
        """
        获取连接状态
        
        Args:
            connection_id: 连接ID
            
        Returns:
            Dict[str, Any]: 连接状态
        """
        if connection_id not in self.connections:
            return {'status': 'error', 'message': f'Connection {connection_id} not found'}
        
        connection = self.connections[connection_id]
        
        return {
            'status': 'success',
            'connection': {
                'id': connection.id,
                'name': connection.name,
                'source_id': connection.source_id,
                'destination_id': connection.destination_id,
                'relationships': connection.relationships,
                'connection_type': connection.connection_type.value,
                'queue_priority': connection.queue_priority.value,
                'back_pressure_object_threshold': connection.back_pressure_object_threshold,
                'back_pressure_data_size_threshold': connection.back_pressure_data_size_threshold,
                'load_balance_strategy': connection.load_balance_strategy,
                'stats': {
                    'queued_count': connection.queued_count,
                    'queued_size': connection.queued_size,
                    'is_back_pressure_enabled': connection.is_back_pressure_enabled()
                }
            }
        }
    
    def list_processors(self, group_id: Optional[str] = None) -> Dict[str, Any]:
        """
        列出处理器
        
        Args:
            group_id: 处理组ID(可选)
            
        Returns:
            Dict[str, Any]: 处理器列表
        """
        processors_info = []
        
        if group_id:
            if group_id not in self.process_groups:
                return {'status': 'error', 'message': f'Process group {group_id} not found'}
            
            group = self.process_groups[group_id]
            target_processors = group.processors
        else:
            target_processors = self.processors
        
        for processor_id, processor in target_processors.items():
            processors_info.append({
                'id': processor.id,
                'name': processor.name,
                'type': processor.processor_type.value,
                'state': processor.state.value,
                'group_id': group_id or 'root',
                'concurrent_tasks': processor.concurrent_tasks,
                'flow_files_received': processor.flow_files_received,
                'flow_files_sent': processor.flow_files_sent
            })
        
        return {
            'status': 'success',
            'processors': processors_info,
            'total': len(processors_info)
        }
    
    def get_cluster_status(self) -> Dict[str, Any]:
        """
        获取集群状态
        
        Returns:
            Dict[str, Any]: 集群状态
        """
        # 计算5分钟统计(模拟)
        total_bytes_read = sum(p.bytes_read for p in self.processors.values())
        total_bytes_written = sum(p.bytes_written for p in self.processors.values())
        total_flow_files_received = sum(p.flow_files_received for p in self.processors.values())
        total_flow_files_sent = sum(p.flow_files_sent for p in self.processors.values())
        
        self.stats['bytes_read_5_min'] = total_bytes_read
        self.stats['bytes_written_5_min'] = total_bytes_written
        self.stats['bytes_transferred_5_min'] = total_bytes_read + total_bytes_written
        self.stats['flow_files_received_5_min'] = total_flow_files_received
        self.stats['flow_files_sent_5_min'] = total_flow_files_sent
        self.stats['active_threads'] = len([p for p in self.processors.values() if p.state == ProcessorState.RUNNING]) * 2
        
        return {
            'cluster_name': self.cluster_name,
            'is_running': self.is_running,
            'version': self.version,
            'java_version': self.java_version,
            'cluster_coordinator': self.cluster_coordinator,
            'stats': self.stats,
            'config': self.config,
            'timestamp': datetime.now().isoformat()
        }

# 使用示例
if __name__ == "__main__":
    # 创建NiFi集群
    nifi = NiFiCluster("production-nifi")
    
    print("=== Apache NiFi数据流处理示例 ===")
    
    # 创建处理器
    print("\n=== 创建处理器 ===")
    
    # 创建文件获取处理器
    get_file_result = nifi.create_processor(
        group_id="root",
        processor_name="GetFile-WebLogs",
        processor_type=ProcessorType.GET_FILE,
        properties={
            'Input Directory': '/var/log/web',
            'File Filter': '.*\.log$',
            'Keep Source File': 'false',
            'Minimum File Age': '0 sec',
            'Polling Interval': '10 sec',
            'Batch Size': '10'
        }
    )
    print(f"GetFile处理器创建结果: {get_file_result}")
    
    # 创建文本替换处理器
    replace_text_result = nifi.create_processor(
        group_id="root",
        processor_name="ReplaceText-Anonymize",
        processor_type=ProcessorType.REPLACE_TEXT,
        properties={
            'Search Value': r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b',
            'Replacement Value': 'XXX.XXX.XXX.XXX',
            'Character Set': 'UTF-8',
            'Maximum Buffer Size': '1 MB',
            'Replacement Strategy': 'Regex Replace'
        }
    )
    print(f"ReplaceText处理器创建结果: {replace_text_result}")
    
    # 创建HDFS写入处理器
    put_hdfs_result = nifi.create_processor(
        group_id="root",
        processor_name="PutHDFS-Archive",
        processor_type=ProcessorType.PUT_HDFS,
        properties={
            'Hadoop Configuration Resources': '/etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml',
            'Directory': '/data/logs/web/${now():format("yyyy/MM/dd")}',
            'Conflict Resolution Strategy': 'replace',
            'Block Size': '128 MB',
            'Replication': '3',
            'Permissions umask': '022'
        }
    )
    print(f"PutHDFS处理器创建结果: {put_hdfs_result}")
    
    if (get_file_result['status'] == 'success' and 
        replace_text_result['status'] == 'success' and 
        put_hdfs_result['status'] == 'success'):
        
        get_file_id = get_file_result['processor_id']
        replace_text_id = replace_text_result['processor_id']
        put_hdfs_id = put_hdfs_result['processor_id']
        
        # 创建连接
        print("\n=== 创建连接 ===")
        
        # GetFile -> ReplaceText
        conn1_result = nifi.create_connection(
            source_id=get_file_id,
            destination_id=replace_text_id,
            relationships=["success"],
            name="GetFile to ReplaceText"
        )
        print(f"连接1创建结果: {conn1_result}")
        
        # ReplaceText -> PutHDFS
        conn2_result = nifi.create_connection(
            source_id=replace_text_id,
            destination_id=put_hdfs_id,
            relationships=["success"],
            name="ReplaceText to PutHDFS"
        )
        print(f"连接2创建结果: {conn2_result}")
        
        # 启动处理器
        print("\n=== 启动处理器 ===")
        
        start_results = []
        for processor_id in [get_file_id, replace_text_id, put_hdfs_id]:
            result = nifi.start_processor(processor_id)
            start_results.append(result)
            print(f"处理器 {processor_id} 启动结果: {result}")
        
        if all(r['status'] == 'success' for r in start_results):
            # 模拟FlowFile处理
            print("\n=== 模拟FlowFile处理 ===")
            
            # 创建FlowFile
            for i in range(5):
                flow_file_result = nifi.create_flow_file(
                    processor_id=get_file_id,
                    content=f"192.168.1.{100+i} - - [25/Dec/2023:10:00:{i:02d} +0000] \"GET /api/data HTTP/1.1\" 200 1234\n",
                    attributes={
                        'filename': f'access_{i+1}.log',
                        'path': '/var/log/web/',
                        'file.size': '78'
                    }
                )
                if i < 3:  # 只打印前3个结果
                    print(f"FlowFile {i+1} 创建结果: {flow_file_result}")
                
                if flow_file_result['status'] == 'success':
                    # 传输FlowFile
                    transfer_result = nifi.transfer_flow_file(
                        processor_id=get_file_id,
                        flow_file_uuid=flow_file_result['flow_file_uuid'],
                        relationship="success"
                    )
                    if i < 3:
                        print(f"FlowFile {i+1} 传输结果: {transfer_result}")
            
            # 获取处理器状态
            print("\n=== 处理器状态 ===")
            for processor_id, name in [(get_file_id, "GetFile"), (replace_text_id, "ReplaceText"), (put_hdfs_id, "PutHDFS")]:
                status = nifi.get_processor_status(processor_id)
                if status['status'] == 'success':
                    proc_info = status['processor']
                    stats = proc_info['stats']
                    print(f"{name} 处理器:")
                    print(f"  状态: {proc_info['state']}")
                    print(f"  接收FlowFiles: {stats['flow_files_received']}")
                    print(f"  发送FlowFiles: {stats['flow_files_sent']}")
                    print(f"  读取字节: {stats['bytes_read']}")
                    print(f"  写入字节: {stats['bytes_written']}")
                    print(f"  传入队列: {stats['incoming_queued']}")
                    print(f"  传出队列: {stats['outgoing_queued']}")
            
            # 获取连接状态
            print("\n=== 连接状态 ===")
            if conn1_result['status'] == 'success':
                conn_status = nifi.get_connection_status(conn1_result['connection_id'])
                if conn_status['status'] == 'success':
                    conn_info = conn_status['connection']
                    stats = conn_info['stats']
                    print(f"连接: {conn_info['name']}")
                    print(f"  队列数量: {stats['queued_count']}")
                    print(f"  队列大小: {stats['queued_size']} bytes")
                    print(f"  背压启用: {stats['is_back_pressure_enabled']}")
    
    # 列出所有处理器
    print("\n=== 处理器列表 ===")
    processors_list = nifi.list_processors()
    if processors_list['status'] == 'success':
        print(f"总处理器数: {processors_list['total']}")
        for proc in processors_list['processors']:
            print(f"  - {proc['name']} ({proc['type']}) - 状态: {proc['state']}")
    
    # 获取集群状态
    print("\n=== 集群状态 ===")
    cluster_status = nifi.get_cluster_status()
    print(f"集群名称: {cluster_status['cluster_name']}")
    print(f"版本: {cluster_status['version']}")
    print(f"运行状态: {cluster_status['is_running']}")
    print(f"集群协调器: {cluster_status['cluster_coordinator']}")
    print("统计信息:")
    for key, value in cluster_status['stats'].items():
        print(f"  {key}: {value}")
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐