基于微服务架构的大数据数据服务平台搭建:从0到1构建可扩展的数据服务体系

摘要/引言

一、开门见山:单体大数据平台的“崩溃瞬间”

想象一下:你是某电商公司的大数据工程师,负责维护用户行为分析平台。凌晨3点,突然收到报警:“用户行为日志堆积超过100万条,实时推荐系统宕机!” 你急忙登录服务器,发现单体架构的大数据平台已经“瘫痪”——Spark集群因任务过载崩溃,Hadoop HDFS的写入延迟高达10分钟,而要扩容只能停机重启,导致业务中断3小时。

这不是虚构的场景,而是很多企业面临的真实痛点。传统单体大数据平台(如“采集-处理-存储”一体化的架构)在面对高并发、海量数据、快速迭代的需求时,暴露出致命缺陷:

  • 扩展性差:新增数据源或处理逻辑需要修改整个系统,无法动态扩容;
  • 维护困难:单体应用的代码耦合度高,一个模块出错可能导致整个系统崩溃;
  • 响应滞后:实时数据处理延迟高,无法满足推荐、风控等低延迟业务需求;
  • 资源浪费:不同组件(如采集、处理、存储)的资源需求不同,单体架构无法按需分配资源。

二、问题陈述:如何构建“可扩展、高可用、易维护”的数据服务平台?

面对这些问题,我们需要一种既能发挥大数据技术优势,又能解决单体架构瓶颈的解决方案。而微服务架构(Microservices Architecture)正是答案——它将复杂系统拆分为独立、可部署的微服务,通过轻量级通信机制(如REST、消息队列)协同工作,完美匹配大数据平台的“分布式、模块化”需求。

三、核心价值:本文能给你带来什么?

本文将带你从0到1搭建基于微服务架构的大数据数据服务平台,解决以下关键问题:

  • 如何将大数据组件(采集、处理、存储)拆分为微服务?
  • 如何设计微服务之间的通信与协作机制?
  • 如何保证微服务与大数据组件的高可用性?
  • 如何监控和治理复杂的分布式系统?

通过本文,你将掌握:

  • 微服务与大数据的互补设计模式
  • 核心组件(采集、处理、存储、服务层)的实现细节
  • 实时用户行为分析等实际场景的落地经验
  • 避免踩坑的最佳实践

四、文章概述

本文将按照“理论-设计-实现-实践”的逻辑展开:

  1. 基础理论:微服务与大数据的碰撞,为什么需要结合?
  2. 组件设计:数据服务平台的核心组件与架构图;
  3. 搭建准备:技术选型与环境配置;
  4. 分步实现:从微服务基础框架到大数据组件整合的完整步骤;
  5. 案例研究:实时用户行为分析服务的落地实践;
  6. 最佳实践:微服务拆分、事务处理、监控等关键问题的解决技巧;
  7. 未来展望:Serverless、AI等技术对数据服务平台的影响。

一、微服务与大数据的碰撞:为什么需要结合?

1.1 传统大数据平台的“单体病”

传统大数据平台通常采用“单体架构+批处理”模式,例如:

  • 采集层:用Flume采集日志,直接写入HDFS;
  • 处理层:用Spark SQL批量处理HDFS中的数据,存入HBase;
  • 服务层:用Java Web应用直接调用HBase API,提供数据服务。

这种架构的问题在于:

  • 耦合度高:采集、处理、服务层的代码高度耦合,修改一个模块需要重启整个系统;
  • 扩展性差:当数据量激增时,无法单独扩容处理层(如Spark集群),只能整体升级硬件;
  • 实时性差:批处理模式的延迟通常在小时级,无法满足实时推荐、风控等需求;
  • 维护成本高:单体应用的故障排查难度大,比如HDFS的写入延迟可能由Flume的配置错误或Spark的任务积压导致,难以快速定位。

1.2 微服务架构的“治病良方”

微服务架构的核心思想是“拆分与协同”,将系统拆分为独立部署、职责单一的微服务,通过轻量级通信(如REST、Kafka)实现协同。它能解决传统大数据平台的“单体病”:

  • 可扩展性:每个微服务可以独立扩容,例如当数据采集压力大时,增加采集微服务的副本;
  • 高可用性:微服务的故障不会影响整个系统,例如处理微服务崩溃时,采集微服务可以将数据暂存到Kafka,待处理微服务恢复后继续处理;
  • 易维护性:每个微服务的代码量小、职责明确,故障排查和迭代速度快;
  • 技术多样性:不同微服务可以选择适合的技术栈,例如采集微服务用Spring Boot,处理微服务用Flink,存储微服务用HBase。

1.3 微服务与大数据的“互补性”

微服务架构与大数据技术的结合,不是简单的“微服务+大数据”,而是优势互补

  • 大数据技术提供“数据处理能力”:例如Flink的实时处理、Spark的批处理、HBase的高吞吐量存储;
  • 微服务架构提供“系统组织能力”:将大数据组件拆分为微服务,实现动态扩容、独立部署、故障隔离。

举个例子:实时用户行为分析场景中,微服务架构与大数据的结合方式如下:

  • 采集微服务:用Flume采集用户行为日志,发送到Kafka;
  • 处理微服务:用Flink消费Kafka中的日志,实时计算用户兴趣标签;
  • 存储微服务:将用户兴趣标签存入HBase;
  • 服务层微服务:用Spring Boot提供REST API,供推荐系统调用。

这种架构的优势在于:

  • 实时性:Flink的实时处理延迟低至秒级;
  • 扩展性:当用户量激增时,增加Flink的TaskManager副本即可提高处理能力;
  • 可靠性:Kafka的消息持久化保证了数据不丢失,微服务的故障隔离保证了系统不会崩溃。

二、核心组件设计:数据服务平台的“骨架”

基于微服务架构的大数据数据服务平台,核心组件包括数据采集层、数据处理层、数据存储层、数据服务层、监控与治理层。以下是详细设计:

2.1 架构图

+-------------------+     +-------------------+     +-------------------+
|   数据采集层       |     |   数据处理层       |     |   数据存储层       |
| (Flume/Kafka微服务)| ←→ | (Flink/Spark微服务)| ←→ | (HBase/ES微服务)  |
+-------------------+     +-------------------+     +-------------------+
          ↓                      ↓                      ↓
+-------------------+     +-------------------+     +-------------------+
|   消息队列(Kafka)|     |   分布式缓存(Redis)|   |   元数据管理(MySQL)|
+-------------------+     +-------------------+     +-------------------+
          ↓                      ↓                      ↓
+---------------------------------------------------------------+
|                     数据服务层(Spring Cloud Gateway)         |
+---------------------------------------------------------------+
          ↓
+---------------------------------------------------------------+
|                     监控与治理层(Prometheus/Grafana/Istio)   |
+---------------------------------------------------------------+
          ↓
+---------------------------------------------------------------+
|                     业务应用(推荐系统/BI工具)                 |
+---------------------------------------------------------------+

2.2 核心组件说明

2.2.1 数据采集层:“数据入口”的微服务化

职责:从各种数据源(日志、数据库、消息队列)采集数据,发送到消息队列(Kafka)。
设计要点

  • 多源采集:支持日志文件(Taildir)、数据库(CDC,如Debezium)、消息队列(RabbitMQ)等数据源;
  • 微服务封装:将每个采集任务封装为微服务,例如“日志采集微服务”“数据库CDC微服务”;
  • 动态配置:通过Nacos配置中心动态修改采集规则(如Flume的Source、Sink配置),无需重启微服务;
  • 可靠性:用Kafka的消息持久化保证数据不丢失,采集微服务的重试机制处理临时故障。

技术选型:Flume(采集工具)、Kafka(消息队列)、Spring Boot(微服务框架)、Nacos(配置中心)。

2.2.2 数据处理层:“数据加工”的微服务化

职责:从消息队列(Kafka)接收数据,进行清洗、转换、计算(批处理/实时处理),将结果存入存储层。
设计要点

  • 分离批处理与实时处理:批处理用Spark SQL(处理历史数据),实时处理用Flink(处理实时数据);
  • 微服务封装:将每个处理任务封装为微服务,例如“实时用户行为处理微服务”“历史订单统计微服务”;
  • ** Exactly-Once 语义**:用Flink的Checkpoint和Kafka的事务性生产者,保证数据不丢失不重复;
  • 动态扩展:通过K8s动态调整处理微服务的副本数量,应对数据量波动。

技术选型:Flink(实时处理)、Spark(批处理)、Kafka(消息队列)、Spring Boot(微服务框架)、K8s(容器编排)。

2.2.3 数据存储层:“数据仓库”的微服务化

职责:存储处理后的数据,提供高吞吐量、低延迟的CRUD接口。
设计要点

  • 分层存储:根据数据的访问频率,选择不同的存储引擎:
    • 高频访问数据:用HBase(高吞吐量、低延迟);
    • 全文检索数据:用Elasticsearch(快速查询);
    • 历史归档数据:用HDFS(低成本存储);
  • 微服务封装:将每个存储引擎封装为微服务,例如“HBase微服务”“ES微服务”;
  • 统一接口:用REST API或gRPC提供统一的存储接口,隐藏底层存储细节;
  • 高可用性:存储引擎采用集群部署(如HBase的HA集群、ES的分片集群),微服务采用多副本部署。

技术选型:HBase(实时存储)、Elasticsearch(检索存储)、HDFS(归档存储)、Spring Boot(微服务框架)、MyBatis(数据访问)。

2.2.4 数据服务层:“数据出口”的微服务化

职责:整合存储层的微服务,提供统一的、面向业务的API,处理权限控制、缓存、限流等共性问题。
设计要点

  • API网关:用Spring Cloud Gateway作为API网关,实现路由、负载均衡、权限控制;
  • 面向业务:API设计以业务需求为中心,例如“获取用户兴趣标签”“查询订单统计”;
  • 共性能力:整合缓存(Redis)、限流(Sentinel)、权限(OAuth2)等共性能力,避免重复开发;
  • 文档化:用Swagger生成API文档,便于业务团队使用。

技术选型:Spring Cloud Gateway(API网关)、Redis(缓存)、Sentinel(限流)、OAuth2(权限)、Swagger(文档)。

2.2.5 监控与治理层:“系统大脑”的微服务化

职责:监控系统的运行状态,治理微服务与大数据组件的故障,保证系统的高可用性。
设计要点

  • 全链路监控:监控微服务(如Spring Boot的Actuator)、大数据组件(如Kafka的JMX、Flink的Metrics)的 metrics;
  • 可视化展示:用Grafana创建Dashboard,展示吞吐量、延迟、错误率等关键指标;
  • 智能报警:用Prometheus的Alertmanager设置报警规则,当指标超过阈值时发送报警(邮件、钉钉);
  • 服务治理:用Istio作为服务网格,实现熔断、降级、链路追踪(Jaeger)。

技术选型:Prometheus(监控)、Grafana(可视化)、Istio(服务网格)、Jaeger(链路追踪)、Alertmanager(报警)。

三、搭建前的准备:先决条件与技术选型

3.1 先决条件

3.1.1 知识储备
  • 微服务:Spring Cloud Alibaba(Nacos、Sentinel、Seata)、Docker、K8s;
  • 大数据:Flink、Spark、Kafka、HBase、HDFS;
  • 其他:REST API、JSON、SQL。
3.1.2 工具与环境
  • 操作系统:Linux(CentOS 7/8)或 macOS;
  • 容器:Docker(版本≥20.10)、Docker Compose(版本≥1.29);
  • 集群:K8s集群(可通过Minikube或Kind搭建本地集群);
  • 大数据组件:Hadoop集群(版本≥3.3)、Kafka集群(版本≥2.8)、Flink集群(版本≥1.15)、HBase集群(版本≥2.4);
  • 开发工具:IntelliJ IDEA、Maven、Git。

3.2 技术选型说明

组件 技术选型 选型理由
微服务框架 Spring Cloud Alibaba 国内主流,整合了Nacos(注册/配置)、Sentinel(限流)、Seata(分布式事务)
消息队列 Kafka 高吞吐量、低延迟,适合大数据场景
实时处理 Flink 支持Exactly-Once语义,实时处理延迟低
批处理 Spark 成熟的批处理框架,生态丰富
实时存储 HBase 高吞吐量、低延迟,适合键值对存储
检索存储 Elasticsearch 快速全文检索,适合日志、商品等数据
归档存储 HDFS 低成本、高可靠,适合历史数据存储
API网关 Spring Cloud Gateway 轻量级、易整合,支持路由、负载均衡
缓存 Redis 高吞吐量、低延迟,适合缓存常用数据
监控 Prometheus + Grafana 开源生态丰富,支持多组件监控
服务网格 Istio 支持熔断、降级、链路追踪,无需修改代码

四、分步搭建:从组件到平台的实现

4.1 第一步:搭建微服务基础框架(Spring Cloud Alibaba)

目标:搭建包含注册中心(Nacos)、配置中心(Nacos)、限流(Sentinel)、分布式事务(Seata)的微服务基础框架。

4.1.1 部署Nacos(注册/配置中心)
  1. 用Docker部署Nacos:
    docker run -d --name nacos -p 8848:8848 -e MODE=standalone nacos/nacos-server:v2.1.1
    
  2. 访问Nacos控制台:http://localhost:8848/nacos(用户名/密码:nacos/nacos)。
4.1.2 创建微服务父项目

用Maven创建父项目data-service-platform, pom.xml 配置:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.7.10</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<groupId>com.example</groupId>
<artifactId>data-service-platform</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>pom</packaging>
<name>data-service-platform</name>
<description>基于微服务架构的大数据数据服务平台</description>

<modules>
    <module>data-collection-service</module> <!-- 数据采集微服务 -->
    <module>data-processing-service</module> <!-- 数据处理微服务 -->
    <module>data-storage-service</module> <!-- 数据存储微服务 -->
    <module>data-api-gateway</module> <!-- 数据服务层(API网关) -->
</modules>

<properties>
    <java.version>1.8</java.version>
    <spring-cloud-alibaba.version>2021.0.5.0</spring-cloud-alibaba.version>
</properties>

<dependencyManagement>
    <dependencies>
        <!-- Spring Cloud Alibaba -->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-alibaba-dependencies</artifactId>
            <version>${spring-cloud-alibaba.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
4.1.3 创建数据采集微服务(data-collection-service)
  1. 新建子模块data-collection-service, pom.xml 引入依赖:
    <dependencies>
        <!-- Spring Boot Web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- Nacos Discovery -->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>
        <!-- Nacos Config -->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
        </dependency>
        <!-- Sentinel -->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
        </dependency>
        <!-- Flume Client -->
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.11.0</version>
        </dependency>
    </dependencies>
    
  2. 配置application.yml
    spring:
      application:
        name: data-collection-service
      cloud:
        nacos:
          discovery:
            server-addr: localhost:8848 # Nacos注册中心地址
          config:
            server-addr: localhost:8848 # Nacos配置中心地址
            file-extension: yaml # 配置文件格式
        sentinel:
          transport:
            dashboard: localhost:8080 # Sentinel控制台地址
    server:
      port: 8001 # 微服务端口
    
  3. 编写启动类DataCollectionServiceApplication.java
    @SpringBootApplication
    @EnableDiscoveryClient // 开启Nacos服务发现
    public class DataCollectionServiceApplication {
        public static void main(String[] args) {
            SpringApplication.run(DataCollectionServiceApplication.class, args);
        }
    }
    
  4. 编写Flume管理接口FlumeController.java
    @RestController
    @RequestMapping("/api/flume")
    public class FlumeController {
        @Value("${flume.agent.name}")
        private String agentName;
        @Value("${flume.conf.path}")
        private String confPath;
    
        /**
         * 启动Flume Agent
         */
        @PostMapping("/start")
        public String startAgent() {
            String command = String.format("flume-ng agent -n %s -c %s -f %s", agentName, confPath, confPath + "/agent.conf");
            try {
                Process process = Runtime.getRuntime().exec(command);
                int exitCode = process.waitFor();
                if (exitCode == 0) {
                    return "Flume Agent启动成功";
                } else {
                    return "Flume Agent启动失败, exit code: " + exitCode;
                }
            } catch (Exception e) {
                return "Flume Agent启动失败:" + e.getMessage();
            }
        }
    }
    
4.1.4 测试微服务基础框架
  1. 启动Nacos、Sentinel(可通过Docker部署);
  2. 启动data-collection-service
  3. 访问Nacos控制台,查看data-collection-service是否注册成功;
  4. 访问http://localhost:8001/api/flume/start,测试Flume Agent启动接口。

4.2 第二步:实现数据采集微服务(Flume + Kafka)

目标:用Flume采集用户行为日志,发送到Kafka。

4.2.1 配置Flume Agent

data-collection-servicesrc/main/resources目录下创建flume文件夹,编写agent.conf

# 定义Agent名称
agent1.sources = source1
agent1.channels = channel1
agent1.sinks = sink1

# 配置Source(Taildir Source,监控日志文件)
agent1.sources.source1.type = TAILDIR
agent1.sources.source1.filegroups = f1
agent1.sources.source1.filegroups.f1 = /logs/user-behavior.log
agent1.sources.source1.positionFile = /logs/taildir_position.json
agent1.sources.source1.fileHeader = true

# 配置Channel(Memory Channel,内存缓存)
agent1.channels.channel1.type = MEMORY
agent1.channels.channel1.capacity = 10000
agent1.channels.channel1.transactionCapacity = 1000

# 配置Sink(Kafka Sink,发送到Kafka)
agent1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.sink1.kafka.bootstrap.servers = localhost:9092
agent1.sinks.sink1.kafka.topic = user-behavior
agent1.sinks.sink1.kafka.producer.acks = 1
agent1.sinks.sink1.kafka.producer.batch.size = 16384
agent1.sinks.sink1.kafka.producer.linger.ms = 1

# 绑定Source、Channel、Sink
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
4.2.2 配置Nacos动态参数

在Nacos控制台创建data-collection-service.yaml配置文件,添加以下内容:

flume:
  agent:
    name: agent1 # Flume Agent名称
  conf:
    path: /opt/data-collection-service/flume # Flume配置文件路径(容器内路径)
4.2.3 测试数据采集
  1. 启动Kafka集群(可通过Docker部署);
  2. 创建Kafka topic user-behavior
    kafka-topics.sh --create --topic user-behavior --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
    
  3. /logs/user-behavior.log写入测试数据:
    echo '{"user_id": "123", "action": "click", "product_id": "456", "timestamp": "2023-10-01 10:00:00"}' >> /logs/user-behavior.log
    
  4. 调用http://localhost:8001/api/flume/start启动Flume Agent;
  5. 用Kafka消费者查看数据:
    kafka-console-consumer.sh --topic user-behavior --bootstrap-server localhost:9092 --from-beginning
    
    若能看到测试数据,说明数据采集成功。

4.3 第三步:实现数据处理微服务(Flink + Kafka)

目标:用Flink消费Kafka中的用户行为日志,实时计算用户兴趣标签(点击次数最多的产品类别),存入HBase。

4.3.1 配置Flink集群

用Docker部署Flink集群(JobManager + TaskManager):

# 启动JobManager
docker run -d --name flink-jobmanager -p 8081:8081 -e FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" flink:1.15.3 jobmanager

# 启动TaskManager(可启动多个)
docker run -d --name flink-taskmanager -e FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" flink:1.15.3 taskmanager
4.3.2 创建数据处理微服务(data-processing-service)
  1. 新建子模块data-processing-service, pom.xml 引入依赖:
    <dependencies>
        <!-- Spring Boot Web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- Nacos Discovery -->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>
        <!-- Flink -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>1.15.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>1.15.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-hbase-2.2</artifactId>
            <version>1.15.3</version>
        </dependency>
    </dependencies>
    
  2. 配置application.yml
    spring:
      application:
        name: data-processing-service
      cloud:
        nacos:
          discovery:
            server-addr: localhost:8848
    server:
      port: 8002
    flink:
      jobmanager:
        address: localhost:8081 # Flink JobManager地址
    kafka:
      bootstrap-servers: localhost:9092
      topic: user-behavior
    hbase:
      zookeeper:
        quorum: localhost:2181 # HBase ZooKeeper地址
      table: user_tags # HBase表名
    
4.3.3 编写Flink处理逻辑

创建UserBehaviorProcessor.java,实现实时计算用户兴趣标签:

public class UserBehaviorProcessor {
    public static void main(String[] args) throws Exception {
        // 1. 创建StreamExecutionEnvironment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3); // 设置并行度
        env.enableCheckpointing(5000); // 开启Checkpoint,间隔5秒
        env.getCheckpointConfig().setCheckpointStorage("hdfs://localhost:9000/flink/checkpoints"); // Checkpoint存储路径

        // 2. 读取Kafka数据
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("localhost:9092")
                .setTopics("user-behavior")
                .setGroupId("user-behavior-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();
        DataStream<String> kafkaStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source");

        // 3. 解析JSON数据
        DataStream<UserBehavior> userBehaviorStream = kafkaStream
                .map(json -> JSON.parseObject(json, UserBehavior.class))
                .filter(behavior -> "click".equals(behavior.getAction())) // 过滤点击行为
                .assignTimestampsAndWatermarks(WatermarkStrategy.<UserBehavior>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                        .withTimestampAssigner((event, timestamp) -> event.getTimestamp().getTime()));

        // 4. 计算用户兴趣标签(5分钟窗口内点击次数最多的产品类别)
        DataStream<UserTag> userTagStream = userBehaviorStream
                .keyBy(UserBehavior::getUserId) // 按用户ID分组
                .window(TumblingEventTimeWindows.of(Time.minutes(5))) // 5分钟滚动窗口
                .aggregate(new UserTagAggregateFunction(), new UserTagWindowFunction());

        // 5. 将结果存入HBase
        HBaseSink.Builder<UserTag> hbaseSinkBuilder = HBaseSink.builder();
        hbaseSinkBuilder.setConfiguration(HBaseConfiguration.create());
        hbaseSinkBuilder.setTable("user_tags");
        hbaseSinkBuilder.setSerializationSchema(new UserTagSerializationSchema());
        userTagStream.addSink(hbaseSinkBuilder.build());

        // 6. 执行Job
        env.execute("User Behavior Processor");
    }

    // 用户行为实体类
    @Data
    public static class UserBehavior {
        private String userId;
        private String action;
        private String productId;
        private Date timestamp;
    }

    // 用户标签实体类
    @Data
    public static class UserTag {
        private String userId;
        private String category;
        private Long count;
    }

    // 聚合函数(计算每个用户每个类别的点击次数)
    public static class UserTagAggregateFunction implements AggregateFunction<UserBehavior, Map<String, Long>, Map<String, Long>> {
        @Override
        public Map<String, Long> createAccumulator() {
            return new HashMap<>();
        }

        @Override
        public Map<String, Long> add(UserBehavior value, Map<String, Long> accumulator) {
            String category = getProductCategory(value.getProductId()); // 假设从产品库获取类别
            accumulator.put(category, accumulator.getOrDefault(category, 0L) + 1);
            return accumulator;
        }

        @Override
        public Map<String, Long> getResult(Map<String, Long> accumulator) {
            return accumulator;
        }

        @Override
        public Map<String, Long> merge(Map<String, Long> a, Map<String, Long> b) {
            a.putAll(b);
            return a;
        }
    }

    // 窗口函数(获取点击次数最多的类别)
    public static class UserTagWindowFunction implements WindowFunction<Map<String, Long>, UserTag, String, TimeWindow> {
        @Override
        public void apply(String userId, TimeWindow window, Iterable<Map<String, Long>> input, Collector<UserTag> out) {
            Map<String, Long> categoryCount = input.iterator().next();
            String topCategory = categoryCount.entrySet().stream()
                    .max(Map.Entry.comparingByValue())
                    .map(Map.Entry::getKey)
                    .orElse("unknown");
            Long topCount = categoryCount.get(topCategory);
            UserTag userTag = new UserTag();
            userTag.setUserId(userId);
            userTag.setCategory(topCategory);
            userTag.setCount(topCount);
            out.collect(userTag);
        }
    }

    // HBase序列化 schema(将UserTag转换为HBase的Put操作)
    public static class UserTagSerializationSchema implements HBaseSerializationSchema<UserTag> {
        @Override
        public Put serialize(UserTag element, byte[] rowKey) {
            Put put = new Put(Bytes.toBytes(element.getUserId()));
            put.addColumn(Bytes.toBytes("tags"), Bytes.toBytes(element.getCategory()), Bytes.toBytes(element.getCount().toString()));
            return put;
        }
    }

    // 模拟从产品库获取类别(实际应调用产品服务)
    private static String getProductCategory(String productId) {
        return productId.startsWith("1") ? "electronics" : "clothing";
    }
}
4.3.4 编写微服务接口提交Flink Job

创建FlinkJobController.java,提供提交Flink Job的接口:

@RestController
@RequestMapping("/api/flink")
public class FlinkJobController {
    @Value("${flink.jobmanager.address}")
    private String jobManagerAddress;
    @Value("${flink.job.jar.path}")
    private String jobJarPath; // Flink Job的Jar包路径

    /**
     * 提交Flink Job
     */
    @PostMapping("/submit")
    public String submitJob() {
        try {
            // 创建Flink Rest Client
            RestClient restClient = new RestClient(
                    new RestClientConfiguration.Builder()
                            .setHost(jobManagerAddress.split(":")[0])
                            .setPort(Integer.parseInt(jobManagerAddress.split(":")[1]))
                            .build()
            );

            // 提交Job
            JobSubmitRequestBody requestBody = JobSubmitRequestBody.builder()
                    .setJarPath(jobJarPath)
                    .setEntryClass("com.example.data.processing.UserBehaviorProcessor")
                    .build();
            CompletableFuture<JobID> jobIdFuture = restClient.submitJob(requestBody);
            JobID jobId = jobIdFuture.get();

            return "Flink Job提交成功, Job ID: " + jobId.toString();
        } catch (Exception e) {
            return "Flink Job提交失败:" + e.getMessage();
        }
    }
}
4.3.5 测试数据处理
  1. 启动HBase集群(可通过Docker部署);
  2. 创建HBase表user_tags
    hbase shell
    create 'user_tags', 'tags'
    
  3. data-processing-service打包成Jar包(data-processing-service-0.0.1-SNAPSHOT.jar),上传到Flink JobManager的/opt/flink/jobs目录;
  4. 调用http://localhost:8002/api/flink/submit提交Flink Job;
  5. 访问Flink控制台(http://localhost:8081),查看Job是否运行成功;
  6. /logs/user-behavior.log写入更多测试数据,用HBase shell查看结果:
    hbase shell
    get 'user_tags', '123'
    
    若能看到用户123的兴趣标签(如electronics),说明数据处理成功。

4.4 第四步:实现数据存储微服务(HBase)

目标:封装HBase的CRUD接口,提供REST API供数据服务层调用。

4.4.1 创建数据存储微服务(data-storage-service)
  1. 新建子模块data-storage-service, pom.xml 引入依赖:
    <dependencies>
        <!-- Spring Boot Web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- Nacos Discovery -->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>
        <!-- HBase -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>2.4.17</version>
        </dependency>
        <!-- MyBatis(可选,用于元数据管理) -->
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.2.2</version>
        </dependency>
    </dependencies>
    
  2. 配置application.yml
    spring:
      application:
        name: data-storage-service
      cloud:
        nacos:
          discovery:
            server-addr: localhost:8848
    server:
      port: 8003
    hbase:
      zookeeper:
        quorum: localhost:2181
      table:
        user_tags: user_tags
    
4.4.2 编写HBase客户端

创建HBaseClient.java,封装HBase的CRUD操作:

@Component
public class HBaseClient {
    private Connection connection;
    private Admin admin;

    @Value("${hbase.zookeeper.quorum}")
    private String zookeeperQuorum;

    @PostConstruct
    public void init() throws IOException {
        Configuration config = HBaseConfiguration.create();
        config.set("hbase.zookeeper.quorum", zookeeperQuorum);
        connection = ConnectionFactory.createConnection(config);
        admin = connection.getAdmin();
    }

    @PreDestroy
    public void close() throws IOException {
        if (admin != null) {
            admin.close();
        }
        if (connection != null) {
            connection.close();
        }
    }

    /**
     * 插入数据
     */
    public void put(String tableName, String rowKey, String family, String qualifier, String value) throws IOException {
        Table table = connection.getTable(TableName.valueOf(tableName));
        Put put = new Put(Bytes.toBytes(rowKey));
        put.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier), Bytes.toBytes(value));
        table.put(put);
        table.close();
    }

    /**
     * 获取数据
     */
    public String get(String tableName, String rowKey, String family, String qualifier) throws IOException {
        Table table = connection.getTable(TableName.valueOf(tableName));
        Get get = new Get(Bytes.toBytes(rowKey));
        get.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
        Result result = table.get(get);
        table.close();
        byte[] value = result.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier));
        return value != null ? Bytes.toString(value) : null;
    }

    /**
     * 扫描数据(按行键前缀)
     */
    public List<Map<String, String>> scan(String tableName, String rowPrefix) throws IOException {
        Table table = connection.getTable(TableName.valueOf(tableName));
        Scan scan = new Scan();
        scan.setRowPrefixFilter(Bytes.toBytes(rowPrefix));
        ResultScanner scanner = table.getScanner(scan);
        List<Map<String, String>> results = new ArrayList<>();
        for (Result result : scanner) {
            Map<String, String> row = new HashMap<>();
            row.put("rowKey", Bytes.toString(result.getRow()));
            for (Cell cell : result.listCells()) {
                String family = Bytes.toString(CellUtil.cloneFamily(cell));
                String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
                String value = Bytes.toString(CellUtil.cloneValue(cell));
                row.put(family + ":" + qualifier, value);
            }
            results.add(row);
        }
        scanner.close();
        table.close();
        return results;
    }
}
4.4.3 编写REST API

创建UserTagController.java,提供用户标签的CRUD接口:

@RestController
@RequestMapping("/api/hbase/user-tags")
public class UserTagController {
    @Autowired
    private HBaseClient hbaseClient;
    @Value("${hbase.table.user_tags}")
    private String tableName;

    /**
     * 获取用户标签
     */
    @GetMapping("/{userId}")
    public ResponseEntity<Map<String, String>> getUserTags(@PathVariable String userId) {
        try {
            List<Map<String, String>> results = hbaseClient.scan(tableName, userId);
            if (results.isEmpty()) {
                return ResponseEntity.notFound().build();
            }
            return ResponseEntity.ok(results.get(0));
        } catch (IOException e) {
            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(null);
        }
    }

    /**
     * 添加用户标签
     */
    @PostMapping("/{userId}")
    public ResponseEntity<String> addUserTag(@PathVariable String userId, @RequestParam String category, @RequestParam String count) {
        try {
            hbaseClient.put(tableName, userId, "tags", category, count);
            return ResponseEntity.ok("用户标签添加成功");
        } catch (IOException e) {
            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("用户标签添加失败:" + e.getMessage());
        }
    }
}
4.4.4 测试数据存储微服务
  1. 启动data-storage-service
  2. 调用http://localhost:8003/api/hbase/user-tags/123,查看用户123的标签;
  3. 调用http://localhost:8003/api/hbase/user-tags/123?category=electronics&count=10,添加用户标签;
  4. 再次调用获取接口,确认标签已添加。

4.5 第五步:实现数据服务层(API网关)

目标:用Spring Cloud Gateway整合数据存储微服务,提供统一的、面向业务的API,处理权限控制、缓存、限流等共性问题。

4.5.1 创建数据服务层微服务(data-api-gateway)
  1. 新建子模块data-api-gateway, pom.xml 引入依赖:
    <dependencies>
        <!-- Spring Cloud Gateway -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-gateway</artifactId>
        </dependency>
        <!-- Nacos Discovery -->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>
        <!-- Sentinel -->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
        </dependency>
        <!-- Redis(缓存) -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <!-- OAuth2(权限) -->
        <dependency>
            <groupId>org.springframework.security</groupId>
            <artifactId>spring-security-oauth2-resource-server</artifactId>
        </dependency>
    </dependencies>
    
  2. 配置application.yml
    spring:
      application:
        name: data-api-gateway
      cloud:
        nacos:
          discovery:
            server-addr: localhost:8848
        gateway:
          routes:
            # 数据存储微服务路由(用户标签)
            - id: data-storage-service
              uri: lb://data-storage-service # 负载均衡到data-storage-service
              predicates:
                - Path=/api/user-tags/**
              filters:
                - RewritePath=/api/user-tags/(?<segment>.*), /api/hbase/user-tags/$\{segment} # 重写路径
                - Sentinel=api/user-tags/**, default # 限流
        sentinel:
          transport:
            dashboard: localhost:8080
      redis:
        host: localhost
        port: 6379
    server:
      port: 8000 # API网关端口
    # OAuth2配置(假设用Keycloak
    
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐