Skip to main content

18 Dart操作流stream常识

1 在dart中,什么是流(stream)? 它又解决了什么问题?

流是一种编程范式概念,就像是一个管道一样,从一端把元素放入,然后从另一端流出。 它主要解决的是一个代码控制流程中提供一个概念简化的通信问题。借由流(stream)或 管道的概念让2个分离的处理逻辑能进行比较直白易懂的通信。比如说,我创建一个流,然后进行监听,然后把这个流的操作句柄交给其它的处理逻辑,当它那边处理好后,便直接 往流添加元素,而我这边就能监听到消息了。 听起来不就是回调吗? 这个在底层的实现还真是回调,但相对于回调的解释,用流的概念会更好理解些,为什么呢? 因为流的操作不止是简单的回调,还有各种操作符,这相当于元素在流中传播时, 会经过重重关卡,而这些关卡会对流中的元素进行操作,而这些个关卡在流的另一个名称为操作符,尽管流的本质就是回调,但加上这些功能性的关卡,尽管底层还是回调来实现的,但 用"回调"一词已经难以形象去形容这一过程了,所以取名为“流(stream)”更加贴切。

综上总结一下,就是流(stream)是一种把数个元素抽象成单一的流转流程,从一端流向另一端,且可在流转的期间可对流转的元素进行如: 过虑、修改等等操作。而它主要解决的是处理流程的通信问题和通信数据的处理问题。

2 在dart中如何声明Stream?

  test('How to declare stream', () async {
// Method 1.1: 通过数列方式去声明
final Stream<String> streamStrList = Stream.fromIterable(['Mars', 'Venus', 'Earth']);
streamStrList.listen(print); // ['Mars', 'Venus', 'Earth']
// Method 1.2: 通过Future去声明
final Stream<String> fromFutureStream = Stream.fromFuture(Future.value('Hello'));
fromFutureStream.listen(print); // Hello
// Method 1.3: 通过Futures去声明
final Stream<String> fromFuturesStream = Stream.fromFutures([Future.value('foo'), Future.value('bar')]);
fromFuturesStream.listen(print); // foo bar
// Method 2: 通过StreamController去声明
final StreamController<String> streamController = StreamController<String>();
streamController.add('hello');
streamController.stream.listen(print); // hello
});

::tip 提示 无论是Stream还是StreamController都可以声明Stream, 而二者的区别在于,前者的元素是固定的,在声明时就已确定了,而后者是可以在处理的过程中进行动态地把元素 加入把流中。

:::

3 流(stream)基本使用示例代码

  test('Basic usage', () async {
final StreamController<String> strStreamController = StreamController<String>();
strStreamController.stream.listen((event) { // <-- 接收元素
print(event); // hello
});
strStreamController.add('hello'); // <-- 添加元素进去
await Future.delayed(Duration(seconds: 20));
});

4 dart中流(stream)的处理事件

4.1 onListen-监听事件

onListen
  test('onListen event in dart stream', () async {
// step 1 创建流
final StreamController<String> strStreamController = StreamController<String>(
onListen: () {
// step 1.1 : 当注册监听事件时,则触发这个回调
print('onListen');
},
);
// step 2: 注册监听事件,然后触发 step 1.1 的回调事件
strStreamController.stream.listen((event) {
print(event);
});

await Future.delayed(Duration(seconds: 20));
});

4.2 onCancel - 取消事件

示例代码
  test('onCancel event in dart stream', () async {
// step 1 创建流
final StreamController<String> strStreamController = StreamController<String>(
onCancel: () {
// step 1.1 : 监听取消事件
print('onCancel'); // step 4.1 关闭到触发取消事件
},
);
// step 2: 监听流中的元素
final StreamSubscription<String> subscription = strStreamController.stream.listen((event) {
print(event); // e1
});
// step 3: 添加元素到流中
strStreamController.add('e1');
await Future.delayed(Duration(seconds: 1));
// step 4: 取消流
subscription.cancel();
strStreamController.add('a2'); // step 5: 无法添加元素进流了,因为流已经取消了

await Future.delayed(Duration(seconds: 20));
});

4.3 onPause - 停止事件

onPause
  test('onPause event in dart stream ', () async {
// step 1: 创建流
final StreamController<String> strStreamController = StreamController<String>(
onPause: () {
// step 4.1 触发中断事件
print('onPause');
},
);
// step 2: 监听流
StreamSubscription subscription = strStreamController.stream.listen((event) {
print(event); // e1 step 5.1: 最能收到e1元素,而e2发送之前就已经停止了流,所以无法接收到
});

// step 3: 发送元素到流中
strStreamController.add('e1');
await Future.delayed(Duration(seconds: 1));
// step 4: 停止流
subscription.pause();
// step 5: 再发送元素到流中
strStreamController.add('e2');
await Future.delayed(Duration(seconds: 10));
});

4.4 onResume - 播放事件

onResume
  test('onResume event in dart stream', () async {
// step 1: 创建流
final StreamController<String> strStreamController = StreamController<String>(
// step 5.1 触发停止事件
onPause: () {
print('onPause');
},
// step: 5.4: 触发播放事件
onResume: () {
print('onResume');
},
);
// step 2: 监听流
final StreamSubscription<String> subscription = strStreamController.stream.listen((event) {
print(event);
// step 3.1: 输出 e1
// step 5.5: 输出 e2
});
// step 3: 添加元素
strStreamController.add('e1');
// step 4: 等1s
await Future.delayed(Duration(seconds: 1));
// step 5: 停止流
subscription.pause();
// step 5.2: 停止后再添加元素
strStreamController.add('e2');
// step 5.3: 然后再播放流
subscription.resume();
await Future.delayed(Duration(seconds: 20));
});

5 流的处理关卡(操作符)

5.1 where-过虑操作

  test('Where method in dart stream', () async {
// step 1: 声明流
final StreamController<String> strStreamController = StreamController<String>();

strStreamController.stream.where((event) {
// step 2: 只过滤字符大于3个的字符串
return event.length > 3;
}).listen((event) {
// step 3: 监听流数据
print(event); // step 5: 1234 <-- 经过上where的过虑,现在已经只剩下1234到达这里了
});

// step 4: 发送数据到流中
strStreamController.add('1');
strStreamController.add('12');
strStreamController.add('123');
strStreamController.add('1234');
await Future.delayed(Duration(seconds: 20));
});

5.2 Join-拼接操作

  test("Join method in dart stream", () async {
// step 1: 声明流
final StreamController<String> strStreamController = StreamController<String>();
// step 2: 给流添加拼接(join)的处理环节
strStreamController.add('foo');
strStreamController.add('baa');
await Future.delayed(Duration(seconds: 1));
strStreamController.close();
String result = await strStreamController.stream.join('/');
print(result); // foo/baa
await Future.delayed(Duration(seconds: 20));
});
提示

join操作需要流的元素已经加载完成了,才能进行join操作,如以上关闭了流,表示当前流已经没有后继加载元素了,join才异步回来。

5.3 take-获取操作

  test('Take method in dart stream', () async {
final StreamController<String> streamController = StreamController<String>();
streamController.add('e1');
streamController.add('e2');
streamController.add('e3');
// 获取2个元素
streamController.stream.take(2).listen(print); // [e1, e2]
});

5.4 takeWhile-条件获取

  test('TakeWhile method in dart stream', () async {
// Step 1: 声明流
final StreamController<String> streamController = StreamController<String>();
streamController.add('e1');
streamController.add('e2');
streamController.add('e3');
// Step 2: 只获取元素到 e2, 后面的元素不再获取
streamController.stream.takeWhile((e) => e != 'e3').listen(print); // e1 e2
});

5.5 containers-查看元素是否在流中

  test('Contains method in dart Stream', () async {
// Step 1: 声明流
final StreamController streamController = StreamController<String>();
streamController.add('e1');
streamController.add('e2');
streamController.add('e3');
// Step 2: 检测元素中有没有 e2 元素
bool result = await streamController.stream.contains('e2');
print(result); // true
});

5.6 any-懒加载遍历

  test('any method in dart stream', () async {
// Step 1: 声明流
final StreamController<String> streamController = StreamController<String>();
streamController.add('e1');
streamController.add('e2');
streamController.add('e3');
// Step 2: 懒加载遍历找出 e2 元素,直到找到目标才停止遍历
final bool result = await streamController.stream.any((element) {
return element == 'e2';
});
print(result); // true
});
提示

虽然takeWhilewhere的操作结果一样,但遍历的机制不一样,takeWhile是一遇到false就不再遍历后面的剩下的元素了,相当于懒加载. 而where则是全量遍历。 官方说明如下:

5.7 every-懒加载遍历检查

  test('every method in dart stream', () async {
// Step 1: 声明流
final StreamController<String> streamController = StreamController<String>();
streamController.add('e1');
streamController.add('e2');
streamController.add('e3');
// step 2: 遍历每一元素
final result = await streamController.stream.every((e) {
if (e == 'e2') {
return false; // step 2.1: 如果有 e2 则返回false,且中断遍历
}
return true;
});
print(result); // false
});
提示

如果检查元素并返回检查的结果,如果元素检查返回的是false则中断遍历,并返回结果。

5.8 firstWhere-懒遍历首次查找匹配

  test('firstWhere method in dart stream', () async {
// Step 1: 声明流
final StreamController<String> streamController = StreamController<String>();
streamController.add('e1');
streamController.add('e2');
streamController.add('e3');
String el = await streamController.stream.firstWhere((element) {
return element == 'e2';
});
print(el); // e2
});

5.9 fold-重复迭代元素

  test('fold method in dart stream', () async {
final Stream<String> stream = Stream.fromIterable(['e1', 'e2', 'e3']);
final String el = await stream.fold<String>('hello ', (previous, element) {
return previous + element;
});
print(el); // hello e1e2e3
});

5.10 lastWhere-查找最后的元素

  test('lastWhere method in dart stream', () async {
Stream<String> stream = Stream.fromIterable(['e1', 'e2', '3']);
final String result = await stream.lastWhere((el) {
if (el == 'e2') {
return true;
}
return false;
}, orElse: () => '');
print(result);
});

5.11 reduce-元素迭代

  test('reduce method in dart stream', () async {
final String result = await Stream.fromIterable(['e1', 'e2', '3']).reduce((previous, el) {
return previous + el;
});
print(result); // e1e2e3
});

5.12 singleWhere-找出单一元素

  test('singleWhere method in dart stream', () async {
final String result = await Stream.fromIterable(['e1', 'e1', 'e2', 'e2', 'e2', 'e3']).singleWhere((element) {
if (element == 'e3') {
return true;
}
return false;
}, orElse: () => '');
print(result); // e3
});
提示

singleWhere只能返回一次true

5.13 toList-转换为列表

  test('toList method in dart stream', () async {
List<String> result = await Stream.fromIterable(['e1', 'e2', 'e3']).toList();
print(result); // ['e1', 'e2', 'e3']
});

5.14 toSet-转换为set

  test('toSet method in dart stream', () async {
Set<String> result = await Stream.fromIterable(['e1', 'e2', 'e3']).toSet();
print(result); // {e1, e2, e3}
});

参考资料