基于微服务架构的大数据数据服务平台搭建
基础理论:微服务与大数据的碰撞,为什么需要结合?组件设计:数据服务平台的核心组件与架构图;搭建准备:技术选型与环境配置;分步实现:从微服务基础框架到大数据组件整合的完整步骤;案例研究:实时用户行为分析服务的落地实践;最佳实践:微服务拆分、事务处理、监控等关键问题的解决技巧;未来展望:Serverless、AI等技术对数据服务平台的影响。
基于微服务架构的大数据数据服务平台搭建:从0到1构建可扩展的数据服务体系
摘要/引言
一、开门见山:单体大数据平台的“崩溃瞬间”
想象一下:你是某电商公司的大数据工程师,负责维护用户行为分析平台。凌晨3点,突然收到报警:“用户行为日志堆积超过100万条,实时推荐系统宕机!” 你急忙登录服务器,发现单体架构的大数据平台已经“瘫痪”——Spark集群因任务过载崩溃,Hadoop HDFS的写入延迟高达10分钟,而要扩容只能停机重启,导致业务中断3小时。
这不是虚构的场景,而是很多企业面临的真实痛点。传统单体大数据平台(如“采集-处理-存储”一体化的架构)在面对高并发、海量数据、快速迭代的需求时,暴露出致命缺陷:
- 扩展性差:新增数据源或处理逻辑需要修改整个系统,无法动态扩容;
- 维护困难:单体应用的代码耦合度高,一个模块出错可能导致整个系统崩溃;
- 响应滞后:实时数据处理延迟高,无法满足推荐、风控等低延迟业务需求;
- 资源浪费:不同组件(如采集、处理、存储)的资源需求不同,单体架构无法按需分配资源。
二、问题陈述:如何构建“可扩展、高可用、易维护”的数据服务平台?
面对这些问题,我们需要一种既能发挥大数据技术优势,又能解决单体架构瓶颈的解决方案。而微服务架构(Microservices Architecture)正是答案——它将复杂系统拆分为独立、可部署的微服务,通过轻量级通信机制(如REST、消息队列)协同工作,完美匹配大数据平台的“分布式、模块化”需求。
三、核心价值:本文能给你带来什么?
本文将带你从0到1搭建基于微服务架构的大数据数据服务平台,解决以下关键问题:
- 如何将大数据组件(采集、处理、存储)拆分为微服务?
- 如何设计微服务之间的通信与协作机制?
- 如何保证微服务与大数据组件的高可用性?
- 如何监控和治理复杂的分布式系统?
通过本文,你将掌握:
- 微服务与大数据的互补设计模式;
- 核心组件(采集、处理、存储、服务层)的实现细节;
- 实时用户行为分析等实际场景的落地经验;
- 避免踩坑的最佳实践。
四、文章概述
本文将按照“理论-设计-实现-实践”的逻辑展开:
- 基础理论:微服务与大数据的碰撞,为什么需要结合?
- 组件设计:数据服务平台的核心组件与架构图;
- 搭建准备:技术选型与环境配置;
- 分步实现:从微服务基础框架到大数据组件整合的完整步骤;
- 案例研究:实时用户行为分析服务的落地实践;
- 最佳实践:微服务拆分、事务处理、监控等关键问题的解决技巧;
- 未来展望:Serverless、AI等技术对数据服务平台的影响。
一、微服务与大数据的碰撞:为什么需要结合?
1.1 传统大数据平台的“单体病”
传统大数据平台通常采用“单体架构+批处理”模式,例如:
- 采集层:用Flume采集日志,直接写入HDFS;
- 处理层:用Spark SQL批量处理HDFS中的数据,存入HBase;
- 服务层:用Java Web应用直接调用HBase API,提供数据服务。
这种架构的问题在于:
- 耦合度高:采集、处理、服务层的代码高度耦合,修改一个模块需要重启整个系统;
- 扩展性差:当数据量激增时,无法单独扩容处理层(如Spark集群),只能整体升级硬件;
- 实时性差:批处理模式的延迟通常在小时级,无法满足实时推荐、风控等需求;
- 维护成本高:单体应用的故障排查难度大,比如HDFS的写入延迟可能由Flume的配置错误或Spark的任务积压导致,难以快速定位。
1.2 微服务架构的“治病良方”
微服务架构的核心思想是“拆分与协同”,将系统拆分为独立部署、职责单一的微服务,通过轻量级通信(如REST、Kafka)实现协同。它能解决传统大数据平台的“单体病”:
- 可扩展性:每个微服务可以独立扩容,例如当数据采集压力大时,增加采集微服务的副本;
- 高可用性:微服务的故障不会影响整个系统,例如处理微服务崩溃时,采集微服务可以将数据暂存到Kafka,待处理微服务恢复后继续处理;
- 易维护性:每个微服务的代码量小、职责明确,故障排查和迭代速度快;
- 技术多样性:不同微服务可以选择适合的技术栈,例如采集微服务用Spring Boot,处理微服务用Flink,存储微服务用HBase。
1.3 微服务与大数据的“互补性”
微服务架构与大数据技术的结合,不是简单的“微服务+大数据”,而是优势互补:
- 大数据技术提供“数据处理能力”:例如Flink的实时处理、Spark的批处理、HBase的高吞吐量存储;
- 微服务架构提供“系统组织能力”:将大数据组件拆分为微服务,实现动态扩容、独立部署、故障隔离。
举个例子:实时用户行为分析场景中,微服务架构与大数据的结合方式如下:
- 采集微服务:用Flume采集用户行为日志,发送到Kafka;
- 处理微服务:用Flink消费Kafka中的日志,实时计算用户兴趣标签;
- 存储微服务:将用户兴趣标签存入HBase;
- 服务层微服务:用Spring Boot提供REST API,供推荐系统调用。
这种架构的优势在于:
- 实时性:Flink的实时处理延迟低至秒级;
- 扩展性:当用户量激增时,增加Flink的TaskManager副本即可提高处理能力;
- 可靠性:Kafka的消息持久化保证了数据不丢失,微服务的故障隔离保证了系统不会崩溃。
二、核心组件设计:数据服务平台的“骨架”
基于微服务架构的大数据数据服务平台,核心组件包括数据采集层、数据处理层、数据存储层、数据服务层、监控与治理层。以下是详细设计:
2.1 架构图
+-------------------+ +-------------------+ +-------------------+
| 数据采集层 | | 数据处理层 | | 数据存储层 |
| (Flume/Kafka微服务)| ←→ | (Flink/Spark微服务)| ←→ | (HBase/ES微服务) |
+-------------------+ +-------------------+ +-------------------+
↓ ↓ ↓
+-------------------+ +-------------------+ +-------------------+
| 消息队列(Kafka)| | 分布式缓存(Redis)| | 元数据管理(MySQL)|
+-------------------+ +-------------------+ +-------------------+
↓ ↓ ↓
+---------------------------------------------------------------+
| 数据服务层(Spring Cloud Gateway) |
+---------------------------------------------------------------+
↓
+---------------------------------------------------------------+
| 监控与治理层(Prometheus/Grafana/Istio) |
+---------------------------------------------------------------+
↓
+---------------------------------------------------------------+
| 业务应用(推荐系统/BI工具) |
+---------------------------------------------------------------+
2.2 核心组件说明
2.2.1 数据采集层:“数据入口”的微服务化
职责:从各种数据源(日志、数据库、消息队列)采集数据,发送到消息队列(Kafka)。
设计要点:
- 多源采集:支持日志文件(Taildir)、数据库(CDC,如Debezium)、消息队列(RabbitMQ)等数据源;
- 微服务封装:将每个采集任务封装为微服务,例如“日志采集微服务”“数据库CDC微服务”;
- 动态配置:通过Nacos配置中心动态修改采集规则(如Flume的Source、Sink配置),无需重启微服务;
- 可靠性:用Kafka的消息持久化保证数据不丢失,采集微服务的重试机制处理临时故障。
技术选型:Flume(采集工具)、Kafka(消息队列)、Spring Boot(微服务框架)、Nacos(配置中心)。
2.2.2 数据处理层:“数据加工”的微服务化
职责:从消息队列(Kafka)接收数据,进行清洗、转换、计算(批处理/实时处理),将结果存入存储层。
设计要点:
- 分离批处理与实时处理:批处理用Spark SQL(处理历史数据),实时处理用Flink(处理实时数据);
- 微服务封装:将每个处理任务封装为微服务,例如“实时用户行为处理微服务”“历史订单统计微服务”;
- ** Exactly-Once 语义**:用Flink的Checkpoint和Kafka的事务性生产者,保证数据不丢失不重复;
- 动态扩展:通过K8s动态调整处理微服务的副本数量,应对数据量波动。
技术选型:Flink(实时处理)、Spark(批处理)、Kafka(消息队列)、Spring Boot(微服务框架)、K8s(容器编排)。
2.2.3 数据存储层:“数据仓库”的微服务化
职责:存储处理后的数据,提供高吞吐量、低延迟的CRUD接口。
设计要点:
- 分层存储:根据数据的访问频率,选择不同的存储引擎:
- 高频访问数据:用HBase(高吞吐量、低延迟);
- 全文检索数据:用Elasticsearch(快速查询);
- 历史归档数据:用HDFS(低成本存储);
- 微服务封装:将每个存储引擎封装为微服务,例如“HBase微服务”“ES微服务”;
- 统一接口:用REST API或gRPC提供统一的存储接口,隐藏底层存储细节;
- 高可用性:存储引擎采用集群部署(如HBase的HA集群、ES的分片集群),微服务采用多副本部署。
技术选型:HBase(实时存储)、Elasticsearch(检索存储)、HDFS(归档存储)、Spring Boot(微服务框架)、MyBatis(数据访问)。
2.2.4 数据服务层:“数据出口”的微服务化
职责:整合存储层的微服务,提供统一的、面向业务的API,处理权限控制、缓存、限流等共性问题。
设计要点:
- API网关:用Spring Cloud Gateway作为API网关,实现路由、负载均衡、权限控制;
- 面向业务:API设计以业务需求为中心,例如“获取用户兴趣标签”“查询订单统计”;
- 共性能力:整合缓存(Redis)、限流(Sentinel)、权限(OAuth2)等共性能力,避免重复开发;
- 文档化:用Swagger生成API文档,便于业务团队使用。
技术选型:Spring Cloud Gateway(API网关)、Redis(缓存)、Sentinel(限流)、OAuth2(权限)、Swagger(文档)。
2.2.5 监控与治理层:“系统大脑”的微服务化
职责:监控系统的运行状态,治理微服务与大数据组件的故障,保证系统的高可用性。
设计要点:
- 全链路监控:监控微服务(如Spring Boot的Actuator)、大数据组件(如Kafka的JMX、Flink的Metrics)的 metrics;
- 可视化展示:用Grafana创建Dashboard,展示吞吐量、延迟、错误率等关键指标;
- 智能报警:用Prometheus的Alertmanager设置报警规则,当指标超过阈值时发送报警(邮件、钉钉);
- 服务治理:用Istio作为服务网格,实现熔断、降级、链路追踪(Jaeger)。
技术选型:Prometheus(监控)、Grafana(可视化)、Istio(服务网格)、Jaeger(链路追踪)、Alertmanager(报警)。
三、搭建前的准备:先决条件与技术选型
3.1 先决条件
3.1.1 知识储备
- 微服务:Spring Cloud Alibaba(Nacos、Sentinel、Seata)、Docker、K8s;
- 大数据:Flink、Spark、Kafka、HBase、HDFS;
- 其他:REST API、JSON、SQL。
3.1.2 工具与环境
- 操作系统:Linux(CentOS 7/8)或 macOS;
- 容器:Docker(版本≥20.10)、Docker Compose(版本≥1.29);
- 集群:K8s集群(可通过Minikube或Kind搭建本地集群);
- 大数据组件:Hadoop集群(版本≥3.3)、Kafka集群(版本≥2.8)、Flink集群(版本≥1.15)、HBase集群(版本≥2.4);
- 开发工具:IntelliJ IDEA、Maven、Git。
3.2 技术选型说明
组件 | 技术选型 | 选型理由 |
---|---|---|
微服务框架 | Spring Cloud Alibaba | 国内主流,整合了Nacos(注册/配置)、Sentinel(限流)、Seata(分布式事务) |
消息队列 | Kafka | 高吞吐量、低延迟,适合大数据场景 |
实时处理 | Flink | 支持Exactly-Once语义,实时处理延迟低 |
批处理 | Spark | 成熟的批处理框架,生态丰富 |
实时存储 | HBase | 高吞吐量、低延迟,适合键值对存储 |
检索存储 | Elasticsearch | 快速全文检索,适合日志、商品等数据 |
归档存储 | HDFS | 低成本、高可靠,适合历史数据存储 |
API网关 | Spring Cloud Gateway | 轻量级、易整合,支持路由、负载均衡 |
缓存 | Redis | 高吞吐量、低延迟,适合缓存常用数据 |
监控 | Prometheus + Grafana | 开源生态丰富,支持多组件监控 |
服务网格 | Istio | 支持熔断、降级、链路追踪,无需修改代码 |
四、分步搭建:从组件到平台的实现
4.1 第一步:搭建微服务基础框架(Spring Cloud Alibaba)
目标:搭建包含注册中心(Nacos)、配置中心(Nacos)、限流(Sentinel)、分布式事务(Seata)的微服务基础框架。
4.1.1 部署Nacos(注册/配置中心)
- 用Docker部署Nacos:
docker run -d --name nacos -p 8848:8848 -e MODE=standalone nacos/nacos-server:v2.1.1
- 访问Nacos控制台:
http://localhost:8848/nacos
(用户名/密码:nacos/nacos)。
4.1.2 创建微服务父项目
用Maven创建父项目data-service-platform
, pom.xml 配置:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.10</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>data-service-platform</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>pom</packaging>
<name>data-service-platform</name>
<description>基于微服务架构的大数据数据服务平台</description>
<modules>
<module>data-collection-service</module> <!-- 数据采集微服务 -->
<module>data-processing-service</module> <!-- 数据处理微服务 -->
<module>data-storage-service</module> <!-- 数据存储微服务 -->
<module>data-api-gateway</module> <!-- 数据服务层(API网关) -->
</modules>
<properties>
<java.version>1.8</java.version>
<spring-cloud-alibaba.version>2021.0.5.0</spring-cloud-alibaba.version>
</properties>
<dependencyManagement>
<dependencies>
<!-- Spring Cloud Alibaba -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>${spring-cloud-alibaba.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
4.1.3 创建数据采集微服务(data-collection-service)
- 新建子模块
data-collection-service
, pom.xml 引入依赖:<dependencies> <!-- Spring Boot Web --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- Nacos Discovery --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> </dependency> <!-- Nacos Config --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId> </dependency> <!-- Sentinel --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId> </dependency> <!-- Flume Client --> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.11.0</version> </dependency> </dependencies>
- 配置
application.yml
:spring: application: name: data-collection-service cloud: nacos: discovery: server-addr: localhost:8848 # Nacos注册中心地址 config: server-addr: localhost:8848 # Nacos配置中心地址 file-extension: yaml # 配置文件格式 sentinel: transport: dashboard: localhost:8080 # Sentinel控制台地址 server: port: 8001 # 微服务端口
- 编写启动类
DataCollectionServiceApplication.java
:@SpringBootApplication @EnableDiscoveryClient // 开启Nacos服务发现 public class DataCollectionServiceApplication { public static void main(String[] args) { SpringApplication.run(DataCollectionServiceApplication.class, args); } }
- 编写Flume管理接口
FlumeController.java
:@RestController @RequestMapping("/api/flume") public class FlumeController { @Value("${flume.agent.name}") private String agentName; @Value("${flume.conf.path}") private String confPath; /** * 启动Flume Agent */ @PostMapping("/start") public String startAgent() { String command = String.format("flume-ng agent -n %s -c %s -f %s", agentName, confPath, confPath + "/agent.conf"); try { Process process = Runtime.getRuntime().exec(command); int exitCode = process.waitFor(); if (exitCode == 0) { return "Flume Agent启动成功"; } else { return "Flume Agent启动失败, exit code: " + exitCode; } } catch (Exception e) { return "Flume Agent启动失败:" + e.getMessage(); } } }
4.1.4 测试微服务基础框架
- 启动Nacos、Sentinel(可通过Docker部署);
- 启动
data-collection-service
; - 访问Nacos控制台,查看
data-collection-service
是否注册成功; - 访问
http://localhost:8001/api/flume/start
,测试Flume Agent启动接口。
4.2 第二步:实现数据采集微服务(Flume + Kafka)
目标:用Flume采集用户行为日志,发送到Kafka。
4.2.1 配置Flume Agent
在data-collection-service
的src/main/resources
目录下创建flume
文件夹,编写agent.conf
:
# 定义Agent名称
agent1.sources = source1
agent1.channels = channel1
agent1.sinks = sink1
# 配置Source(Taildir Source,监控日志文件)
agent1.sources.source1.type = TAILDIR
agent1.sources.source1.filegroups = f1
agent1.sources.source1.filegroups.f1 = /logs/user-behavior.log
agent1.sources.source1.positionFile = /logs/taildir_position.json
agent1.sources.source1.fileHeader = true
# 配置Channel(Memory Channel,内存缓存)
agent1.channels.channel1.type = MEMORY
agent1.channels.channel1.capacity = 10000
agent1.channels.channel1.transactionCapacity = 1000
# 配置Sink(Kafka Sink,发送到Kafka)
agent1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.sink1.kafka.bootstrap.servers = localhost:9092
agent1.sinks.sink1.kafka.topic = user-behavior
agent1.sinks.sink1.kafka.producer.acks = 1
agent1.sinks.sink1.kafka.producer.batch.size = 16384
agent1.sinks.sink1.kafka.producer.linger.ms = 1
# 绑定Source、Channel、Sink
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
4.2.2 配置Nacos动态参数
在Nacos控制台创建data-collection-service.yaml
配置文件,添加以下内容:
flume:
agent:
name: agent1 # Flume Agent名称
conf:
path: /opt/data-collection-service/flume # Flume配置文件路径(容器内路径)
4.2.3 测试数据采集
- 启动Kafka集群(可通过Docker部署);
- 创建Kafka topic
user-behavior
:kafka-topics.sh --create --topic user-behavior --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
- 向
/logs/user-behavior.log
写入测试数据:echo '{"user_id": "123", "action": "click", "product_id": "456", "timestamp": "2023-10-01 10:00:00"}' >> /logs/user-behavior.log
- 调用
http://localhost:8001/api/flume/start
启动Flume Agent; - 用Kafka消费者查看数据:
若能看到测试数据,说明数据采集成功。kafka-console-consumer.sh --topic user-behavior --bootstrap-server localhost:9092 --from-beginning
4.3 第三步:实现数据处理微服务(Flink + Kafka)
目标:用Flink消费Kafka中的用户行为日志,实时计算用户兴趣标签(点击次数最多的产品类别),存入HBase。
4.3.1 配置Flink集群
用Docker部署Flink集群(JobManager + TaskManager):
# 启动JobManager
docker run -d --name flink-jobmanager -p 8081:8081 -e FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" flink:1.15.3 jobmanager
# 启动TaskManager(可启动多个)
docker run -d --name flink-taskmanager -e FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" flink:1.15.3 taskmanager
4.3.2 创建数据处理微服务(data-processing-service)
- 新建子模块
data-processing-service
, pom.xml 引入依赖:<dependencies> <!-- Spring Boot Web --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- Nacos Discovery --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> </dependency> <!-- Flink --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>1.15.3</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>1.15.3</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hbase-2.2</artifactId> <version>1.15.3</version> </dependency> </dependencies>
- 配置
application.yml
:spring: application: name: data-processing-service cloud: nacos: discovery: server-addr: localhost:8848 server: port: 8002 flink: jobmanager: address: localhost:8081 # Flink JobManager地址 kafka: bootstrap-servers: localhost:9092 topic: user-behavior hbase: zookeeper: quorum: localhost:2181 # HBase ZooKeeper地址 table: user_tags # HBase表名
4.3.3 编写Flink处理逻辑
创建UserBehaviorProcessor.java
,实现实时计算用户兴趣标签:
public class UserBehaviorProcessor {
public static void main(String[] args) throws Exception {
// 1. 创建StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3); // 设置并行度
env.enableCheckpointing(5000); // 开启Checkpoint,间隔5秒
env.getCheckpointConfig().setCheckpointStorage("hdfs://localhost:9000/flink/checkpoints"); // Checkpoint存储路径
// 2. 读取Kafka数据
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setTopics("user-behavior")
.setGroupId("user-behavior-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> kafkaStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source");
// 3. 解析JSON数据
DataStream<UserBehavior> userBehaviorStream = kafkaStream
.map(json -> JSON.parseObject(json, UserBehavior.class))
.filter(behavior -> "click".equals(behavior.getAction())) // 过滤点击行为
.assignTimestampsAndWatermarks(WatermarkStrategy.<UserBehavior>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp().getTime()));
// 4. 计算用户兴趣标签(5分钟窗口内点击次数最多的产品类别)
DataStream<UserTag> userTagStream = userBehaviorStream
.keyBy(UserBehavior::getUserId) // 按用户ID分组
.window(TumblingEventTimeWindows.of(Time.minutes(5))) // 5分钟滚动窗口
.aggregate(new UserTagAggregateFunction(), new UserTagWindowFunction());
// 5. 将结果存入HBase
HBaseSink.Builder<UserTag> hbaseSinkBuilder = HBaseSink.builder();
hbaseSinkBuilder.setConfiguration(HBaseConfiguration.create());
hbaseSinkBuilder.setTable("user_tags");
hbaseSinkBuilder.setSerializationSchema(new UserTagSerializationSchema());
userTagStream.addSink(hbaseSinkBuilder.build());
// 6. 执行Job
env.execute("User Behavior Processor");
}
// 用户行为实体类
@Data
public static class UserBehavior {
private String userId;
private String action;
private String productId;
private Date timestamp;
}
// 用户标签实体类
@Data
public static class UserTag {
private String userId;
private String category;
private Long count;
}
// 聚合函数(计算每个用户每个类别的点击次数)
public static class UserTagAggregateFunction implements AggregateFunction<UserBehavior, Map<String, Long>, Map<String, Long>> {
@Override
public Map<String, Long> createAccumulator() {
return new HashMap<>();
}
@Override
public Map<String, Long> add(UserBehavior value, Map<String, Long> accumulator) {
String category = getProductCategory(value.getProductId()); // 假设从产品库获取类别
accumulator.put(category, accumulator.getOrDefault(category, 0L) + 1);
return accumulator;
}
@Override
public Map<String, Long> getResult(Map<String, Long> accumulator) {
return accumulator;
}
@Override
public Map<String, Long> merge(Map<String, Long> a, Map<String, Long> b) {
a.putAll(b);
return a;
}
}
// 窗口函数(获取点击次数最多的类别)
public static class UserTagWindowFunction implements WindowFunction<Map<String, Long>, UserTag, String, TimeWindow> {
@Override
public void apply(String userId, TimeWindow window, Iterable<Map<String, Long>> input, Collector<UserTag> out) {
Map<String, Long> categoryCount = input.iterator().next();
String topCategory = categoryCount.entrySet().stream()
.max(Map.Entry.comparingByValue())
.map(Map.Entry::getKey)
.orElse("unknown");
Long topCount = categoryCount.get(topCategory);
UserTag userTag = new UserTag();
userTag.setUserId(userId);
userTag.setCategory(topCategory);
userTag.setCount(topCount);
out.collect(userTag);
}
}
// HBase序列化 schema(将UserTag转换为HBase的Put操作)
public static class UserTagSerializationSchema implements HBaseSerializationSchema<UserTag> {
@Override
public Put serialize(UserTag element, byte[] rowKey) {
Put put = new Put(Bytes.toBytes(element.getUserId()));
put.addColumn(Bytes.toBytes("tags"), Bytes.toBytes(element.getCategory()), Bytes.toBytes(element.getCount().toString()));
return put;
}
}
// 模拟从产品库获取类别(实际应调用产品服务)
private static String getProductCategory(String productId) {
return productId.startsWith("1") ? "electronics" : "clothing";
}
}
4.3.4 编写微服务接口提交Flink Job
创建FlinkJobController.java
,提供提交Flink Job的接口:
@RestController
@RequestMapping("/api/flink")
public class FlinkJobController {
@Value("${flink.jobmanager.address}")
private String jobManagerAddress;
@Value("${flink.job.jar.path}")
private String jobJarPath; // Flink Job的Jar包路径
/**
* 提交Flink Job
*/
@PostMapping("/submit")
public String submitJob() {
try {
// 创建Flink Rest Client
RestClient restClient = new RestClient(
new RestClientConfiguration.Builder()
.setHost(jobManagerAddress.split(":")[0])
.setPort(Integer.parseInt(jobManagerAddress.split(":")[1]))
.build()
);
// 提交Job
JobSubmitRequestBody requestBody = JobSubmitRequestBody.builder()
.setJarPath(jobJarPath)
.setEntryClass("com.example.data.processing.UserBehaviorProcessor")
.build();
CompletableFuture<JobID> jobIdFuture = restClient.submitJob(requestBody);
JobID jobId = jobIdFuture.get();
return "Flink Job提交成功, Job ID: " + jobId.toString();
} catch (Exception e) {
return "Flink Job提交失败:" + e.getMessage();
}
}
}
4.3.5 测试数据处理
- 启动HBase集群(可通过Docker部署);
- 创建HBase表
user_tags
:hbase shell create 'user_tags', 'tags'
- 将
data-processing-service
打包成Jar包(data-processing-service-0.0.1-SNAPSHOT.jar
),上传到Flink JobManager的/opt/flink/jobs
目录; - 调用
http://localhost:8002/api/flink/submit
提交Flink Job; - 访问Flink控制台(
http://localhost:8081
),查看Job是否运行成功; - 向
/logs/user-behavior.log
写入更多测试数据,用HBase shell查看结果:
若能看到用户hbase shell get 'user_tags', '123'
123
的兴趣标签(如electronics
),说明数据处理成功。
4.4 第四步:实现数据存储微服务(HBase)
目标:封装HBase的CRUD接口,提供REST API供数据服务层调用。
4.4.1 创建数据存储微服务(data-storage-service)
- 新建子模块
data-storage-service
, pom.xml 引入依赖:<dependencies> <!-- Spring Boot Web --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- Nacos Discovery --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> </dependency> <!-- HBase --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>2.4.17</version> </dependency> <!-- MyBatis(可选,用于元数据管理) --> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>2.2.2</version> </dependency> </dependencies>
- 配置
application.yml
:spring: application: name: data-storage-service cloud: nacos: discovery: server-addr: localhost:8848 server: port: 8003 hbase: zookeeper: quorum: localhost:2181 table: user_tags: user_tags
4.4.2 编写HBase客户端
创建HBaseClient.java
,封装HBase的CRUD操作:
@Component
public class HBaseClient {
private Connection connection;
private Admin admin;
@Value("${hbase.zookeeper.quorum}")
private String zookeeperQuorum;
@PostConstruct
public void init() throws IOException {
Configuration config = HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum", zookeeperQuorum);
connection = ConnectionFactory.createConnection(config);
admin = connection.getAdmin();
}
@PreDestroy
public void close() throws IOException {
if (admin != null) {
admin.close();
}
if (connection != null) {
connection.close();
}
}
/**
* 插入数据
*/
public void put(String tableName, String rowKey, String family, String qualifier, String value) throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName));
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier), Bytes.toBytes(value));
table.put(put);
table.close();
}
/**
* 获取数据
*/
public String get(String tableName, String rowKey, String family, String qualifier) throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName));
Get get = new Get(Bytes.toBytes(rowKey));
get.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
Result result = table.get(get);
table.close();
byte[] value = result.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier));
return value != null ? Bytes.toString(value) : null;
}
/**
* 扫描数据(按行键前缀)
*/
public List<Map<String, String>> scan(String tableName, String rowPrefix) throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
scan.setRowPrefixFilter(Bytes.toBytes(rowPrefix));
ResultScanner scanner = table.getScanner(scan);
List<Map<String, String>> results = new ArrayList<>();
for (Result result : scanner) {
Map<String, String> row = new HashMap<>();
row.put("rowKey", Bytes.toString(result.getRow()));
for (Cell cell : result.listCells()) {
String family = Bytes.toString(CellUtil.cloneFamily(cell));
String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
row.put(family + ":" + qualifier, value);
}
results.add(row);
}
scanner.close();
table.close();
return results;
}
}
4.4.3 编写REST API
创建UserTagController.java
,提供用户标签的CRUD接口:
@RestController
@RequestMapping("/api/hbase/user-tags")
public class UserTagController {
@Autowired
private HBaseClient hbaseClient;
@Value("${hbase.table.user_tags}")
private String tableName;
/**
* 获取用户标签
*/
@GetMapping("/{userId}")
public ResponseEntity<Map<String, String>> getUserTags(@PathVariable String userId) {
try {
List<Map<String, String>> results = hbaseClient.scan(tableName, userId);
if (results.isEmpty()) {
return ResponseEntity.notFound().build();
}
return ResponseEntity.ok(results.get(0));
} catch (IOException e) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(null);
}
}
/**
* 添加用户标签
*/
@PostMapping("/{userId}")
public ResponseEntity<String> addUserTag(@PathVariable String userId, @RequestParam String category, @RequestParam String count) {
try {
hbaseClient.put(tableName, userId, "tags", category, count);
return ResponseEntity.ok("用户标签添加成功");
} catch (IOException e) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("用户标签添加失败:" + e.getMessage());
}
}
}
4.4.4 测试数据存储微服务
- 启动
data-storage-service
; - 调用
http://localhost:8003/api/hbase/user-tags/123
,查看用户123
的标签; - 调用
http://localhost:8003/api/hbase/user-tags/123?category=electronics&count=10
,添加用户标签; - 再次调用获取接口,确认标签已添加。
4.5 第五步:实现数据服务层(API网关)
目标:用Spring Cloud Gateway整合数据存储微服务,提供统一的、面向业务的API,处理权限控制、缓存、限流等共性问题。
4.5.1 创建数据服务层微服务(data-api-gateway)
- 新建子模块
data-api-gateway
, pom.xml 引入依赖:<dependencies> <!-- Spring Cloud Gateway --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-gateway</artifactId> </dependency> <!-- Nacos Discovery --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> </dependency> <!-- Sentinel --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId> </dependency> <!-- Redis(缓存) --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <!-- OAuth2(权限) --> <dependency> <groupId>org.springframework.security</groupId> <artifactId>spring-security-oauth2-resource-server</artifactId> </dependency> </dependencies>
- 配置
application.yml
:spring: application: name: data-api-gateway cloud: nacos: discovery: server-addr: localhost:8848 gateway: routes: # 数据存储微服务路由(用户标签) - id: data-storage-service uri: lb://data-storage-service # 负载均衡到data-storage-service predicates: - Path=/api/user-tags/** filters: - RewritePath=/api/user-tags/(?<segment>.*), /api/hbase/user-tags/$\{segment} # 重写路径 - Sentinel=api/user-tags/**, default # 限流 sentinel: transport: dashboard: localhost:8080 redis: host: localhost port: 6379 server: port: 8000 # API网关端口 # OAuth2配置(假设用Keycloak
更多推荐
所有评论(0)