全国免费咨询:

13245491521

VR图标白色 VR图标黑色
X

中高端软件定制开发服务商

与我们取得联系

13245491521     13245491521

2022-11-16_Flutter 异步编程 | 学习 Stream 的元素转换操作

您的位置:首页 >> 新闻 >> 行业资讯

Flutter 异步编程 | 学习 Stream 的元素转换操作 本文为稀土掘金技术社区首发签约文章,14天内禁止转载,14天后未获授权禁止转载,侵权必究! 张风捷特烈 - 出品 0. 前言 - 测试说明Stream是一种可连续监听的事件序列,它本质上是对发布-订阅模式的具体实现,人们习惯于称其为响应式模式。很多编程语言都有Rx[language], 它们由【ReactiveX】组织负责维护,如下也包含RxDart版本,它是对Stream的拓展。 image.pngRx中对元素的操作方式被称为操作符,Stream本身有很多内置操作符,本篇我们学习这些内置的操作。这里测试中使用的流如下所示,通过StreamProvider#createStream提供:一共流中有七个int元素,每隔100 ms产出一个。 image.pngclassStreamProvider{ StreamintcreateStream()async*{ Listintres=[1,9,9,4,3,2,8]; for(inti=0ires.length;i++){ yieldres[i]; awaitFuture.delayed(constDuration(milliseconds:100)); } } } 输入流测试如下,运行时会每隔100 ms打印一个数字:代码见00_original.dart voidmain(){ StreamintintStream=StreamProvider().createStream(); intStream.listen((e){ print(e); } 一、简单的 Stream 之间的转换首先我们从下面8个操作符简单认识一下Stream-Stream间的转换。这几个方法,都是Stream的成员方法,且返回另一个Stream对象: image.png1. map 操作符: 映射转换Streammap(Sconvert(Tevent)) 从方法定义上可以看出:map可以将T类型的元素转换成S型,返回为一个S型的新流。map意为映射,入参是单参回调,用来决定转换的映射关系。如下所示,通过map操作,将一个int型的Stream转换成一个String型的Stream。 image.png测试代码01_map.dart如下,这里的映射关系是:将输入的元素作为key, 从numMap中取值返回: Mapint,StringnumMap={ 0:"零",1:"壹",2:"贰",3:"叁",4:"肆", 5:"伍",6:"陆",7:"柒",8:"捌",9:"玖",10:"拾", }; voidmain(){ StreamintintStream=StreamProvider().createStream(); StreamStringnewStream=intStream.mapString((inte)=numMap[e]!); newStream.listen((e){ print(e); } 2. distinct 操作符: 区分过滤Streamdistinct([boolequals(Tprevious,Tnext)?]) 从方法定义上可以看出:distinct会生成一个与原流相同泛型的新流,入参是两参回调函数,返回bool值。该操作符可以根据当前元素和前一元素比较结果,决定是否将当前元素加入输出流。如下示意图,在200 ms时,输入流产出的元素是9,当把条件设为: 前后元素相同则不产出,就可以把连续相同的元素过滤掉: image.png测试代码02_distinct.dart如下,这里的判断关系是:前后两个元素相同是为一致,不加入新流中。这在状态管理中,可以很方便地实现连续相同的状态不做响应,从而避免无意义的更新。 voidmain(){ StreamintintStream=StreamProvider().createStream(); StreamintnewStream=intStream.distinct((inta,intb)=a== newStream.listen((e){ print(e); } 3. where 操作符:条件过滤Streamwhere(booltest(Tevent)) 从方法定义上可以看出:where会生成一个与原流相同泛型的新流,入参是一参回调函数,返回bool值。该操作符可以对当前元素进行校验,决定是否将其加入输出流。如下示意图,判断标准是元素大于5才能加入新流,所以输出流只有9,9,8三个元素,在其他时段不产出 : image.png测试代码03_where.dart如下,这里的判断关系是:元素e 5时允许加入新流中。通过where可以忽略原流中的某些元素,当元素激活时不作响应。 voidmain(){ StreamintintStream=StreamProvider().createStream(); StreamintnewStream=intStream.where((inte)=e newStream.listen((e){ print(e); } 4. take 操作符:截取Streamtake(intcount) 从方法定义上可以看出:take会生成一个与原流相同泛型的新流,入参是 int 型数字。该操作符可以选取前面count个元素加入输出流。如下示意图,take(3)表示只取输入流的前三个。值得注意的是:如果输入流此时未进行监听,输出流达到count后,时间线就会结束。也就是说输入流不会继续产出元素。 image.png测试代码04_take.dart如下: voidmain(){ StreamintintStream=StreamProvider().createStream(); StreamintnewStream=intStream.take(3); newStream.listen((e){ print(e); } 5. takeWhile 操作符:条件截取StreamtakeWhile(booltest(Telement)) 从方法定义上可以看出:takeWhile会生成一个与原流相同泛型的新流,入参是一参回调函数,返回bool值。该操作符可以根据条件截取前面满足条件元素加入输出流,直到出现不满足条件为止。值得注意的是:如果输入流未进行监听,输出流出现不满足条件元素时,时间线就会结束。 image.png测试代码05_takeWhile.dart如下:条件是小于4,或者等于9,所以前面三个元素满足情况,第四个元素不满足,会使流结束: voidmain(){ StreamintintStream=StreamProvider().createStream(); StreamintnewStream=intStream.takeWhile((inte)=e=4||e== newStream.listen((e){ print(e); } 6. skip 操作符:跳过Streamskip(intcount) 从方法定义上可以看出:skip会生成一个与原流相同泛型的新流,入参是 int 型数字。该操作符可以跳过前面count个元素,使他们不加入输出流。如下所示,是skip(3)的效果。 image.png测试代码06_skip.dart如下: voidmain(){ StreamintintStream=StreamProvider().createStream(); StreamintnewStream=intStream.skip(3); newStream.listen((e){ print(e); } 7. skipWhile 操作符:条件跳过StreamskipWhile(booltest(Telement)) 从方法定义上可以看出:skipWhile会生成一个与原流相同泛型的新流,入参是一参回调函数,返回bool值。该操作符可以根据条件跳过前面满足条件元素,使其不加入输出流,直到出现不满足条件为止。 值得注意的是:第一个不满足条件的元素出现后,该条件不会影响后续的元素。比如下面条件是e 4, 第一个元素是1,满足条件跳过。第二个元素是9不满足,接下来的3、2元素不会受到条件影响。 image.png测试代码07_skipWhile.dart如下: voidmain(){ StreamintintStream=StreamProvider().createStream(); StreamintnewStream=intStream.skipWhile((inte)=e newStream.listen((e){ print(e); } 8. cast 与 castFrom : 强制类型转换Streamcast()=Stream.castFromT,R(this); 从方法定义上可以看出:cast没有参数,允许生成一个与原流不同泛型的新流。该方法使用Stream的castFrom静态方法实现的,本质上是对元素进行强制类型转换,所以需要注意的是:前后类型必须满足转换要求,否则会出现异常,一般使用场景非常有限。 image.pngimage.png测试代码08_cast_castFrom.dart如下: voidmain(){ StreamintintStream=StreamProvider().createStream(); StreamnumnewStream=intStream.castnum intstart=DateTime.now().millisecondsSinceEpoch; newStream.listen((e){ print(e); } 二、较复杂的 Stream 之间的转换image.png1. expand 操作符:展开Streamexpand(Iterableconvert(Telement)) 从方法定义上可以看出:expand允许生成一个与原流不同泛型 S的新流,入参是一参回调函数,返回Iterable值。也就是说,该操作符可以让一个输入流元素,激发出多个其他类型的元素,放入输出流中:比如下面每个元素可以输出两个字符元素信息: image.png测试代码09_expend.dart如下: Mapint,StringnumMap={ 0:"零",1:"壹",2:"贰",3:"叁",4:"肆", 5:"伍",6:"陆",7:"柒",8:"捌",9:"玖",10:"拾", }; Mapint,StringnumMap2={ 0:"0",1:"Ⅰ",2:"Ⅱ",3:"Ⅲ",4:"Ⅳ", 5:"Ⅴ",6:"Ⅵ",7:"Ⅶ",8:"Ⅷ",9:"Ⅸ",10:"Ⅹ", }; voidmain(){ StreamintintStream=StreamProvider().createStream(); StreamStringnewStream=intStream.expandString((e)=[numMap[e]!,numMap2[e]!]); intstart=DateTime.now().millisecondsSinceEpoch; newStream.listen((e){ print(e); } 2. asyncMap 操作符:异步映射StreamasyncMap(FutureOrconvert(Tevent)) 从方法定义上可以看出:asyncMap允许生成一个与原流不同泛型 E的新流,入参是一参回调函数,返回FutureOr类型值。也就是说,该操作符允许通过异步方法对流元素进行转换。如下所示,1元素激发时,延时100 ms模拟异步操作,在操作完成之后才会继续触发延时100 ms产出2元素,以此类推。 说个实际场景比较任意明白:比如读取文件夹会生成一个Stream对象,通过where可以过滤出文件元素,在通过asyncMap可以直接通过异步读取File元素内容,获取一个StreamString的文件夹内部文件内容流: image.png测试代码10_asyncMap.dart如下: Mapint,StringnumMap={ 0:"零",1:"壹",2:"贰",3:"叁",4:"肆", 5:"伍",6:"陆",7:"柒",8:"捌",9:"玖",10:"拾", }; voidmain(){ StreamintintStream=StreamProvider().createStream(); StreamStringnewStream=intStream.asyncMap(delayMap); intstart=DateTime.now().millisecondsSinceEpoch; newStream.listen((e){ print(e); } FutureStringdelayMap(inte)async{ //模拟异步耗时: awaitFuture.delayed(constDuration(milliseconds:100)); returnnumMap[e]!; } 3. asyncExpand 操作符:异步展开StreamasyncExpand(Stream?convert(Tevent)) 从方法定义上可以看出:asyncExpand允许生成一个与原流不同泛型 E的新流,入参是一参回调函数,返回Stream?类型值。也就是说,该操作符允许会将元素转换成E 泛型的流。这可能让人很难理解,不过结合Expand可以将元素转化为Iterable,那异步情况转换成Stream也在情理之中。 如下示例中,每个数字会被转换成含有两个元素的StreamString,且每个String元素间延时50 ms模拟异步处理。感觉这个方法一旦派上用场,肯定有大用,我暂时没想出什么应用场景 ~ image.png测试代码11_asyncExpend.dart如下: Mapint,StringnumMap={ 0:"零",1:"壹",2:"贰",3:"叁",4:"肆", 5:"伍",6:"陆",7:"柒",8:"捌",9:"玖",10:"拾", }; Mapint,StringnumMap2={ 0:"0",1:"Ⅰ",2:"Ⅱ",3:"Ⅲ",4:"Ⅳ", 5:"Ⅴ",6:"Ⅵ",7:"Ⅶ",8:"Ⅷ",9:"Ⅸ",10:"Ⅹ", }; voidmain(){ StreamintintStream=StreamProvider().createStream(); StreamStringnewStream=intStream.asyncExpand(_streamExpand); newStream.listen((e){ print(e); } StreamString?_streamExpand(inte)async*{ awaitFuture.delayed(constDuration(milliseconds:50)); yieldnumMap[e]!; awaitFuture.delayed(constDuration(milliseconds:50)); yieldnumMap2[e]!; } 4. transform 操作符:转化流Streamtransform(StreamTransformerT,SstreamTransformer) 从方法定义上可以看出:transform允许生成一个与原流不相同泛型 S的新流,入参是StreamTransformerT, S类型对象。该方法可以通过自定义StreamTransformer来实现流从T类型到S类型的转化。 Dart中对StreamTransformer的实现比较少,只有第Ascii的编解码的流转换器。 image.png如下代码,简单通过 AsciiEncoder 看一下使用方式,代码12_transform.dart voidmain(){ StreamintintStream=StreamProvider().createStream(); StreamListintnewStream=intStream .map((inte)=e.toString()) .transform(constAsciiEncoder()); intstart=DateTime.now().millisecondsSinceEpoch; newStream.listen((e){ print("$e===${DateTime.now().millisecondsSinceEpoch-start}ms"); } 在bloc中对事件的转换时,可以看到transform的身影: image.png另外在rxdart库中有大量的StreamTransformer的使用,这个有机会再细说。 image.png能通过一个Stream生成另一个Stream内置的的就只这些方法。另外stream_transform库对Stream的流转换进行了一些拓展,虽然没有rxdart那么全面和系统,但基本可以满足日常需要。作为一个小巧的对rxdart的下位替代还是不错的。 三、 产生单元素的操作符除了转换成其他Stream之外,还有一些操作符可以产出一个元素,也就是Future对象。这小节来看一下如下的五个操作符: image.png1. reduce 操作符:迭代合并Futurereduce(Tcombine(Tprevious,Telement)) 从方法定义上可以看出:reduce返回与原流相同泛型的Future对象 ,入参是两参回调函数,返回泛型对象。该操作符的特点是:每当非首元素激活时,都会使用combine对前面结果p和当前元素e进行处理。 image.png断点调试一下就会非常清晰,测试代码见13_reduce.dart。比如这里combine操作是前后元素相加,第一次combine触发是在100ms时,9被激活。 此时的p是前一元素,也就是1,e是当前值9 返回结果是p+e = 10; image.png在200 ms第二个9激活时,会触发第二次combine,此时的p是上一次的结果值10,9是当前值9 返回结果是p+e = 19。以此类推,知道600ms时流结束,返回结果36。这样就实现了异步的元素累加效果。当然除了加法,你可以提供combine函数满足特点的需求。 image.png值得注意的是: reduce 只会返回最终的合并结果元素,也就是一个 Future 对象,但在元素激发的过程中会不断通过combine进行迭代合并结果。 2. fold 操作符:迭代合并转换Futurefold(SinitialValue,Scombine(Sprevious,Telement)) 从方法定义上可以看出:fold返回与原流不同泛型的Future对象 ,也就是说,除了迭代合并之外,fold还有类型转换的功能。 我们知道reduce是在第二个元素激活时,才和第一个元素开始迭代的。fold有两个入参,第一个是S类型的初始值,表示最初迭代的前元素,也就是说在第一个元素激活时就可以使用combine来处理。第二个入参是函数对象combine,它有将T泛型的元素,和结果S型元素,组合成新S型对象的能力。 image.png测试代码14_fold.dart如下:fold和reduce在语义上是折叠和减少的含义。从图中可以看出,它们都是对Stream元素按照规则进行合并操作,从输入和输出来看,是将多个元素合并成一个元素进行输出,这就是折叠和减少的形象体现。可以感受到reduce相当于一个搭配版的fold。 voidmain(){ StreamintintStream=StreamProvider().createStream(); FutureStringreduceResult= intStream.foldString("大写:",(Stringp,inte)=p+numMap[e]!); intstart=DateTime.now().millisecondsSinceEpoch; reduceResult.then((value){ print("$value===${DateTime.now().millisecondsSinceEpoch-start}ms"); } 3. drain 操作符: 排干Futuredrain([E?futureValue]) 从方法定义上可以看出:drain返回与原流不同泛型的Future对象 ,其中可以传入E?泛型的目标值。 这个方法非常形象,把Stream中的元素看作是水,drain的意思是把水排干。该方法的作用就返回Future对象作为Stream 结束或异常的信号,无视其中的元素触发。 image.png测试代码15_drain.dart如下: voidmain(){ StreamintintStream=StreamProvider().createStream(); FutureintreduceResult=intStream.drain(4); intstart=DateTime.now().millisecondsSinceEpoch; reduceResult.then((value){ print("流已结束===${DateTime.now().millisecondsSinceEpoch-start}ms"); } 4. every 操作符: 全匹配校验Futureboolevery(booltest(Telement)) 从方法定义上可以看出:every返回bool的Future对象 ,一个入参是用于校验元素的回调函数,返回bool值。 该方法用于校验是否流中的每个元素都满足校验条件,如果有一个元素不满足,在原流没有其他监听的情况下,会立刻终止时间线。如下测试所示,判断条件是e4,第一个元素即不满足,所以会立刻停止,后续的流元素也不会被激发。 image.png测试代码16_every.dart如下: voidmain(){ StreamintintStream=StreamProvider().createStream(); FutureboolreduceResult=intStream.every((e)=e intstart=DateTime.now().millisecondsSinceEpoch; reduceResult.then((value){ print("$value===${DateTime.now().millisecondsSinceEpoch-start}ms"); } 5. any 操作符: 任意匹配校验Futureboolany(booltest(Telement)) 从方法定义上可以看出:any它返回bool的Future对象 ,一个入参是用于校验元素的回调函数,返回bool值。 该方法用于校验是否流中的是否存在任一元素校验条件,如果有一个元素满足,在原流没有其他监听的情况下,会立刻终止时间线。如下测试所示,判断条件是e4,第二个元素满足条件,所以会立刻停止,返回true, 后续的流元素也不会被激发。 image.png测试代码17_any.dart如下: voidmain(){ StreamintintStream=StreamProvider().createStream(); FutureboolreduceResult=intStream.any((e)=e intstart=DateTime.now().millisecondsSinceEpoch; reduceResult.then((value){ print("$value===${DateTime.now().millisecondsSinceEpoch-start}ms"); } 四、其他操作符下面看一下剩下的7的相对简单的操作符: image.png1. singleWhere 操作符: 单元素查询FuturesingleWhere(booltest(Telement),{TorElse()?}) 从方法定义上可以看出:singleWhere它返回与原流相同泛型的Future对象 ,一个入参是用于校验元素的回调函数,返回bool值。 还有可选回调orElse, 用于在为查询到元素时提供默认值。 值得注意的是: 当未提供orElse,在流结束时没有匹配元素时,会出现No element异常。另外,如果在元素激活中发现第二个满足条件的元素,会抛出Too many elements异常,并终止后续元素的发出。也就是说,singleWhere需要保证有且仅有一个元素满足条件,所以在匹配成功后,时间线并不会停下。 image.png测试代码18_singleWhere.dart如下: voidmain(){ StreamintintStream=StreamProvider().createStream(); FutureintreduceResult= intStream.singleWhere((e)=e==4,orElse:()=-1); intstart=DateTime.now().millisecondsSinceEpoch; reduceResult.then((value){ print(value); } 2. firstWhere 操作符: 首匹配元素FuturefirstWhere(booltest(Telement),{TorElse()?}) firstWhere在入参和返回值上和singleWhere一致,但其功能上有些差异。当某个元素激活时,符合firstWhere的条件,时间线就会停止。也就是说firstWhere只负责得到第一个匹配的元素,拿到即止。另外,如果未提供orElse且未发现元素,出现No element异常。 image.png测试代码19_firstWhere.dart如下: voidmain(){ StreamintintStream=StreamProvider().createStream(); FutureintreduceResult=intStream.firstWhere((e)=e==4,orElse:()=-1); intstart=DateTime.now().millisecondsSinceEpoch; reduceResult.then((value){ print("$value===${DateTime.now().millisecondsSinceEpoch-start}ms"); } 3. lastWhere 操作符: 尾匹配元素FuturelastWhere(booltest(Telement),{TorElse()?}) lastWhere在入参和返回值上和firstWhere一致,从名称上也能看出它的作用是匹配最后一个满足条件的元素。因为在流停止前,都不能确定未来的元素是否满足条件,所以lastWhere的时间线会到流结束。这是它和firstWhere一个很大的差异。另外同样,如果未提供orElse且未发现元素,出现No element异常。 image.png测试代码20_lastWhere.dart如下: voidmain(){ StreamintintStream=StreamProvider().createStream(); FutureintreduceResult=intStream.lastWhere((e)=e==9,orElse:()=-1); intstart=DateTime.now().millisecondsSinceEpoch; reduceResult.then((value){ print("$value===${DateTime.now().millisecondsSinceEpoch-start}ms"); } 4. elementAt 操作符: 索引元素FutureelementAt(intindex) elementAt非常简单, 通过指定索引进行匹配,返回对应索引位的元素,通过Future回调。在指定索引为的元素激活后,时间线会停止。 image.png测试代码21_elementAt.dart如下: voidmain(){ StreamintintStream=StreamProvider().createStream(); FutureintreduceResult=intStream.elementAt(4); intstart=DateTime.now().millisecondsSinceEpoch; reduceResult.then((value){ print("$value===${DateTime.now().millisecondsSinceEpoch-start}ms"); } 5. forEach、contains 和 join剩下的三个比较简单,就一起说吧,forEach是对Stream的元素进行遍历操作,返回的Future对象会在流结束时完成,返回null。如下代码22_forEach.dart,每次元素激活时都会触发process方法进行处理: voidmain(){ StreamintintStream=StreamProvider().createStream(); FuturedynamicreduceResult=intStream.forEach(process); intstart=DateTime.now().millisecondsSinceEpoch; reduceResult.then((value){ print("$value===${DateTime.now().millisecondsSinceEpoch-start}ms"); } voidprocess(inte){ print(e); } contains传入Object?对象,用于校验流中是否包含元素,返回Futurebool对象,当检测到包含时,时间线会停止,返回true。测试代码如下23_contains.dart voidmain(){ StreamintintStream=StreamProvider().createStream(); FutureboolreduceResult=intStream.contains(4); intstart=DateTime.now().millisecondsSinceEpoch; reduceResult.then((value){ print("$value===${DateTime.now().millisecondsSinceEpoch-start}ms"); } join方法返回FutureString对象, 其作用是将元素通过入参符号进行连接成字符串。这和 List 中的join作用是类似的,只不过Stream支持异步处理join的过程。测试代码如下24_join.dart voidmain(){ StreamintintStream=StreamProvider().createStream(); FutureStringreduceResult=intStream.join(","); intstart=DateTime.now().millisecondsSinceEpoch; reduceResult.then((value){ print("$value===${DateTime.now().millisecondsSinceEpoch-start}ms"); } 这24个就是Stream类中内置的操作方法,可以满足绝大多数使用场景。但对于一些特殊场景,比如说debounce防抖 、throttle节流等,就无法支持。可以通过三方库进行拓展,另外 Flutter 中内置的StopWatch和Bloc中的Emitter都是比较有趣的东西。下一篇,也是本专题的最后一篇,将进一步探索Stream流转换的实现方式,来达到自己拓展 Stream 的目的。敬请期待 ~ 阅读原文

上一篇:2023-04-21_【招聘】寻找一个广告天才 下一篇:2025-01-04_瑞幸新周边被玩坏,雪王成最大“受害者”哈哈哈哈哈

TAG标签:

14
网站开发网络凭借多年的网站建设经验,坚持以“帮助中小企业实现网络营销化”为宗旨,累计为4000多家客户提供品质建站服务,得到了客户的一致好评。如果您有网站建设网站改版域名注册主机空间手机网站建设网站备案等方面的需求...
请立即点击咨询我们或拨打咨询热线:13245491521 13245491521 ,我们会详细为你一一解答你心中的疑难。
项目经理在线

相关阅读 更多>>

猜您喜欢更多>>

我们已经准备好了,你呢?
2022我们与您携手共赢,为您的企业营销保驾护航!

不达标就退款

高性价比建站

免费网站代备案

1对1原创设计服务

7×24小时售后支持

 

全国免费咨询:

13245491521

业务咨询:13245491521 / 13245491521

节假值班:13245491521()

联系地址:

Copyright © 2019-2025      ICP备案:沪ICP备19027192号-6 法律顾问:律师XXX支持

在线
客服

技术在线服务时间:9:00-20:00

在网站开发,您对接的直接是技术员,而非客服传话!

电话
咨询

13245491521
7*24小时客服热线

13245491521
项目经理手机

微信
咨询

加微信获取报价