蛋黄派碎冰冰

rxjs学习记录

介绍rxjs的常用操作符和使用场景。

2021-03-13 18:53:00

rxjs 学习记录

基础概念

  • Observable (可观察对象): 表示一个概念,这个概念是一个可调用的未来值或事件的集合。
  • Observer (观察者): 一个回调函数的集合,它知道如何去监听由 Observable 提供的值。
  • Subscription (订阅): 表示 Observable 的执行,主要用于取消 Observable 的执行。
  • Operators (操作符): 采用函数式编程风格的纯函数 (pure function),使用像 map、filter、concat、flatMap 等这样的操作符来处理集合。
  • Subject (主体): 相当于 EventEmitter,并且是将值或事件多路推送给多个 Observer 的唯一方式。
  • Schedulers (调度器): 用来控制并发并且是中央集权的调度员,允许我们在发生计算时进行协调,例如 setTimeout 或 requestAnimationFrame 或其他。

常用操作符

  • of: 返回一个 observable,依次发出传入的值
  • switchMap: 会取消上次发出的 observable,通常和 interval 配合使用
  • mergeMap/flatMap: 与 map 的区别是能将返回结果打平,返回一个 Observable,可用于串联请求数据
  • merge: 将多个 Observable 合并为一个
  • mergeAll: 收集并订阅所有 Observable,map + mergeAll === mergeMap
  • skipWhile: 符合条件时跳过本次,但是一旦出现不满足条件的项,则会发出此之后的所有项
  • takeWhile:第一个返回 false 时完成
  • takeUntil: 传入一个 Observable,发出信号时时完成当前 Observable
  • startWith: 可以和 interval 一起使用,定义第一次触发的 delay 参数
  • take:指定第几次执行
  • filter: 过滤掉不符合条件的数据项,和 skipWhile 类似但是会一直生效

关于取消订阅

结束 observable 的方式

  1. Observable 发送完数据之后执行 Observable.onComplete()
  2. Observable 发生错误,执行 Observable.OnError()
  3. 订阅者主动取消,subscription.unsubscribe()

angular 中常见的 Observable 哪些需要取消订阅

  • 调用 httpClient 方法返回的 Observable 不需要取消订阅,因为发送数据之后,angular 会调用 Observable.onComplete()
  • 使用 AsyncPipe 不需要取消订阅
  • 通过 Subject,BehaviorSubject,AsyncSubject,ReplaySubject 在各个 Component 之间通信,需要调用 unsubscribe()取消订阅
  • 使用 rxjs 自带的操作符不需要取消订阅

应用实例

处理多个 http 请求

遇到需要调用多个接口返回数据并统一处理的时候,可以使用 forkJoin,类似 Promise.all()

  GetDetails(projectId: string) {
    const projects = this._http.get<any>(`/api/factory/projects/${projectId}`);
    const powers = this._http.get<any>(`/api/factory/projectpermissions/projects/${projectId}/userpermissions`);
    return forkJoin([projects, powers]).pipe(
      map(values => {
        const project = values[0];
        const power = values[1];
        return { ...project, power };
      })
    );
  }

事件节流

    this.ngZone.runOutsideAngular(() => {
      fromEvent(window, 'resize')
        .pipe(debounceTime(300), takeUntil(this.destroy$))
        .subscribe(() => {
          // ...
        });
		});

subscribe 嵌套问题

在使用 rxjs 的过程中,经常会碰到在一个 subscribe 函数里需要订阅另一个 Observable 的情况,subscribe 嵌套虽然也能达到目的,但是如果没有正确的取消订阅容易造成内存泄漏,并且写起来也不优雅。这种时候可以使用合并操作符来解决,如mergeMapcombineLatest等。

比如监听路由参数变化并使用路由参数调用某个接口获取数据,这种串联的可以使用mergeMap:

ngOnInit(): void {
    this.activatedRoute.queryParamMap
      .pipe(
        map((params) => params.get('id')),
        mergeMap((id) => fetchProject(id))
      )
      .subscribe((data) => {
        console.log('data', data);
      });
}

BehaviorSubject、ReplaySubject 和 AsyncSubject 区别

BehaviorSubject

可以设置初始值,并且订阅的时候会马上发送存储的最后一个值给订阅者

ReplaySubject

可以实现重播,new ReplaySubject(存储几次广播,过期时间)

AsyncSubject

一样可以存储数据,但是只会在 Observable 结束时发送数据

rxjs 调试

https://blog.angular-university.io/debug-rxjs/