expand

/**
 * 语法
 * @param {Function} project - 投影函数
 * @param {Function, Optional} concurrent - 并发订阅的最大输入Observable数量
 * @param {Function, Optional} scheduler - 定时器调度
 * @return {Function}
 */
expand(project, concurrent, scheduler);

mergeMap相似,不同的是expand将投影函数也应用于新的Observable,也就是说,每收到源发出的值时,立即输出这个值,同时调用投影函数返回一个新的Observable并立即订阅它合并到输出中.如此一来,值映射为源,源又产生值,不断循环产生值,所以,一般会使用take操作符限制个数或投影函数返回一个EMPTY来终止递归

示例

import { fromEvent, of } from "rxjs";
import { expand, mapTo, take, expand } from "rxjs/operators";

const source$ = of(1);
const result$ = source$.pipe(
  expand(v => of(2 * v)),
  take(5)
);
 

虽然原本source$发出一个值就结束了,但通过expand操作符的循环调用生产了一系列数据

注:本示例仅为展示效果,实际上result$的发出的值(订阅源为of)都是同一时间发出的