在这里插入图片描述

欢迎加入开源鸿蒙跨平台社区:https://openharmonycrossplatform.csdn.net


一、场景引入:为什么需要响应式数据流?

在现代移动应用开发中,实时数据更新是常见需求。想象一下这样的场景:你正在开发一个聊天应用,需要实时显示新消息;或者你在开发一个股票应用,需要实时更新股价信息;又或者你在开发一个下载管理器,需要实时显示下载进度。这些场景都需要应用能够响应数据的变化,自动更新 UI。

这就是为什么我们需要 响应式数据流StreamBuilder 是 Flutter 提供的响应式组件,它能够监听数据流(Stream)的变化,在数据更新时自动重建 UI,实现真正的响应式编程。

📱 1.1 响应式数据流的典型应用场景

在现代移动应用中,响应式数据流的需求非常广泛:

实时聊天应用:当有新消息到达时,聊天界面需要立即更新,显示新消息。消息可能来自 WebSocket、Firebase 或其他实时数据源。

股票/行情应用:股票价格实时变动,应用需要实时更新价格显示,包括价格变化动画、涨跌颜色等。

下载/上传进度:文件下载或上传时,需要实时显示进度百分比、速度、剩余时间等信息。

传感器数据:GPS 位置、加速度计、陀螺仪等传感器数据持续产生,应用需要实时处理和显示。

搜索建议:用户在搜索框输入时,实时显示搜索建议,需要处理防抖、取消等逻辑。

倒计时/计时器:秒表、倒计时器等需要精确更新显示时间。

1.2 StreamBuilder 与其他响应式方案对比

Flutter 提供了多种响应式编程方案:

方案 适用场景 学习成本 灵活度 性能
setState 简单状态更新
StreamBuilder 异步数据流
FutureBuilder 一次性异步操作
ValueNotifier 单值状态管理
ChangeNotifier 复杂状态管理
Provider/Riverpod 全局状态管理
BLoC/Cubit 企业级状态管理 极高

对于异步数据流场景,StreamBuilder 是最佳选择:

声明式 UI:你只需要描述 UI 如何根据数据状态显示,StreamBuilder 会自动处理数据更新和 UI 重建。

自动管理订阅:StreamBuilder 会自动管理 Stream 的订阅和取消订阅,避免内存泄漏。

状态处理:StreamBuilder 提供了 connectionState 属性,可以轻松处理加载中、错误等状态。

与 Flutter 生态集成:StreamBuilder 是 Flutter 框架的一部分,可以与其他 Flutter 组件无缝配合。

1.3 Stream 核心概念

理解 Stream 的核心概念是掌握 StreamBuilder 的关键:

Stream(流):Stream 是一系列异步事件的序列。你可以把它想象成一条管道,数据从一端流入,从另一端流出。与 Future 不同,Stream 可以产生多个值。

StreamController:Stream 控制器,用于创建和控制 Stream。它提供了 Stream 和 Sink 两个属性,分别用于监听数据和添加数据。

StreamSubscription:Stream 订阅,用于管理对 Stream 的订阅。可以通过它暂停、恢复或取消订阅。

StreamTransformer:Stream 转换器,用于转换 Stream 中的数据。例如,将一个数字 Stream 转换为字符串 Stream。

Sink:数据接收器,用于向 Stream 添加数据。


二、技术架构设计

在正式编写代码之前,我们需要设计一个清晰的架构。良好的架构设计可以让代码更易于理解、维护和扩展。

🏛️ 2.1 数据流架构设计

┌─────────────────────────────────────────────────────────────┐
│                    数据源层                                  │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────────────┐  │
│  │ WebSocket   │  │ Timer       │  │ Sensor              │  │
│  │ 网络数据     │  │ 定时器       │  │ 传感器              │  │
│  └─────────────┘  └─────────────┘  └─────────────────────┘  │
│                              │                               │
│                              ▼                               │
│  ┌─────────────────────────────────────────────────────┐    │
│  │           StreamController                          │    │
│  │  - 创建 Stream                                       │    │
│  │  - 提供 Sink 用于添加数据                            │    │
│  └─────────────────────────────────────────────────────┘    │
│                              │                               │
│                              ▼                               │
│  ┌─────────────────────────────────────────────────────┐    │
│  │           Stream<T>                                  │    │
│  │  - 异步数据序列                                       │    │
│  │  - 可被多个监听器订阅                                 │    │
│  └─────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────┘
                               │
                               │ 监听
                               ▼
┌─────────────────────────────────────────────────────────────┐
│                    UI 层                                     │
│  ┌─────────────────────────────────────────────────────┐    │
│  │           StreamBuilder<T>                          │    │
│  │  - 监听 Stream 变化                                  │    │
│  │  - 自动管理订阅                                       │    │
│  │  - 根据状态构建 UI                                    │    │
│  └─────────────────────────────────────────────────────┘    │
│                              │                               │
│                              ▼                               │
│  ┌─────────────────────────────────────────────────────┐    │
│  │           AsyncSnapshot<T>                          │    │
│  │  - connectionState: 连接状态                         │    │
│  │  - data: 最新数据                                    │    │
│  │  - error: 错误信息                                   │    │
│  │  - hasData / hasError: 状态判断                      │    │
│  └─────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────┘

🎯 2.2 AsyncSnapshot 状态

StreamBuilder 通过 AsyncSnapshot 提供数据状态:

状态 描述 典型用途
ConnectionState.none 未连接到 Stream 显示初始状态
ConnectionState.waiting 等待数据 显示加载指示器
ConnectionState.active 已连接,等待新数据 显示数据
ConnectionState.done Stream 已关闭 显示最终状态

📐 2.3 数据流处理流程

创建 StreamController
      │
      ▼
获取 Stream 并传递给 StreamBuilder
      │
      ▼
StreamBuilder 订阅 Stream
      │
      ├──▶ ConnectionState.waiting: 显示加载中
      │
      ├──▶ 数据到达: ConnectionState.active
      │    │
      │    └──▶ 显示数据
      │
      ├──▶ 错误发生: hasError = true
      │    │
      │    └──▶ 显示错误信息
      │
      └──▶ Stream 关闭: ConnectionState.done
            │
            └──▶ 显示最终状态
                  │
                  ▼
            dispose() 自动取消订阅

三、核心功能实现

🔧 3.1 基础 StreamBuilder 使用

import 'dart:async';
import 'package:flutter/material.dart';

class BasicStreamBuilderPage extends StatefulWidget {
  const BasicStreamBuilderPage({super.key});

  
  State<BasicStreamBuilderPage> createState() => _BasicStreamBuilderPageState();
}

class _BasicStreamBuilderPageState extends State<BasicStreamBuilderPage> {
  late StreamController<int> _controller;
  int _counter = 0;
  
  
  void initState() {
    super.initState();
    _controller = StreamController<int>();
  }
  
  
  void dispose() {
    _controller.close();
    super.dispose();
  }

  
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(title: const Text('基础 StreamBuilder')),
      body: Center(
        child: StreamBuilder<int>(
          stream: _controller.stream,
          initialData: 0,
          builder: (context, snapshot) {
            // 处理不同状态
            if (snapshot.connectionState == ConnectionState.waiting) {
              return const CircularProgressIndicator();
            }
            
            if (snapshot.hasError) {
              return Text('错误: ${snapshot.error}');
            }
            
            return Column(
              mainAxisAlignment: MainAxisAlignment.center,
              children: [
                const Text('计数器:', style: TextStyle(fontSize: 18)),
                Text(
                  '${snapshot.data}',
                  style: const TextStyle(fontSize: 48, fontWeight: FontWeight.bold),
                ),
              ],
            );
          },
        ),
      ),
      floatingActionButton: FloatingActionButton(
        onPressed: () {
          _counter++;
          _controller.add(_counter);
        },
        child: const Icon(Icons.add),
      ),
    );
  }
}

⏱️ 3.2 倒计时器实现

/// 倒计时器
class CountdownTimer extends StatefulWidget {
  final int seconds;
  final VoidCallback? onCompleted;
  
  const CountdownTimer({
    super.key,
    required this.seconds,
    this.onCompleted,
  });

  
  State<CountdownTimer> createState() => _CountdownTimerState();
}

class _CountdownTimerState extends State<CountdownTimer> {
  late StreamController<int> _controller;
  late int _remainingSeconds;
  Timer? _timer;
  
  
  void initState() {
    super.initState();
    _remainingSeconds = widget.seconds;
    _controller = StreamController<int>();
    _controller.add(_remainingSeconds);
  }
  
  void _startTimer() {
    _timer?.cancel();
    _timer = Timer.periodic(const Duration(seconds: 1), (timer) {
      if (_remainingSeconds > 0) {
        _remainingSeconds--;
        _controller.add(_remainingSeconds);
      } else {
        timer.cancel();
        widget.onCompleted?.call();
      }
    });
  }
  
  void _pauseTimer() {
    _timer?.cancel();
  }
  
  void _resetTimer() {
    _timer?.cancel();
    _remainingSeconds = widget.seconds;
    _controller.add(_remainingSeconds);
  }
  
  
  void dispose() {
    _timer?.cancel();
    _controller.close();
    super.dispose();
  }

  
  Widget build(BuildContext context) {
    return Column(
      mainAxisAlignment: MainAxisAlignment.center,
      children: [
        StreamBuilder<int>(
          stream: _controller.stream,
          initialData: _remainingSeconds,
          builder: (context, snapshot) {
            final minutes = snapshot.data! ~/ 60;
            final seconds = snapshot.data! % 60;
            
            return Text(
              '${minutes.toString().padLeft(2, '0')}:${seconds.toString().padLeft(2, '0')}',
              style: const TextStyle(
                fontSize: 72,
                fontWeight: FontWeight.bold,
                fontFeatures: [FontFeature.tabularFigures()],
              ),
            );
          },
        ),
        const SizedBox(height: 24),
        Row(
          mainAxisAlignment: MainAxisAlignment.center,
          children: [
            ElevatedButton(
              onPressed: _startTimer,
              child: const Text('开始'),
            ),
            const SizedBox(width: 16),
            ElevatedButton(
              onPressed: _pauseTimer,
              child: const Text('暂停'),
            ),
            const SizedBox(width: 16),
            ElevatedButton(
              onPressed: _resetTimer,
              child: const Text('重置'),
            ),
          ],
        ),
      ],
    );
  }
}

📊 3.3 进度条实现

/// 下载进度管理器
class DownloadProgressManager {
  final StreamController<double> _progressController = StreamController<double>();
  Stream<double> get progressStream => _progressController.stream;
  
  double _currentProgress = 0;
  
  Future<void> startDownload() async {
    _currentProgress = 0;
    
    // 模拟下载过程
    while (_currentProgress < 1.0) {
      await Future.delayed(const Duration(milliseconds: 100));
      _currentProgress += 0.02;
      _currentProgress = _currentProgress.clamp(0.0, 1.0);
      _progressController.add(_currentProgress);
    }
  }
  
  void dispose() {
    _progressController.close();
  }
}

/// 下载进度页面
class DownloadProgressPage extends StatefulWidget {
  const DownloadProgressPage({super.key});

  
  State<DownloadProgressPage> createState() => _DownloadProgressPageState();
}

class _DownloadProgressPageState extends State<DownloadProgressPage> {
  final DownloadProgressManager _manager = DownloadProgressManager();
  bool _isDownloading = false;
  
  
  void dispose() {
    _manager.dispose();
    super.dispose();
  }

  
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(title: const Text('下载进度')),
      body: Padding(
        padding: const EdgeInsets.all(24),
        child: Column(
          mainAxisAlignment: MainAxisAlignment.center,
          children: [
            StreamBuilder<double>(
              stream: _manager.progressStream,
              initialData: 0,
              builder: (context, snapshot) {
                final progress = snapshot.data ?? 0;
                final percentage = (progress * 100).toStringAsFixed(0);
                
                return Column(
                  children: [
                    // 进度条
                    LinearProgressIndicator(
                      value: progress,
                      minHeight: 12,
                      backgroundColor: Colors.grey.shade200,
                      borderRadius: BorderRadius.circular(6),
                    ),
                    const SizedBox(height: 16),
                    // 百分比
                    Text(
                      '$percentage%',
                      style: const TextStyle(
                        fontSize: 32,
                        fontWeight: FontWeight.bold,
                      ),
                    ),
                    const SizedBox(height: 8),
                    // 状态文本
                    Text(
                      progress >= 1.0 ? '下载完成' : '正在下载...',
                      style: TextStyle(color: Colors.grey.shade600),
                    ),
                  ],
                );
              },
            ),
            const SizedBox(height: 32),
            ElevatedButton(
              onPressed: _isDownloading
                  ? null
                  : () async {
                      setState(() => _isDownloading = true);
                      await _manager.startDownload();
                      setState(() => _isDownloading = false);
                    },
              child: Text(_isDownloading ? '下载中...' : '开始下载'),
            ),
          ],
        ),
      ),
    );
  }
}

🔍 3.4 实时搜索实现

/// 搜索服务
class SearchService {
  final StreamController<List<String>> _resultsController = StreamController<List<String>>();
  Stream<List<String>> get results => _resultsController.stream;
  
  Timer? _debounceTimer;
  
  // 模拟数据
  final List<String> _allItems = List.generate(100, (i) => 'Item ${i + 1}');
  
  void search(String query) {
    // 防抖处理
    _debounceTimer?.cancel();
    _debounceTimer = Timer(const Duration(milliseconds: 300), () {
      if (query.isEmpty) {
        _resultsController.add([]);
        return;
      }
      
      final results = _allItems
          .where((item) => item.toLowerCase().contains(query.toLowerCase()))
          .toList();
      
      _resultsController.add(results);
    });
  }
  
  void dispose() {
    _debounceTimer?.cancel();
    _resultsController.close();
  }
}

/// 搜索页面
class SearchPage extends StatefulWidget {
  const SearchPage({super.key});

  
  State<SearchPage> createState() => _SearchPageState();
}

class _SearchPageState extends State<SearchPage> {
  final SearchService _searchService = SearchService();
  final TextEditingController _searchController = TextEditingController();
  
  
  void dispose() {
    _searchService.dispose();
    _searchController.dispose();
    super.dispose();
  }

  
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(
        title: TextField(
          controller: _searchController,
          decoration: const InputDecoration(
            hintText: '搜索...',
            border: InputBorder.none,
          ),
          onChanged: _searchService.search,
        ),
        actions: [
          IconButton(
            icon: const Icon(Icons.clear),
            onPressed: () {
              _searchController.clear();
              _searchService.search('');
            },
          ),
        ],
      ),
      body: StreamBuilder<List<String>>(
        stream: _searchService.results,
        initialData: [],
        builder: (context, snapshot) {
          if (snapshot.connectionState == ConnectionState.waiting) {
            return const Center(child: CircularProgressIndicator());
          }
          
          final results = snapshot.data ?? [];
          
          if (_searchController.text.isEmpty) {
            return const Center(child: Text('输入关键词开始搜索'));
          }
          
          if (results.isEmpty) {
            return const Center(child: Text('没有找到结果'));
          }
          
          return ListView.builder(
            itemCount: results.length,
            itemBuilder: (context, index) {
              return ListTile(
                leading: const Icon(Icons.search),
                title: Text(results[index]),
                onTap: () {},
              );
            },
          );
        },
      ),
    );
  }
}

四、完整应用示例

下面是一个完整的实时数据监控应用示例:

import 'dart:async';
import 'package:flutter/material.dart';

void main() {
  runApp(const StreamBuilderApp());
}

class StreamBuilderApp extends StatelessWidget {
  const StreamBuilderApp({super.key});

  
  Widget build(BuildContext context) {
    return MaterialApp(
      title: '实时数据监控',
      debugShowCheckedModeBanner: false,
      theme: ThemeData(
        colorScheme: ColorScheme.fromSeed(seedColor: Colors.blue),
        useMaterial3: true,
      ),
      home: const MonitoringPage(),
    );
  }
}

/// 数据模型
class MonitoringData {
  final DateTime timestamp;
  final double value;
  final String status;
  
  MonitoringData({
    required this.timestamp,
    required this.value,
    required this.status,
  });
}

/// 数据服务
class MonitoringService {
  final StreamController<MonitoringData> _dataController = 
      StreamController<MonitoringData>.broadcast();
  Stream<MonitoringData> get dataStream => _dataController.stream;
  
  Timer? _timer;
  bool _isRunning = false;
  
  void start() {
    if (_isRunning) return;
    _isRunning = true;
    
    _timer = Timer.periodic(const Duration(seconds: 1), (timer) {
      final random = DateTime.now().millisecond / 1000;
      final value = 50 + random * 50;
      
      String status;
      if (value < 70) {
        status = '正常';
      } else if (value < 90) {
        status = '警告';
      } else {
        status = '危险';
      }
      
      _dataController.add(MonitoringData(
        timestamp: DateTime.now(),
        value: value,
        status: status,
      ));
    });
  }
  
  void stop() {
    _timer?.cancel();
    _isRunning = false;
  }
  
  void dispose() {
    stop();
    _dataController.close();
  }
}

/// 监控页面
class MonitoringPage extends StatefulWidget {
  const MonitoringPage({super.key});

  
  State<MonitoringPage> createState() => _MonitoringPageState();
}

class _MonitoringPageState extends State<MonitoringPage> {
  final MonitoringService _service = MonitoringService();
  final List<MonitoringData> _history = [];
  
  
  void dispose() {
    _service.dispose();
    super.dispose();
  }

  
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(
        title: const Text('📊 实时数据监控'),
        centerTitle: true,
        actions: [
          StreamBuilder<MonitoringData>(
            stream: _service.dataStream,
            builder: (context, snapshot) {
              final isRunning = snapshot.connectionState == ConnectionState.active;
              return Switch(
                value: isRunning,
                onChanged: (value) {
                  if (value) {
                    _service.start();
                  } else {
                    _service.stop();
                  }
                },
              );
            },
          ),
        ],
      ),
      body: StreamBuilder<MonitoringData>(
        stream: _service.dataStream,
        builder: (context, snapshot) {
          // 保存历史数据
          if (snapshot.hasData) {
            _history.insert(0, snapshot.data!);
            if (_history.length > 20) {
              _history.removeLast();
            }
          }
          
          return SingleChildScrollView(
            padding: const EdgeInsets.all(16),
            child: Column(
              crossAxisAlignment: CrossAxisAlignment.start,
              children: [
                // 当前状态卡片
                _buildCurrentStatusCard(snapshot),
                const SizedBox(height: 24),
                
                // 数值显示
                _buildValueDisplay(snapshot),
                const SizedBox(height: 24),
                
                // 历史记录
                _buildHistoryList(),
              ],
            ),
          );
        },
      ),
    );
  }
  
  Widget _buildCurrentStatusCard(AsyncSnapshot<MonitoringData> snapshot) {
    String statusText;
    Color statusColor;
    IconData statusIcon;
    
    if (!snapshot.hasData) {
      statusText = '等待数据...';
      statusColor = Colors.grey;
      statusIcon = Icons.hourglass_empty;
    } else {
      final data = snapshot.data!;
      switch (data.status) {
        case '正常':
          statusText = '系统运行正常';
          statusColor = Colors.green;
          statusIcon = Icons.check_circle;
          break;
        case '警告':
          statusText = '系统负载较高';
          statusColor = Colors.orange;
          statusIcon = Icons.warning;
          break;
        case '危险':
          statusText = '系统负载过高';
          statusColor = Colors.red;
          statusIcon = Icons.error;
          break;
        default:
          statusText = '未知状态';
          statusColor = Colors.grey;
          statusIcon = Icons.help;
      }
    }
    
    return Card(
      child: Padding(
        padding: const EdgeInsets.all(20),
        child: Row(
          children: [
            Container(
              width: 60,
              height: 60,
              decoration: BoxDecoration(
                color: statusColor.withOpacity(0.1),
                shape: BoxShape.circle,
              ),
              child: Icon(statusIcon, color: statusColor, size: 32),
            ),
            const SizedBox(width: 16),
            Expanded(
              child: Column(
                crossAxisAlignment: CrossAxisAlignment.start,
                children: [
                  Text(
                    statusText,
                    style: const TextStyle(
                      fontSize: 18,
                      fontWeight: FontWeight.bold,
                    ),
                  ),
                  const SizedBox(height: 4),
                  Text(
                    snapshot.hasData
                        ? '更新时间: ${_formatTime(snapshot.data!.timestamp)}'
                        : '点击右上角开关开始监控',
                    style: TextStyle(color: Colors.grey.shade600),
                  ),
                ],
              ),
            ),
          ],
        ),
      ),
    );
  }
  
  Widget _buildValueDisplay(AsyncSnapshot<MonitoringData> snapshot) {
    final value = snapshot.data?.value ?? 0;
    
    return Card(
      child: Padding(
        padding: const EdgeInsets.all(20),
        child: Column(
          children: [
            Row(
              mainAxisAlignment: MainAxisAlignment.spaceBetween,
              children: [
                const Text(
                  '当前数值',
                  style: TextStyle(fontSize: 16),
                ),
                Text(
                  value.toStringAsFixed(2),
                  style: const TextStyle(
                    fontSize: 36,
                    fontWeight: FontWeight.bold,
                  ),
                ),
              ],
            ),
            const SizedBox(height: 16),
            // 进度条
            ClipRRect(
              borderRadius: BorderRadius.circular(8),
              child: LinearProgressIndicator(
                value: value / 100,
                minHeight: 20,
                backgroundColor: Colors.grey.shade200,
                valueColor: AlwaysStoppedAnimation(
                  value < 70
                      ? Colors.green
                      : value < 90
                          ? Colors.orange
                          : Colors.red,
                ),
              ),
            ),
            const SizedBox(height: 8),
            Row(
              mainAxisAlignment: MainAxisAlignment.spaceBetween,
              children: [
                Text('0', style: TextStyle(color: Colors.grey.shade600)),
                Text('50', style: TextStyle(color: Colors.grey.shade600)),
                Text('100', style: TextStyle(color: Colors.grey.shade600)),
              ],
            ),
          ],
        ),
      ),
    );
  }
  
  Widget _buildHistoryList() {
    return Card(
      child: Padding(
        padding: const EdgeInsets.all(16),
        child: Column(
          crossAxisAlignment: CrossAxisAlignment.start,
          children: [
            const Text(
              '历史记录',
              style: TextStyle(fontSize: 16, fontWeight: FontWeight.bold),
            ),
            const SizedBox(height: 12),
            if (_history.isEmpty)
              const Center(
                child: Padding(
                  padding: EdgeInsets.all(20),
                  child: Text('暂无历史记录'),
                ),
              )
            else
              ...List.generate(_history.length.clamp(0, 10), (index) {
                final data = _history[index];
                return ListTile(
                  dense: true,
                  leading: Container(
                    width: 8,
                    height: 8,
                    decoration: BoxDecoration(
                      color: data.status == '正常'
                          ? Colors.green
                          : data.status == '警告'
                              ? Colors.orange
                              : Colors.red,
                      shape: BoxShape.circle,
                    ),
                  ),
                  title: Text(
                    data.value.toStringAsFixed(2),
                    style: const TextStyle(fontWeight: FontWeight.w500),
                  ),
                  subtitle: Text(
                    _formatTime(data.timestamp),
                    style: TextStyle(fontSize: 12, color: Colors.grey.shade600),
                  ),
                  trailing: Text(
                    data.status,
                    style: TextStyle(
                      color: data.status == '正常'
                          ? Colors.green
                          : data.status == '警告'
                              ? Colors.orange
                              : Colors.red,
                    ),
                  ),
                );
              }),
          ],
        ),
      ),
    );
  }
  
  String _formatTime(DateTime time) {
    return '${time.hour.toString().padLeft(2, '0')}:'
           '${time.minute.toString().padLeft(2, '0')}:'
           '${time.second.toString().padLeft(2, '0')}';
  }
}

五、进阶技巧

🌟 5.1 Stream 组合与转换

/// 组合多个 Stream
class StreamCombiner {
  final StreamController<String> _combinedController = StreamController<String>();
  Stream<String> get combinedStream => _combinedController.stream;
  
  late StreamSubscription _subscription1;
  late StreamSubscription _subscription2;
  
  void combine(Stream<int> stream1, Stream<String> stream2) {
    int? lastValue1;
    String? lastValue2;
    
    _subscription1 = stream1.listen((value) {
      lastValue1 = value;
      if (lastValue2 != null) {
        _combinedController.add('Stream1: $lastValue1, Stream2: $lastValue2');
      }
    });
    
    _subscription2 = stream2.listen((value) {
      lastValue2 = value;
      if (lastValue1 != null) {
        _combinedController.add('Stream1: $lastValue1, Stream2: $lastValue2');
      }
    });
  }
  
  void dispose() {
    _subscription1.cancel();
    _subscription2.cancel();
    _combinedController.close();
  }
}

/// Stream 转换示例
class StreamTransformers {
  // 过滤偶数
  Stream<int> filterEven(Stream<int> source) {
    return source.where((value) => value % 2 == 0);
  }
  
  // 转换为字符串
  Stream<String> mapToString(Stream<int> source) {
    return source.map((value) => 'Value: $value');
  }
  
  // 防抖
  Stream<T> debounce<T>(Stream<T> source, Duration duration) {
    return source.transform(
      StreamTransformer<T, T>.fromHandlers(
        handleData: (data, sink) {
          Timer(duration, () => sink.add(data));
        },
      ),
    );
  }
  
  // 去重
  Stream<T> distinct<T>(Stream<T> source) {
    return source.distinct();
  }
}

🔄 5.2 广播 Stream

/// 广播 Stream 示例
class BroadcastStreamExample extends StatefulWidget {
  const BroadcastStreamExample({super.key});

  
  State<BroadcastStreamExample> createState() => _BroadcastStreamExampleState();
}

class _BroadcastStreamExampleState extends State<BroadcastStreamExample> {
  late StreamController<int> _controller;
  late Stream<int> _broadcastStream;
  
  
  void initState() {
    super.initState();
    // 创建广播 Stream
    _controller = StreamController<int>.broadcast();
    _broadcastStream = _controller.stream;
    
    // 模拟数据产生
    Timer.periodic(const Duration(seconds: 1), (timer) {
      _controller.add(timer.tick);
    });
  }
  
  
  void dispose() {
    _controller.close();
    super.dispose();
  }

  
  Widget build(BuildContext context) {
    return Scaffold(
      body: Column(
        children: [
          // 多个 StreamBuilder 可以监听同一个广播 Stream
          Expanded(
            child: StreamBuilder<int>(
              stream: _broadcastStream,
              builder: (context, snapshot) {
                return Center(
                  child: Text(
                    '监听器 1: ${snapshot.data ?? 0}',
                    style: const TextStyle(fontSize: 24),
                  ),
                );
              },
            ),
          ),
          Expanded(
            child: StreamBuilder<int>(
              stream: _broadcastStream,
              builder: (context, snapshot) {
                return Center(
                  child: Text(
                    '监听器 2: ${snapshot.data ?? 0}',
                    style: const TextStyle(fontSize: 24),
                  ),
                );
              },
            ),
          ),
        ],
      ),
    );
  }
}

⚡ 5.3 错误处理

/// 带错误处理的 StreamBuilder
class SafeStreamBuilder<T> extends StatelessWidget {
  final Stream<T> stream;
  final T? initialData;
  final Widget Function(BuildContext context, T data) builder;
  final Widget Function(BuildContext context, Object error)? errorBuilder;
  final Widget? loadingWidget;
  
  const SafeStreamBuilder({
    super.key,
    required this.stream,
    this.initialData,
    required this.builder,
    this.errorBuilder,
    this.loadingWidget,
  });

  
  Widget build(BuildContext context) {
    return StreamBuilder<T>(
      stream: stream,
      initialData: initialData,
      builder: (context, snapshot) {
        // 处理加载状态
        if (snapshot.connectionState == ConnectionState.waiting) {
          return loadingWidget ?? 
              const Center(child: CircularProgressIndicator());
        }
        
        // 处理错误状态
        if (snapshot.hasError) {
          return errorBuilder?.call(context, snapshot.error!) ??
              Center(
                child: Column(
                  mainAxisAlignment: MainAxisAlignment.center,
                  children: [
                    const Icon(Icons.error_outline, size: 48, color: Colors.red),
                    const SizedBox(height: 16),
                    Text('错误: ${snapshot.error}'),
                  ],
                ),
              );
        }
        
        // 处理无数据状态
        if (!snapshot.hasData) {
          return const Center(child: Text('暂无数据'));
        }
        
        // 正常显示数据
        return builder(context, snapshot.data as T);
      },
    );
  }
}

六、最佳实践与注意事项

✅ 6.1 性能优化建议

  1. 使用 broadcast Stream:当多个组件需要监听同一个 Stream 时,使用广播 Stream 避免重复创建。

  2. 及时关闭 StreamController:在 dispose 中关闭 StreamController,避免内存泄漏。

  3. 使用 initialData:设置初始数据,避免不必要的加载状态。

  4. 避免过度重建:StreamBuilder 只会在数据变化时重建,不需要额外的优化。

  5. 使用 StreamTransformer:对于复杂的数据转换,使用 StreamTransformer 而不是在 builder 中处理。

⚠️ 6.2 常见问题与解决方案

问题 原因 解决方案
内存泄漏 未关闭 StreamController 在 dispose 中调用 close()
多次监听失败 使用了单订阅 Stream 改用 broadcast Stream
数据不更新 Stream 未发送数据 检查 Sink.add() 是否调用
状态异常 未正确处理连接状态 检查 connectionState
防抖失效 每次重建创建新 Stream 将 Stream 提升到 State

📝 6.3 代码规范建议

  1. 封装 Stream 服务:将 Stream 相关逻辑封装成独立的服务类。

  2. 使用类型参数:为 Stream 和 StreamBuilder 指定明确的类型参数。

  3. 处理所有状态:在 builder 中处理所有可能的连接状态。

  4. 添加注释:复杂的 Stream 逻辑应该添加注释说明。

  5. 错误处理:始终处理可能的错误情况。


七、总结

本文详细介绍了 Flutter 中 StreamBuilder 组件的使用方法,从基础概念到高级技巧,帮助你掌握响应式数据流的核心能力。

核心要点回顾:

📌 Stream 基础:理解 Stream、StreamController、Sink 的概念和用法

📌 StreamBuilder 使用:监听 Stream 变化,根据状态构建 UI

📌 状态处理:正确处理 waiting、active、done 等连接状态

📌 实际应用:倒计时、进度条、实时搜索等典型场景

📌 进阶技巧:Stream 组合、广播 Stream、错误处理等

通过本文的学习,你应该能够独立开发实时数据应用,并能够将响应式编程应用到更多场景中。


八、参考资料

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐