rxjs学习笔记: 冷流/热流(Cold/Hot) Observables

RxJS 中 Observables 分为两种:Cold Observables 和 Hot Observables,这两个到底有什么区别呢?
更新于: 2021-12-19 12:57:28

定义

Cold Observables: 

  • 只有被 observers 订阅的时候,才会开始产生值
  • 是单播的,有多少个订阅就会生成多少个订阅实例,每个订阅都是从第一个产生的值开始接收值
  • 所以每个订阅接收到的值都是一样的

Hot Observables:

  • 不管有没有被订阅都会产生值
  • 是多播的,多个订阅共享同一个实例,是从订阅开始接受到值,每个订阅接收到的值是不同的
  • 取决于它们是从什么时候开始订阅(obs.connect)。

其它定义

  • Cold Observables: 在被订阅后运行,observables 序列仅在 subscribe 函数被调用后才会推送数据
  • Hot Observables: 在被订阅之前就已经开始产生数据,例如 mouse move 事件。

publish

  1. publish 会产生一个 ConnectableObservable 的对象,即有 connect 方法
  2. 两次 subscribe 都不会接入到从0开始的值,因为 connect 先执行,subscribe 行为后进行
  3. 等2者都 subscribe 之后,会接收到相同的 data
import { interval, Observable, Subject } from 'rxjs';
import { publish, refCount, share } from 'rxjs/operators';

const obs$ = interval(1000).pipe(publish());

setTimeout(() => {
  obs$.subscribe((data) => {
    console.log('1st subscriber:' + data);
  });
  setTimeout(() => {
    obs$.subscribe((data) => {
      console.log('2st subscriber:' + data);
    });
  }, 1100);
}, 2100);

obs$.connect();

refCount

  • 这个通常(其它情况暂时没有用过)会与 publish 配合使用
  • 这个会返回一个 Observable ,所以不需要  connect 方式调用
  • 而且,他会保证第一个 subscribe 从 0 开始接收到数据 (当前 example 中会产生0,1,2,3…这种序列)
import { interval, Observable, Subject } from 'rxjs';
import { publish, refCount, share } from 'rxjs/operators';

const obs$ = interval(1000).pipe(publish(), refCount());

setTimeout(() => {
  obs$.subscribe((data) => {
    console.log('1st subscriber:' + data);
  });
  setTimeout(() => {
    obs$.subscribe((data) => {
      console.log('2st subscriber:' + data);
    });
  }, 1100);
}, 2100);


// 这里不需要了
// obs$.connect();

share

// share = publish() + refCount()

import { interval, Observable, Subject } from 'rxjs';
import { publish, refCount, share } from 'rxjs/operators';

const obs$ = interval(1000).pipe(share());

setTimeout(() => {
  obs$.subscribe((data) => {
    console.log('1st subscriber:' + data);
  });
  setTimeout(() => {
    obs$.subscribe((data) => {
      console.log('2st subscriber:' + data);
    });
  }, 1100);
}, 2100);

shareReplay

在用 share() 的时候,第二个或者更后面的订阅者开始订阅的时候,都是共享第一订阅者的 Observables,比如在上面的例子中,第二个订阅比第一个订阅晚五秒再开始订阅,那么第二个订阅者从5开始接收值。但是实际情况中,如果我想让第二个订阅者也能拿到前面的值,那怎么办呢?用 shareReplay() 可以实现。

参考:

https://limeii.github.io/2019/07/rxjs-coldhot-observable/