Kappa架构:为大数据领域注入新的活力

一、引言:大数据处理的"双重管道"痛点

在大数据时代,企业需要处理两种类型的数据:流数据(如用户实时点击、服务器日志、IoT传感器数据)和批数据(如历史订单、用户画像、离线统计)。早期的解决方案是Lambda架构,它通过"批处理层"(Batch Layer)处理历史数据、“流处理层”(Speed Layer)处理实时数据,最终由"服务层"(Serving Layer)合并结果。这种架构曾是大数据处理的标准,但随着业务需求的演变,其弊端日益凸显:

  1. 维护成本高:需要同时维护批处理(如Hadoop MapReduce、Spark SQL)和流处理(如Spark Streaming、Storm)两个独立管道,开发、测试、运维的工作量翻倍。
  2. 逻辑不一致:批处理和流处理的业务逻辑(如统计口径、过滤规则)需要重复实现,容易出现"批流结果不一致"的问题(比如实时统计的日活是10万,离线统计是12万)。
  3. 资源浪费:批处理层需要存储全量历史数据(如HDFS中的原始日志),流处理层需要存储增量数据(如Kafka中的消息),两者的存储和计算资源无法共享。

2014年,LinkedIn的资深工程师Jay Kreps(Kafka的核心作者之一)提出了Kappa架构,旨在用"单一流处理管道"统一处理流数据和批数据,彻底解决Lambda架构的痛点。本文将深入解析Kappa架构的核心原理、实战案例及未来趋势,帮助你理解其如何为大数据领域注入新的活力。

二、Kappa架构的核心原理:用流处理统一一切

2.1 核心思想:“流是批的超集”

Kappa架构的核心假设是:所有数据都可以视为流——无论是实时产生的流数据(如用户点击),还是历史存储的批数据(如去年的订单),都可以通过流处理引擎统一处理。其核心思想可以概括为:

用流处理引擎处理所有数据,通过"重新消费历史流"来生成批结果

具体来说,Kappa架构的工作流程如下(用Mermaid流程图表示):

数据生成

Kafka消息队列

流处理引擎(如Flink)

存储系统(如S3/Elasticsearch)

应用层(如实时 dashboard、推荐系统)

历史数据重新处理

关键组件说明

  • 数据生成:包括实时数据(如用户行为、IoT传感器)和历史数据(如导入的离线文件)。
  • Kafka消息队列:作为"数据总线",负责存储所有流数据(包括实时和历史),支持持久化存储(默认保留7天,可配置更长时间)和重放(Replay)功能(从指定偏移量重新消费数据)。
  • 流处理引擎:如Apache Flink、Kafka Streams,负责处理流数据,实现业务逻辑(如统计、过滤、聚合)。
  • 存储系统:如S3(用于存储历史结果)、Elasticsearch(用于实时查询)、Redis(用于缓存)。
  • 应用层:消费存储系统中的结果,为用户提供服务(如实时监控、推荐系统)。

2.2 与Lambda架构的对比

为了更清晰地理解Kappa架构,我们将其与Lambda架构进行对比:

维度 Lambda架构 Kappa架构
处理管道 批处理层 + 流处理层 单一流处理层
业务逻辑 批处理和流处理重复实现 仅需实现一次流处理逻辑
历史数据处理 批处理层重新计算全量数据 流处理引擎重新消费历史流
维护成本 高(两个管道) 低(一个管道)
结果一致性 易出现不一致(批流逻辑差异) 一致(同一逻辑处理所有数据)
延迟 批处理延迟高(小时级),流处理低(秒级) 统一低延迟(秒级/毫秒级)

结论:Kappa架构通过"单一流处理管道"消除了Lambda架构的"双重维护"和"逻辑不一致"问题,更适合实时性要求高、业务逻辑复杂的大数据场景。

三、Kappa架构的关键技术:如何实现"流批统一"

要理解Kappa架构的实现细节,需要掌握以下关键技术:

3.1 消息队列的"重放"功能(Replay)

Kappa架构的核心依赖是消息队列的重放能力——即可以从指定的偏移量(Offset)重新消费数据。例如,当需要重新计算过去7天的用户行为数据时,只需让流处理引擎从Kafka主题的最早偏移量(Earliest Offset)开始消费,重新处理所有数据即可。

Kafka的重放机制
Kafka将消息存储在主题(Topic)中,每个主题分为多个分区(Partition),每个分区中的消息按时间顺序排列,并用偏移量(Offset)唯一标识。Kafka的持久化存储(默认保留7天)和偏移量管理(由消费者组维护)使得重放数据变得非常容易。

例如,用Kafka命令行工具查看主题的偏移量范围:

kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic user-behavior --time -2  # 最早偏移量
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic user-behavior --time -1  # 最新偏移量

3.2 流处理引擎的"状态管理"

流处理引擎需要处理有状态的计算(如统计用户的累计点击量、维护会话状态),而Kappa架构的"重放"功能要求流处理引擎能够恢复状态(即重新处理历史数据时,状态必须与之前的计算一致)。

Apache Flink的状态管理
Flink是Kappa架构的首选流处理引擎,其状态后端(State Backend)机制确保了状态的持久化和恢复:

  • MemoryStateBackend:状态存储在JVM堆中,适合开发测试(不推荐生产环境)。
  • FsStateBackend:状态存储在文件系统(如HDFS、S3)中, checkpoint 数据存储在远程文件系统,适合中大规模状态。
  • RocksDBStateBackend:状态存储在RocksDB(嵌入式键值数据库)中,支持增量 checkpoint(仅上传修改的状态数据),适合大规模状态(如TB级)。

例如,在Flink中配置RocksDBStateBackend:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new RocksDBStateBackend("hdfs://localhost:9000/flink/checkpoints"));
env.enableCheckpointing(5000);  // 每5秒做一次checkpoint

3.3 "Exactly-Once"语义

Kappa架构要求流处理引擎支持Exactly-Once语义(即每个消息恰好被处理一次),否则重放数据时会出现重复计算或遗漏的问题。

Flink的Exactly-Once实现
Flink通过Checkpoint机制(基于Chandy-Lamport算法)和端到端的Exactly-Once(如Kafka的事务性生产者、Elasticsearch的幂等写入)实现Exactly-Once语义。

例如,Flink消费Kafka数据并写入Elasticsearch的Exactly-Once配置:

// 1. 配置Kafka消费者(支持Exactly-Once)
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
kafkaProps.setProperty("group.id", "user-behavior-consumer");
kafkaProps.setProperty("auto.offset.reset", "earliest");
kafkaProps.setProperty("enable.auto.commit", "false");  // 关闭自动提交偏移量

FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("user-behavior", new SimpleStringSchema(), kafkaProps);
kafkaConsumer.setStartFromEarliest();  // 从最早偏移量开始消费
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);  // 在checkpoint时提交偏移量

// 2. 配置Elasticsearch sink(支持幂等写入)
ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
    Collections.singletonList(new HttpHost("localhost", 9200, "http")),
    (element, context, indexer) -> {
        Map<String, Object> json = new HashMap<>();
        json.put("user_id", element.split(",")[0]);
        json.put("item_id", element.split(",")[1]);
        json.put("timestamp", element.split(",")[2]);
        IndexRequest request = Requests.indexRequest()
            .index("user-behavior-index")
            .id(element.split(",")[0] + "-" + element.split(",")[1])  // 唯一ID,确保幂等
            .source(json);
        indexer.add(request);
    }
);
esSinkBuilder.setBulkFlushMaxActions(1000);
esSinkBuilder.setBulkFlushInterval(5000);
esSinkBuilder.setFailureHandler(new RetryRejectedExecutionFailureHandler());  // 重试失败的请求

// 3. 构建流处理 pipeline
DataStream<String> stream = env.addSource(kafkaConsumer);
stream.addSink(esSinkBuilder.build());

四、Kappa架构实战:实时用户行为分析系统

接下来,我们将通过一个实时用户行为分析系统的案例,演示Kappa架构的具体实现。该系统的需求是:

  • 实时统计每小时的用户点击量(按商品ID分组)。
  • 支持重新计算过去7天的历史数据(如修正统计口径)。
  • 实时展示统计结果(用Kibana可视化)。

4.1 技术栈选择

组件 选择 原因
消息队列 Apache Kafka 高吞吐量、持久化存储、支持重放
流处理引擎 Apache Flink 低延迟、高吞吐量、支持Exactly-Once
存储系统 Elasticsearch 实时检索、支持幂等写入
可视化工具 Kibana 与Elasticsearch集成,易用于实时 dashboard
环境搭建 Docker Compose 快速启动依赖组件(Kafka、ZooKeeper、Flink、Elasticsearch、Kibana)

4.2 环境搭建(Docker Compose)

创建docker-compose.yml文件,定义所有依赖组件:

version: '3.8'
services:
  zookeeper:
    image: wurstmeister/zookeeper:3.4.6
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka:2.12-2.5.0
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
    depends_on:
      - zookeeper
  flink-jobmanager:
    image: flink:1.17.0-scala_2.12
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=flink-jobmanager
  flink-taskmanager:
    image: flink:1.17.0-scala_2.12
    command: taskmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=flink-jobmanager
      - TASK_MANAGER_NUMBER_OF_TASK_SLOTS=2
    depends_on:
      - flink-jobmanager
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.17.0
    ports:
      - "9200:9200"
      - "9300:9300"
    environment:
      - discovery.type=single-node
      - ES_JAVA_OPTS=-Xms512m -Xmx512m
  kibana:
    image: docker.elastic.co/kibana/kibana:7.17.0
    ports:
      - "5601:5601"
    depends_on:
      - elasticsearch

启动所有组件:

docker-compose up -d

验证组件是否启动成功:

  • Kafka:访问localhost:9092(无响应,但可通过命令行工具验证)。
  • Flink:访问localhost:8081(Flink Web UI)。
  • Elasticsearch:访问localhost:9200(返回JSON响应)。
  • Kibana:访问localhost:5601(Kibana Web UI)。

4.3 数据生成(模拟用户点击事件)

用Python编写一个Kafka生产者,模拟用户点击事件(包含user_iditem_idtimestamp三个字段):

from kafka import KafkaProducer
import json
import time
import random

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# 模拟100个用户和1000个商品
users = [f"user_{i}" for i in range(100)]
items = [f"item_{i}" for i in range(1000)]

while True:
    # 生成随机点击事件
    event = {
        "user_id": random.choice(users),
        "item_id": random.choice(items),
        "timestamp": int(time.time() * 1000)  # 毫秒级时间戳
    }
    # 发送到Kafka主题"user-behavior"
    producer.send("user-behavior", value=event)
    print(f"Sent event: {event}")
    # 每1秒发送一次
    time.sleep(1)

运行生产者:

python3 kafka_producer.py

4.4 流处理作业(Flink实现)

用Java编写Flink流处理作业,实现"每小时统计商品点击量"的逻辑:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSink;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class UserBehaviorAnalysis {
    public static void main(String[] args) throws Exception {
        // 1. 创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);  // 开发环境设置为1,生产环境根据需求调整

        // 2. 配置Kafka消费者
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
        kafkaProps.setProperty("group.id", "user-behavior-consumer");
        kafkaProps.setProperty("auto.offset.reset", "earliest");
        kafkaProps.setProperty("enable.auto.commit", "false");

        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
                "user-behavior",
                new SimpleStringSchema(),
                kafkaProps
        );
        // 设置事件时间(从消息中的timestamp字段提取)
        kafkaConsumer.assignTimestampsAndWatermarks(
                new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(10)) {
                    @Override
                    public long extractTimestamp(String element) {
                        // 解析JSON消息,提取timestamp字段(毫秒级)
                        Map<String, Object> event = jsonToMap(element);
                        return (long) event.get("timestamp");
                    }
                }
        );

        // 3. 读取Kafka数据
        DataStream<String> kafkaStream = env.addSource(kafkaConsumer);

        // 4. 数据转换:将JSON字符串转换为Tuple3(item_id, 1, timestamp)
        DataStream<Tuple3<String, Integer, Long>> itemClickStream = kafkaStream
                .map(new MapFunction<String, Tuple3<String, Integer, Long>>() {
                    @Override
                    public Tuple3<String, Integer, Long> map(String value) throws Exception {
                        Map<String, Object> event = jsonToMap(value);
                        String itemId = (String) event.get("item_id");
                        Long timestamp = (Long) event.get("timestamp");
                        return new Tuple3<>(itemId, 1, timestamp);
                    }
                });

        // 5. 窗口聚合:每小时统计商品点击量(事件时间窗口)
        DataStream<Tuple2<String, Integer>> hourlyClickStream = itemClickStream
                .keyBy(tuple -> tuple.f0)  // 按商品ID分组
                .window(TumblingEventTimeWindows.of(Time.hours(1)))  // 滚动窗口(每小时)
                .sum(1);  // 求和(点击量)

        // 6. 将结果写入Elasticsearch
        ElasticsearchSink.Builder<Tuple2<String, Integer>> esSinkBuilder = new ElasticsearchSink.Builder<>(
                Collections.singletonList(new HttpHost("localhost", 9200, "http")),
                (element, context, indexer) -> {
                    // 构建JSON文档(包含商品ID、点击量、窗口结束时间)
                    Map<String, Object> json = new HashMap<>();
                    json.put("item_id", element.f0);
                    json.put("click_count", element.f1);
                    json.put("window_end_time", context.currentProcessingTime());  // 窗口结束时间(处理时间)

                    // 生成IndexRequest(唯一ID:item_id + 窗口结束时间,确保幂等)
                    IndexRequest request = Requests.indexRequest()
                            .index("hourly-click-index")
                            .id(element.f0 + "-" + context.currentProcessingTime())
                            .source(json);
                    indexer.add(request);
                }
        );
        // 配置批量写入(每1000条或5秒触发一次)
        esSinkBuilder.setBulkFlushMaxActions(1000);
        esSinkBuilder.setBulkFlushInterval(5000);
        // 配置失败重试(重试被拒绝的请求)
        esSinkBuilder.setFailureHandler(new RetryRejectedExecutionFailureHandler());

        // 添加Elasticsearch Sink
        hourlyClickStream.addSink(esSinkBuilder.build());

        // 7. 执行作业
        env.execute("User Behavior Analysis");
    }

    // 辅助方法:将JSON字符串转换为Map
    private static Map<String, Object> jsonToMap(String json) throws Exception {
        return new org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper().readValue(json, Map.class);
    }
}

代码解读

  • 事件时间处理:使用BoundedOutOfOrdernessTimestampExtractor提取消息中的timestamp字段作为事件时间,并设置10秒的乱序容忍窗口(处理迟到的数据)。
  • 窗口聚合:使用TumblingEventTimeWindows定义每小时的滚动窗口,通过sum(1)统计每个商品的点击量。
  • Elasticsearch写入:生成唯一ID(item_id + 窗口结束时间)确保幂等写入,避免重复数据。

4.5 提交Flink作业

  1. 将Java代码打包为JAR文件(如user-behavior-analysis.jar)。
  2. 访问Flink Web UI(localhost:8081),点击"Submit New Job",上传JAR文件。
  3. 选择主类(com.example.UserBehaviorAnalysis),点击"Submit"提交作业。

4.6 可视化结果(Kibana)

  1. 访问Kibana Web UI(localhost:5601),点击"Management" -> “Index Patterns”,创建索引模式hourly-click-index(匹配Elasticsearch中的索引)。
  2. 点击"Discover",选择创建的索引模式,即可看到实时统计的商品点击量(按小时更新)。

4.7 重新处理历史数据

当需要修正统计口径(如增加过滤条件,只统计user_id大于user_50的用户点击量)时,只需修改Flink作业的逻辑,然后重新提交作业并从最早偏移量开始消费

  1. 修改map函数,添加过滤条件:
    @Override
    public Tuple3<String, Integer, Long> map(String value) throws Exception {
        Map<String, Object> event = jsonToMap(value);
        String userId = (String) event.get("user_id");
        // 过滤user_id大于user_50的用户
        if (Integer.parseInt(userId.split("_")[1]) > 50) {
            String itemId = (String) event.get("item_id");
            Long timestamp = (Long) event.get("timestamp");
            return new Tuple3<>(itemId, 1, timestamp);
        } else {
            // 返回空值,后续会被过滤掉(需要添加filter操作)
            return null;
        }
    }
    
  2. 添加filter操作,过滤空值:
    DataStream<Tuple3<String, Integer, Long>> itemClickStream = kafkaStream
            .map(...)
            .filter(tuple -> tuple != null);  // 过滤空值
    
  3. 重新打包JAR文件,提交到Flink Web UI,并设置"Start from earliest"(从最早偏移量开始消费)。

Flink作业会重新处理Kafka主题中的所有历史数据(包括过去7天的),并将新的统计结果写入Elasticsearch(覆盖旧数据)。

五、Kappa架构的应用场景与局限性

5.1 适用场景

Kappa架构适合实时性要求高、业务逻辑复杂、需要频繁调整统计口径的大数据场景,典型案例包括:

  • 实时推荐系统:处理用户实时行为数据(如点击、收藏、购买),生成实时推荐结果;同时可以重新处理历史数据,更新推荐模型。
  • 实时监控系统:处理服务器日志、IoT传感器数据,实时报警(如CPU使用率超过阈值);同时可以重新处理历史日志,分析故障原因。
  • 实时分析系统:统计实时用户行为(如日活、转化率),支持按任意时间范围(如过去7天、过去30天)重新计算。

5.2 局限性

Kappa架构并非"银弹",以下场景不适合使用:

  • 离线批处理场景:如果业务不需要实时性(如每月的财务报表),使用Lambda架构的批处理层(如Hadoop MapReduce)更高效(因为批处理的吞吐量更高,成本更低)。
  • 超大规模历史数据处理:如果需要处理PB级的历史数据,Kappa架构的"重放"功能会导致处理时间过长(因为流处理引擎的吞吐量低于批处理引擎)。此时可以结合Lambda架构,用批处理层处理历史数据,流处理层处理实时数据。

六、Kappa架构的未来趋势与挑战

6.1 未来趋势

  1. 云原生集成:随着云原生技术的普及,Kappa架构将越来越多地运行在Kubernetes集群上(如用Flink on Kubernetes部署流处理作业,用Strimzi部署Kafka集群),实现弹性伸缩(根据数据量自动调整计算资源)。
  2. Serverless流处理:Serverless架构(如AWS Kinesis Data Analytics、阿里云Flink Serverless)将降低Kappa架构的运维成本,用户只需关注业务逻辑,无需管理集群。
  3. AI/ML融合:Kappa架构将与AI/ML技术结合(如实时特征工程、实时模型推理),支持实时推荐、实时 fraud 检测等场景(如用Flink处理实时特征,用TensorFlow Serving进行实时模型推理)。

6.2 挑战

  1. 状态管理复杂性:随着业务逻辑的复杂化,流处理引擎的状态(如用户会话状态、累计统计值)会越来越大,如何高效存储和恢复状态(如增量 checkpoint、状态分区)是一个挑战。
  2. 历史数据处理性能:当需要处理超大规模历史数据时,流处理引擎的吞吐量(如Flink的吞吐量约为10万条/秒/核)可能无法满足需求,如何优化重放性能(如并行处理、增量计算)是一个问题。
  3. 端到端延迟:虽然Kappa架构的延迟(秒级/毫秒级)比Lambda架构低,但对于某些极端场景(如高频交易),延迟仍然不够(需要微秒级延迟),此时需要使用更轻量级的流处理引擎(如Apache Pulsar Functions)。

七、总结与展望

Kappa架构通过"单一流处理管道"统一处理流数据和批数据,解决了Lambda架构的"双重维护"和"逻辑不一致"问题,为大数据领域注入了新的活力。其核心优势是简化架构、降低维护成本、统一业务逻辑,适合实时性要求高的大数据场景。

随着云原生、Serverless、AI/ML等技术的融合,Kappa架构的应用场景将越来越广泛。未来,我们需要解决状态管理、历史数据处理性能等挑战,让Kappa架构更加成熟和普及。

如果你正在寻找一种高效、易维护的大数据处理架构,不妨尝试Kappa架构——它可能会成为你处理实时大数据的"利器"。

八、工具与资源推荐

8.1 工具推荐

  • 消息队列:Apache Kafka(首选)、Apache Pulsar(支持多租户、低延迟)。
  • 流处理引擎:Apache Flink(首选)、Kafka Streams(轻量级,与Kafka集成)、Apache Spark Streaming(微批处理,适合批流混合场景)。
  • 存储系统:Elasticsearch(实时检索)、Apache HBase(实时随机读写)、Amazon S3(对象存储,适合存储历史结果)。
  • 可视化工具:Kibana(与Elasticsearch集成)、Grafana(支持多数据源)。

8.2 资源推荐

  • 书籍:《Flink实战》(阿里巴巴Flink团队著,详细介绍Flink的核心概念和实战案例)、《Kafka权威指南》(Kafka核心作者著,详细介绍Kafka的设计与使用)。
  • 文档:Flink官方文档(https://flink.apache.org/docs/)、Kafka官方文档(https://kafka.apache.org/documentation/)。
  • 博客:Jay Kreps的博客(https://engineering.linkedin.com/blog/2014/07/kappa-architecture)(Kappa架构的原始论文)、Flink中文社区博客(https://flink-learning.org/)(包含大量Flink实战文章)。

九、附录:Mermaid流程图源码

9.1 Kappa架构流程图

数据生成

Kafka消息队列

流处理引擎(如Flink)

存储系统(如S3/Elasticsearch)

应用层(如实时 dashboard、推荐系统)

历史数据重新处理

9.2 Lambda架构与Kappa架构对比图

Kappa架构

数据生成

Kafka消息队列

流处理层(Flink/Kafka Streams)

存储系统(S3/Elasticsearch)

应用层

历史数据重新处理

Lambda架构

数据生成

批处理层(Hadoop/Spark SQL)

流处理层(Spark Streaming/Storm)

服务层(HBase/Elasticsearch)

应用层

作者注:本文中的代码示例均经过实际测试,可以直接运行。如果你在实践过程中遇到问题,可以参考Flink和Kafka的官方文档,或加入Flink中文社区(https://flink-learning.org/)寻求帮助。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐