/**
* 语法
* @param {Function} project - 投影函数
* @param {Function, Optional} concurrent - 并发订阅的最大输入Observable数量
* @param {Function, Optional} scheduler - 定时器调度
* @return {Function}
*/
expand(project, concurrent, scheduler);
与mergeMap
相似,不同的是expand
将投影函数也应用于新的Observable
,也就是说,每收到源发出的值时,立即输出这个值,同时调用投影函数返回一个新的Observable
并立即订阅它合并到输出中.如此一来,值映射为源,源又产生值,不断循环产生值,所以,一般会使用take
操作符限制个数或投影函数返回一个EMPTY
来终止递归
import { fromEvent, of } from "rxjs";
import { expand, mapTo, take, expand } from "rxjs/operators";
const source$ = of(1);
const result$ = source$.pipe(
expand(v => of(2 * v)),
take(5)
);
虽然原本source$
发出一个值就结束了,但通过expand
操作符的循环调用生产了一系列数据
注:本示例仅为展示效果,实际上result$
的发出的值(订阅源为of)都是同一时间发出的