06-Hadoop生态系统组件(3)
摘要 本文详细介绍了Apache Airflow工作流调度组件的核心实现,主要包括以下内容: 基本枚举类型:定义了任务状态(TaskState)、DAG状态(DagState)、触发规则(TriggerRule)、调度间隔(ScheduleInterval)和执行器类型(ExecutorType)等核心枚举类型。 核心数据结构: TaskInstance:表示任务运行实例,包含执行状态、时间戳、重
·
7. 工作流调度组件
7.1 Apache Airflow详解
from typing import Dict, List, Any, Optional, Tuple, Union, Callable
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime, timedelta
import threading
import time
import random
import json
import uuid
from collections import defaultdict, deque
import copy
class TaskState(Enum):
"""任务状态"""
NONE = "无状态"
SCHEDULED = "已调度"
QUEUED = "队列中"
RUNNING = "运行中"
SUCCESS = "成功"
FAILED = "失败"
UP_FOR_RETRY = "等待重试"
UP_FOR_RESCHEDULE = "等待重新调度"
UPSTREAM_FAILED = "上游失败"
SKIPPED = "跳过"
REMOVED = "已移除"
class DagState(Enum):
"""DAG状态"""
RUNNING = "运行中"
SUCCESS = "成功"
FAILED = "失败"
PAUSED = "暂停"
QUEUED = "队列中"
class TriggerRule(Enum):
"""触发规则"""
ALL_SUCCESS = "all_success" # 所有上游任务成功
ALL_FAILED = "all_failed" # 所有上游任务失败
ALL_DONE = "all_done" # 所有上游任务完成
ONE_SUCCESS = "one_success" # 至少一个上游任务成功
ONE_FAILED = "one_failed" # 至少一个上游任务失败
NONE_FAILED = "none_failed" # 没有上游任务失败
NONE_SKIPPED = "none_skipped" # 没有上游任务跳过
DUMMY = "dummy" # 总是触发
class ScheduleInterval(Enum):
"""调度间隔"""
ONCE = "@once" # 只执行一次
HOURLY = "@hourly" # 每小时
DAILY = "@daily" # 每天
WEEKLY = "@weekly" # 每周
MONTHLY = "@monthly" # 每月
YEARLY = "@yearly" # 每年
NONE = None # 不调度
class ExecutorType(Enum):
"""执行器类型"""
SEQUENTIAL = "SequentialExecutor"
LOCAL = "LocalExecutor"
CELERY = "CeleryExecutor"
KUBERNETES = "KubernetesExecutor"
DEBUG = "DebugExecutor"
@dataclass
class TaskInstance:
"""任务实例"""
task_id: str
dag_id: str
execution_date: datetime
state: TaskState = TaskState.NONE
start_date: Optional[datetime] = None
end_date: Optional[datetime] = None
duration: Optional[float] = None
try_number: int = 1
max_tries: int = 1
hostname: str = ""
unixname: str = ""
job_id: Optional[int] = None
pool: str = "default_pool"
queue: str = "default"
priority_weight: int = 1
operator: str = ""
queued_dttm: Optional[datetime] = None
pid: Optional[int] = None
executor_config: Dict[str, Any] = field(default_factory=dict)
log_url: str = ""
@dataclass
class Task:
"""任务定义"""
task_id: str
dag_id: str
operator_class: str
operator_kwargs: Dict[str, Any] = field(default_factory=dict)
upstream_task_ids: List[str] = field(default_factory=list)
downstream_task_ids: List[str] = field(default_factory=list)
trigger_rule: TriggerRule = TriggerRule.ALL_SUCCESS
depends_on_past: bool = False
wait_for_downstream: bool = False
retries: int = 0
retry_delay: timedelta = timedelta(minutes=5)
retry_exponential_backoff: bool = False
max_retry_delay: Optional[timedelta] = None
start_date: Optional[datetime] = None
end_date: Optional[datetime] = None
schedule_interval: Optional[str] = None
pool: str = "default_pool"
queue: str = "default"
priority_weight: int = 1
weight_rule: str = "downstream" # downstream, upstream, absolute
sla: Optional[timedelta] = None
execution_timeout: Optional[timedelta] = None
on_failure_callback: Optional[Callable] = None
on_success_callback: Optional[Callable] = None
on_retry_callback: Optional[Callable] = None
email_on_failure: bool = True
email_on_retry: bool = True
email: List[str] = field(default_factory=list)
@dataclass
class DagRun:
"""DAG运行实例"""
dag_id: str
run_id: str
execution_date: datetime
start_date: Optional[datetime] = None
end_date: Optional[datetime] = None
state: DagState = DagState.RUNNING
run_type: str = "scheduled" # scheduled, manual, backfill
external_trigger: bool = False
conf: Dict[str, Any] = field(default_factory=dict)
data_interval_start: Optional[datetime] = None
data_interval_end: Optional[datetime] = None
last_scheduling_decision: Optional[datetime] = None
dag_hash: Optional[str] = None
creating_job_id: Optional[int] = None
queued_at: Optional[datetime] = None
@dataclass
class Dag:
"""DAG定义"""
dag_id: str
description: str = ""
schedule_interval: Optional[str] = ScheduleInterval.DAILY.value
start_date: Optional[datetime] = None
end_date: Optional[datetime] = None
catchup: bool = True
max_active_runs: int = 16
max_active_tasks: int = 16
default_args: Dict[str, Any] = field(default_factory=dict)
params: Dict[str, Any] = field(default_factory=dict)
tags: List[str] = field(default_factory=list)
tasks: Dict[str, Task] = field(default_factory=dict)
is_paused: bool = False
is_subdag: bool = False
fileloc: str = ""
template_searchpath: Optional[List[str]] = None
template_undefined: str = "strict" # strict, jinja2.Undefined
user_defined_macros: Optional[Dict[str, Any]] = None
user_defined_filters: Optional[Dict[str, Any]] = None
default_view: str = "tree" # tree, graph, duration, gantt, landing_times
orientation: str = "LR" # LR, TB, RL, BT
concurrency: int = 16
max_active_runs_per_dag: int = 16
dagrun_timeout: Optional[timedelta] = None
sla_miss_callback: Optional[Callable] = None
default_view: str = "tree"
access_control: Dict[str, Any] = field(default_factory=dict)
doc_md: Optional[str] = None
@dataclass
class Connection:
"""连接配置"""
conn_id: str
conn_type: str
description: str = ""
host: Optional[str] = None
login: Optional[str] = None
password: Optional[str] = None
schema: Optional[str] = None
port: Optional[int] = None
extra: Dict[str, Any] = field(default_factory=dict)
uri: Optional[str] = None
is_encrypted: bool = False
is_extra_encrypted: bool = False
@dataclass
class Variable:
"""变量"""
key: str
val: str
description: str = ""
is_encrypted: bool = False
@dataclass
class Pool:
"""资源池"""
pool: str
slots: int
description: str = """
include_deferred: bool = True
class AirflowScheduler:
"""
Airflow调度器
"""
def __init__(self, scheduler_id: str = "airflow-scheduler-1"):
self.scheduler_id = scheduler_id
self.dags = {} # dag_id -> Dag
self.dag_runs = {} # run_id -> DagRun
self.task_instances = {} # (dag_id, task_id, execution_date) -> TaskInstance
self.connections = {} # conn_id -> Connection
self.variables = {} # key -> Variable
self.pools = {} # pool_name -> Pool
# 调度器状态
self.is_running = False
self.executor_type = ExecutorType.LOCAL
self.max_threads = 4
self.heartbeat_interval = 5 # 秒
self.dag_dir_list_interval = 300 # 秒
self.child_process_timeout = 60 # 秒
self.zombie_task_threshold = 300 # 秒
# 线程锁
self.dag_lock = threading.Lock()
self.task_lock = threading.Lock()
self.run_lock = threading.Lock()
# 调度器线程
self.scheduler_thread = None
self.heartbeat_thread = None
# 统计信息
self.stats = {
'total_dags': 0,
'active_dags': 0,
'paused_dags': 0,
'total_dag_runs': 0,
'running_dag_runs': 0,
'total_task_instances': 0,
'running_task_instances': 0,
'successful_task_instances': 0,
'failed_task_instances': 0
}
# 初始化默认数据
self._initialize_default_data()
def _initialize_default_data(self):
"""初始化默认数据"""
# 创建默认资源池
default_pool = Pool(
pool="default_pool",
slots=128,
description="Default pool"
)
self.pools["default_pool"] = default_pool
# 创建默认连接
local_mysql = Connection(
conn_id="mysql_default",
conn_type="mysql",
description="Default MySQL connection",
host="localhost",
login="airflow",
password="airflow",
schema="airflow",
port=3306
)
self.connections["mysql_default"] = local_mysql
# 创建示例DAG
self._create_example_dags()
def _create_example_dags(self):
"""创建示例DAG"""
# 数据处理DAG
data_processing_dag = Dag(
dag_id="data_processing_pipeline",
description="Daily data processing pipeline",
schedule_interval=ScheduleInterval.DAILY.value,
start_date=datetime(2024, 1, 1),
catchup=False,
max_active_runs=1,
tags=["data", "etl", "daily"]
)
# 添加任务
extract_task = Task(
task_id="extract_data",
dag_id="data_processing_pipeline",
operator_class="BashOperator",
operator_kwargs={
'bash_command': 'echo "Extracting data from source systems..."'
},
retries=2,
retry_delay=timedelta(minutes=5)
)
transform_task = Task(
task_id="transform_data",
dag_id="data_processing_pipeline",
operator_class="PythonOperator",
operator_kwargs={
'python_callable': 'transform_data_function'
},
upstream_task_ids=["extract_data"],
retries=1,
retry_delay=timedelta(minutes=3)
)
load_task = Task(
task_id="load_data",
dag_id="data_processing_pipeline",
operator_class="SqlOperator",
operator_kwargs={
'sql': 'INSERT INTO target_table SELECT * FROM staging_table',
'conn_id': 'mysql_default'
},
upstream_task_ids=["transform_data"],
retries=3,
retry_delay=timedelta(minutes=2)
)
validate_task = Task(
task_id="validate_data",
dag_id="data_processing_pipeline",
operator_class="PythonOperator",
operator_kwargs={
'python_callable': 'validate_data_quality'
},
upstream_task_ids=["load_data"]
)
# 设置任务依赖关系
transform_task.upstream_task_ids = ["extract_data"]
load_task.upstream_task_ids = ["transform_data"]
validate_task.upstream_task_ids = ["load_data"]
extract_task.downstream_task_ids = ["transform_data"]
transform_task.downstream_task_ids = ["load_data"]
load_task.downstream_task_ids = ["validate_data"]
data_processing_dag.tasks = {
"extract_data": extract_task,
"transform_data": transform_task,
"load_data": load_task,
"validate_data": validate_task
}
self.dags["data_processing_pipeline"] = data_processing_dag
# 机器学习训练DAG
ml_training_dag = Dag(
dag_id="ml_model_training",
description="Machine learning model training pipeline",
schedule_interval=ScheduleInterval.WEEKLY.value,
start_date=datetime(2024, 1, 1),
catchup=False,
max_active_runs=1,
tags=["ml", "training", "weekly"]
)
# 添加ML任务
prepare_data_task = Task(
task_id="prepare_training_data",
dag_id="ml_model_training",
operator_class="PythonOperator",
operator_kwargs={
'python_callable': 'prepare_training_data'
}
)
train_model_task = Task(
task_id="train_model",
dag_id="ml_model_training",
operator_class="PythonOperator",
operator_kwargs={
'python_callable': 'train_ml_model'
},
upstream_task_ids=["prepare_training_data"],
execution_timeout=timedelta(hours=2)
)
evaluate_model_task = Task(
task_id="evaluate_model",
dag_id="ml_model_training",
operator_class="PythonOperator",
operator_kwargs={
'python_callable': 'evaluate_model_performance'
},
upstream_task_ids=["train_model"]
)
deploy_model_task = Task(
task_id="deploy_model",
dag_id="ml_model_training",
operator_class="BashOperator",
operator_kwargs={
'bash_command': 'echo "Deploying model to production..."'
},
upstream_task_ids=["evaluate_model"],
trigger_rule=TriggerRule.ALL_SUCCESS
)
# 设置任务依赖关系
train_model_task.upstream_task_ids = ["prepare_training_data"]
evaluate_model_task.upstream_task_ids = ["train_model"]
deploy_model_task.upstream_task_ids = ["evaluate_model"]
prepare_data_task.downstream_task_ids = ["train_model"]
train_model_task.downstream_task_ids = ["evaluate_model"]
evaluate_model_task.downstream_task_ids = ["deploy_model"]
ml_training_dag.tasks = {
"prepare_training_data": prepare_data_task,
"train_model": train_model_task,
"evaluate_model": evaluate_model_task,
"deploy_model": deploy_model_task
}
self.dags["ml_model_training"] = ml_training_dag
def add_dag(self, dag: Dag) -> Dict[str, Any]:
"""
添加DAG
Args:
dag: DAG对象
Returns:
Dict[str, Any]: 添加结果
"""
if dag.dag_id in self.dags:
return {'status': 'error', 'message': f'DAG {dag.dag_id} already exists'}
with self.dag_lock:
self.dags[dag.dag_id] = dag
self.stats['total_dags'] += 1
if not dag.is_paused:
self.stats['active_dags'] += 1
else:
self.stats['paused_dags'] += 1
return {
'status': 'success',
'dag': {
'dag_id': dag.dag_id,
'description': dag.description,
'schedule_interval': dag.schedule_interval,
'is_paused': dag.is_paused,
'task_count': len(dag.tasks)
}
}
def get_dag(self, dag_id: str) -> Dict[str, Any]:
"""
获取DAG信息
Args:
dag_id: DAG ID
Returns:
Dict[str, Any]: DAG信息
"""
if dag_id not in self.dags:
return {'status': 'error', 'message': f'DAG {dag_id} not found'}
dag = self.dags[dag_id]
# 获取任务信息
tasks_info = []
for task_id, task in dag.tasks.items():
tasks_info.append({
'task_id': task.task_id,
'operator_class': task.operator_class,
'upstream_task_ids': task.upstream_task_ids,
'downstream_task_ids': task.downstream_task_ids,
'trigger_rule': task.trigger_rule.value,
'retries': task.retries,
'pool': task.pool,
'queue': task.queue
})
return {
'status': 'success',
'dag': {
'dag_id': dag.dag_id,
'description': dag.description,
'schedule_interval': dag.schedule_interval,
'start_date': dag.start_date.isoformat() if dag.start_date else None,
'end_date': dag.end_date.isoformat() if dag.end_date else None,
'catchup': dag.catchup,
'max_active_runs': dag.max_active_runs,
'max_active_tasks': dag.max_active_tasks,
'is_paused': dag.is_paused,
'tags': dag.tags,
'tasks': tasks_info,
'task_count': len(dag.tasks)
}
}
def trigger_dag(self, dag_id: str, run_id: Optional[str] = None,
conf: Optional[Dict[str, Any]] = None,
execution_date: Optional[datetime] = None) -> Dict[str, Any]:
"""
触发DAG运行
Args:
dag_id: DAG ID
run_id: 运行ID
conf: 配置参数
execution_date: 执行日期
Returns:
Dict[str, Any]: 触发结果
"""
if dag_id not in self.dags:
return {'status': 'error', 'message': f'DAG {dag_id} not found'}
dag = self.dags[dag_id]
if dag.is_paused:
return {'status': 'error', 'message': f'DAG {dag_id} is paused'}
if execution_date is None:
execution_date = datetime.now()
if run_id is None:
run_id = f"manual__{execution_date.strftime('%Y-%m-%dT%H:%M:%S')}"
# 检查是否已存在相同的运行
if run_id in self.dag_runs:
return {'status': 'error', 'message': f'DagRun {run_id} already exists'}
# 创建DAG运行实例
dag_run = DagRun(
dag_id=dag_id,
run_id=run_id,
execution_date=execution_date,
start_date=datetime.now(),
state=DagState.RUNNING,
run_type="manual",
external_trigger=True,
conf=conf or {},
data_interval_start=execution_date,
data_interval_end=execution_date + timedelta(days=1)
)
with self.run_lock:
self.dag_runs[run_id] = dag_run
self.stats['total_dag_runs'] += 1
self.stats['running_dag_runs'] += 1
# 创建任务实例
self._create_task_instances(dag, dag_run)
return {
'status': 'success',
'dag_run': {
'dag_id': dag_id,
'run_id': run_id,
'execution_date': execution_date.isoformat(),
'state': dag_run.state.value,
'run_type': dag_run.run_type
}
}
def _create_task_instances(self, dag: Dag, dag_run: DagRun):
"""
为DAG运行创建任务实例
Args:
dag: DAG对象
dag_run: DAG运行实例
"""
with self.task_lock:
for task_id, task in dag.tasks.items():
task_instance = TaskInstance(
task_id=task_id,
dag_id=dag.dag_id,
execution_date=dag_run.execution_date,
state=TaskState.SCHEDULED,
max_tries=task.retries + 1,
pool=task.pool,
queue=task.queue,
priority_weight=task.priority_weight,
operator=task.operator_class
)
key = (dag.dag_id, task_id, dag_run.execution_date)
self.task_instances[key] = task_instance
self.stats['total_task_instances'] += 1
def get_dag_runs(self, dag_id: Optional[str] = None,
state: Optional[DagState] = None,
limit: int = 100) -> Dict[str, Any]:
"""
获取DAG运行列表
Args:
dag_id: DAG ID过滤
state: 状态过滤
limit: 限制数量
Returns:
Dict[str, Any]: DAG运行列表
"""
dag_runs_info = []
count = 0
for run_id, dag_run in self.dag_runs.items():
if count >= limit:
break
# 应用过滤条件
if dag_id is not None and dag_run.dag_id != dag_id:
continue
if state is not None and dag_run.state != state:
continue
dag_runs_info.append({
'dag_id': dag_run.dag_id,
'run_id': dag_run.run_id,
'execution_date': dag_run.execution_date.isoformat(),
'start_date': dag_run.start_date.isoformat() if dag_run.start_date else None,
'end_date': dag_run.end_date.isoformat() if dag_run.end_date else None,
'state': dag_run.state.value,
'run_type': dag_run.run_type,
'external_trigger': dag_run.external_trigger,
'conf': dag_run.conf
})
count += 1
return {
'status': 'success',
'dag_runs': dag_runs_info,
'total': len(dag_runs_info)
}
def get_task_instances(self, dag_id: str, run_id: str) -> Dict[str, Any]:
"""
获取任务实例列表
Args:
dag_id: DAG ID
run_id: 运行ID
Returns:
Dict[str, Any]: 任务实例列表
"""
if run_id not in self.dag_runs:
return {'status': 'error', 'message': f'DagRun {run_id} not found'}
dag_run = self.dag_runs[run_id]
task_instances_info = []
for key, task_instance in self.task_instances.items():
ti_dag_id, ti_task_id, ti_execution_date = key
if (ti_dag_id == dag_id and
ti_execution_date == dag_run.execution_date):
task_instances_info.append({
'task_id': task_instance.task_id,
'dag_id': task_instance.dag_id,
'execution_date': task_instance.execution_date.isoformat(),
'state': task_instance.state.value,
'start_date': task_instance.start_date.isoformat() if task_instance.start_date else None,
'end_date': task_instance.end_date.isoformat() if task_instance.end_date else None,
'duration': task_instance.duration,
'try_number': task_instance.try_number,
'max_tries': task_instance.max_tries,
'operator': task_instance.operator,
'pool': task_instance.pool,
'queue': task_instance.queue,
'priority_weight': task_instance.priority_weight
})
return {
'status': 'success',
'task_instances': task_instances_info,
'total': len(task_instances_info)
}
def run_task_instance(self, dag_id: str, task_id: str,
execution_date: datetime) -> Dict[str, Any]:
"""
运行任务实例
Args:
dag_id: DAG ID
task_id: 任务ID
execution_date: 执行日期
Returns:
Dict[str, Any]: 运行结果
"""
key = (dag_id, task_id, execution_date)
if key not in self.task_instances:
return {'status': 'error', 'message': f'TaskInstance {key} not found'}
task_instance = self.task_instances[key]
if task_instance.state not in [TaskState.SCHEDULED, TaskState.QUEUED, TaskState.UP_FOR_RETRY]:
return {'status': 'error', 'message': f'TaskInstance {key} is not in a runnable state'}
# 模拟任务执行
with self.task_lock:
task_instance.state = TaskState.RUNNING
task_instance.start_date = datetime.now()
task_instance.hostname = "airflow-worker-1"
task_instance.pid = random.randint(1000, 9999)
self.stats['running_task_instances'] += 1
# 模拟任务执行时间
execution_time = random.uniform(1, 10) # 1-10秒
time.sleep(execution_time)
# 模拟任务结果(90%成功率)
success = random.random() > 0.1
with self.task_lock:
task_instance.end_date = datetime.now()
task_instance.duration = execution_time
if success:
task_instance.state = TaskState.SUCCESS
self.stats['successful_task_instances'] += 1
else:
if task_instance.try_number < task_instance.max_tries:
task_instance.state = TaskState.UP_FOR_RETRY
task_instance.try_number += 1
else:
task_instance.state = TaskState.FAILED
self.stats['failed_task_instances'] += 1
self.stats['running_task_instances'] -= 1
return {
'status': 'success',
'task_instance': {
'task_id': task_instance.task_id,
'dag_id': task_instance.dag_id,
'execution_date': task_instance.execution_date.isoformat(),
'state': task_instance.state.value,
'duration': task_instance.duration,
'try_number': task_instance.try_number
}
}
def pause_dag(self, dag_id: str) -> Dict[str, Any]:
"""
暂停DAG
Args:
dag_id: DAG ID
Returns:
Dict[str, Any]: 暂停结果
"""
if dag_id not in self.dags:
return {'status': 'error', 'message': f'DAG {dag_id} not found'}
dag = self.dags[dag_id]
if dag.is_paused:
return {'status': 'error', 'message': f'DAG {dag_id} is already paused'}
with self.dag_lock:
dag.is_paused = True
self.stats['active_dags'] -= 1
self.stats['paused_dags'] += 1
return {
'status': 'success',
'message': f'DAG {dag_id} paused successfully',
'dag_id': dag_id,
'is_paused': True
}
def unpause_dag(self, dag_id: str) -> Dict[str, Any]:
"""
取消暂停DAG
Args:
dag_id: DAG ID
Returns:
Dict[str, Any]: 取消暂停结果
"""
if dag_id not in self.dags:
return {'status': 'error', 'message': f'DAG {dag_id} not found'}
dag = self.dags[dag_id]
if not dag.is_paused:
return {'status': 'error', 'message': f'DAG {dag_id} is not paused'}
with self.dag_lock:
dag.is_paused = False
self.stats['paused_dags'] -= 1
self.stats['active_dags'] += 1
return {
'status': 'success',
'message': f'DAG {dag_id} unpaused successfully',
'dag_id': dag_id,
'is_paused': False
}
def add_connection(self, connection: Connection) -> Dict[str, Any]:
"""
添加连接
Args:
connection: 连接对象
Returns:
Dict[str, Any]: 添加结果
"""
if connection.conn_id in self.connections:
return {'status': 'error', 'message': f'Connection {connection.conn_id} already exists'}
self.connections[connection.conn_id] = connection
return {
'status': 'success',
'connection': {
'conn_id': connection.conn_id,
'conn_type': connection.conn_type,
'host': connection.host,
'port': connection.port,
'schema': connection.schema
}
}
def get_connections(self) -> Dict[str, Any]:
"""
获取连接列表
Returns:
Dict[str, Any]: 连接列表
"""
connections_info = []
for conn_id, connection in self.connections.items():
connections_info.append({
'conn_id': connection.conn_id,
'conn_type': connection.conn_type,
'description': connection.description,
'host': connection.host,
'login': connection.login,
'schema': connection.schema,
'port': connection.port,
'is_encrypted': connection.is_encrypted
})
return {
'status': 'success',
'connections': connections_info,
'total': len(connections_info)
}
def set_variable(self, key: str, value: str, description: str = "") -> Dict[str, Any]:
"""
设置变量
Args:
key: 变量键
value: 变量值
description: 描述
Returns:
Dict[str, Any]: 设置结果
"""
variable = Variable(
key=key,
val=value,
description=description
)
self.variables[key] = variable
return {
'status': 'success',
'variable': {
'key': key,
'value': value,
'description': description
}
}
def get_variable(self, key: str) -> Dict[str, Any]:
"""
获取变量
Args:
key: 变量键
Returns:
Dict[str, Any]: 变量信息
"""
if key not in self.variables:
return {'status': 'error', 'message': f'Variable {key} not found'}
variable = self.variables[key]
return {
'status': 'success',
'variable': {
'key': variable.key,
'value': variable.val,
'description': variable.description,
'is_encrypted': variable.is_encrypted
}
}
def get_scheduler_status(self) -> Dict[str, Any]:
"""
获取调度器状态
Returns:
Dict[str, Any]: 调度器状态
"""
return {
'scheduler_id': self.scheduler_id,
'is_running': self.is_running,
'executor_type': self.executor_type.value,
'max_threads': self.max_threads,
'heartbeat_interval': self.heartbeat_interval,
'stats': self.stats,
'config': {
'dag_dir_list_interval': self.dag_dir_list_interval,
'child_process_timeout': self.child_process_timeout,
'zombie_task_threshold': self.zombie_task_threshold
},
'pools': {
pool_name: {
'slots': pool.slots,
'description': pool.description
} for pool_name, pool in self.pools.items()
},
'timestamp': datetime.now().isoformat()
}
# 使用示例
if __name__ == "__main__":
# 创建Airflow调度器
scheduler = AirflowScheduler("prod-scheduler")
print("=== Apache Airflow调度器示例 ===")
# 获取DAG信息
print("\n=== DAG信息 ===")
dag_info = scheduler.get_dag("data_processing_pipeline")
if dag_info['status'] == 'success':
dag = dag_info['dag']
print(f"DAG ID: {dag['dag_id']}")
print(f"描述: {dag['description']}")
print(f"调度间隔: {dag['schedule_interval']}")
print(f"任务数量: {dag['task_count']}")
print(f"标签: {', '.join(dag['tags'])}")
print("任务列表:")
for task in dag['tasks']:
print(f" - {task['task_id']} ({task['operator_class']})")
if task['upstream_task_ids']:
print(f" 上游任务: {', '.join(task['upstream_task_ids'])}")
# 触发DAG运行
print("\n=== 触发DAG运行 ===")
trigger_result = scheduler.trigger_dag(
dag_id="data_processing_pipeline",
conf={'env': 'production', 'batch_size': 1000}
)
print(f"触发结果: {trigger_result['status']}")
if trigger_result['status'] == 'success':
dag_run = trigger_result['dag_run']
print(f"运行ID: {dag_run['run_id']}")
print(f"执行日期: {dag_run['execution_date']}")
print(f"状态: {dag_run['state']}")
# 获取任务实例
print("\n=== 任务实例 ===")
if trigger_result['status'] == 'success':
run_id = trigger_result['dag_run']['run_id']
task_instances = scheduler.get_task_instances("data_processing_pipeline", run_id)
print(f"任务实例总数: {task_instances['total']}")
for ti in task_instances['task_instances']:
print(f" - {ti['task_id']}: {ti['state']} (尝试次数: {ti['try_number']}/{ti['max_tries']})")
# 运行任务实例
print("\n=== 运行任务实例 ===")
if trigger_result['status'] == 'success':
execution_date = datetime.fromisoformat(trigger_result['dag_run']['execution_date'])
# 运行extract_data任务
run_result = scheduler.run_task_instance(
dag_id="data_processing_pipeline",
task_id="extract_data",
execution_date=execution_date
)
print(f"运行extract_data任务: {run_result['status']}")
if run_result['status'] == 'success':
ti = run_result['task_instance']
print(f" 状态: {ti['state']}")
print(f" 执行时间: {ti['duration']:.2f}秒")
# 添加连接
print("\n=== 添加连接 ===")
postgres_conn = Connection(
conn_id="postgres_default",
conn_type="postgres",
description="Default PostgreSQL connection",
host="localhost",
login="postgres",
password="postgres",
schema="airflow",
port=5432
)
conn_result = scheduler.add_connection(postgres_conn)
print(f"添加连接结果: {conn_result}")
# 设置变量
print("\n=== 设置变量 ===")
var_result = scheduler.set_variable(
key="data_source_url",
value="https://api.example.com/data",
description="Data source API URL"
)
print(f"设置变量结果: {var_result}")
# 暂停DAG
print("\n=== 暂停DAG ===")
pause_result = scheduler.pause_dag("ml_model_training")
print(f"暂停DAG结果: {pause_result}")
# 获取DAG运行列表
print("\n=== DAG运行列表 ===")
dag_runs = scheduler.get_dag_runs(dag_id="data_processing_pipeline")
print(f"DAG运行总数: {dag_runs['total']}")
for run in dag_runs['dag_runs']:
print(f" - {run['run_id']}: {run['state']} ({run['run_type']})")
# 获取调度器状态
print("\n=== 调度器状态 ===")
status = scheduler.get_scheduler_status()
print(f"调度器ID: {status['scheduler_id']}")
print(f"运行状态: {status['is_running']}")
print(f"执行器类型: {status['executor_type']}")
print(f"最大线程数: {status['max_threads']}")
print("统计信息:")
for key, value in status['stats'].items():
print(f" {key}: {value}")
## 10. 生态系统集成与最佳实践
### 10.1 组件集成架构
```python
from enum import Enum
from dataclasses import dataclass
from typing import Dict, List, Optional, Any
from datetime import datetime
import json
class IntegrationType(Enum):
"""集成类型"""
DATA_PIPELINE = "data_pipeline"
REAL_TIME_ANALYTICS = "real_time_analytics"
BATCH_PROCESSING = "batch_processing"
MACHINE_LEARNING = "machine_learning"
DATA_WAREHOUSE = "data_warehouse"
class ComponentRole(Enum):
"""组件角色"""
DATA_SOURCE = "data_source"
DATA_INGESTION = "data_ingestion"
DATA_PROCESSING = "data_processing"
DATA_STORAGE = "data_storage"
DATA_QUERY = "data_query"
DATA_VISUALIZATION = "data_visualization"
WORKFLOW_ORCHESTRATION = "workflow_orchestration"
MONITORING = "monitoring"
@dataclass
class ComponentIntegration:
"""组件集成配置"""
component_name: str
role: ComponentRole
dependencies: List[str]
configuration: Dict[str, Any]
health_check_url: Optional[str] = None
metrics_endpoint: Optional[str] = None
@dataclass
class DataPipeline:
"""数据管道"""
pipeline_id: str
name: str
integration_type: IntegrationType
components: List[ComponentIntegration]
data_flow: List[Dict[str, str]]
schedule: Optional[str] = None
sla_minutes: Optional[int] = None
created_at: datetime = None
def __post_init__(self):
if self.created_at is None:
self.created_at = datetime.now()
class HadoopEcosystemIntegrator:
"""Hadoop生态系统集成器"""
def __init__(self):
self.pipelines: Dict[str, DataPipeline] = {}
self.component_registry: Dict[str, ComponentIntegration] = {}
self.integration_templates = self._initialize_templates()
def _initialize_templates(self) -> Dict[str, Dict[str, Any]]:
"""初始化集成模板"""
return {
"real_time_analytics": {
"components": [
"kafka", "storm", "hbase", "elasticsearch", "grafana"
],
"data_flow": [
{"from": "kafka", "to": "storm", "type": "stream"},
{"from": "storm", "to": "hbase", "type": "write"},
{"from": "hbase", "to": "elasticsearch", "type": "index"},
{"from": "elasticsearch", "to": "grafana", "type": "query"}
]
},
"batch_processing": {
"components": [
"hdfs", "yarn", "mapreduce", "hive", "oozie"
],
"data_flow": [
{"from": "hdfs", "to": "mapreduce", "type": "read"},
{"from": "mapreduce", "to": "hive", "type": "transform"},
{"from": "hive", "to": "hdfs", "type": "write"}
]
},
"data_warehouse": {
"components": [
"sqoop", "hdfs", "hive", "impala", "tableau"
],
"data_flow": [
{"from": "sqoop", "to": "hdfs", "type": "import"},
{"from": "hdfs", "to": "hive", "type": "load"},
{"from": "hive", "to": "impala", "type": "query"},
{"from": "impala", "to": "tableau", "type": "visualize"}
]
}
}
def register_component(self, component: ComponentIntegration) -> Dict[str, Any]:
"""注册组件"""
try:
# 验证依赖关系
for dep in component.dependencies:
if dep not in self.component_registry:
return {
'status': 'error',
'message': f'Dependency {dep} not found'
}
self.component_registry[component.component_name] = component
return {
'status': 'success',
'component_name': component.component_name,
'role': component.role.value,
'dependencies': component.dependencies
}
except Exception as e:
return {
'status': 'error',
'message': f'Failed to register component: {str(e)}'
}
def create_pipeline(self, pipeline_config: Dict[str, Any]) -> Dict[str, Any]:
"""创建数据管道"""
try:
pipeline_id = pipeline_config['pipeline_id']
integration_type = IntegrationType(pipeline_config['integration_type'])
# 获取模板
template = self.integration_templates.get(integration_type.value)
if not template:
return {
'status': 'error',
'message': f'No template found for {integration_type.value}'
}
# 构建组件列表
components = []
for comp_name in template['components']:
if comp_name in self.component_registry:
components.append(self.component_registry[comp_name])
else:
# 创建默认组件配置
default_component = ComponentIntegration(
component_name=comp_name,
role=self._get_default_role(comp_name),
dependencies=[],
configuration={}
)
components.append(default_component)
# 创建管道
pipeline = DataPipeline(
pipeline_id=pipeline_id,
name=pipeline_config.get('name', f'Pipeline-{pipeline_id}'),
integration_type=integration_type,
components=components,
data_flow=template['data_flow'],
schedule=pipeline_config.get('schedule'),
sla_minutes=pipeline_config.get('sla_minutes')
)
self.pipelines[pipeline_id] = pipeline
return {
'status': 'success',
'pipeline_id': pipeline_id,
'integration_type': integration_type.value,
'components_count': len(components),
'data_flow_steps': len(template['data_flow'])
}
except Exception as e:
return {
'status': 'error',
'message': f'Failed to create pipeline: {str(e)}'
}
def _get_default_role(self, component_name: str) -> ComponentRole:
"""获取组件默认角色"""
role_mapping = {
'kafka': ComponentRole.DATA_INGESTION,
'storm': ComponentRole.DATA_PROCESSING,
'spark': ComponentRole.DATA_PROCESSING,
'hdfs': ComponentRole.DATA_STORAGE,
'hbase': ComponentRole.DATA_STORAGE,
'hive': ComponentRole.DATA_QUERY,
'impala': ComponentRole.DATA_QUERY,
'elasticsearch': ComponentRole.DATA_STORAGE,
'grafana': ComponentRole.DATA_VISUALIZATION,
'sqoop': ComponentRole.DATA_INGESTION,
'flume': ComponentRole.DATA_INGESTION,
'oozie': ComponentRole.WORKFLOW_ORCHESTRATION,
'airflow': ComponentRole.WORKFLOW_ORCHESTRATION,
'ranger': ComponentRole.MONITORING,
'knox': ComponentRole.MONITORING
}
return role_mapping.get(component_name, ComponentRole.DATA_PROCESSING)
def validate_pipeline(self, pipeline_id: str) -> Dict[str, Any]:
"""验证管道配置"""
if pipeline_id not in self.pipelines:
return {
'status': 'error',
'message': f'Pipeline {pipeline_id} not found'
}
pipeline = self.pipelines[pipeline_id]
validation_results = []
# 验证组件依赖
for component in pipeline.components:
for dep in component.dependencies:
dep_exists = any(c.component_name == dep for c in pipeline.components)
if not dep_exists:
validation_results.append({
'type': 'dependency_missing',
'component': component.component_name,
'missing_dependency': dep
})
# 验证数据流
component_names = {c.component_name for c in pipeline.components}
for flow in pipeline.data_flow:
if flow['from'] not in component_names:
validation_results.append({
'type': 'flow_source_missing',
'flow': flow,
'missing_component': flow['from']
})
if flow['to'] not in component_names:
validation_results.append({
'type': 'flow_target_missing',
'flow': flow,
'missing_component': flow['to']
})
return {
'status': 'success' if not validation_results else 'warning',
'pipeline_id': pipeline_id,
'validation_results': validation_results,
'is_valid': len(validation_results) == 0
}
def get_pipeline_status(self, pipeline_id: str) -> Dict[str, Any]:
"""获取管道状态"""
if pipeline_id not in self.pipelines:
return {
'status': 'error',
'message': f'Pipeline {pipeline_id} not found'
}
pipeline = self.pipelines[pipeline_id]
# 模拟组件健康检查
component_health = []
for component in pipeline.components:
health_status = 'healthy' if component.component_name in ['hdfs', 'yarn', 'hive'] else 'unknown'
component_health.append({
'component': component.component_name,
'role': component.role.value,
'status': health_status,
'last_check': datetime.now().isoformat()
})
return {
'status': 'success',
'pipeline_id': pipeline_id,
'name': pipeline.name,
'integration_type': pipeline.integration_type.value,
'created_at': pipeline.created_at.isoformat(),
'components_count': len(pipeline.components),
'component_health': component_health,
'data_flow_steps': len(pipeline.data_flow)
}
def list_pipelines(self) -> Dict[str, Any]:
"""列出所有管道"""
pipelines_info = []
for pipeline_id, pipeline in self.pipelines.items():
pipelines_info.append({
'pipeline_id': pipeline_id,
'name': pipeline.name,
'integration_type': pipeline.integration_type.value,
'components_count': len(pipeline.components),
'created_at': pipeline.created_at.isoformat()
})
return {
'status': 'success',
'total_pipelines': len(pipelines_info),
'pipelines': pipelines_info
}
def get_integration_recommendations(self, requirements: Dict[str, Any]) -> Dict[str, Any]:
"""获取集成建议"""
data_volume = requirements.get('data_volume', 'medium')
latency_requirement = requirements.get('latency', 'batch')
data_types = requirements.get('data_types', ['structured'])
recommendations = []
# 基于需求推荐架构
if latency_requirement == 'real_time':
if 'streaming' in data_types:
recommendations.append({
'architecture': 'Lambda Architecture',
'components': ['kafka', 'storm', 'hbase', 'hdfs', 'spark'],
'description': '实时流处理架构,支持低延迟数据处理'
})
recommendations.append({
'architecture': 'Kappa Architecture',
'components': ['kafka', 'spark_streaming', 'elasticsearch'],
'description': '简化的流处理架构,统一批处理和流处理'
})
elif latency_requirement == 'batch':
recommendations.append({
'architecture': 'Traditional Batch Processing',
'components': ['hdfs', 'yarn', 'mapreduce', 'hive', 'oozie'],
'description': '传统批处理架构,适合大数据量离线分析'
})
# 基于数据量推荐
if data_volume == 'large':
recommendations.append({
'architecture': 'Distributed Data Warehouse',
'components': ['hdfs', 'hive', 'impala', 'kudu', 'spark'],
'description': '分布式数据仓库,支持大规模数据存储和查询'
})
return {
'status': 'success',
'requirements': requirements,
'recommendations': recommendations,
'total_recommendations': len(recommendations)
}
### 10.2 性能优化最佳实践
```python
class PerformanceOptimizer:
"""性能优化器"""
def __init__(self):
self.optimization_rules = self._initialize_rules()
def _initialize_rules(self) -> Dict[str, Dict[str, Any]]:
"""初始化优化规则"""
return {
'hdfs': {
'block_size': {
'small_files': '64MB',
'large_files': '256MB',
'very_large_files': '512MB'
},
'replication_factor': {
'development': 1,
'testing': 2,
'production': 3
},
'compression': {
'text_files': 'gzip',
'sequence_files': 'snappy',
'parquet_files': 'snappy'
}
},
'yarn': {
'memory_allocation': {
'container_min_mb': 1024,
'container_max_mb': 8192,
'am_memory_mb': 1024
},
'cpu_allocation': {
'container_min_vcores': 1,
'container_max_vcores': 4
}
},
'mapreduce': {
'map_tasks': {
'memory_mb': 2048,
'java_opts': '-Xmx1638m',
'io_sort_mb': 512
},
'reduce_tasks': {
'memory_mb': 4096,
'java_opts': '-Xmx3276m',
'parallel_copies': 10
}
},
'hive': {
'execution_engine': 'tez',
'vectorization': True,
'cost_based_optimizer': True,
'compression': {
'intermediate': 'snappy',
'final': 'gzip'
}
},
'spark': {
'executor_memory': '4g',
'executor_cores': 2,
'driver_memory': '2g',
'serializer': 'org.apache.spark.serializer.KryoSerializer',
'sql_adaptive_enabled': True
}
}
def analyze_performance(self, component: str, metrics: Dict[str, Any]) -> Dict[str, Any]:
"""分析性能指标"""
if component not in self.optimization_rules:
return {
'status': 'error',
'message': f'No optimization rules for component: {component}'
}
recommendations = []
# 基于组件类型分析
if component == 'hdfs':
recommendations.extend(self._analyze_hdfs_performance(metrics))
elif component == 'yarn':
recommendations.extend(self._analyze_yarn_performance(metrics))
elif component == 'mapreduce':
recommendations.extend(self._analyze_mapreduce_performance(metrics))
elif component == 'hive':
recommendations.extend(self._analyze_hive_performance(metrics))
elif component == 'spark':
recommendations.extend(self._analyze_spark_performance(metrics))
return {
'status': 'success',
'component': component,
'recommendations': recommendations,
'total_recommendations': len(recommendations)
}
def _analyze_hdfs_performance(self, metrics: Dict[str, Any]) -> List[Dict[str, Any]]:
"""分析HDFS性能"""
recommendations = []
# 检查小文件问题
if metrics.get('avg_file_size_mb', 0) < 64:
recommendations.append({
'type': 'small_files',
'priority': 'high',
'description': '存在大量小文件,建议合并文件或增加块大小',
'action': 'Merge small files or increase block size to 128MB'
})
# 检查磁盘使用率
if metrics.get('disk_usage_percent', 0) > 80:
recommendations.append({
'type': 'disk_usage',
'priority': 'high',
'description': '磁盘使用率过高,建议清理或扩容',
'action': 'Clean up old data or add more storage capacity'
})
# 检查网络带宽
if metrics.get('network_utilization_percent', 0) > 70:
recommendations.append({
'type': 'network',
'priority': 'medium',
'description': '网络带宽使用率高,建议优化数据本地性',
'action': 'Improve data locality or upgrade network infrastructure'
})
return recommendations
def _analyze_yarn_performance(self, metrics: Dict[str, Any]) -> List[Dict[str, Any]]:
"""分析YARN性能"""
recommendations = []
# 检查资源利用率
memory_utilization = metrics.get('memory_utilization_percent', 0)
if memory_utilization > 90:
recommendations.append({
'type': 'memory_pressure',
'priority': 'high',
'description': '内存使用率过高,建议调整容器大小或增加节点',
'action': 'Adjust container sizes or add more NodeManagers'
})
elif memory_utilization < 30:
recommendations.append({
'type': 'memory_underutilization',
'priority': 'medium',
'description': '内存利用率低,建议增加并发任务或调整资源配置',
'action': 'Increase parallelism or adjust resource allocation'
})
# 检查队列配置
if metrics.get('pending_applications', 0) > 10:
recommendations.append({
'type': 'queue_congestion',
'priority': 'medium',
'description': '队列拥堵,建议调整队列配置或增加资源',
'action': 'Adjust queue configuration or add more resources'
})
return recommendations
def _analyze_mapreduce_performance(self, metrics: Dict[str, Any]) -> List[Dict[str, Any]]:
"""分析MapReduce性能"""
recommendations = []
# 检查任务执行时间
avg_map_time = metrics.get('avg_map_time_seconds', 0)
if avg_map_time > 300: # 5分钟
recommendations.append({
'type': 'slow_map_tasks',
'priority': 'high',
'description': 'Map任务执行时间过长,建议优化输入分片或增加内存',
'action': 'Optimize input splits or increase map memory'
})
# 检查数据倾斜
task_time_variance = metrics.get('task_time_variance', 0)
if task_time_variance > 0.5:
recommendations.append({
'type': 'data_skew',
'priority': 'high',
'description': '存在数据倾斜,建议重新分区或使用自定义分区器',
'action': 'Repartition data or use custom partitioner'
})
return recommendations
def _analyze_hive_performance(self, metrics: Dict[str, Any]) -> List[Dict[str, Any]]:
"""分析Hive性能"""
recommendations = []
# 检查查询执行时间
avg_query_time = metrics.get('avg_query_time_seconds', 0)
if avg_query_time > 600: # 10分钟
recommendations.append({
'type': 'slow_queries',
'priority': 'high',
'description': '查询执行时间过长,建议优化SQL或创建索引',
'action': 'Optimize SQL queries or create appropriate indexes'
})
# 检查表格式
if metrics.get('orc_tables_percent', 0) < 50:
recommendations.append({
'type': 'table_format',
'priority': 'medium',
'description': '建议使用ORC格式提高查询性能',
'action': 'Convert tables to ORC format for better performance'
})
return recommendations
def _analyze_spark_performance(self, metrics: Dict[str, Any]) -> List[Dict[str, Any]]:
"""分析Spark性能"""
recommendations = []
# 检查GC时间
gc_time_percent = metrics.get('gc_time_percent', 0)
if gc_time_percent > 10:
recommendations.append({
'type': 'gc_pressure',
'priority': 'high',
'description': 'GC时间过长,建议调整内存配置或使用G1GC',
'action': 'Adjust memory settings or use G1 garbage collector'
})
# 检查数据序列化
if not metrics.get('kryo_serializer_enabled', False):
recommendations.append({
'type': 'serialization',
'priority': 'medium',
'description': '建议启用Kryo序列化器提高性能',
'action': 'Enable Kryo serializer for better performance'
})
return recommendations
### 10.3 监控和运维
```python
class EcosystemMonitor:
"""生态系统监控器"""
def __init__(self):
self.alerts = []
self.metrics_history = []
self.thresholds = self._initialize_thresholds()
def _initialize_thresholds(self) -> Dict[str, Dict[str, float]]:
"""初始化告警阈值"""
return {
'hdfs': {
'disk_usage_percent': 85.0,
'namenode_heap_usage_percent': 80.0,
'datanode_failed_volumes': 1.0
},
'yarn': {
'memory_utilization_percent': 90.0,
'nodemanager_unhealthy_percent': 10.0,
'application_failed_percent': 5.0
},
'hive': {
'metastore_connection_failures': 5.0,
'query_failure_rate_percent': 10.0,
'avg_query_time_seconds': 300.0
}
}
def collect_metrics(self, component: str) -> Dict[str, Any]:
"""收集组件指标"""
# 模拟指标收集
import random
if component == 'hdfs':
metrics = {
'disk_usage_percent': random.uniform(60, 95),
'namenode_heap_usage_percent': random.uniform(40, 85),
'datanode_count': random.randint(5, 20),
'total_capacity_tb': random.uniform(100, 1000),
'used_capacity_tb': random.uniform(50, 800),
'block_count': random.randint(1000000, 10000000)
}
elif component == 'yarn':
metrics = {
'memory_utilization_percent': random.uniform(30, 95),
'cpu_utilization_percent': random.uniform(20, 80),
'active_nodes': random.randint(5, 50),
'running_applications': random.randint(10, 100),
'pending_applications': random.randint(0, 20)
}
elif component == 'hive':
metrics = {
'active_sessions': random.randint(5, 50),
'avg_query_time_seconds': random.uniform(30, 600),
'query_success_rate_percent': random.uniform(85, 99),
'metastore_connections': random.randint(10, 100)
}
else:
metrics = {'status': 'unknown_component'}
# 添加时间戳
metrics['timestamp'] = datetime.now().isoformat()
metrics['component'] = component
# 保存到历史记录
self.metrics_history.append(metrics)
# 检查告警
self._check_alerts(component, metrics)
return {
'status': 'success',
'component': component,
'metrics': metrics
}
def _check_alerts(self, component: str, metrics: Dict[str, Any]):
"""检查告警条件"""
if component not in self.thresholds:
return
component_thresholds = self.thresholds[component]
for metric_name, threshold in component_thresholds.items():
if metric_name in metrics:
value = metrics[metric_name]
if value > threshold:
alert = {
'alert_id': f"{component}_{metric_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
'component': component,
'metric': metric_name,
'value': value,
'threshold': threshold,
'severity': self._get_severity(metric_name, value, threshold),
'timestamp': datetime.now().isoformat(),
'message': f"{component} {metric_name} is {value}, exceeds threshold {threshold}"
}
self.alerts.append(alert)
def _get_severity(self, metric_name: str, value: float, threshold: float) -> str:
"""获取告警严重程度"""
ratio = value / threshold
if ratio > 1.2:
return 'critical'
elif ratio > 1.1:
return 'warning'
else:
return 'info'
def get_alerts(self, severity: Optional[str] = None, limit: int = 50) -> Dict[str, Any]:
"""获取告警列表"""
filtered_alerts = self.alerts
if severity:
filtered_alerts = [a for a in self.alerts if a['severity'] == severity]
# 按时间倒序排列
filtered_alerts.sort(key=lambda x: x['timestamp'], reverse=True)
return {
'status': 'success',
'total_alerts': len(filtered_alerts),
'alerts': filtered_alerts[:limit]
}
def get_health_summary(self) -> Dict[str, Any]:
"""获取健康状况摘要"""
# 统计各组件状态
component_status = {}
recent_metrics = {}
# 获取最近的指标
for metrics in reversed(self.metrics_history[-50:]):
component = metrics['component']
if component not in recent_metrics:
recent_metrics[component] = metrics
# 评估组件健康状态
for component, metrics in recent_metrics.items():
health_score = self._calculate_health_score(component, metrics)
component_status[component] = {
'health_score': health_score,
'status': self._get_health_status(health_score),
'last_update': metrics['timestamp']
}
# 统计告警
alert_summary = {
'critical': len([a for a in self.alerts if a['severity'] == 'critical']),
'warning': len([a for a in self.alerts if a['severity'] == 'warning']),
'info': len([a for a in self.alerts if a['severity'] == 'info'])
}
return {
'status': 'success',
'overall_health': self._calculate_overall_health(component_status),
'component_status': component_status,
'alert_summary': alert_summary,
'total_components': len(component_status)
}
def _calculate_health_score(self, component: str, metrics: Dict[str, Any]) -> float:
"""计算健康分数 (0-100)"""
if component not in self.thresholds:
return 100.0
score = 100.0
thresholds = self.thresholds[component]
for metric_name, threshold in thresholds.items():
if metric_name in metrics:
value = metrics[metric_name]
if value > threshold:
# 根据超出程度扣分
ratio = value / threshold
penalty = min(50, (ratio - 1) * 100)
score -= penalty
return max(0.0, score)
def _get_health_status(self, health_score: float) -> str:
"""根据健康分数获取状态"""
if health_score >= 90:
return 'excellent'
elif health_score >= 70:
return 'good'
elif health_score >= 50:
return 'warning'
else:
return 'critical'
def _calculate_overall_health(self, component_status: Dict[str, Dict[str, Any]]) -> str:
"""计算整体健康状态"""
if not component_status:
return 'unknown'
avg_score = sum(status['health_score'] for status in component_status.values()) / len(component_status)
return self._get_health_status(avg_score)
# 使用示例
if __name__ == "__main__":
# 创建生态系统集成器
integrator = HadoopEcosystemIntegrator()
print("=== Hadoop生态系统集成示例 ===\n")
# 注册组件
print("=== 注册组件 ===")
components = [
ComponentIntegration(
component_name="kafka",
role=ComponentRole.DATA_INGESTION,
dependencies=[],
configuration={"brokers": 3, "partitions": 12}
),
ComponentIntegration(
component_name="storm",
role=ComponentRole.DATA_PROCESSING,
dependencies=["kafka"],
configuration={"workers": 4, "executors": 16}
),
ComponentIntegration(
component_name="hbase",
role=ComponentRole.DATA_STORAGE,
dependencies=["hdfs"],
configuration={"regions": 100, "replication": 3}
)
]
for component in components:
result = integrator.register_component(component)
print(f"注册 {component.component_name}: {result['status']}")
# 创建实时分析管道
print("\n=== 创建实时分析管道 ===")
pipeline_config = {
'pipeline_id': 'real-time-analytics-001',
'name': 'Real-time User Behavior Analytics',
'integration_type': 'real_time_analytics',
'schedule': '0 */6 * * *',
'sla_minutes': 30
}
pipeline_result = integrator.create_pipeline(pipeline_config)
print(f"管道创建结果: {pipeline_result}")
if pipeline_result['status'] == 'success':
pipeline_id = pipeline_result['pipeline_id']
# 验证管道
print("\n=== 验证管道配置 ===")
validation_result = integrator.validate_pipeline(pipeline_id)
print(f"验证结果: {validation_result['is_valid']}")
if validation_result['validation_results']:
for issue in validation_result['validation_results']:
print(f" 问题: {issue}")
# 获取管道状态
print("\n=== 管道状态 ===")
status_result = integrator.get_pipeline_status(pipeline_id)
if status_result['status'] == 'success':
print(f"管道ID: {status_result['pipeline_id']}")
print(f"名称: {status_result['name']}")
print(f"集成类型: {status_result['integration_type']}")
print(f"组件数量: {status_result['components_count']}")
print("组件健康状态:")
for health in status_result['component_health']:
print(f" - {health['component']} ({health['role']}): {health['status']}")
# 获取集成建议
print("\n=== 集成建议 ===")
requirements = {
'data_volume': 'large',
'latency': 'real_time',
'data_types': ['streaming', 'structured']
}
recommendations = integrator.get_integration_recommendations(requirements)
if recommendations['status'] == 'success':
print(f"基于需求的建议 ({recommendations['total_recommendations']}个):")
for rec in recommendations['recommendations']:
print(f" 架构: {rec['architecture']}")
print(f" 组件: {', '.join(rec['components'])}")
print(f" 描述: {rec['description']}\n")
# 性能优化分析
print("=== 性能优化分析 ===")
optimizer = PerformanceOptimizer()
# 模拟HDFS性能指标
hdfs_metrics = {
'avg_file_size_mb': 32, # 小文件问题
'disk_usage_percent': 85, # 磁盘使用率高
'network_utilization_percent': 45
}
hdfs_analysis = optimizer.analyze_performance('hdfs', hdfs_metrics)
if hdfs_analysis['status'] == 'success':
print(f"HDFS性能分析 ({hdfs_analysis['total_recommendations']}个建议):")
for rec in hdfs_analysis['recommendations']:
print(f" 类型: {rec['type']} (优先级: {rec['priority']})")
print(f" 描述: {rec['description']}")
print(f" 建议: {rec['action']}\n")
# 监控示例
print("=== 系统监控 ===")
monitor = EcosystemMonitor()
# 收集各组件指标
components_to_monitor = ['hdfs', 'yarn', 'hive']
for component in components_to_monitor:
metrics_result = monitor.collect_metrics(component)
if metrics_result['status'] == 'success':
metrics = metrics_result['metrics']
print(f"{component.upper()}指标:")
for key, value in metrics.items():
if key not in ['timestamp', 'component']:
if isinstance(value, float):
print(f" {key}: {value:.2f}")
else:
print(f" {key}: {value}")
print()
# 获取告警
print("=== 告警信息 ===")
alerts_result = monitor.get_alerts(limit=5)
if alerts_result['status'] == 'success':
print(f"总告警数: {alerts_result['total_alerts']}")
for alert in alerts_result['alerts']:
print(f" [{alert['severity'].upper()}] {alert['message']}")
# 获取健康摘要
print("\n=== 健康状况摘要 ===")
health_summary = monitor.get_health_summary()
if health_summary['status'] == 'success':
print(f"整体健康状态: {health_summary['overall_health']}")
print(f"监控组件数: {health_summary['total_components']}")
print("组件状态:")
for component, status in health_summary['component_status'].items():
print(f" {component}: {status['status']} (分数: {status['health_score']:.1f})")
alert_summary = health_summary['alert_summary']
print(f"告警统计: 严重 {alert_summary['critical']}, 警告 {alert_summary['warning']}, 信息 {alert_summary['info']}")
## 11. 总结与展望
### 11.1 Hadoop生态系统总结
Hadoop生态系统是一个庞大而复杂的大数据处理平台,包含了从数据存储、处理、查询到监控、安全等各个方面的组件。通过本教程的学习,我们深入了解了:
1. **核心存储组件**: HDFS提供分布式文件存储,HBase提供NoSQL数据库功能
2. **数据处理组件**: MapReduce、Spark、Storm等提供批处理和流处理能力
3. **数据查询组件**: Hive、Impala、Drill等提供SQL查询接口
4. **数据流组件**: Kafka、Pulsar提供消息队列和流处理平台
5. **监控管理组件**: Ambari、Cloudera Manager提供集群管理和监控
6. **工作流调度组件**: Airflow、Oozie提供任务调度和工作流管理
7. **数据传输组件**: Sqoop、Flume、NiFi提供数据导入导出和流式传输
8. **安全组件**: Ranger、Knox提供权限管理和安全网关
### 11.2 架构设计原则
在设计Hadoop生态系统架构时,应遵循以下原则:
1. **可扩展性**: 系统应能够水平扩展以处理不断增长的数据量
2. **容错性**: 系统应能够处理硬件故障和网络问题
3. **性能优化**: 通过合理的配置和优化提高系统性能
4. **安全性**: 实施适当的安全措施保护数据和系统
5. **可维护性**: 系统应易于监控、管理和维护
6. **成本效益**: 在满足需求的前提下控制成本
### 11.3 技术发展趋势
1. **云原生化**: 越来越多的Hadoop组件支持容器化部署和云原生架构
2. **实时处理**: 流处理技术的发展使得实时数据处理成为主流
3. **机器学习集成**: 大数据平台与机器学习框架的深度集成
4. **自动化运维**: 智能化的监控、调优和故障处理
5. **多云支持**: 支持跨云平台的数据处理和管理
### 11.4 学习建议
1. **实践为主**: 通过搭建实际环境来学习和理解各组件
2. **循序渐进**: 从核心组件开始,逐步学习扩展组件
3. **关注社区**: 跟踪开源社区的发展和最新技术
4. **项目实战**: 通过实际项目来应用所学知识
5. **持续学习**: 大数据技术发展迅速,需要持续学习新技术
### 11.5 未来展望
Hadoop生态系统将继续演进,主要发展方向包括:
1. **更好的用户体验**: 简化部署、配置和使用流程
2. **更强的性能**: 通过新的算法和优化技术提高处理性能
3. **更广的应用场景**: 支持更多类型的数据和应用场景
4. **更深的集成**: 与其他技术栈的更深度集成
5. **更智能的管理**: 基于AI的自动化管理和优化
通过掌握Hadoop生态系统,您将能够构建强大的大数据处理平台,为企业的数字化转型提供技术支撑。
---
**本教程到此结束。希望通过学习本教程,您能够深入理解Hadoop生态系统的各个组件,并能够在实际项目中应用这些知识。**
## 9. 安全组件
### 9.1 Apache Ranger详解
Apache Ranger是一个用于在Hadoop平台上启用、监控和管理全面数据安全的框架。
```python
from typing import Dict, List, Any, Optional, Set
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime, timedelta
import json
import uuid
class PolicyType(Enum):
"""策略类型"""
ACCESS = "access"
MASKING = "masking"
ROW_FILTER = "row_filter"
TAG_BASED = "tag_based"
class PermissionType(Enum):
"""权限类型"""
READ = "read"
WRITE = "write"
CREATE = "create"
DELETE = "delete"
ADMIN = "admin"
SELECT = "select"
UPDATE = "update"
DROP = "drop"
ALTER = "alter"
INDEX = "index"
LOCK = "lock"
ALL = "all"
class ServiceType(Enum):
"""服务类型"""
HDFS = "hdfs"
HIVE = "hive"
HBASE = "hbase"
YARN = "yarn"
KAFKA = "kafka"
STORM = "storm"
KNOX = "knox"
SOLR = "solr"
ATLAS = "atlas"
class AuditAction(Enum):
"""审计动作"""
ACCESS = "access"
CREATE = "create"
UPDATE = "update"
DELETE = "delete"
LOGIN = "login"
LOGOUT = "logout"
POLICY_CHANGE = "policy_change"
ADMIN_ACTION = "admin_action"
class AccessResult(Enum):
"""访问结果"""
ALLOWED = "allowed"
DENIED = "denied"
NOT_DETERMINED = "not_determined"
@dataclass
class RangerResource:
"""Ranger资源"""
service_type: ServiceType
resource_type: str # database, table, column, path等
resource_name: str
resource_path: Optional[str] = None
is_recursive: bool = False
is_excludes: bool = False
@dataclass
class PolicyItem:
"""策略项"""
users: List[str] = field(default_factory=list)
groups: List[str] = field(default_factory=list)
roles: List[str] = field(default_factory=list)
permissions: List[PermissionType] = field(default_factory=list)
delegate_admin: bool = False
conditions: Dict[str, Any] = field(default_factory=dict)
@dataclass
class RangerPolicy:
"""Ranger策略"""
id: str
name: str
service_name: str
service_type: ServiceType
policy_type: PolicyType
description: str
is_enabled: bool
is_audit_enabled: bool
resources: Dict[str, RangerResource]
policy_items: List[PolicyItem]
deny_policy_items: List[PolicyItem] = field(default_factory=list)
allow_exceptions: List[PolicyItem] = field(default_factory=list)
deny_exceptions: List[PolicyItem] = field(default_factory=list)
created_by: str = "admin"
updated_by: str = "admin"
create_time: datetime = field(default_factory=datetime.now)
update_time: datetime = field(default_factory=datetime.now)
version: int = 1
@dataclass
class AuditEvent:
"""审计事件"""
id: str
event_time: datetime
user: str
service_name: str
service_type: ServiceType
resource_path: str
resource_type: str
action: AuditAction
access_type: str
result: AccessResult
policy_id: Optional[str] = None
client_ip: Optional[str] = None
session_id: Optional[str] = None
request_data: Optional[str] = None
additional_info: Dict[str, Any] = field(default_factory=dict)
@dataclass
class RangerService:
"""Ranger服务"""
id: str
name: str
type: ServiceType
description: str
is_enabled: bool
configs: Dict[str, Any]
created_by: str = "admin"
updated_by: str = "admin"
create_time: datetime = field(default_factory=datetime.now)
update_time: datetime = field(default_factory=datetime.now)
version: int = 1
@dataclass
class RangerUser:
"""Ranger用户"""
id: str
name: str
first_name: str
last_name: str
email_address: str
password: str
user_source: int = 0 # 0: Internal, 1: External
is_visible: int = 1
user_role_list: List[str] = field(default_factory=list)
group_id_list: List[str] = field(default_factory=list)
status: int = 1 # 1: Active, 0: Inactive
created_by: str = "admin"
updated_by: str = "admin"
create_time: datetime = field(default_factory=datetime.now)
update_time: datetime = field(default_factory=datetime.now)
@dataclass
class RangerGroup:
"""Ranger组"""
id: str
name: str
description: str
group_type: int = 1 # 1: Internal, 0: External
group_source: int = 0
is_visible: int = 1
created_by: str = "admin"
updated_by: str = "admin"
create_time: datetime = field(default_factory=datetime.now)
update_time: datetime = field(default_factory=datetime.now)
class RangerAdmin:
"""
Apache Ranger管理服务器
提供策略管理、用户管理、审计等功能
"""
def __init__(self, admin_url: str = "http://localhost:6080"):
self.admin_url = admin_url
self.services: Dict[str, RangerService] = {}
self.policies: Dict[str, RangerPolicy] = {}
self.users: Dict[str, RangerUser] = {}
self.groups: Dict[str, RangerGroup] = {}
self.audit_events: List[AuditEvent] = []
self.is_running = True
self.version = "2.4.0"
# 初始化默认数据
self._initialize_default_data()
def _initialize_default_data(self):
"""初始化默认数据"""
# 创建默认用户
admin_user = RangerUser(
id="user_1",
name="admin",
first_name="Admin",
last_name="User",
email_address="admin@example.com",
password="admin123",
user_role_list=["ROLE_SYS_ADMIN"]
)
self.users[admin_user.id] = admin_user
# 创建默认组
admin_group = RangerGroup(
id="group_1",
name="admin",
description="Administrator group"
)
self.groups[admin_group.id] = admin_group
# 创建默认服务
hdfs_service = RangerService(
id="service_1",
name="hadoop-hdfs",
type=ServiceType.HDFS,
description="HDFS Service",
is_enabled=True,
configs={
"username": "hdfs",
"password": "hdfs",
"fs.default.name": "hdfs://localhost:9000",
"hadoop.security.authorization": "true"
}
)
self.services[hdfs_service.id] = hdfs_service
def create_service(self, name: str, service_type: ServiceType,
description: str, configs: Dict[str, Any]) -> Dict[str, Any]:
"""
创建服务
Args:
name: 服务名称
service_type: 服务类型
description: 描述
configs: 配置
Returns:
Dict[str, Any]: 创建结果
"""
try:
service_id = f"service_{len(self.services) + 1}"
service = RangerService(
id=service_id,
name=name,
type=service_type,
description=description,
is_enabled=True,
configs=configs
)
self.services[service_id] = service
# 记录审计事件
self._log_audit_event(
user="admin",
service_name=name,
service_type=service_type,
resource_path=f"/service/{name}",
resource_type="service",
action=AuditAction.CREATE,
access_type="create",
result=AccessResult.ALLOWED
)
return {
'status': 'success',
'service_id': service_id,
'message': f'Service {name} created successfully'
}
except Exception as e:
return {
'status': 'error',
'message': f'Failed to create service: {str(e)}'
}
def create_policy(self, policy_name: str, service_name: str,
policy_type: PolicyType, resources: Dict[str, Any],
policy_items: List[Dict[str, Any]],
description: str = "") -> Dict[str, Any]:
"""
创建策略
Args:
policy_name: 策略名称
service_name: 服务名称
policy_type: 策略类型
resources: 资源定义
policy_items: 策略项
description: 描述
Returns:
Dict[str, Any]: 创建结果
"""
try:
# 查找服务
service = None
for svc in self.services.values():
if svc.name == service_name:
service = svc
break
if not service:
return {
'status': 'error',
'message': f'Service {service_name} not found'
}
policy_id = f"policy_{len(self.policies) + 1}"
# 转换资源
ranger_resources = {}
for res_type, res_values in resources.items():
if isinstance(res_values, str):
res_values = [res_values]
ranger_resources[res_type] = RangerResource(
service_type=service.type,
resource_type=res_type,
resource_name=",".join(res_values) if isinstance(res_values, list) else res_values
)
# 转换策略项
ranger_policy_items = []
for item in policy_items:
policy_item = PolicyItem(
users=item.get('users', []),
groups=item.get('groups', []),
roles=item.get('roles', []),
permissions=[PermissionType(p) for p in item.get('permissions', [])],
delegate_admin=item.get('delegate_admin', False)
)
ranger_policy_items.append(policy_item)
policy = RangerPolicy(
id=policy_id,
name=policy_name,
service_name=service_name,
service_type=service.type,
policy_type=policy_type,
description=description,
is_enabled=True,
is_audit_enabled=True,
resources=ranger_resources,
policy_items=ranger_policy_items
)
self.policies[policy_id] = policy
# 记录审计事件
self._log_audit_event(
user="admin",
service_name=service_name,
service_type=service.type,
resource_path=f"/policy/{policy_name}",
resource_type="policy",
action=AuditAction.CREATE,
access_type="create",
result=AccessResult.ALLOWED,
policy_id=policy_id
)
return {
'status': 'success',
'policy_id': policy_id,
'message': f'Policy {policy_name} created successfully'
}
except Exception as e:
return {
'status': 'error',
'message': f'Failed to create policy: {str(e)}'
}
def check_access(self, user: str, resource_path: str,
access_type: str, service_name: str) -> Dict[str, Any]:
"""
检查访问权限
Args:
user: 用户名
resource_path: 资源路径
access_type: 访问类型
service_name: 服务名称
Returns:
Dict[str, Any]: 访问检查结果
"""
try:
# 查找服务
service = None
for svc in self.services.values():
if svc.name == service_name:
service = svc
break
if not service:
return {
'status': 'error',
'message': f'Service {service_name} not found'
}
# 检查策略
access_result = AccessResult.DENIED
matched_policy = None
for policy in self.policies.values():
if (policy.service_name == service_name and
policy.is_enabled and
self._matches_resource(policy, resource_path)):
# 检查策略项
for item in policy.policy_items:
if (user in item.users and
any(perm.value == access_type or perm == PermissionType.ALL
for perm in item.permissions)):
access_result = AccessResult.ALLOWED
matched_policy = policy
break
if access_result == AccessResult.ALLOWED:
break
# 记录审计事件
self._log_audit_event(
user=user,
service_name=service_name,
service_type=service.type,
resource_path=resource_path,
resource_type="file",
action=AuditAction.ACCESS,
access_type=access_type,
result=access_result,
policy_id=matched_policy.id if matched_policy else None
)
return {
'status': 'success',
'access_result': access_result.value,
'policy_id': matched_policy.id if matched_policy else None,
'policy_name': matched_policy.name if matched_policy else None
}
except Exception as e:
return {
'status': 'error',
'message': f'Failed to check access: {str(e)}'
}
def _matches_resource(self, policy: RangerPolicy, resource_path: str) -> bool:
"""
检查资源是否匹配策略
Args:
policy: 策略
resource_path: 资源路径
Returns:
bool: 是否匹配
"""
# 简化的资源匹配逻辑
for resource in policy.resources.values():
if resource.resource_name in resource_path or resource_path.startswith(resource.resource_name):
return True
return False
def _log_audit_event(self, user: str, service_name: str, service_type: ServiceType,
resource_path: str, resource_type: str, action: AuditAction,
access_type: str, result: AccessResult, policy_id: Optional[str] = None):
"""
记录审计事件
Args:
user: 用户
service_name: 服务名称
service_type: 服务类型
resource_path: 资源路径
resource_type: 资源类型
action: 动作
access_type: 访问类型
result: 结果
policy_id: 策略ID
"""
event = AuditEvent(
id=f"audit_{len(self.audit_events) + 1}",
event_time=datetime.now(),
user=user,
service_name=service_name,
service_type=service_type,
resource_path=resource_path,
resource_type=resource_type,
action=action,
access_type=access_type,
result=result,
policy_id=policy_id,
client_ip="192.168.1.100",
session_id=f"session_{uuid.uuid4().hex[:8]}"
)
self.audit_events.append(event)
# 保持最近1000条审计记录
if len(self.audit_events) > 1000:
self.audit_events = self.audit_events[-1000:]
def get_policies(self, service_name: Optional[str] = None) -> Dict[str, Any]:
"""
获取策略列表
Args:
service_name: 服务名称(可选)
Returns:
Dict[str, Any]: 策略列表
"""
policies_info = []
for policy in self.policies.values():
if service_name is None or policy.service_name == service_name:
policies_info.append({
'id': policy.id,
'name': policy.name,
'service_name': policy.service_name,
'service_type': policy.service_type.value,
'policy_type': policy.policy_type.value,
'description': policy.description,
'is_enabled': policy.is_enabled,
'is_audit_enabled': policy.is_audit_enabled,
'created_by': policy.created_by,
'create_time': policy.create_time.isoformat(),
'version': policy.version
})
return {
'status': 'success',
'policies': policies_info,
'total': len(policies_info)
}
def get_audit_events(self, user: Optional[str] = None,
service_name: Optional[str] = None,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
limit: int = 100) -> Dict[str, Any]:
"""
获取审计事件
Args:
user: 用户名(可选)
service_name: 服务名称(可选)
start_time: 开始时间(可选)
end_time: 结束时间(可选)
limit: 限制数量
Returns:
Dict[str, Any]: 审计事件列表
"""
filtered_events = []
for event in self.audit_events:
# 应用过滤条件
if user and event.user != user:
continue
if service_name and event.service_name != service_name:
continue
if start_time and event.event_time < start_time:
continue
if end_time and event.event_time > end_time:
continue
filtered_events.append({
'id': event.id,
'event_time': event.event_time.isoformat(),
'user': event.user,
'service_name': event.service_name,
'service_type': event.service_type.value,
'resource_path': event.resource_path,
'resource_type': event.resource_type,
'action': event.action.value,
'access_type': event.access_type,
'result': event.result.value,
'policy_id': event.policy_id,
'client_ip': event.client_ip,
'session_id': event.session_id
})
# 按时间倒序排列并限制数量
filtered_events.sort(key=lambda x: x['event_time'], reverse=True)
filtered_events = filtered_events[:limit]
return {
'status': 'success',
'events': filtered_events,
'total': len(filtered_events)
}
def get_services(self) -> Dict[str, Any]:
"""
获取服务列表
Returns:
Dict[str, Any]: 服务列表
"""
services_info = []
for service in self.services.values():
services_info.append({
'id': service.id,
'name': service.name,
'type': service.type.value,
'description': service.description,
'is_enabled': service.is_enabled,
'created_by': service.created_by,
'create_time': service.create_time.isoformat(),
'version': service.version
})
return {
'status': 'success',
'services': services_info,
'total': len(services_info)
}
def get_admin_status(self) -> Dict[str, Any]:
"""
获取管理服务器状态
Returns:
Dict[str, Any]: 服务器状态
"""
return {
'admin_url': self.admin_url,
'is_running': self.is_running,
'version': self.version,
'stats': {
'total_services': len(self.services),
'total_policies': len(self.policies),
'total_users': len(self.users),
'total_groups': len(self.groups),
'total_audit_events': len(self.audit_events),
'enabled_services': len([s for s in self.services.values() if s.is_enabled]),
'enabled_policies': len([p for p in self.policies.values() if p.is_enabled])
},
'timestamp': datetime.now().isoformat()
}
# 使用示例
if __name__ == "__main__":
# 创建Ranger管理服务器
ranger = RangerAdmin()
print("=== Apache Ranger安全管理示例 ===")
# 创建Hive服务
print("\n=== 创建Hive服务 ===")
hive_service_result = ranger.create_service(
name="hadoop-hive",
service_type=ServiceType.HIVE,
description="Hive Service for data warehouse",
configs={
"username": "hive",
"password": "hive123",
"jdbc.driverClassName": "org.apache.hive.jdbc.HiveDriver",
"jdbc.url": "jdbc:hive2://localhost:10000"
}
)
print(f"Hive服务创建结果: {hive_service_result}")
# 创建HDFS访问策略
print("\n=== 创建HDFS访问策略 ===")
hdfs_policy_result = ranger.create_policy(
policy_name="hdfs-data-access",
service_name="hadoop-hdfs",
policy_type=PolicyType.ACCESS,
resources={
"path": ["/user/data/*"]
},
policy_items=[
{
"users": ["datauser", "analyst"],
"groups": ["data-team"],
"permissions": ["read", "write"]
}
],
description="Allow data team to access /user/data directory"
)
print(f"HDFS策略创建结果: {hdfs_policy_result}")
# 创建Hive数据库策略
print("\n=== 创建Hive数据库策略 ===")
hive_policy_result = ranger.create_policy(
policy_name="hive-sales-db-access",
service_name="hadoop-hive",
policy_type=PolicyType.ACCESS,
resources={
"database": ["sales"],
"table": ["*"],
"column": ["*"]
},
policy_items=[
{
"users": ["analyst"],
"groups": ["analytics-team"],
"permissions": ["select"]
},
{
"users": ["dataadmin"],
"permissions": ["all"]
}
],
description="Access control for sales database"
)
print(f"Hive策略创建结果: {hive_policy_result}")
# 检查访问权限
print("\n=== 检查访问权限 ===")
# 检查HDFS访问
hdfs_access_check = ranger.check_access(
user="datauser",
resource_path="/user/data/sales.csv",
access_type="read",
service_name="hadoop-hdfs"
)
print(f"datauser访问/user/data/sales.csv (read): {hdfs_access_check}")
# 检查Hive访问
hive_access_check = ranger.check_access(
user="analyst",
resource_path="sales.customers",
access_type="select",
service_name="hadoop-hive"
)
print(f"analyst访问sales.customers (select): {hive_access_check}")
# 检查未授权访问
unauthorized_check = ranger.check_access(
user="guest",
resource_path="/user/data/confidential.csv",
access_type="read",
service_name="hadoop-hdfs"
)
print(f"guest访问/user/data/confidential.csv (read): {unauthorized_check}")
# 获取策略列表
print("\n=== 策略列表 ===")
policies = ranger.get_policies()
if policies['status'] == 'success':
print(f"总策略数: {policies['total']}")
for policy in policies['policies']:
print(f" - {policy['name']} ({policy['service_name']}) - {policy['policy_type']}")
# 获取审计事件
print("\n=== 审计事件 ===")
audit_events = ranger.get_audit_events(limit=10)
if audit_events['status'] == 'success':
print(f"审计事件数: {audit_events['total']}")
for event in audit_events['events'][:5]: # 显示前5个事件
print(f" - {event['event_time']}: {event['user']} {event['action']} {event['resource_path']} -> {event['result']}")
# 获取服务列表
print("\n=== 服务列表 ===")
services = ranger.get_services()
if services['status'] == 'success':
print(f"总服务数: {services['total']}")
for service in services['services']:
print(f" - {service['name']} ({service['type']}) - {'启用' if service['is_enabled'] else '禁用'}")
# 获取管理服务器状态
print("\n=== 管理服务器状态 ===")
status = ranger.get_admin_status()
print(f"服务器URL: {status['admin_url']}")
print(f"运行状态: {status['is_running']}")
print(f"版本: {status['version']}")
print("统计信息:")
for key, value in status['stats'].items():
print(f" {key}: {value}")
9.2 Apache Knox详解
Apache Knox是一个为Hadoop集群提供安全访问的网关服务。
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime, timedelta
import base64
import hashlib
import jwt
import uuid
class AuthenticationType(Enum):
"""认证类型"""
BASIC = "basic"
KERBEROS = "kerberos"
JWT = "jwt"
LDAP = "ldap"
PAM = "pam"
class ServiceRole(Enum):
"""服务角色"""
NAMENODE = "NAMENODE"
DATANODE = "DATANODE"
RESOURCEMANAGER = "RESOURCEMANAGER"
NODEMANAGER = "NODEMANAGER"
HIVESERVER2 = "HIVESERVER2"
WEBHCAT = "WEBHCAT"
OOZIE = "OOZIE"
WEBHBASE = "WEBHBASE"
STORM = "STORM"
KAFKA = "KAFKA"
class TopologyStatus(Enum):
"""拓扑状态"""
ACTIVE = "active"
INACTIVE = "inactive"
DEPLOYING = "deploying"
ERROR = "error"
@dataclass
class KnoxService:
"""Knox服务定义"""
name: str
role: ServiceRole
url: str
version: str = "1.0.0"
params: Dict[str, Any] = field(default_factory=dict)
@dataclass
class KnoxProvider:
"""Knox提供者"""
name: str
enabled: bool
role: str # authentication, authorization, identity-assertion等
params: Dict[str, Any] = field(default_factory=dict)
@dataclass
class KnoxTopology:
"""Knox拓扑"""
name: str
description: str
services: List[KnoxService]
providers: List[KnoxProvider]
status: TopologyStatus = TopologyStatus.INACTIVE
created_time: datetime = field(default_factory=datetime.now)
updated_time: datetime = field(default_factory=datetime.now)
version: str = "1.0.0"
@dataclass
class KnoxUser:
"""Knox用户"""
username: str
password_hash: str
roles: List[str] = field(default_factory=list)
groups: List[str] = field(default_factory=list)
is_active: bool = True
created_time: datetime = field(default_factory=datetime.now)
last_login: Optional[datetime] = None
@dataclass
class KnoxSession:
"""Knox会话"""
session_id: str
username: str
client_ip: str
user_agent: str
created_time: datetime
last_access_time: datetime
expires_at: datetime
is_active: bool = True
jwt_token: Optional[str] = None
@dataclass
class AccessLog:
"""访问日志"""
timestamp: datetime
client_ip: str
username: str
method: str
url: str
status_code: int
response_size: int
user_agent: str
session_id: str
topology_name: str
service_name: str
response_time_ms: int
class KnoxGateway:
"""
Apache Knox网关
提供Hadoop集群的安全访问入口
"""
def __init__(self, gateway_host: str = "localhost", gateway_port: int = 8443):
self.gateway_host = gateway_host
self.gateway_port = gateway_port
self.gateway_url = f"https://{gateway_host}:{gateway_port}"
self.topologies: Dict[str, KnoxTopology] = {}
self.users: Dict[str, KnoxUser] = {}
self.sessions: Dict[str, KnoxSession] = {}
self.access_logs: List[AccessLog] = []
self.is_running = True
self.version = "1.6.1"
self.jwt_secret = "knox-secret-key"
# 初始化默认数据
self._initialize_default_data()
def _initialize_default_data(self):
"""初始化默认数据"""
# 创建默认用户
admin_user = KnoxUser(
username="admin",
password_hash=self._hash_password("admin123"),
roles=["admin"],
groups=["administrators"]
)
self.users["admin"] = admin_user
guest_user = KnoxUser(
username="guest",
password_hash=self._hash_password("guest123"),
roles=["user"],
groups=["users"]
)
self.users["guest"] = guest_user
# 创建默认拓扑
self._create_default_topology()
def _create_default_topology(self):
"""创建默认拓扑"""
# 定义服务
services = [
KnoxService(
name="namenode",
role=ServiceRole.NAMENODE,
url="hdfs://localhost:9000"
),
KnoxService(
name="resourcemanager",
role=ServiceRole.RESOURCEMANAGER,
url="http://localhost:8088"
),
KnoxService(
name="hive",
role=ServiceRole.HIVESERVER2,
url="http://localhost:10001"
)
]
# 定义提供者
providers = [
KnoxProvider(
name="ShiroProvider",
enabled=True,
role="authentication",
params={
"sessionTimeout": "30",
"main.ldapRealm": "org.apache.knox.gateway.shirorealm.KnoxLdapRealm",
"main.ldapContextFactory": "org.apache.knox.gateway.shirorealm.KnoxLdapContextFactory",
"main.ldapRealm.contextFactory": "$ldapContextFactory"
}
),
KnoxProvider(
name="AclsAuthz",
enabled=True,
role="authorization",
params={
"webhdfs.acl": "admin;*;*",
"webhcat.acl": "admin;*;*"
}
),
KnoxProvider(
name="Default",
enabled=True,
role="identity-assertion",
params={
"name": "Default"
}
)
]
default_topology = KnoxTopology(
name="sandbox",
description="Default sandbox topology",
services=services,
providers=providers,
status=TopologyStatus.ACTIVE
)
self.topologies["sandbox"] = default_topology
def _hash_password(self, password: str) -> str:
"""密码哈希"""
return hashlib.sha256(password.encode()).hexdigest()
def authenticate(self, username: str, password: str,
auth_type: AuthenticationType = AuthenticationType.BASIC) -> Dict[str, Any]:
"""
用户认证
Args:
username: 用户名
password: 密码
auth_type: 认证类型
Returns:
Dict[str, Any]: 认证结果
"""
try:
if username not in self.users:
return {
'status': 'error',
'message': 'User not found'
}
user = self.users[username]
if not user.is_active:
return {
'status': 'error',
'message': 'User account is disabled'
}
# 验证密码
if user.password_hash != self._hash_password(password):
return {
'status': 'error',
'message': 'Invalid credentials'
}
# 创建会话
session_id = str(uuid.uuid4())
expires_at = datetime.now() + timedelta(hours=8)
# 生成JWT令牌
jwt_payload = {
'username': username,
'roles': user.roles,
'groups': user.groups,
'session_id': session_id,
'exp': expires_at.timestamp(),
'iat': datetime.now().timestamp()
}
jwt_token = jwt.encode(jwt_payload, self.jwt_secret, algorithm='HS256')
session = KnoxSession(
session_id=session_id,
username=username,
client_ip="192.168.1.100",
user_agent="Knox-Client/1.0",
created_time=datetime.now(),
last_access_time=datetime.now(),
expires_at=expires_at,
jwt_token=jwt_token
)
self.sessions[session_id] = session
# 更新用户最后登录时间
user.last_login = datetime.now()
return {
'status': 'success',
'session_id': session_id,
'jwt_token': jwt_token,
'expires_at': expires_at.isoformat(),
'user_info': {
'username': username,
'roles': user.roles,
'groups': user.groups
}
}
except Exception as e:
return {
'status': 'error',
'message': f'Authentication failed: {str(e)}'
}
def create_topology(self, name: str, description: str,
services: List[Dict[str, Any]],
providers: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
创建拓扑
Args:
name: 拓扑名称
description: 描述
services: 服务列表
providers: 提供者列表
Returns:
Dict[str, Any]: 创建结果
"""
try:
if name in self.topologies:
return {
'status': 'error',
'message': f'Topology {name} already exists'
}
# 转换服务
knox_services = []
for svc in services:
knox_service = KnoxService(
name=svc['name'],
role=ServiceRole(svc['role']),
url=svc['url'],
version=svc.get('version', '1.0.0'),
params=svc.get('params', {})
)
knox_services.append(knox_service)
# 转换提供者
knox_providers = []
for prov in providers:
knox_provider = KnoxProvider(
name=prov['name'],
enabled=prov.get('enabled', True),
role=prov['role'],
params=prov.get('params', {})
)
knox_providers.append(knox_provider)
topology = KnoxTopology(
name=name,
description=description,
services=knox_services,
providers=knox_providers,
status=TopologyStatus.DEPLOYING
)
self.topologies[name] = topology
# 模拟部署过程
import time
time.sleep(1)
topology.status = TopologyStatus.ACTIVE
return {
'status': 'success',
'topology_name': name,
'message': f'Topology {name} created and deployed successfully'
}
except Exception as e:
return {
'status': 'error',
'message': f'Failed to create topology: {str(e)}'
}
def proxy_request(self, topology_name: str, service_name: str,
path: str, method: str = "GET",
session_id: Optional[str] = None) -> Dict[str, Any]:
"""
代理请求到后端服务
Args:
topology_name: 拓扑名称
service_name: 服务名称
path: 请求路径
method: HTTP方法
session_id: 会话ID
Returns:
Dict[str, Any]: 代理结果
"""
try:
# 检查拓扑
if topology_name not in self.topologies:
return {
'status': 'error',
'status_code': 404,
'message': f'Topology {topology_name} not found'
}
topology = self.topologies[topology_name]
if topology.status != TopologyStatus.ACTIVE:
return {
'status': 'error',
'status_code': 503,
'message': f'Topology {topology_name} is not active'
}
# 检查会话(如果提供)
username = "anonymous"
if session_id:
if session_id not in self.sessions:
return {
'status': 'error',
'status_code': 401,
'message': 'Invalid session'
}
session = self.sessions[session_id]
if not session.is_active or datetime.now() > session.expires_at:
return {
'status': 'error',
'status_code': 401,
'message': 'Session expired'
}
username = session.username
session.last_access_time = datetime.now()
# 查找服务
target_service = None
for service in topology.services:
if service.name == service_name:
target_service = service
break
if not target_service:
return {
'status': 'error',
'status_code': 404,
'message': f'Service {service_name} not found in topology {topology_name}'
}
# 模拟代理请求
start_time = datetime.now()
# 构建目标URL
target_url = f"{target_service.url}{path}"
# 模拟响应
if service_name == "namenode" and "webhdfs" in path:
response_data = {
"FileStatuses": {
"FileStatus": [
{
"accessTime": 1640995200000,
"blockSize": 134217728,
"group": "supergroup",
"length": 1024,
"modificationTime": 1640995200000,
"owner": "hdfs",
"pathSuffix": "data.txt",
"permission": "644",
"replication": 3,
"type": "FILE"
}
]
}
}
status_code = 200
elif service_name == "resourcemanager":
response_data = {
"clusterInfo": {
"id": 1640995200000,
"startedOn": 1640995200000,
"state": "STARTED",
"haState": "ACTIVE",
"resourceManagerVersion": "3.3.4",
"resourceManagerBuildVersion": "3.3.4",
"hadoopVersion": "3.3.4"
}
}
status_code = 200
else:
response_data = {"message": "Service response", "path": path}
status_code = 200
end_time = datetime.now()
response_time_ms = int((end_time - start_time).total_seconds() * 1000)
# 记录访问日志
access_log = AccessLog(
timestamp=start_time,
client_ip="192.168.1.100",
username=username,
method=method,
url=f"/gateway/{topology_name}/{service_name}{path}",
status_code=status_code,
response_size=len(str(response_data)),
user_agent="Knox-Client/1.0",
session_id=session_id or "anonymous",
topology_name=topology_name,
service_name=service_name,
response_time_ms=response_time_ms
)
self.access_logs.append(access_log)
# 保持最近1000条访问日志
if len(self.access_logs) > 1000:
self.access_logs = self.access_logs[-1000:]
return {
'status': 'success',
'status_code': status_code,
'target_url': target_url,
'response_data': response_data,
'response_time_ms': response_time_ms
}
except Exception as e:
return {
'status': 'error',
'status_code': 500,
'message': f'Proxy request failed: {str(e)}'
}
def get_topologies(self) -> Dict[str, Any]:
"""
获取拓扑列表
Returns:
Dict[str, Any]: 拓扑列表
"""
topologies_info = []
for topology in self.topologies.values():
topologies_info.append({
'name': topology.name,
'description': topology.description,
'status': topology.status.value,
'services_count': len(topology.services),
'providers_count': len(topology.providers),
'created_time': topology.created_time.isoformat(),
'updated_time': topology.updated_time.isoformat(),
'version': topology.version
})
return {
'status': 'success',
'topologies': topologies_info,
'total': len(topologies_info)
}
def get_access_logs(self, topology_name: Optional[str] = None,
username: Optional[str] = None,
limit: int = 100) -> Dict[str, Any]:
"""
获取访问日志
Args:
topology_name: 拓扑名称(可选)
username: 用户名(可选)
limit: 限制数量
Returns:
Dict[str, Any]: 访问日志列表
"""
filtered_logs = []
for log in self.access_logs:
if topology_name and log.topology_name != topology_name:
continue
if username and log.username != username:
continue
filtered_logs.append({
'timestamp': log.timestamp.isoformat(),
'client_ip': log.client_ip,
'username': log.username,
'method': log.method,
'url': log.url,
'status_code': log.status_code,
'response_size': log.response_size,
'user_agent': log.user_agent,
'session_id': log.session_id,
'topology_name': log.topology_name,
'service_name': log.service_name,
'response_time_ms': log.response_time_ms
})
# 按时间倒序排列并限制数量
filtered_logs.sort(key=lambda x: x['timestamp'], reverse=True)
filtered_logs = filtered_logs[:limit]
return {
'status': 'success',
'logs': filtered_logs,
'total': len(filtered_logs)
}
def get_gateway_status(self) -> Dict[str, Any]:
"""
获取网关状态
Returns:
Dict[str, Any]: 网关状态
"""
active_sessions = len([s for s in self.sessions.values()
if s.is_active and datetime.now() <= s.expires_at])
return {
'gateway_url': self.gateway_url,
'is_running': self.is_running,
'version': self.version,
'stats': {
'total_topologies': len(self.topologies),
'active_topologies': len([t for t in self.topologies.values()
if t.status == TopologyStatus.ACTIVE]),
'total_users': len(self.users),
'active_sessions': active_sessions,
'total_access_logs': len(self.access_logs)
},
'timestamp': datetime.now().isoformat()
}
# 使用示例
if __name__ == "__main__":
# 创建Knox网关
knox = KnoxGateway()
print("=== Apache Knox网关示例 ===")
# 用户认证
print("\n=== 用户认证 ===")
auth_result = knox.authenticate("admin", "admin123")
print(f"认证结果: {auth_result}")
if auth_result['status'] == 'success':
session_id = auth_result['session_id']
jwt_token = auth_result['jwt_token']
# 创建自定义拓扑
print("\n=== 创建自定义拓扑 ===")
topology_result = knox.create_topology(
name="production",
description="Production environment topology",
services=[
{
"name": "namenode",
"role": "NAMENODE",
"url": "hdfs://prod-namenode:9000"
},
{
"name": "resourcemanager",
"role": "RESOURCEMANAGER",
"url": "http://prod-rm:8088"
},
{
"name": "hive",
"role": "HIVESERVER2",
"url": "http://prod-hive:10001"
}
],
providers=[
{
"name": "ShiroProvider",
"role": "authentication",
"enabled": True,
"params": {
"sessionTimeout": "60"
}
},
{
"name": "AclsAuthz",
"role": "authorization",
"enabled": True,
"params": {
"webhdfs.acl": "admin;*;*"
}
}
]
)
print(f"拓扑创建结果: {topology_result}")
# 代理请求
print("\n=== 代理请求 ===")
# HDFS WebHDFS请求
hdfs_request = knox.proxy_request(
topology_name="sandbox",
service_name="namenode",
path="/webhdfs/v1/user?op=LISTSTATUS",
method="GET",
session_id=session_id
)
print(f"HDFS请求结果: {hdfs_request}")
# YARN ResourceManager请求
yarn_request = knox.proxy_request(
topology_name="sandbox",
service_name="resourcemanager",
path="/ws/v1/cluster/info",
method="GET",
session_id=session_id
)
print(f"YARN请求结果: {yarn_request}")
# 未授权请求
unauthorized_request = knox.proxy_request(
topology_name="sandbox",
service_name="namenode",
path="/webhdfs/v1/admin?op=LISTSTATUS",
method="GET"
)
print(f"未授权请求结果: {unauthorized_request}")
# 获取拓扑列表
print("\n=== 拓扑列表 ===")
topologies = knox.get_topologies()
if topologies['status'] == 'success':
print(f"总拓扑数: {topologies['total']}")
for topology in topologies['topologies']:
print(f" - {topology['name']}: {topology['description']} ({topology['status']})")
# 获取访问日志
print("\n=== 访问日志 ===")
access_logs = knox.get_access_logs(limit=5)
if access_logs['status'] == 'success':
print(f"访问日志数: {access_logs['total']}")
for log in access_logs['logs']:
print(f" - {log['timestamp']}: {log['username']} {log['method']} {log['url']} -> {log['status_code']} ({log['response_time_ms']}ms)")
# 获取网关状态
print("\n=== 网关状态 ===")
status = knox.get_gateway_status()
print(f"网关URL: {status['gateway_url']}")
print(f"运行状态: {status['is_running']}")
print(f"版本: {status['version']}")
print("统计信息:")
for key, value in status['stats'].items():
print(f" {key}: {value}")
print("资源池:")
for pool_name, pool_info in status['pools'].items():
print(f" {pool_name}: {pool_info['slots']} slots")
### 7.2 Apache Oozie详解
```python
from typing import Dict, List, Any, Optional, Tuple, Union
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime, timedelta
import threading
import time
import random
import json
import uuid
from collections import defaultdict, deque
import xml.etree.ElementTree as ET
class WorkflowStatus(Enum):
"""工作流状态"""
PREP = "PREP" # 准备中
RUNNING = "RUNNING" # 运行中
SUCCEEDED = "SUCCEEDED" # 成功
FAILED = "FAILED" # 失败
KILLED = "KILLED" # 已终止
SUSPENDED = "SUSPENDED" # 暂停
class ActionStatus(Enum):
"""动作状态"""
PREP = "PREP" # 准备中
RUNNING = "RUNNING" # 运行中
OK = "OK" # 成功
ERROR = "ERROR" # 错误
KILLED = "KILLED" # 已终止
START_RETRY = "START_RETRY" # 开始重试
START_MANUAL = "START_MANUAL" # 手动开始
DONE = "DONE" # 完成
END_RETRY = "END_RETRY" # 结束重试
END_MANUAL = "END_MANUAL" # 手动结束
USER_RETRY = "USER_RETRY" # 用户重试
class CoordinatorStatus(Enum):
"""协调器状态"""
PREP = "PREP" # 准备中
RUNNING = "RUNNING" # 运行中
SUCCEEDED = "SUCCEEDED" # 成功
FAILED = "FAILED" # 失败
KILLED = "KILLED" # 已终止
SUSPENDED = "SUSPENDED" # 暂停
DONEWITHERROR = "DONEWITHERROR" # 完成但有错误
RUNNINGWITHERROR = "RUNNINGWITHERROR" # 运行中但有错误
class ActionType(Enum):
"""动作类型"""
MAP_REDUCE = "map-reduce"
PIG = "pig"
HIVE = "hive"
SQOOP = "sqoop"
SHELL = "shell"
JAVA = "java"
FS = "fs"
EMAIL = "email"
SSH = "ssh"
SUB_WORKFLOW = "sub-workflow"
SPARK = "spark"
DISTCP = "distcp"
class BundleStatus(Enum):
"""Bundle状态"""
PREP = "PREP" # 准备中
RUNNING = "RUNNING" # 运行中
SUCCEEDED = "SUCCEEDED" # 成功
FAILED = "FAILED" # 失败
KILLED = "KILLED" # 已终止
SUSPENDED = "SUSPENDED" # 暂停
DONEWITHERROR = "DONEWITHERROR" # 完成但有错误
RUNNINGWITHERROR = "RUNNINGWITHERROR" # 运行中但有错误
@dataclass
class WorkflowAction:
"""工作流动作"""
name: str
action_type: ActionType
status: ActionStatus = ActionStatus.PREP
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
transition: str = "" # 成功时的下一个动作
error_transition: str = "" # 失败时的下一个动作
retry_max: int = 3
retry_interval: int = 10 # 分钟
configuration: Dict[str, Any] = field(default_factory=dict)
external_id: str = ""
external_status: str = ""
external_child_ids: str = ""
error_code: str = ""
error_message: str = ""
data: str = ""
stats: str = ""
console_url: str = ""
user_retry_count: int = 0
user_retry_max: int = 3
user_retry_interval: int = 10
@dataclass
class WorkflowJob:
"""工作流作业"""
id: str
app_name: str
app_path: str
status: WorkflowStatus = WorkflowStatus.PREP
user: str = ""
group: str = ""
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
created_time: datetime = field(default_factory=datetime.now)
last_modified_time: datetime = field(default_factory=datetime.now)
run: int = 0
console_url: str = ""
actions: Dict[str, WorkflowAction] = field(default_factory=dict)
conf: str = ""
parent_id: str = ""
external_id: str = ""
log_token: str = ""
@dataclass
class CoordinatorAction:
"""协调器动作"""
id: str
job_id: str
status: ActionStatus = ActionStatus.PREP
external_id: str = ""
external_status: str = ""
nominal_time: Optional[datetime] = None
created_time: datetime = field(default_factory=datetime.now)
last_modified_time: datetime = field(default_factory=datetime.now)
missing_dependencies: str = ""
push_missing_dependencies: str = ""
timeout: int = 120 # 分钟
error_code: str = ""
error_message: str = ""
console_url: str = ""
@dataclass
class CoordinatorJob:
"""协调器作业"""
id: str
app_name: str
app_path: str
status: CoordinatorStatus = CoordinatorStatus.PREP
user: str = ""
group: str = ""
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
pause_time: Optional[datetime] = None
created_time: datetime = field(default_factory=datetime.now)
last_modified_time: datetime = field(default_factory=datetime.now)
next_materialized_time: Optional[datetime] = None
frequency: str = "" # 频率表达式
time_zone: str = "UTC"
concurrency: int = 1
execution: str = "FIFO" # FIFO, LIFO, LAST_ONLY
timeout: int = 120 # 分钟
actions: Dict[str, CoordinatorAction] = field(default_factory=dict)
conf: str = ""
mat_throttling: int = 12
@dataclass
class BundleJob:
"""Bundle作业"""
id: str
app_name: str
app_path: str
status: BundleStatus = BundleStatus.PREP
user: str = ""
group: str = ""
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
pause_time: Optional[datetime] = None
created_time: datetime = field(default_factory=datetime.now)
last_modified_time: datetime = field(default_factory=datetime.now)
kick_off_time: Optional[datetime] = None
time_zone: str = "UTC"
coordinators: List[str] = field(default_factory=list)
conf: str = ""
class OozieServer:
"""
Oozie工作流引擎服务器
"""
def __init__(self, server_url: str = "http://localhost:11000/oozie"):
self.server_url = server_url
self.workflow_jobs = {} # job_id -> WorkflowJob
self.coordinator_jobs = {} # job_id -> CoordinatorJob
self.bundle_jobs = {} # job_id -> BundleJob
# 服务器状态
self.is_running = True
self.build_version = "5.2.1"
self.system_mode = "NORMAL" # NORMAL, NOWEBSERVICE, SAFEMODE
self.java_version = "1.8.0_281"
self.os_name = "Linux"
self.os_version = "3.10.0"
# 配置
self.config = {
'oozie.service.WorkflowStoreService.workflow.callable.limit': 100,
'oozie.service.coord.normal.default.timeout': 120,
'oozie.service.coord.push.check.requeue.interval': 30000,
'oozie.service.CallableQueueService.queue.size': 10000,
'oozie.service.CallableQueueService.threads': 10,
'oozie.service.coord.materialization.window': 3600000,
'oozie.service.coord.lookup.trigger.window': 10800000,
'oozie.service.bundle.ForkedActionExecutor.buffer.size': 1000
}
# 线程锁
self.workflow_lock = threading.Lock()
self.coordinator_lock = threading.Lock()
self.bundle_lock = threading.Lock()
# 统计信息
self.stats = {
'total_workflows': 0,
'running_workflows': 0,
'succeeded_workflows': 0,
'failed_workflows': 0,
'total_coordinators': 0,
'running_coordinators': 0,
'total_bundles': 0,
'running_bundles': 0
}
# 初始化示例作业
self._create_example_jobs()
def _create_example_jobs(self):
"""创建示例作业"""
# 创建示例工作流
workflow_id = f"0000001-{datetime.now().strftime('%Y%m%d%H%M%S')}-oozie-W"
# 创建工作流动作
start_action = WorkflowAction(
name="start",
action_type=ActionType.FS,
status=ActionStatus.OK,
transition="data-processing"
)
data_processing_action = WorkflowAction(
name="data-processing",
action_type=ActionType.MAP_REDUCE,
status=ActionStatus.PREP,
transition="data-validation",
error_transition="fail",
configuration={
'mapred.job.queue.name': 'default',
'mapred.mapper.class': 'org.example.DataMapper',
'mapred.reducer.class': 'org.example.DataReducer',
'mapred.input.dir': '/user/input',
'mapred.output.dir': '/user/output'
}
)
data_validation_action = WorkflowAction(
name="data-validation",
action_type=ActionType.HIVE,
status=ActionStatus.PREP,
transition="end",
error_transition="fail",
configuration={
'hive.script': 'validate_data.hql',
'hive.params': 'INPUT=/user/output,OUTPUT=/user/validated'
}
)
end_action = WorkflowAction(
name="end",
action_type=ActionType.FS,
status=ActionStatus.PREP
)
fail_action = WorkflowAction(
name="fail",
action_type=ActionType.FS,
status=ActionStatus.PREP
)
workflow_job = WorkflowJob(
id=workflow_id,
app_name="data-processing-workflow",
app_path="/user/oozie/workflows/data-processing",
user="hadoop",
group="hadoop",
actions={
"start": start_action,
"data-processing": data_processing_action,
"data-validation": data_validation_action,
"end": end_action,
"fail": fail_action
},
conf='<configuration><property><name>jobTracker</name><value>localhost:8032</value></property></configuration>'
)
self.workflow_jobs[workflow_id] = workflow_job
self.stats['total_workflows'] += 1
# 创建示例协调器
coordinator_id = f"0000001-{datetime.now().strftime('%Y%m%d%H%M%S')}-oozie-C"
coordinator_job = CoordinatorJob(
id=coordinator_id,
app_name="daily-data-processing",
app_path="/user/oozie/coordinators/daily-processing",
user="hadoop",
group="hadoop",
start_time=datetime.now(),
end_time=datetime.now() + timedelta(days=30),
frequency="${coord:days(1)}",
concurrency=1,
execution="FIFO",
timeout=120
)
self.coordinator_jobs[coordinator_id] = coordinator_job
self.stats['total_coordinators'] += 1
# 创建示例Bundle
bundle_id = f"0000001-{datetime.now().strftime('%Y%m%d%H%M%S')}-oozie-B"
bundle_job = BundleJob(
id=bundle_id,
app_name="data-pipeline-bundle",
app_path="/user/oozie/bundles/data-pipeline",
user="hadoop",
group="hadoop",
start_time=datetime.now(),
end_time=datetime.now() + timedelta(days=90),
coordinators=[coordinator_id]
)
self.bundle_jobs[bundle_id] = bundle_job
self.stats['total_bundles'] += 1
def submit_workflow(self, app_path: str, config: Dict[str, Any],
user: str = "hadoop") -> Dict[str, Any]:
"""
提交工作流
Args:
app_path: 应用路径
config: 配置参数
user: 用户名
Returns:
Dict[str, Any]: 提交结果
"""
job_id = f"{len(self.workflow_jobs)+1:07d}-{datetime.now().strftime('%Y%m%d%H%M%S')}-oozie-W"
# 解析应用名称
app_name = app_path.split('/')[-1] if app_path else "workflow"
# 创建工作流作业
workflow_job = WorkflowJob(
id=job_id,
app_name=app_name,
app_path=app_path,
user=user,
group=user,
conf=json.dumps(config)
)
# 创建基本动作
start_action = WorkflowAction(
name="start",
action_type=ActionType.FS,
status=ActionStatus.OK,
transition="main-action"
)
main_action = WorkflowAction(
name="main-action",
action_type=ActionType.MAP_REDUCE,
status=ActionStatus.PREP,
transition="end",
error_transition="fail",
configuration=config
)
end_action = WorkflowAction(
name="end",
action_type=ActionType.FS,
status=ActionStatus.PREP
)
fail_action = WorkflowAction(
name="fail",
action_type=ActionType.FS,
status=ActionStatus.PREP
)
workflow_job.actions = {
"start": start_action,
"main-action": main_action,
"end": end_action,
"fail": fail_action
}
with self.workflow_lock:
self.workflow_jobs[job_id] = workflow_job
self.stats['total_workflows'] += 1
return {
'status': 'success',
'job_id': job_id,
'message': f'Workflow submitted successfully'
}
def start_workflow(self, job_id: str) -> Dict[str, Any]:
"""
启动工作流
Args:
job_id: 作业ID
Returns:
Dict[str, Any]: 启动结果
"""
if job_id not in self.workflow_jobs:
return {'status': 'error', 'message': f'Workflow {job_id} not found'}
workflow_job = self.workflow_jobs[job_id]
if workflow_job.status != WorkflowStatus.PREP:
return {'status': 'error', 'message': f'Workflow {job_id} is not in PREP state'}
with self.workflow_lock:
workflow_job.status = WorkflowStatus.RUNNING
workflow_job.start_time = datetime.now()
workflow_job.run += 1
self.stats['running_workflows'] += 1
# 启动第一个动作
self._execute_workflow_action(job_id, "start")
return {
'status': 'success',
'job_id': job_id,
'message': f'Workflow started successfully'
}
def _execute_workflow_action(self, job_id: str, action_name: str):
"""
执行工作流动作
Args:
job_id: 作业ID
action_name: 动作名称
"""
workflow_job = self.workflow_jobs[job_id]
action = workflow_job.actions.get(action_name)
if not action:
return
# 模拟动作执行
action.status = ActionStatus.RUNNING
action.start_time = datetime.now()
# 模拟执行时间
execution_time = random.uniform(1, 5)
time.sleep(execution_time)
# 模拟执行结果(90%成功率)
success = random.random() > 0.1
action.end_time = datetime.now()
if success:
action.status = ActionStatus.OK
# 执行下一个动作
if action.transition and action.transition != "end":
self._execute_workflow_action(job_id, action.transition)
elif action.transition == "end":
self._complete_workflow(job_id, WorkflowStatus.SUCCEEDED)
else:
action.status = ActionStatus.ERROR
action.error_code = "E0001"
action.error_message = "Action execution failed"
# 执行错误转换
if action.error_transition:
if action.error_transition == "fail":
self._complete_workflow(job_id, WorkflowStatus.FAILED)
else:
self._execute_workflow_action(job_id, action.error_transition)
def _complete_workflow(self, job_id: str, status: WorkflowStatus):
"""
完成工作流
Args:
job_id: 作业ID
status: 最终状态
"""
workflow_job = self.workflow_jobs[job_id]
with self.workflow_lock:
workflow_job.status = status
workflow_job.end_time = datetime.now()
self.stats['running_workflows'] -= 1
if status == WorkflowStatus.SUCCEEDED:
self.stats['succeeded_workflows'] += 1
elif status == WorkflowStatus.FAILED:
self.stats['failed_workflows'] += 1
def get_workflow_job(self, job_id: str) -> Dict[str, Any]:
"""
获取工作流作业信息
Args:
job_id: 作业ID
Returns:
Dict[str, Any]: 作业信息
"""
if job_id not in self.workflow_jobs:
return {'status': 'error', 'message': f'Workflow {job_id} not found'}
workflow_job = self.workflow_jobs[job_id]
# 获取动作信息
actions_info = []
for action_name, action in workflow_job.actions.items():
actions_info.append({
'name': action.name,
'type': action.action_type.value,
'status': action.status.value,
'start_time': action.start_time.isoformat() if action.start_time else None,
'end_time': action.end_time.isoformat() if action.end_time else None,
'transition': action.transition,
'error_transition': action.error_transition,
'retry_max': action.retry_max,
'error_code': action.error_code,
'error_message': action.error_message
})
return {
'status': 'success',
'workflow': {
'id': workflow_job.id,
'app_name': workflow_job.app_name,
'app_path': workflow_job.app_path,
'status': workflow_job.status.value,
'user': workflow_job.user,
'group': workflow_job.group,
'start_time': workflow_job.start_time.isoformat() if workflow_job.start_time else None,
'end_time': workflow_job.end_time.isoformat() if workflow_job.end_time else None,
'created_time': workflow_job.created_time.isoformat(),
'last_modified_time': workflow_job.last_modified_time.isoformat(),
'run': workflow_job.run,
'actions': actions_info,
'parent_id': workflow_job.parent_id,
'external_id': workflow_job.external_id
}
}
def kill_workflow(self, job_id: str) -> Dict[str, Any]:
"""
终止工作流
Args:
job_id: 作业ID
Returns:
Dict[str, Any]: 终止结果
"""
if job_id not in self.workflow_jobs:
return {'status': 'error', 'message': f'Workflow {job_id} not found'}
workflow_job = self.workflow_jobs[job_id]
if workflow_job.status not in [WorkflowStatus.RUNNING, WorkflowStatus.SUSPENDED]:
return {'status': 'error', 'message': f'Workflow {job_id} is not in a killable state'}
with self.workflow_lock:
workflow_job.status = WorkflowStatus.KILLED
workflow_job.end_time = datetime.now()
# 终止所有运行中的动作
for action in workflow_job.actions.values():
if action.status == ActionStatus.RUNNING:
action.status = ActionStatus.KILLED
action.end_time = datetime.now()
if workflow_job.status == WorkflowStatus.RUNNING:
self.stats['running_workflows'] -= 1
return {
'status': 'success',
'job_id': job_id,
'message': f'Workflow killed successfully'
}
def suspend_workflow(self, job_id: str) -> Dict[str, Any]:
"""
暂停工作流
Args:
job_id: 作业ID
Returns:
Dict[str, Any]: 暂停结果
"""
if job_id not in self.workflow_jobs:
return {'status': 'error', 'message': f'Workflow {job_id} not found'}
workflow_job = self.workflow_jobs[job_id]
if workflow_job.status != WorkflowStatus.RUNNING:
return {'status': 'error', 'message': f'Workflow {job_id} is not running'}
with self.workflow_lock:
workflow_job.status = WorkflowStatus.SUSPENDED
workflow_job.last_modified_time = datetime.now()
return {
'status': 'success',
'job_id': job_id,
'message': f'Workflow suspended successfully'
}
def resume_workflow(self, job_id: str) -> Dict[str, Any]:
"""
恢复工作流
Args:
job_id: 作业ID
Returns:
Dict[str, Any]: 恢复结果
"""
if job_id not in self.workflow_jobs:
return {'status': 'error', 'message': f'Workflow {job_id} not found'}
workflow_job = self.workflow_jobs[job_id]
if workflow_job.status != WorkflowStatus.SUSPENDED:
return {'status': 'error', 'message': f'Workflow {job_id} is not suspended'}
with self.workflow_lock:
workflow_job.status = WorkflowStatus.RUNNING
workflow_job.last_modified_time = datetime.now()
return {
'status': 'success',
'job_id': job_id,
'message': f'Workflow resumed successfully'
}
def submit_coordinator(self, app_path: str, config: Dict[str, Any],
user: str = "hadoop") -> Dict[str, Any]:
"""
提交协调器
Args:
app_path: 应用路径
config: 配置参数
user: 用户名
Returns:
Dict[str, Any]: 提交结果
"""
job_id = f"{len(self.coordinator_jobs)+1:07d}-{datetime.now().strftime('%Y%m%d%H%M%S')}-oozie-C"
# 解析应用名称
app_name = app_path.split('/')[-1] if app_path else "coordinator"
# 创建协调器作业
coordinator_job = CoordinatorJob(
id=job_id,
app_name=app_name,
app_path=app_path,
user=user,
group=user,
start_time=datetime.now(),
end_time=datetime.now() + timedelta(days=30),
frequency=config.get('frequency', '${coord:days(1)}'),
concurrency=config.get('concurrency', 1),
execution=config.get('execution', 'FIFO'),
timeout=config.get('timeout', 120),
conf=json.dumps(config)
)
with self.coordinator_lock:
self.coordinator_jobs[job_id] = coordinator_job
self.stats['total_coordinators'] += 1
return {
'status': 'success',
'job_id': job_id,
'message': f'Coordinator submitted successfully'
}
def start_coordinator(self, job_id: str) -> Dict[str, Any]:
"""
启动协调器
Args:
job_id: 作业ID
Returns:
Dict[str, Any]: 启动结果
"""
if job_id not in self.coordinator_jobs:
return {'status': 'error', 'message': f'Coordinator {job_id} not found'}
coordinator_job = self.coordinator_jobs[job_id]
if coordinator_job.status != CoordinatorStatus.PREP:
return {'status': 'error', 'message': f'Coordinator {job_id} is not in PREP state'}
with self.coordinator_lock:
coordinator_job.status = CoordinatorStatus.RUNNING
coordinator_job.last_modified_time = datetime.now()
coordinator_job.next_materialized_time = datetime.now() + timedelta(days=1)
self.stats['running_coordinators'] += 1
return {
'status': 'success',
'job_id': job_id,
'message': f'Coordinator started successfully'
}
def get_coordinator_job(self, job_id: str) -> Dict[str, Any]:
"""
获取协调器作业信息
Args:
job_id: 作业ID
Returns:
Dict[str, Any]: 作业信息
"""
if job_id not in self.coordinator_jobs:
return {'status': 'error', 'message': f'Coordinator {job_id} not found'}
coordinator_job = self.coordinator_jobs[job_id]
# 获取动作信息
actions_info = []
for action_id, action in coordinator_job.actions.items():
actions_info.append({
'id': action.id,
'job_id': action.job_id,
'status': action.status.value,
'external_id': action.external_id,
'nominal_time': action.nominal_time.isoformat() if action.nominal_time else None,
'created_time': action.created_time.isoformat(),
'missing_dependencies': action.missing_dependencies,
'timeout': action.timeout,
'error_code': action.error_code,
'error_message': action.error_message
})
return {
'status': 'success',
'coordinator': {
'id': coordinator_job.id,
'app_name': coordinator_job.app_name,
'app_path': coordinator_job.app_path,
'status': coordinator_job.status.value,
'user': coordinator_job.user,
'group': coordinator_job.group,
'start_time': coordinator_job.start_time.isoformat() if coordinator_job.start_time else None,
'end_time': coordinator_job.end_time.isoformat() if coordinator_job.end_time else None,
'pause_time': coordinator_job.pause_time.isoformat() if coordinator_job.pause_time else None,
'created_time': coordinator_job.created_time.isoformat(),
'last_modified_time': coordinator_job.last_modified_time.isoformat(),
'next_materialized_time': coordinator_job.next_materialized_time.isoformat() if coordinator_job.next_materialized_time else None,
'frequency': coordinator_job.frequency,
'time_zone': coordinator_job.time_zone,
'concurrency': coordinator_job.concurrency,
'execution': coordinator_job.execution,
'timeout': coordinator_job.timeout,
'actions': actions_info,
'mat_throttling': coordinator_job.mat_throttling
}
}
def submit_bundle(self, app_path: str, config: Dict[str, Any],
user: str = "hadoop") -> Dict[str, Any]:
"""
提交Bundle
Args:
app_path: 应用路径
config: 配置参数
user: 用户名
Returns:
Dict[str, Any]: 提交结果
"""
job_id = f"{len(self.bundle_jobs)+1:07d}-{datetime.now().strftime('%Y%m%d%H%M%S')}-oozie-B"
# 解析应用名称
app_name = app_path.split('/')[-1] if app_path else "bundle"
# 创建Bundle作业
bundle_job = BundleJob(
id=job_id,
app_name=app_name,
app_path=app_path,
user=user,
group=user,
start_time=datetime.now(),
end_time=datetime.now() + timedelta(days=90),
coordinators=config.get('coordinators', []),
conf=json.dumps(config)
)
with self.bundle_lock:
self.bundle_jobs[job_id] = bundle_job
self.stats['total_bundles'] += 1
return {
'status': 'success',
'job_id': job_id,
'message': f'Bundle submitted successfully'
}
def start_bundle(self, job_id: str) -> Dict[str, Any]:
"""
启动Bundle
Args:
job_id: 作业ID
Returns:
Dict[str, Any]: 启动结果
"""
if job_id not in self.bundle_jobs:
return {'status': 'error', 'message': f'Bundle {job_id} not found'}
bundle_job = self.bundle_jobs[job_id]
if bundle_job.status != BundleStatus.PREP:
return {'status': 'error', 'message': f'Bundle {job_id} is not in PREP state'}
with self.bundle_lock:
bundle_job.status = BundleStatus.RUNNING
bundle_job.kick_off_time = datetime.now()
self.stats['running_bundles'] += 1
return {
'status': 'success',
'job_id': job_id,
'message': f'Bundle started successfully'
}
def get_bundle_job(self, job_id: str) -> Dict[str, Any]:
"""
获取Bundle作业信息
Args:
job_id: 作业ID
Returns:
Dict[str, Any]: 作业信息
"""
if job_id not in self.bundle_jobs:
return {'status': 'error', 'message': f'Bundle {job_id} not found'}
bundle_job = self.bundle_jobs[job_id]
return {
'status': 'success',
'bundle': {
'id': bundle_job.id,
'app_name': bundle_job.app_name,
'app_path': bundle_job.app_path,
'status': bundle_job.status.value,
'user': bundle_job.user,
'group': bundle_job.group,
'start_time': bundle_job.start_time.isoformat() if bundle_job.start_time else None,
'end_time': bundle_job.end_time.isoformat() if bundle_job.end_time else None,
'pause_time': bundle_job.pause_time.isoformat() if bundle_job.pause_time else None,
'created_time': bundle_job.created_time.isoformat(),
'last_modified_time': bundle_job.last_modified_time.isoformat(),
'kick_off_time': bundle_job.kick_off_time.isoformat() if bundle_job.kick_off_time else None,
'time_zone': bundle_job.time_zone,
'coordinators': bundle_job.coordinators
}
}
def get_jobs(self, job_type: str = "workflow", status: Optional[str] = None,
user: Optional[str] = None, limit: int = 50) -> Dict[str, Any]:
"""
获取作业列表
Args:
job_type: 作业类型 (workflow, coordinator, bundle)
status: 状态过滤
user: 用户过滤
limit: 限制数量
Returns:
Dict[str, Any]: 作业列表
"""
jobs_info = []
count = 0
if job_type == "workflow":
for job_id, job in self.workflow_jobs.items():
if count >= limit:
break
# 应用过滤条件
if status and job.status.value != status:
continue
if user and job.user != user:
continue
jobs_info.append({
'id': job.id,
'app_name': job.app_name,
'status': job.status.value,
'user': job.user,
'group': job.group,
'start_time': job.start_time.isoformat() if job.start_time else None,
'end_time': job.end_time.isoformat() if job.end_time else None,
'created_time': job.created_time.isoformat(),
'run': job.run
})
count += 1
elif job_type == "coordinator":
for job_id, job in self.coordinator_jobs.items():
if count >= limit:
break
# 应用过滤条件
if status and job.status.value != status:
continue
if user and job.user != user:
continue
jobs_info.append({
'id': job.id,
'app_name': job.app_name,
'status': job.status.value,
'user': job.user,
'group': job.group,
'start_time': job.start_time.isoformat() if job.start_time else None,
'end_time': job.end_time.isoformat() if job.end_time else None,
'created_time': job.created_time.isoformat(),
'frequency': job.frequency,
'concurrency': job.concurrency
})
count += 1
elif job_type == "bundle":
for job_id, job in self.bundle_jobs.items():
if count >= limit:
break
# 应用过滤条件
if status and job.status.value != status:
continue
if user and job.user != user:
continue
jobs_info.append({
'id': job.id,
'app_name': job.app_name,
'status': job.status.value,
'user': job.user,
'group': job.group,
'start_time': job.start_time.isoformat() if job.start_time else None,
'end_time': job.end_time.isoformat() if job.end_time else None,
'created_time': job.created_time.isoformat(),
'coordinators_count': len(job.coordinators)
})
count += 1
return {
'status': 'success',
'jobs': jobs_info,
'total': len(jobs_info),
'job_type': job_type
}
def get_server_status(self) -> Dict[str, Any]:
"""
获取服务器状态
Returns:
Dict[str, Any]: 服务器状态
"""
return {
'server_url': self.server_url,
'is_running': self.is_running,
'build_version': self.build_version,
'system_mode': self.system_mode,
'java_version': self.java_version,
'os_name': self.os_name,
'os_version': self.os_version,
'stats': self.stats,
'config': self.config,
'timestamp': datetime.now().isoformat()
}
# 使用示例
if __name__ == "__main__":
# 创建Oozie服务器
oozie = OozieServer("http://localhost:11000/oozie")
print("=== Apache Oozie工作流引擎示例 ===")
# 提交工作流
print("\n=== 提交工作流 ===")
workflow_config = {
'jobTracker': 'localhost:8032',
'nameNode': 'hdfs://localhost:9000',
'queueName': 'default',
'inputDir': '/user/input',
'outputDir': '/user/output'
}
submit_result = oozie.submit_workflow(
app_path="/user/oozie/workflows/data-processing",
config=workflow_config,
user="hadoop"
)
print(f"提交结果: {submit_result}")
if submit_result['status'] == 'success':
workflow_id = submit_result['job_id']
# 启动工作流
print("\n=== 启动工作流 ===")
start_result = oozie.start_workflow(workflow_id)
print(f"启动结果: {start_result}")
# 等待一段时间
time.sleep(2)
# 获取工作流信息
print("\n=== 工作流信息 ===")
workflow_info = oozie.get_workflow_job(workflow_id)
if workflow_info['status'] == 'success':
workflow = workflow_info['workflow']
print(f"工作流ID: {workflow['id']}")
print(f"应用名称: {workflow['app_name']}")
print(f"状态: {workflow['status']}")
print(f"用户: {workflow['user']}")
print(f"开始时间: {workflow['start_time']}")
print(f"运行次数: {workflow['run']}")
print("动作列表:")
for action in workflow['actions']:
print(f" - {action['name']}: {action['status']} ({action['type']})")
if action['transition']:
print(f" 成功转换: {action['transition']}")
if action['error_transition']:
print(f" 错误转换: {action['error_transition']}")
# 提交协调器
print("\n=== 提交协调器 ===")
coordinator_config = {
'frequency': '${coord:days(1)}',
'concurrency': 1,
'execution': 'FIFO',
'timeout': 120,
'workflowPath': '/user/oozie/workflows/daily-processing'
}
coord_submit_result = oozie.submit_coordinator(
app_path="/user/oozie/coordinators/daily-processing",
config=coordinator_config,
user="hadoop"
)
print(f"协调器提交结果: {coord_submit_result}")
if coord_submit_result['status'] == 'success':
coordinator_id = coord_submit_result['job_id']
# 启动协调器
coord_start_result = oozie.start_coordinator(coordinator_id)
print(f"协调器启动结果: {coord_start_result}")
# 获取协调器信息
print("\n=== 协调器信息 ===")
coord_info = oozie.get_coordinator_job(coordinator_id)
if coord_info['status'] == 'success':
coordinator = coord_info['coordinator']
print(f"协调器ID: {coordinator['id']}")
print(f"应用名称: {coordinator['app_name']}")
print(f"状态: {coordinator['status']}")
print(f"频率: {coordinator['frequency']}")
print(f"并发数: {coordinator['concurrency']}")
print(f"执行策略: {coordinator['execution']}")
print(f"下次物化时间: {coordinator['next_materialized_time']}")
# 提交Bundle
print("\n=== 提交Bundle ===")
bundle_config = {
'coordinators': [coordinator_id] if coord_submit_result['status'] == 'success' else [],
'kickOffTime': datetime.now().isoformat()
}
bundle_submit_result = oozie.submit_bundle(
app_path="/user/oozie/bundles/data-pipeline",
config=bundle_config,
user="hadoop"
)
print(f"Bundle提交结果: {bundle_submit_result}")
if bundle_submit_result['status'] == 'success':
bundle_id = bundle_submit_result['job_id']
# 启动Bundle
bundle_start_result = oozie.start_bundle(bundle_id)
print(f"Bundle启动结果: {bundle_start_result}")
# 获取Bundle信息
print("\n=== Bundle信息 ===")
bundle_info = oozie.get_bundle_job(bundle_id)
if bundle_info['status'] == 'success':
bundle = bundle_info['bundle']
print(f"Bundle ID: {bundle['id']}")
print(f"应用名称: {bundle['app_name']}")
print(f"状态: {bundle['status']}")
print(f"协调器数量: {len(bundle['coordinators'])}")
print(f"启动时间: {bundle['kick_off_time']}")
# 获取作业列表
print("\n=== 工作流作业列表 ===")
workflows = oozie.get_jobs(job_type="workflow", limit=10)
print(f"工作流总数: {workflows['total']}")
for job in workflows['jobs']:
print(f" - {job['id']}: {job['app_name']} ({job['status']})")
print("\n=== 协调器作业列表 ===")
coordinators = oozie.get_jobs(job_type="coordinator", limit=10)
print(f"协调器总数: {coordinators['total']}")
for job in coordinators['jobs']:
print(f" - {job['id']}: {job['app_name']} ({job['status']}) - {job['frequency']}")
# 获取服务器状态
print("\n=== 服务器状态 ===")
status = oozie.get_server_status()
print(f"服务器URL: {status['server_url']}")
print(f"运行状态: {status['is_running']}")
print(f"版本: {status['build_version']}")
print(f"系统模式: {status['system_mode']}")
print(f"Java版本: {status['java_version']}")
print("统计信息:")
for key, value in status['stats'].items():
print(f" {key}: {value}")
8.2 Apache Flume详解
from typing import Dict, List, Any, Optional, Tuple, Union
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime, timedelta
import threading
import time
import random
import json
import uuid
from collections import defaultdict, deque
import re
class FlumeComponentType(Enum):
"""Flume组件类型"""
SOURCE = "source"
SINK = "sink"
CHANNEL = "channel"
INTERCEPTOR = "interceptor"
SERIALIZER = "serializer"
SINK_PROCESSOR = "sink_processor"
class FlumeComponentStatus(Enum):
"""Flume组件状态"""
IDLE = "IDLE" # 空闲
START = "START" # 启动中
STOP = "STOP" # 停止中
ERROR = "ERROR" # 错误
class SourceType(Enum):
"""Source类型"""
SPOOLDIR = "spooldir" # 目录监控
TAILDIR = "taildir" # 文件尾部监控
KAFKA = "kafka" # Kafka消费者
HTTP = "http" # HTTP接收器
NETCAT = "netcat" # Netcat
EXEC = "exec" # 命令执行
JMS = "jms" # JMS消息
AVRO = "avro" # Avro RPC
THRIFT = "thrift" # Thrift RPC
SEQUENCE_GENERATOR = "seq" # 序列生成器
class SinkType(Enum):
"""Sink类型"""
HDFS = "hdfs" # HDFS写入
HBASE = "hbase" # HBase写入
KAFKA = "kafka" # Kafka生产者
ELASTICSEARCH = "elasticsearch" # Elasticsearch
SOLR = "solr" # Solr搜索
LOGGER = "logger" # 日志输出
NULL = "null" # 空输出
AVRO = "avro" # Avro RPC
THRIFT = "thrift" # Thrift RPC
FILE_ROLL = "file_roll" # 文件滚动
HIVE = "hive" # Hive写入
class ChannelType(Enum):
"""Channel类型"""
MEMORY = "memory" # 内存通道
FILE = "file" # 文件通道
SPILLABLE_MEMORY = "spillablememory" # 可溢出内存通道
JDBC = "jdbc" # JDBC通道
KAFKA = "kafka" # Kafka通道
PSEUDO_TRANSACTION = "pseudotxn" # 伪事务通道
class EventStatus(Enum):
"""事件状态"""
READY = "READY" # 就绪
INFLIGHT = "INFLIGHT" # 传输中
TAKE = "TAKE" # 已取出
COMMIT = "COMMIT" # 已提交
ROLLBACK = "ROLLBACK" # 已回滚
@dataclass
class FlumeEvent:
"""Flume事件"""
event_id: str
headers: Dict[str, str] = field(default_factory=dict)
body: bytes = b""
timestamp: datetime = field(default_factory=datetime.now)
status: EventStatus = EventStatus.READY
source_name: str = ""
channel_name: str = ""
sink_name: str = ""
retry_count: int = 0
max_retries: int = 3
def get_header(self, key: str, default: str = "") -> str:
"""获取头部信息"""
return self.headers.get(key, default)
def set_header(self, key: str, value: str):
"""设置头部信息"""
self.headers[key] = value
def get_body_as_string(self, encoding: str = 'utf-8') -> str:
"""获取事件体字符串"""
try:
return self.body.decode(encoding)
except UnicodeDecodeError:
return self.body.decode('utf-8', errors='ignore')
@dataclass
class FlumeTransaction:
"""Flume事务"""
transaction_id: str
channel_name: str
events: List[FlumeEvent] = field(default_factory=list)
start_time: datetime = field(default_factory=datetime.now)
end_time: Optional[datetime] = None
is_committed: bool = False
is_rolled_back: bool = False
def add_event(self, event: FlumeEvent):
"""添加事件到事务"""
self.events.append(event)
def commit(self):
"""提交事务"""
self.is_committed = True
self.end_time = datetime.now()
for event in self.events:
event.status = EventStatus.COMMIT
def rollback(self):
"""回滚事务"""
self.is_rolled_back = True
self.end_time = datetime.now()
for event in self.events:
event.status = EventStatus.ROLLBACK
@dataclass
class FlumeSource:
"""Flume Source"""
name: str
source_type: SourceType
channels: List[str] = field(default_factory=list)
config: Dict[str, Any] = field(default_factory=dict)
status: FlumeComponentStatus = FlumeComponentStatus.IDLE
events_received: int = 0
events_accepted: int = 0
events_rejected: int = 0
last_activity: Optional[datetime] = None
error_count: int = 0
error_message: str = ""
def add_channel(self, channel_name: str):
"""添加通道"""
if channel_name not in self.channels:
self.channels.append(channel_name)
def remove_channel(self, channel_name: str):
"""移除通道"""
if channel_name in self.channels:
self.channels.remove(channel_name)
@dataclass
class FlumeSink:
"""Flume Sink"""
name: str
sink_type: SinkType
channel: str
config: Dict[str, Any] = field(default_factory=dict)
status: FlumeComponentStatus = FlumeComponentStatus.IDLE
events_drained: int = 0
events_failed: int = 0
connection_failures: int = 0
batch_size: int = 100
batch_timeout: int = 3000 # 毫秒
last_activity: Optional[datetime] = None
error_count: int = 0
error_message: str = ""
@dataclass
class FlumeChannel:
"""Flume Channel"""
name: str
channel_type: ChannelType
config: Dict[str, Any] = field(default_factory=dict)
status: FlumeComponentStatus = FlumeComponentStatus.IDLE
capacity: int = 10000
transaction_capacity: int = 1000
events: deque = field(default_factory=deque)
transactions: Dict[str, FlumeTransaction] = field(default_factory=dict)
events_put: int = 0
events_take: int = 0
channel_size: int = 0
channel_fill_percentage: float = 0.0
last_activity: Optional[datetime] = None
def put_event(self, event: FlumeEvent, transaction_id: str) -> bool:
"""放入事件"""
if len(self.events) >= self.capacity:
return False
if transaction_id not in self.transactions:
self.transactions[transaction_id] = FlumeTransaction(
transaction_id=transaction_id,
channel_name=self.name
)
transaction = self.transactions[transaction_id]
if len(transaction.events) >= self.transaction_capacity:
return False
event.status = EventStatus.INFLIGHT
event.channel_name = self.name
transaction.add_event(event)
self.events_put += 1
self.last_activity = datetime.now()
return True
def take_event(self, transaction_id: str) -> Optional[FlumeEvent]:
"""取出事件"""
if not self.events:
return None
if transaction_id not in self.transactions:
self.transactions[transaction_id] = FlumeTransaction(
transaction_id=transaction_id,
channel_name=self.name
)
event = self.events.popleft()
event.status = EventStatus.TAKE
transaction = self.transactions[transaction_id]
transaction.add_event(event)
self.events_take += 1
self.channel_size = len(self.events)
self.channel_fill_percentage = (self.channel_size / self.capacity) * 100
self.last_activity = datetime.now()
return event
def commit_transaction(self, transaction_id: str) -> bool:
"""提交事务"""
if transaction_id not in self.transactions:
return False
transaction = self.transactions[transaction_id]
transaction.commit()
# 对于put操作,将事件添加到通道
for event in transaction.events:
if event.status == EventStatus.INFLIGHT:
self.events.append(event)
self.channel_size = len(self.events)
self.channel_fill_percentage = (self.channel_size / self.capacity) * 100
del self.transactions[transaction_id]
return True
def rollback_transaction(self, transaction_id: str) -> bool:
"""回滚事务"""
if transaction_id not in self.transactions:
return False
transaction = self.transactions[transaction_id]
transaction.rollback()
# 对于take操作,将事件放回通道
for event in transaction.events:
if event.status == EventStatus.TAKE:
self.events.appendleft(event)
self.channel_size = len(self.events)
self.channel_fill_percentage = (self.channel_size / self.capacity) * 100
del self.transactions[transaction_id]
return True
@dataclass
class FlumeAgent:
"""Flume Agent"""
name: str
host: str = "localhost"
port: int = 41414
sources: Dict[str, FlumeSource] = field(default_factory=dict)
sinks: Dict[str, FlumeSink] = field(default_factory=dict)
channels: Dict[str, FlumeChannel] = field(default_factory=dict)
is_running: bool = False
start_time: Optional[datetime] = None
config_file: str = ""
jvm_metrics: Dict[str, Any] = field(default_factory=dict)
class FlumeCluster:
"""
Apache Flume数据收集集群
"""
def __init__(self, cluster_name: str = "flume-cluster"):
self.cluster_name = cluster_name
# 数据存储
self.agents = {} # agent_name -> FlumeAgent
# 集群状态
self.is_running = True
self.version = "1.11.0"
self.java_version = "1.8.0_281"
# 配置
self.config = {
'flume.monitoring.type': 'http',
'flume.monitoring.port': 34545,
'flume.root.logger': 'INFO,console',
'flume.log.dir': './logs',
'flume.log.file': 'flume.log'
}
# 线程锁
self.agents_lock = threading.Lock()
# 统计信息
self.stats = {
'total_agents': 0,
'running_agents': 0,
'total_sources': 0,
'total_sinks': 0,
'total_channels': 0,
'total_events_processed': 0,
'total_events_failed': 0,
'average_throughput': 0.0
}
# 初始化示例代理
self._create_example_agents()
def _create_example_agents(self):
"""创建示例代理"""
# 创建示例代理
agent1 = FlumeAgent(
name="web-log-agent",
host="node1.example.com",
port=41414
)
agent2 = FlumeAgent(
name="app-log-agent",
host="node2.example.com",
port=41414
)
with self.agents_lock:
self.agents[agent1.name] = agent1
self.agents[agent2.name] = agent2
self._update_stats()
def _update_stats(self):
"""更新统计信息"""
self.stats['total_agents'] = len(self.agents)
self.stats['running_agents'] = len([a for a in self.agents.values() if a.is_running])
self.stats['total_sources'] = sum(len(a.sources) for a in self.agents.values())
self.stats['total_sinks'] = sum(len(a.sinks) for a in self.agents.values())
self.stats['total_channels'] = sum(len(a.channels) for a in self.agents.values())
def create_agent(self, agent_name: str, host: str = "localhost", port: int = 41414) -> Dict[str, Any]:
"""
创建Flume代理
Args:
agent_name: 代理名称
host: 主机地址
port: 端口号
Returns:
Dict[str, Any]: 创建结果
"""
if agent_name in self.agents:
return {'status': 'error', 'message': f'Agent {agent_name} already exists'}
agent = FlumeAgent(
name=agent_name,
host=host,
port=port
)
with self.agents_lock:
self.agents[agent_name] = agent
self._update_stats()
return {
'status': 'success',
'agent_name': agent_name,
'host': host,
'port': port,
'message': f'Agent {agent_name} created successfully'
}
def start_agent(self, agent_name: str) -> Dict[str, Any]:
"""
启动代理
Args:
agent_name: 代理名称
Returns:
Dict[str, Any]: 启动结果
"""
if agent_name not in self.agents:
return {'status': 'error', 'message': f'Agent {agent_name} not found'}
agent = self.agents[agent_name]
if agent.is_running:
return {'status': 'error', 'message': f'Agent {agent_name} is already running'}
agent.is_running = True
agent.start_time = datetime.now()
# 启动所有组件
for source in agent.sources.values():
source.status = FlumeComponentStatus.START
for sink in agent.sinks.values():
sink.status = FlumeComponentStatus.START
for channel in agent.channels.values():
channel.status = FlumeComponentStatus.START
with self.agents_lock:
self._update_stats()
return {
'status': 'success',
'agent_name': agent_name,
'start_time': agent.start_time.isoformat(),
'message': f'Agent {agent_name} started successfully'
}
def stop_agent(self, agent_name: str) -> Dict[str, Any]:
"""
停止代理
Args:
agent_name: 代理名称
Returns:
Dict[str, Any]: 停止结果
"""
if agent_name not in self.agents:
return {'status': 'error', 'message': f'Agent {agent_name} not found'}
agent = self.agents[agent_name]
if not agent.is_running:
return {'status': 'error', 'message': f'Agent {agent_name} is not running'}
agent.is_running = False
# 停止所有组件
for source in agent.sources.values():
source.status = FlumeComponentStatus.STOP
for sink in agent.sinks.values():
sink.status = FlumeComponentStatus.STOP
for channel in agent.channels.values():
channel.status = FlumeComponentStatus.STOP
with self.agents_lock:
self._update_stats()
return {
'status': 'success',
'agent_name': agent_name,
'message': f'Agent {agent_name} stopped successfully'
}
def add_source(self, agent_name: str, source_name: str, source_type: SourceType,
channels: List[str], config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
添加Source
Args:
agent_name: 代理名称
source_name: Source名称
source_type: Source类型
channels: 通道列表
config: 配置
Returns:
Dict[str, Any]: 添加结果
"""
if agent_name not in self.agents:
return {'status': 'error', 'message': f'Agent {agent_name} not found'}
agent = self.agents[agent_name]
if source_name in agent.sources:
return {'status': 'error', 'message': f'Source {source_name} already exists'}
source = FlumeSource(
name=source_name,
source_type=source_type,
channels=channels.copy(),
config=config or {}
)
agent.sources[source_name] = source
with self.agents_lock:
self._update_stats()
return {
'status': 'success',
'agent_name': agent_name,
'source_name': source_name,
'source_type': source_type.value,
'channels': channels,
'message': f'Source {source_name} added successfully'
}
def add_sink(self, agent_name: str, sink_name: str, sink_type: SinkType,
channel: str, config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
添加Sink
Args:
agent_name: 代理名称
sink_name: Sink名称
sink_type: Sink类型
channel: 通道名称
config: 配置
Returns:
Dict[str, Any]: 添加结果
"""
if agent_name not in self.agents:
return {'status': 'error', 'message': f'Agent {agent_name} not found'}
agent = self.agents[agent_name]
if sink_name in agent.sinks:
return {'status': 'error', 'message': f'Sink {sink_name} already exists'}
sink = FlumeSink(
name=sink_name,
sink_type=sink_type,
channel=channel,
config=config or {}
)
agent.sinks[sink_name] = sink
with self.agents_lock:
self._update_stats()
return {
'status': 'success',
'agent_name': agent_name,
'sink_name': sink_name,
'sink_type': sink_type.value,
'channel': channel,
'message': f'Sink {sink_name} added successfully'
}
def add_channel(self, agent_name: str, channel_name: str, channel_type: ChannelType,
config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
添加Channel
Args:
agent_name: 代理名称
channel_name: Channel名称
channel_type: Channel类型
config: 配置
Returns:
Dict[str, Any]: 添加结果
"""
if agent_name not in self.agents:
return {'status': 'error', 'message': f'Agent {agent_name} not found'}
agent = self.agents[agent_name]
if channel_name in agent.channels:
return {'status': 'error', 'message': f'Channel {channel_name} already exists'}
channel_config = config or {}
capacity = channel_config.get('capacity', 10000)
transaction_capacity = channel_config.get('transactionCapacity', 1000)
channel = FlumeChannel(
name=channel_name,
channel_type=channel_type,
config=channel_config,
capacity=capacity,
transaction_capacity=transaction_capacity
)
agent.channels[channel_name] = channel
with self.agents_lock:
self._update_stats()
return {
'status': 'success',
'agent_name': agent_name,
'channel_name': channel_name,
'channel_type': channel_type.value,
'capacity': capacity,
'message': f'Channel {channel_name} added successfully'
}
def send_event(self, agent_name: str, source_name: str, headers: Dict[str, str],
body: Union[str, bytes]) -> Dict[str, Any]:
"""
发送事件
Args:
agent_name: 代理名称
source_name: Source名称
headers: 事件头部
body: 事件体
Returns:
Dict[str, Any]: 发送结果
"""
if agent_name not in self.agents:
return {'status': 'error', 'message': f'Agent {agent_name} not found'}
agent = self.agents[agent_name]
if source_name not in agent.sources:
return {'status': 'error', 'message': f'Source {source_name} not found'}
source = agent.sources[source_name]
if source.status != FlumeComponentStatus.START:
return {'status': 'error', 'message': f'Source {source_name} is not running'}
# 创建事件
event_id = f"event-{uuid.uuid4().hex[:8]}"
event_body = body.encode('utf-8') if isinstance(body, str) else body
event = FlumeEvent(
event_id=event_id,
headers=headers.copy(),
body=event_body,
source_name=source_name
)
# 发送到所有配置的通道
success_channels = []
failed_channels = []
for channel_name in source.channels:
if channel_name not in agent.channels:
failed_channels.append(channel_name)
continue
channel = agent.channels[channel_name]
transaction_id = f"txn-{uuid.uuid4().hex[:8]}"
if channel.put_event(event, transaction_id):
if channel.commit_transaction(transaction_id):
success_channels.append(channel_name)
else:
channel.rollback_transaction(transaction_id)
failed_channels.append(channel_name)
else:
failed_channels.append(channel_name)
# 更新Source统计
source.events_received += 1
source.last_activity = datetime.now()
if success_channels:
source.events_accepted += 1
else:
source.events_rejected += 1
return {
'status': 'success' if success_channels else 'error',
'event_id': event_id,
'success_channels': success_channels,
'failed_channels': failed_channels,
'message': f'Event sent to {len(success_channels)} channels'
}
def process_events(self, agent_name: str, sink_name: str, batch_size: int = 100) -> Dict[str, Any]:
"""
处理事件(Sink从Channel取出事件)
Args:
agent_name: 代理名称
sink_name: Sink名称
batch_size: 批次大小
Returns:
Dict[str, Any]: 处理结果
"""
if agent_name not in self.agents:
return {'status': 'error', 'message': f'Agent {agent_name} not found'}
agent = self.agents[agent_name]
if sink_name not in agent.sinks:
return {'status': 'error', 'message': f'Sink {sink_name} not found'}
sink = agent.sinks[sink_name]
if sink.status != FlumeComponentStatus.START:
return {'status': 'error', 'message': f'Sink {sink_name} is not running'}
if sink.channel not in agent.channels:
return {'status': 'error', 'message': f'Channel {sink.channel} not found'}
channel = agent.channels[sink.channel]
transaction_id = f"txn-{uuid.uuid4().hex[:8]}"
# 取出事件批次
events = []
for _ in range(min(batch_size, sink.batch_size)):
event = channel.take_event(transaction_id)
if event is None:
break
events.append(event)
if not events:
return {
'status': 'success',
'events_processed': 0,
'message': 'No events to process'
}
# 模拟事件处理(90%成功率)
success = random.random() > 0.1
if success:
# 提交事务
channel.commit_transaction(transaction_id)
sink.events_drained += len(events)
sink.last_activity = datetime.now()
return {
'status': 'success',
'events_processed': len(events),
'sink_type': sink.sink_type.value,
'message': f'Successfully processed {len(events)} events'
}
else:
# 回滚事务
channel.rollback_transaction(transaction_id)
sink.events_failed += len(events)
sink.error_count += 1
sink.error_message = "Failed to write events to destination"
return {
'status': 'error',
'events_processed': 0,
'events_failed': len(events),
'message': 'Failed to process events'
}
def get_agent_status(self, agent_name: str) -> Dict[str, Any]:
"""
获取代理状态
Args:
agent_name: 代理名称
Returns:
Dict[str, Any]: 代理状态
"""
if agent_name not in self.agents:
return {'status': 'error', 'message': f'Agent {agent_name} not found'}
agent = self.agents[agent_name]
# 收集Source信息
sources_info = []
for source_name, source in agent.sources.items():
sources_info.append({
'name': source.name,
'type': source.source_type.value,
'status': source.status.value,
'channels': source.channels,
'events_received': source.events_received,
'events_accepted': source.events_accepted,
'events_rejected': source.events_rejected,
'last_activity': source.last_activity.isoformat() if source.last_activity else None
})
# 收集Sink信息
sinks_info = []
for sink_name, sink in agent.sinks.items():
sinks_info.append({
'name': sink.name,
'type': sink.sink_type.value,
'status': sink.status.value,
'channel': sink.channel,
'events_drained': sink.events_drained,
'events_failed': sink.events_failed,
'connection_failures': sink.connection_failures,
'last_activity': sink.last_activity.isoformat() if sink.last_activity else None
})
# 收集Channel信息
channels_info = []
for channel_name, channel in agent.channels.items():
channels_info.append({
'name': channel.name,
'type': channel.channel_type.value,
'status': channel.status.value,
'capacity': channel.capacity,
'channel_size': channel.channel_size,
'channel_fill_percentage': channel.channel_fill_percentage,
'events_put': channel.events_put,
'events_take': channel.events_take,
'active_transactions': len(channel.transactions),
'last_activity': channel.last_activity.isoformat() if channel.last_activity else None
})
return {
'status': 'success',
'agent': {
'name': agent.name,
'host': agent.host,
'port': agent.port,
'is_running': agent.is_running,
'start_time': agent.start_time.isoformat() if agent.start_time else None,
'sources': sources_info,
'sinks': sinks_info,
'channels': channels_info
}
}
def list_agents(self) -> Dict[str, Any]:
"""
列出所有代理
Returns:
Dict[str, Any]: 代理列表
"""
agents_info = []
for agent_name, agent in self.agents.items():
agents_info.append({
'name': agent.name,
'host': agent.host,
'port': agent.port,
'is_running': agent.is_running,
'start_time': agent.start_time.isoformat() if agent.start_time else None,
'sources_count': len(agent.sources),
'sinks_count': len(agent.sinks),
'channels_count': len(agent.channels)
})
return {
'status': 'success',
'agents': agents_info,
'total': len(agents_info)
}
def get_cluster_status(self) -> Dict[str, Any]:
"""
获取集群状态
Returns:
Dict[str, Any]: 集群状态
"""
# 计算总体统计
total_events_processed = 0
total_events_failed = 0
for agent in self.agents.values():
for source in agent.sources.values():
total_events_processed += source.events_accepted
for sink in agent.sinks.values():
total_events_processed += sink.events_drained
total_events_failed += sink.events_failed
self.stats['total_events_processed'] = total_events_processed
self.stats['total_events_failed'] = total_events_failed
return {
'cluster_name': self.cluster_name,
'is_running': self.is_running,
'version': self.version,
'java_version': self.java_version,
'stats': self.stats,
'config': self.config,
'timestamp': datetime.now().isoformat()
}
# 使用示例
if __name__ == "__main__":
# 创建Flume集群
flume = FlumeCluster("production-flume")
print("=== Apache Flume数据收集示例 ===")
# 创建代理
print("\n=== 创建Flume代理 ===")
agent_result = flume.create_agent(
agent_name="web-server-agent",
host="web-server-01",
port=41414
)
print(f"代理创建结果: {agent_result}")
if agent_result['status'] == 'success':
agent_name = agent_result['agent_name']
# 添加Channel
print("\n=== 添加Channel ===")
channel_result = flume.add_channel(
agent_name=agent_name,
channel_name="memory-channel",
channel_type=ChannelType.MEMORY,
config={
'capacity': 10000,
'transactionCapacity': 1000
}
)
print(f"Channel添加结果: {channel_result}")
# 添加Source
print("\n=== 添加Source ===")
source_result = flume.add_source(
agent_name=agent_name,
source_name="spooldir-source",
source_type=SourceType.SPOOLDIR,
channels=["memory-channel"],
config={
'spoolDir': '/var/log/web/spool',
'channels': 'memory-channel'
}
)
print(f"Source添加结果: {source_result}")
# 添加Sink
print("\n=== 添加Sink ===")
sink_result = flume.add_sink(
agent_name=agent_name,
sink_name="hdfs-sink",
sink_type=SinkType.HDFS,
channel="memory-channel",
config={
'hdfs.path': '/flume/events/%Y/%m/%d',
'hdfs.fileType': 'DataStream',
'hdfs.writeFormat': 'Text',
'hdfs.batchSize': 1000,
'hdfs.rollInterval': 600,
'hdfs.rollSize': 268435456
}
)
print(f"Sink添加结果: {sink_result}")
# 启动代理
print("\n=== 启动代理 ===")
start_result = flume.start_agent(agent_name)
print(f"代理启动结果: {start_result}")
if start_result['status'] == 'success':
# 发送事件
print("\n=== 发送事件 ===")
for i in range(10):
event_result = flume.send_event(
agent_name=agent_name,
source_name="spooldir-source",
headers={
'timestamp': str(int(time.time())),
'host': 'web-server-01',
'source': 'access.log'
},
body=f"192.168.1.{100+i} - - [25/Dec/2023:10:00:{i:02d} +0000] \"GET /api/users HTTP/1.1\" 200 1234"
)
if i < 3: # 只打印前3个结果
print(f"事件 {i+1} 发送结果: {event_result}")
# 处理事件
print("\n=== 处理事件 ===")
for i in range(3):
process_result = flume.process_events(
agent_name=agent_name,
sink_name="hdfs-sink",
batch_size=5
)
print(f"批次 {i+1} 处理结果: {process_result}")
time.sleep(1)
# 获取代理状态
print("\n=== 代理状态 ===")
agent_status = flume.get_agent_status(agent_name)
if agent_status['status'] == 'success':
agent_info = agent_status['agent']
print(f"代理名称: {agent_info['name']}")
print(f"主机: {agent_info['host']}:{agent_info['port']}")
print(f"运行状态: {agent_info['is_running']}")
print(f"启动时间: {agent_info['start_time']}")
print("\nSources:")
for source in agent_info['sources']:
print(f" - {source['name']} ({source['type']}): {source['events_received']} received, {source['events_accepted']} accepted")
print("\nChannels:")
for channel in agent_info['channels']:
print(f" - {channel['name']} ({channel['type']}): {channel['channel_size']}/{channel['capacity']} ({channel['channel_fill_percentage']:.1f}%)")
print("\nSinks:")
for sink in agent_info['sinks']:
print(f" - {sink['name']} ({sink['type']}): {sink['events_drained']} drained, {sink['events_failed']} failed")
# 列出所有代理
print("\n=== 代理列表 ===")
agents_list = flume.list_agents()
if agents_list['status'] == 'success':
print(f"总代理数: {agents_list['total']}")
for agent in agents_list['agents']:
print(f" - {agent['name']} @ {agent['host']}:{agent['port']} - 运行: {agent['is_running']}")
# 获取集群状态
print("\n=== 集群状态 ===")
cluster_status = flume.get_cluster_status()
print(f"集群名称: {cluster_status['cluster_name']}")
print(f"版本: {cluster_status['version']}")
print(f"运行状态: {cluster_status['is_running']}")
print("统计信息:")
for key, value in cluster_status['stats'].items():
print(f" {key}: {value}")
8.3 Apache NiFi详解
from typing import Dict, List, Any, Optional, Tuple, Union, Set
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime, timedelta
import threading
import time
import random
import json
import uuid
from collections import defaultdict, deque
import re
from urllib.parse import urlparse
class ProcessorType(Enum):
"""处理器类型"""
GET_FILE = "GetFile" # 获取文件
PUT_FILE = "PutFile" # 写入文件
GET_HTTP = "GetHTTP" # HTTP获取
POST_HTTP = "PostHTTP" # HTTP发送
GET_KAFKA = "GetKafka" # Kafka消费
PUT_KAFKA = "PutKafka" # Kafka生产
GET_HDFS = "GetHDFS" # HDFS读取
PUT_HDFS = "PutHDFS" # HDFS写入
EXTRACT_TEXT = "ExtractText" # 文本提取
REPLACE_TEXT = "ReplaceText" # 文本替换
ROUTE_ON_ATTRIBUTE = "RouteOnAttribute" # 属性路由
UPDATE_ATTRIBUTE = "UpdateAttribute" # 更新属性
SPLIT_TEXT = "SplitText" # 文本分割
MERGE_CONTENT = "MergeContent" # 内容合并
COMPRESS_CONTENT = "CompressContent" # 内容压缩
DECOMPRESS_CONTENT = "DecompressContent" # 内容解压
CONVERT_RECORD = "ConvertRecord" # 记录转换
QUERY_RECORD = "QueryRecord" # 记录查询
EXECUTE_SQL = "ExecuteSQL" # SQL执行
PUT_SQL = "PutSQL" # SQL写入
GENERATE_FLOWFILE = "GenerateFlowFile" # 生成FlowFile
LOG_ATTRIBUTE = "LogAttribute" # 日志属性
class ProcessorState(Enum):
"""处理器状态"""
STOPPED = "STOPPED" # 停止
RUNNING = "RUNNING" # 运行
DISABLED = "DISABLED" # 禁用
INVALID = "INVALID" # 无效
VALIDATING = "VALIDATING" # 验证中
class FlowFileState(Enum):
"""FlowFile状态"""
ACTIVE = "ACTIVE" # 活跃
QUEUED = "QUEUED" # 排队
PROCESSING = "PROCESSING" # 处理中
COMPLETED = "COMPLETED" # 完成
FAILED = "FAILED" # 失败
PENALIZED = "PENALIZED" # 惩罚
EXPIRED = "EXPIRED" # 过期
class RelationshipType(Enum):
"""关系类型"""
SUCCESS = "success" # 成功
FAILURE = "failure" # 失败
ORIGINAL = "original" # 原始
MATCHED = "matched" # 匹配
UNMATCHED = "unmatched" # 不匹配
RETRY = "retry" # 重试
INVALID = "invalid" # 无效
SPLIT = "split" # 分割
MERGED = "merged" # 合并
ROUTED = "routed" # 路由
class ConnectionType(Enum):
"""连接类型"""
STANDARD = "standard" # 标准连接
LOAD_BALANCE = "load_balance" # 负载均衡
FUNNEL = "funnel" # 漏斗
SELF_LOOP = "self_loop" # 自循环
class QueuePriority(Enum):
"""队列优先级"""
FIFO = "FirstInFirstOut" # 先进先出
LIFO = "LastInFirstOut" # 后进先出
OLDEST_FIRST = "OldestFlowFileFirst" # 最旧优先
NEWEST_FIRST = "NewestFlowFileFirst" # 最新优先
LARGEST_FIRST = "LargestFlowFileFirst" # 最大优先
SMALLEST_FIRST = "SmallestFlowFileFirst" # 最小优先
@dataclass
class FlowFile:
"""NiFi FlowFile"""
uuid: str
attributes: Dict[str, str] = field(default_factory=dict)
content: bytes = b""
size: int = 0
entry_date: datetime = field(default_factory=datetime.now)
lineage_start_date: datetime = field(default_factory=datetime.now)
last_queued_date: datetime = field(default_factory=datetime.now)
queue_date_index: int = 0
state: FlowFileState = FlowFileState.ACTIVE
penalty_expiration: Optional[datetime] = None
def __post_init__(self):
if self.size == 0 and self.content:
self.size = len(self.content)
def get_attribute(self, key: str, default: str = "") -> str:
"""获取属性"""
return self.attributes.get(key, default)
def put_attribute(self, key: str, value: str):
"""设置属性"""
self.attributes[key] = value
def remove_attribute(self, key: str) -> bool:
"""移除属性"""
if key in self.attributes:
del self.attributes[key]
return True
return False
def clone(self) -> 'FlowFile':
"""克隆FlowFile"""
return FlowFile(
uuid=f"clone-{uuid.uuid4().hex[:8]}",
attributes=self.attributes.copy(),
content=self.content,
size=self.size,
entry_date=self.entry_date,
lineage_start_date=self.lineage_start_date
)
@dataclass
class Relationship:
"""处理器关系"""
name: str
description: str = ""
auto_terminate: bool = False
@dataclass
class Connection:
"""连接"""
id: str
name: str
source_id: str
destination_id: str
relationships: List[str] = field(default_factory=list)
connection_type: ConnectionType = ConnectionType.STANDARD
queue_priority: QueuePriority = QueuePriority.FIFO
flow_file_expiration: timedelta = field(default_factory=lambda: timedelta(seconds=0))
back_pressure_object_threshold: int = 10000
back_pressure_data_size_threshold: str = "1 GB"
load_balance_strategy: str = "DO_NOT_LOAD_BALANCE"
load_balance_partition_attribute: str = ""
queue: deque = field(default_factory=deque)
queued_count: int = 0
queued_size: int = 0
def enqueue(self, flow_file: FlowFile):
"""入队"""
flow_file.state = FlowFileState.QUEUED
flow_file.last_queued_date = datetime.now()
self.queue.append(flow_file)
self.queued_count += 1
self.queued_size += flow_file.size
def dequeue(self) -> Optional[FlowFile]:
"""出队"""
if not self.queue:
return None
flow_file = self.queue.popleft()
flow_file.state = FlowFileState.PROCESSING
self.queued_count -= 1
self.queued_size -= flow_file.size
return flow_file
def is_back_pressure_enabled(self) -> bool:
"""检查是否启用背压"""
return (self.queued_count >= self.back_pressure_object_threshold or
self.queued_size >= self._parse_data_size(self.back_pressure_data_size_threshold))
def _parse_data_size(self, size_str: str) -> int:
"""解析数据大小字符串"""
size_str = size_str.upper().strip()
if size_str.endswith('KB'):
return int(float(size_str[:-2]) * 1024)
elif size_str.endswith('MB'):
return int(float(size_str[:-2]) * 1024 * 1024)
elif size_str.endswith('GB'):
return int(float(size_str[:-2]) * 1024 * 1024 * 1024)
elif size_str.endswith('TB'):
return int(float(size_str[:-2]) * 1024 * 1024 * 1024 * 1024)
else:
return int(float(size_str))
@dataclass
class Processor:
"""NiFi处理器"""
id: str
name: str
processor_type: ProcessorType
state: ProcessorState = ProcessorState.STOPPED
properties: Dict[str, str] = field(default_factory=dict)
relationships: Dict[str, Relationship] = field(default_factory=dict)
incoming_connections: List[str] = field(default_factory=list)
outgoing_connections: List[str] = field(default_factory=list)
run_duration_millis: int = 0
scheduling_period: str = "0 sec"
scheduling_strategy: str = "TIMER_DRIVEN"
execution_node: str = "ALL"
penalty_duration: str = "30 sec"
yield_duration: str = "1 sec"
bulletin_level: str = "WARN"
run_schedule: Optional[str] = None
concurrent_tasks: int = 1
# 统计信息
bytes_read: int = 0
bytes_written: int = 0
bytes_transferred: int = 0
flow_files_received: int = 0
flow_files_sent: int = 0
flow_files_removed: int = 0
invocations: int = 0
processing_nanos: int = 0
def add_relationship(self, name: str, description: str = "", auto_terminate: bool = False):
"""添加关系"""
self.relationships[name] = Relationship(
name=name,
description=description,
auto_terminate=auto_terminate
)
def get_property(self, key: str, default: str = "") -> str:
"""获取属性"""
return self.properties.get(key, default)
def set_property(self, key: str, value: str):
"""设置属性"""
self.properties[key] = value
@dataclass
class ProcessGroup:
"""处理组"""
id: str
name: str
parent_group_id: Optional[str] = None
processors: Dict[str, Processor] = field(default_factory=dict)
connections: Dict[str, Connection] = field(default_factory=dict)
process_groups: Dict[str, 'ProcessGroup'] = field(default_factory=dict)
input_ports: Dict[str, Any] = field(default_factory=dict)
output_ports: Dict[str, Any] = field(default_factory=dict)
labels: Dict[str, Any] = field(default_factory=dict)
variables: Dict[str, str] = field(default_factory=dict)
parameter_context_id: Optional[str] = None
flow_file_concurrency: str = "UNBOUNDED"
flow_file_outbound_policy: str = "STREAM_WHEN_AVAILABLE"
default_flow_file_expiration: str = "0 sec"
default_back_pressure_object_threshold: int = 10000
default_back_pressure_data_size_threshold: str = "1 GB"
def add_processor(self, processor: Processor):
"""添加处理器"""
self.processors[processor.id] = processor
def add_connection(self, connection: Connection):
"""添加连接"""
self.connections[connection.id] = connection
def get_variable(self, name: str, default: str = "") -> str:
"""获取变量"""
return self.variables.get(name, default)
def set_variable(self, name: str, value: str):
"""设置变量"""
self.variables[name] = value
@dataclass
class FlowRegistry:
"""流注册表"""
id: str
name: str
url: str
description: str = ""
@dataclass
class VersionedFlow:
"""版本化流"""
registry_id: str
bucket_id: str
flow_id: str
version: int
flow_name: str
description: str = ""
comments: str = ""
class NiFiCluster:
"""
Apache NiFi数据流处理集群
"""
def __init__(self, cluster_name: str = "nifi-cluster"):
self.cluster_name = cluster_name
# 数据存储
self.process_groups = {} # group_id -> ProcessGroup
self.processors = {} # processor_id -> Processor
self.connections = {} # connection_id -> Connection
self.flow_registries = {} # registry_id -> FlowRegistry
self.versioned_flows = {} # flow_id -> VersionedFlow
# 集群状态
self.is_running = True
self.version = "1.18.0"
self.java_version = "11.0.16"
self.cluster_coordinator = "node1.example.com:11443"
# 配置
self.config = {
'nifi.web.http.port': 8080,
'nifi.web.https.port': 8443,
'nifi.cluster.is.node': True,
'nifi.cluster.node.address': 'localhost',
'nifi.cluster.node.protocol.port': 11443,
'nifi.zookeeper.connect.string': 'localhost:2181',
'nifi.state.management.embedded.zookeeper.start': True,
'nifi.flowfile.repository.implementation': 'org.apache.nifi.controller.repository.WriteAheadFlowFileRepository',
'nifi.content.repository.implementation': 'org.apache.nifi.controller.repository.FileSystemRepository',
'nifi.provenance.repository.implementation': 'org.apache.nifi.provenance.WriteAheadProvenanceRepository'
}
# 线程锁
self.cluster_lock = threading.Lock()
# 统计信息
self.stats = {
'total_processors': 0,
'running_processors': 0,
'stopped_processors': 0,
'invalid_processors': 0,
'total_connections': 0,
'total_process_groups': 0,
'active_threads': 0,
'queued_flow_files': 0,
'queued_content_size': 0,
'bytes_read_5_min': 0,
'bytes_written_5_min': 0,
'bytes_transferred_5_min': 0,
'flow_files_received_5_min': 0,
'flow_files_sent_5_min': 0
}
# 初始化根处理组
self._create_root_process_group()
# 创建示例流
self._create_example_flow()
def _create_root_process_group(self):
"""创建根处理组"""
root_group = ProcessGroup(
id="root",
name="NiFi Flow"
)
with self.cluster_lock:
self.process_groups["root"] = root_group
self._update_stats()
def _create_example_flow(self):
"""创建示例数据流"""
root_group = self.process_groups["root"]
# 创建示例处理器
generate_processor = Processor(
id="generate-001",
name="GenerateFlowFile",
processor_type=ProcessorType.GENERATE_FLOWFILE
)
generate_processor.add_relationship("success", "成功生成FlowFile")
generate_processor.set_property("File Size", "1KB")
generate_processor.set_property("Batch Size", "1")
generate_processor.set_property("Data Format", "Text")
log_processor = Processor(
id="log-001",
name="LogAttribute",
processor_type=ProcessorType.LOG_ATTRIBUTE
)
log_processor.add_relationship("success", "成功记录属性")
log_processor.set_property("Log Level", "info")
log_processor.set_property("Log Payload", "true")
# 创建连接
connection = Connection(
id="conn-001",
name="GenerateFlowFile to LogAttribute",
source_id="generate-001",
destination_id="log-001",
relationships=["success"]
)
# 添加到根组
root_group.add_processor(generate_processor)
root_group.add_processor(log_processor)
root_group.add_connection(connection)
# 更新全局索引
with self.cluster_lock:
self.processors[generate_processor.id] = generate_processor
self.processors[log_processor.id] = log_processor
self.connections[connection.id] = connection
self._update_stats()
def _update_stats(self):
"""更新统计信息"""
self.stats['total_processors'] = len(self.processors)
self.stats['running_processors'] = len([p for p in self.processors.values() if p.state == ProcessorState.RUNNING])
self.stats['stopped_processors'] = len([p for p in self.processors.values() if p.state == ProcessorState.STOPPED])
self.stats['invalid_processors'] = len([p for p in self.processors.values() if p.state == ProcessorState.INVALID])
self.stats['total_connections'] = len(self.connections)
self.stats['total_process_groups'] = len(self.process_groups)
# 计算队列统计
total_queued = 0
total_queued_size = 0
for connection in self.connections.values():
total_queued += connection.queued_count
total_queued_size += connection.queued_size
self.stats['queued_flow_files'] = total_queued
self.stats['queued_content_size'] = total_queued_size
def create_processor(self, group_id: str, processor_name: str, processor_type: ProcessorType,
properties: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
"""
创建处理器
Args:
group_id: 处理组ID
processor_name: 处理器名称
processor_type: 处理器类型
properties: 处理器属性
Returns:
Dict[str, Any]: 创建结果
"""
if group_id not in self.process_groups:
return {'status': 'error', 'message': f'Process group {group_id} not found'}
processor_id = f"proc-{uuid.uuid4().hex[:8]}"
processor = Processor(
id=processor_id,
name=processor_name,
processor_type=processor_type,
properties=properties or {}
)
# 根据处理器类型添加默认关系
if processor_type in [ProcessorType.GET_FILE, ProcessorType.GET_HTTP, ProcessorType.GET_KAFKA]:
processor.add_relationship("success", "成功获取数据")
processor.add_relationship("failure", "获取数据失败")
elif processor_type in [ProcessorType.PUT_FILE, ProcessorType.PUT_HDFS, ProcessorType.PUT_KAFKA]:
processor.add_relationship("success", "成功写入数据")
processor.add_relationship("failure", "写入数据失败")
elif processor_type == ProcessorType.ROUTE_ON_ATTRIBUTE:
processor.add_relationship("matched", "属性匹配")
processor.add_relationship("unmatched", "属性不匹配")
elif processor_type == ProcessorType.SPLIT_TEXT:
processor.add_relationship("splits", "分割结果")
processor.add_relationship("original", "原始文件")
processor.add_relationship("failure", "分割失败")
else:
processor.add_relationship("success", "处理成功")
processor.add_relationship("failure", "处理失败")
group = self.process_groups[group_id]
group.add_processor(processor)
with self.cluster_lock:
self.processors[processor_id] = processor
self._update_stats()
return {
'status': 'success',
'processor_id': processor_id,
'processor_name': processor_name,
'processor_type': processor_type.value,
'group_id': group_id,
'message': f'Processor {processor_name} created successfully'
}
def create_connection(self, source_id: str, destination_id: str, relationships: List[str],
name: Optional[str] = None) -> Dict[str, Any]:
"""
创建连接
Args:
source_id: 源处理器ID
destination_id: 目标处理器ID
relationships: 关系列表
name: 连接名称
Returns:
Dict[str, Any]: 创建结果
"""
if source_id not in self.processors:
return {'status': 'error', 'message': f'Source processor {source_id} not found'}
if destination_id not in self.processors:
return {'status': 'error', 'message': f'Destination processor {destination_id} not found'}
source_processor = self.processors[source_id]
destination_processor = self.processors[destination_id]
# 验证关系是否存在
for rel in relationships:
if rel not in source_processor.relationships:
return {'status': 'error', 'message': f'Relationship {rel} not found in source processor'}
connection_id = f"conn-{uuid.uuid4().hex[:8]}"
connection_name = name or f"{source_processor.name} to {destination_processor.name}"
connection = Connection(
id=connection_id,
name=connection_name,
source_id=source_id,
destination_id=destination_id,
relationships=relationships.copy()
)
# 更新处理器连接信息
source_processor.outgoing_connections.append(connection_id)
destination_processor.incoming_connections.append(connection_id)
# 找到处理器所在的组并添加连接
for group in self.process_groups.values():
if source_id in group.processors:
group.add_connection(connection)
break
with self.cluster_lock:
self.connections[connection_id] = connection
self._update_stats()
return {
'status': 'success',
'connection_id': connection_id,
'connection_name': connection_name,
'source_id': source_id,
'destination_id': destination_id,
'relationships': relationships,
'message': f'Connection {connection_name} created successfully'
}
def start_processor(self, processor_id: str) -> Dict[str, Any]:
"""
启动处理器
Args:
processor_id: 处理器ID
Returns:
Dict[str, Any]: 启动结果
"""
if processor_id not in self.processors:
return {'status': 'error', 'message': f'Processor {processor_id} not found'}
processor = self.processors[processor_id]
if processor.state == ProcessorState.RUNNING:
return {'status': 'error', 'message': f'Processor {processor.name} is already running'}
if processor.state == ProcessorState.INVALID:
return {'status': 'error', 'message': f'Processor {processor.name} is invalid and cannot be started'}
processor.state = ProcessorState.RUNNING
with self.cluster_lock:
self._update_stats()
return {
'status': 'success',
'processor_id': processor_id,
'processor_name': processor.name,
'message': f'Processor {processor.name} started successfully'
}
def stop_processor(self, processor_id: str) -> Dict[str, Any]:
"""
停止处理器
Args:
processor_id: 处理器ID
Returns:
Dict[str, Any]: 停止结果
"""
if processor_id not in self.processors:
return {'status': 'error', 'message': f'Processor {processor_id} not found'}
processor = self.processors[processor_id]
if processor.state == ProcessorState.STOPPED:
return {'status': 'error', 'message': f'Processor {processor.name} is already stopped'}
processor.state = ProcessorState.STOPPED
with self.cluster_lock:
self._update_stats()
return {
'status': 'success',
'processor_id': processor_id,
'processor_name': processor.name,
'message': f'Processor {processor.name} stopped successfully'
}
def create_flow_file(self, processor_id: str, content: Union[str, bytes],
attributes: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
"""
创建FlowFile
Args:
processor_id: 处理器ID
content: 内容
attributes: 属性
Returns:
Dict[str, Any]: 创建结果
"""
if processor_id not in self.processors:
return {'status': 'error', 'message': f'Processor {processor_id} not found'}
processor = self.processors[processor_id]
if processor.state != ProcessorState.RUNNING:
return {'status': 'error', 'message': f'Processor {processor.name} is not running'}
# 创建FlowFile
flow_file_uuid = f"ff-{uuid.uuid4().hex}"
content_bytes = content.encode('utf-8') if isinstance(content, str) else content
flow_file = FlowFile(
uuid=flow_file_uuid,
attributes=attributes or {},
content=content_bytes,
size=len(content_bytes)
)
# 设置默认属性
flow_file.put_attribute("filename", f"flowfile-{flow_file_uuid[:8]}")
flow_file.put_attribute("path", "/")
flow_file.put_attribute("uuid", flow_file_uuid)
# 更新处理器统计
processor.flow_files_received += 1
processor.bytes_read += flow_file.size
processor.invocations += 1
return {
'status': 'success',
'flow_file_uuid': flow_file_uuid,
'size': flow_file.size,
'attributes': flow_file.attributes,
'message': f'FlowFile created successfully'
}
def transfer_flow_file(self, processor_id: str, flow_file_uuid: str,
relationship: str) -> Dict[str, Any]:
"""
传输FlowFile
Args:
processor_id: 处理器ID
flow_file_uuid: FlowFile UUID
relationship: 关系名称
Returns:
Dict[str, Any]: 传输结果
"""
if processor_id not in self.processors:
return {'status': 'error', 'message': f'Processor {processor_id} not found'}
processor = self.processors[processor_id]
if relationship not in processor.relationships:
return {'status': 'error', 'message': f'Relationship {relationship} not found'}
# 查找目标连接
target_connections = []
for conn_id in processor.outgoing_connections:
connection = self.connections[conn_id]
if relationship in connection.relationships:
target_connections.append(connection)
if not target_connections:
# 检查是否自动终止
if processor.relationships[relationship].auto_terminate:
processor.flow_files_removed += 1
return {
'status': 'success',
'action': 'auto_terminated',
'message': f'FlowFile auto-terminated on relationship {relationship}'
}
else:
return {'status': 'error', 'message': f'No connection found for relationship {relationship}'}
# 创建模拟FlowFile(实际应该从会话中获取)
flow_file = FlowFile(
uuid=flow_file_uuid,
attributes={'filename': f'flowfile-{flow_file_uuid[:8]}'},
content=b'sample content',
size=14
)
# 传输到所有目标连接
transferred_count = 0
for connection in target_connections:
connection.enqueue(flow_file.clone())
transferred_count += 1
# 更新处理器统计
processor.flow_files_sent += transferred_count
processor.bytes_written += flow_file.size * transferred_count
with self.cluster_lock:
self._update_stats()
return {
'status': 'success',
'flow_file_uuid': flow_file_uuid,
'relationship': relationship,
'transferred_count': transferred_count,
'target_connections': [conn.id for conn in target_connections],
'message': f'FlowFile transferred to {transferred_count} connections'
}
def get_processor_status(self, processor_id: str) -> Dict[str, Any]:
"""
获取处理器状态
Args:
processor_id: 处理器ID
Returns:
Dict[str, Any]: 处理器状态
"""
if processor_id not in self.processors:
return {'status': 'error', 'message': f'Processor {processor_id} not found'}
processor = self.processors[processor_id]
# 计算连接队列信息
incoming_queued = 0
outgoing_queued = 0
for conn_id in processor.incoming_connections:
if conn_id in self.connections:
incoming_queued += self.connections[conn_id].queued_count
for conn_id in processor.outgoing_connections:
if conn_id in self.connections:
outgoing_queued += self.connections[conn_id].queued_count
return {
'status': 'success',
'processor': {
'id': processor.id,
'name': processor.name,
'type': processor.processor_type.value,
'state': processor.state.value,
'properties': processor.properties,
'relationships': {name: rel.name for name, rel in processor.relationships.items()},
'incoming_connections': processor.incoming_connections,
'outgoing_connections': processor.outgoing_connections,
'concurrent_tasks': processor.concurrent_tasks,
'scheduling_period': processor.scheduling_period,
'scheduling_strategy': processor.scheduling_strategy,
'stats': {
'bytes_read': processor.bytes_read,
'bytes_written': processor.bytes_written,
'bytes_transferred': processor.bytes_transferred,
'flow_files_received': processor.flow_files_received,
'flow_files_sent': processor.flow_files_sent,
'flow_files_removed': processor.flow_files_removed,
'invocations': processor.invocations,
'processing_nanos': processor.processing_nanos,
'incoming_queued': incoming_queued,
'outgoing_queued': outgoing_queued
}
}
}
def get_connection_status(self, connection_id: str) -> Dict[str, Any]:
"""
获取连接状态
Args:
connection_id: 连接ID
Returns:
Dict[str, Any]: 连接状态
"""
if connection_id not in self.connections:
return {'status': 'error', 'message': f'Connection {connection_id} not found'}
connection = self.connections[connection_id]
return {
'status': 'success',
'connection': {
'id': connection.id,
'name': connection.name,
'source_id': connection.source_id,
'destination_id': connection.destination_id,
'relationships': connection.relationships,
'connection_type': connection.connection_type.value,
'queue_priority': connection.queue_priority.value,
'back_pressure_object_threshold': connection.back_pressure_object_threshold,
'back_pressure_data_size_threshold': connection.back_pressure_data_size_threshold,
'load_balance_strategy': connection.load_balance_strategy,
'stats': {
'queued_count': connection.queued_count,
'queued_size': connection.queued_size,
'is_back_pressure_enabled': connection.is_back_pressure_enabled()
}
}
}
def list_processors(self, group_id: Optional[str] = None) -> Dict[str, Any]:
"""
列出处理器
Args:
group_id: 处理组ID(可选)
Returns:
Dict[str, Any]: 处理器列表
"""
processors_info = []
if group_id:
if group_id not in self.process_groups:
return {'status': 'error', 'message': f'Process group {group_id} not found'}
group = self.process_groups[group_id]
target_processors = group.processors
else:
target_processors = self.processors
for processor_id, processor in target_processors.items():
processors_info.append({
'id': processor.id,
'name': processor.name,
'type': processor.processor_type.value,
'state': processor.state.value,
'group_id': group_id or 'root',
'concurrent_tasks': processor.concurrent_tasks,
'flow_files_received': processor.flow_files_received,
'flow_files_sent': processor.flow_files_sent
})
return {
'status': 'success',
'processors': processors_info,
'total': len(processors_info)
}
def get_cluster_status(self) -> Dict[str, Any]:
"""
获取集群状态
Returns:
Dict[str, Any]: 集群状态
"""
# 计算5分钟统计(模拟)
total_bytes_read = sum(p.bytes_read for p in self.processors.values())
total_bytes_written = sum(p.bytes_written for p in self.processors.values())
total_flow_files_received = sum(p.flow_files_received for p in self.processors.values())
total_flow_files_sent = sum(p.flow_files_sent for p in self.processors.values())
self.stats['bytes_read_5_min'] = total_bytes_read
self.stats['bytes_written_5_min'] = total_bytes_written
self.stats['bytes_transferred_5_min'] = total_bytes_read + total_bytes_written
self.stats['flow_files_received_5_min'] = total_flow_files_received
self.stats['flow_files_sent_5_min'] = total_flow_files_sent
self.stats['active_threads'] = len([p for p in self.processors.values() if p.state == ProcessorState.RUNNING]) * 2
return {
'cluster_name': self.cluster_name,
'is_running': self.is_running,
'version': self.version,
'java_version': self.java_version,
'cluster_coordinator': self.cluster_coordinator,
'stats': self.stats,
'config': self.config,
'timestamp': datetime.now().isoformat()
}
# 使用示例
if __name__ == "__main__":
# 创建NiFi集群
nifi = NiFiCluster("production-nifi")
print("=== Apache NiFi数据流处理示例 ===")
# 创建处理器
print("\n=== 创建处理器 ===")
# 创建文件获取处理器
get_file_result = nifi.create_processor(
group_id="root",
processor_name="GetFile-WebLogs",
processor_type=ProcessorType.GET_FILE,
properties={
'Input Directory': '/var/log/web',
'File Filter': '.*\.log$',
'Keep Source File': 'false',
'Minimum File Age': '0 sec',
'Polling Interval': '10 sec',
'Batch Size': '10'
}
)
print(f"GetFile处理器创建结果: {get_file_result}")
# 创建文本替换处理器
replace_text_result = nifi.create_processor(
group_id="root",
processor_name="ReplaceText-Anonymize",
processor_type=ProcessorType.REPLACE_TEXT,
properties={
'Search Value': r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b',
'Replacement Value': 'XXX.XXX.XXX.XXX',
'Character Set': 'UTF-8',
'Maximum Buffer Size': '1 MB',
'Replacement Strategy': 'Regex Replace'
}
)
print(f"ReplaceText处理器创建结果: {replace_text_result}")
# 创建HDFS写入处理器
put_hdfs_result = nifi.create_processor(
group_id="root",
processor_name="PutHDFS-Archive",
processor_type=ProcessorType.PUT_HDFS,
properties={
'Hadoop Configuration Resources': '/etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml',
'Directory': '/data/logs/web/${now():format("yyyy/MM/dd")}',
'Conflict Resolution Strategy': 'replace',
'Block Size': '128 MB',
'Replication': '3',
'Permissions umask': '022'
}
)
print(f"PutHDFS处理器创建结果: {put_hdfs_result}")
if (get_file_result['status'] == 'success' and
replace_text_result['status'] == 'success' and
put_hdfs_result['status'] == 'success'):
get_file_id = get_file_result['processor_id']
replace_text_id = replace_text_result['processor_id']
put_hdfs_id = put_hdfs_result['processor_id']
# 创建连接
print("\n=== 创建连接 ===")
# GetFile -> ReplaceText
conn1_result = nifi.create_connection(
source_id=get_file_id,
destination_id=replace_text_id,
relationships=["success"],
name="GetFile to ReplaceText"
)
print(f"连接1创建结果: {conn1_result}")
# ReplaceText -> PutHDFS
conn2_result = nifi.create_connection(
source_id=replace_text_id,
destination_id=put_hdfs_id,
relationships=["success"],
name="ReplaceText to PutHDFS"
)
print(f"连接2创建结果: {conn2_result}")
# 启动处理器
print("\n=== 启动处理器 ===")
start_results = []
for processor_id in [get_file_id, replace_text_id, put_hdfs_id]:
result = nifi.start_processor(processor_id)
start_results.append(result)
print(f"处理器 {processor_id} 启动结果: {result}")
if all(r['status'] == 'success' for r in start_results):
# 模拟FlowFile处理
print("\n=== 模拟FlowFile处理 ===")
# 创建FlowFile
for i in range(5):
flow_file_result = nifi.create_flow_file(
processor_id=get_file_id,
content=f"192.168.1.{100+i} - - [25/Dec/2023:10:00:{i:02d} +0000] \"GET /api/data HTTP/1.1\" 200 1234\n",
attributes={
'filename': f'access_{i+1}.log',
'path': '/var/log/web/',
'file.size': '78'
}
)
if i < 3: # 只打印前3个结果
print(f"FlowFile {i+1} 创建结果: {flow_file_result}")
if flow_file_result['status'] == 'success':
# 传输FlowFile
transfer_result = nifi.transfer_flow_file(
processor_id=get_file_id,
flow_file_uuid=flow_file_result['flow_file_uuid'],
relationship="success"
)
if i < 3:
print(f"FlowFile {i+1} 传输结果: {transfer_result}")
# 获取处理器状态
print("\n=== 处理器状态 ===")
for processor_id, name in [(get_file_id, "GetFile"), (replace_text_id, "ReplaceText"), (put_hdfs_id, "PutHDFS")]:
status = nifi.get_processor_status(processor_id)
if status['status'] == 'success':
proc_info = status['processor']
stats = proc_info['stats']
print(f"{name} 处理器:")
print(f" 状态: {proc_info['state']}")
print(f" 接收FlowFiles: {stats['flow_files_received']}")
print(f" 发送FlowFiles: {stats['flow_files_sent']}")
print(f" 读取字节: {stats['bytes_read']}")
print(f" 写入字节: {stats['bytes_written']}")
print(f" 传入队列: {stats['incoming_queued']}")
print(f" 传出队列: {stats['outgoing_queued']}")
# 获取连接状态
print("\n=== 连接状态 ===")
if conn1_result['status'] == 'success':
conn_status = nifi.get_connection_status(conn1_result['connection_id'])
if conn_status['status'] == 'success':
conn_info = conn_status['connection']
stats = conn_info['stats']
print(f"连接: {conn_info['name']}")
print(f" 队列数量: {stats['queued_count']}")
print(f" 队列大小: {stats['queued_size']} bytes")
print(f" 背压启用: {stats['is_back_pressure_enabled']}")
# 列出所有处理器
print("\n=== 处理器列表 ===")
processors_list = nifi.list_processors()
if processors_list['status'] == 'success':
print(f"总处理器数: {processors_list['total']}")
for proc in processors_list['processors']:
print(f" - {proc['name']} ({proc['type']}) - 状态: {proc['state']}")
# 获取集群状态
print("\n=== 集群状态 ===")
cluster_status = nifi.get_cluster_status()
print(f"集群名称: {cluster_status['cluster_name']}")
print(f"版本: {cluster_status['version']}")
print(f"运行状态: {cluster_status['is_running']}")
print(f"集群协调器: {cluster_status['cluster_coordinator']}")
print("统计信息:")
for key, value in cluster_status['stats'].items():
print(f" {key}: {value}")
更多推荐
所有评论(0)