RxJS学习笔记

under rxjs

in tech

Published: 2019-12-02

该笔记基于RxJS 6

基础概念

RxJS Primer介绍了RxJS中常见概念和基本用法。

Demystifying RxJS Observable介绍了RxJS Observable的概念模型和简单实现。简单地说,RxJS Observable本质上是一个函数,接受Observer作为其参数,当其被调用时,它调用Observer的nexterrorcomplete方法处理数据,最后用类封装出API。这个模型有助于理解Observable为什么默认是“冷的”、“懒的”。

// 举例:根据此思想实现的`of`函数
interface IObserver {
  next: (data: any) => void
  error: (error: Error) => void
  complete: () => void
}

type IPartialObserver = Partial<IObserver>;

type ICleanupLogic = () => void

type IObservable = (observer: IPartialObserver) => ICleanupLogic

const patch = (partialObserver: IPartialObserver): IObserver => {
  return {
    next: partialObserver.next || (() => {}),
    error: partialObserver.error || (() => {}),
    complete: partialObserver.complete || (() => {})
  };
};

const of = (...args: any[]): IObservable => {
  return (observer: IPartialObserver) => {
    args.forEach((data) => patch(observer).next(data));
    return () => {
      console.log('Clean up');
    };
  }
};

// Create source
const source = of(1, 2, 3, 4);

// Subscribe
const unsub = source({
  next: (x) => console.log(x)
});

// Unsubscribe
unsub();

// 输出:
//
// 1
// 2
// 3
// 4
// Clean up

同样也可以创建一个“热的”Observable:

// interval和ConnectableObserver
const interval = (ms: number) => {
  return (observer: IPartialObserver) => {
    let value = 0;
    let timeoutHandle: number;
    const emit = () => {
      timeoutHandle = setTimeout(() => {
        patch(observer).next(value);
        value += 1;
        emit();
      }, ms);
    };
    emit();
    return () => {
      if (timeoutHandle != null) {
        clearTimeout(timeoutHandle);
      }
    };
  };
};

type IConnectable = IObservable & ((action: 'connect') => ICleanupLogic);

const makeConnectable = (source: IObservable) => {
  const partialObservers: IPartialObserver[] = [];
  return (actionOrObserver: 'connect' | IPartialObserver) => {
    if (actionOrObserver === 'connect') {
      return source({
        next: value => partialObservers.forEach(ob => patch(ob).next(value)),
        error: e => partialObservers.forEach(ob => patch(ob).error(e)),
        complete: () => partialObservers.forEach(ob => patch(ob).complete)
      });
    } else {
      const observer = actionOrObserver;
      if (partialObservers.indexOf(observer) < 0) {
        partialObservers.push(observer);
      }
      return () => {
        const index = partialObservers.indexOf(observer);
        if (index >= 0) {
          partialObservers.splice(index, 1);
        }
      }
    }
  };
};

const connectable = makeConnectable(interval(1000));
// Sub1准备处理数据,此时还没有数据产生,否则Sub2会错过前2个数据
const unsub1 = connectable({
  next: value => console.log(`Sub1: ${value}`)
});
setTimeout(() => {
  // 开始产生数据
  const disconnect = connectable('connect');
  setTimeout(() => {
    // Sub2会错过第1个数据
    const unsub2 = connectable({
      next: value => console.log(`Sub2: ${value}`)
    });
    setTimeout(() => {
      // Sub2停止处理数据,Sub1会继续处理后续数据
      unsub2();
      setTimeout(() => {
        // 停止产生数据,Sub1将不会收到任何数据
        disconnect();
        setTimeout(() => {
          // Sub1停止处理数据
          unsub1();
        }, 1100);
      }, 1100);
    }, 1100);
  }, 1100);
}, 1100);

// 输出:
//
// Sub1: 0
// Sub1: 1
// Sub2: 1
// Sub1: 2

Observable对象的“冷热”

Observable默认是“冷的”,这个“冷”有两种含义:

  1. 只有其.subscribe方法被调用时才会产生数据
  2. 其对每个.subscribe调用产生独立的数据

这篇文章详细描述了“冷”和“热”Observable的区别和使用场景,这篇文章中的示例代码基于RxJS 5编写。

publish运算符及其伙伴们

publish运算符返回一个ConnectableObserver对象,该对象在其.connect方法被调用后才开始产生数据,即使当前没有subscription,如果存在多个subscription,每个subscription都接收到来自同一数据流的相同数据。

import { of, zip, interval, merge, ConnectableObservable } from "rxjs";
import { map, publish, tap } from "rxjs/operators";

console.clear();

const obs1 = interval(1000);
const obs2 = of(1, 2, 3, 4);
const obs$ = zip(obs1, obs2).pipe(
  map(values => values[1])
);

const published = obs$
  .pipe(
    publish()
  )

setTimeout(() => {
  // 该subscription没有接收到第一个数据
  published.subscribe((x) => console.log(`Sub 1: ${x}`));
  setTimeout(() => {
    // 该subscription没有接收到第1个和第2个数据
    published.subscribe((x) => console.log(`Sub 2: ${x}`));
  }, 1100);
}, 1100);

// connect之后马上开始产生数据
published.connect();

// 输出
//
// Sub 1: 2
// Sub 1: 3
// Sub 2: 3
// Sub 1: 4
// Sub 2: 4

publish运算符还支持selector参数,这是一个函数,输入一个Observable对象,该对象的所有subscription接收同一个参数

import { of, zip, interval, merge, ConnectableObservable } from "rxjs";
import { map, publish, tap } from "rxjs/operators";

console.clear();

const obs1 = interval(1000);
const obs2 = of(1, 2);
const obs$ = zip(obs1, obs2).pipe(
  map(values => values[1])
);

const published = obs$
  .pipe(
    publish(
      $multicasted => merge(
        // 多次使用$multicasted其数据来自同一数据流
        $multicasted.pipe(tap(x => console.log(`Tap x1: ${x}`))),
        $multicasted.pipe(
          map(x => x * 10),
          tap(x => console.log(`Tap x10: ${x}`)),
        ),
      )
    )
  )

setTimeout(() => {
  published.subscribe((x) => console.log(`Sub 1: ${x}`));
}, 1100);

// 输出
//
// Tap x1: 1
// Sub 1: 1
// Tap x10: 10
// Sub 1: 10
// Tap x1: 2
// Sub 1: 2
// Tap x10: 20
// Sub 1: 20

另外,在传入selector参数时,publish运算符返回selector的返回值,这不一定是ConnectableObserver对象,也不一定是“热”的Observer:

import { of, zip, interval, merge, ConnectableObservable } from "rxjs";
import { map, publish, tap } from "rxjs/operators";

console.clear();

const obs1 = interval(1000);
const obs2 = of(1, 2);
const obs$ = zip(obs1, obs2).pipe(
  map(values => values[1])
);

const published = obs$
  .pipe(
    publish(
      $multicasted => $multicasted.pipe(
        tap(x => console.log(`Tap : ${x}`))
      )
    )
  )

// 这两个subscription都独自收到了所有的数据,因此published是“冷”Observer
setTimeout(() => {
  published.subscribe((x) => console.log(`Sub 1: ${x}`));
  setTimeout(() => {
    published.subscribe((x) => console.log(`Sub 2: ${x}`));
  }, 1100);
}, 1100);

if (typeof published.connect === 'function') {
  published.connect();
  console.log('Connected');
} else {
  // 这不是ConnectableObservable对象
  console.log('Not connectable');
}

// 输出:
//
// Not connectable
// Tap : 1
// Sub 1: 1
// Tap : 2
// Sub 1: 2
// Tap : 1
// Sub 2: 1
// Tap : 2
// Sub 2: 2

ConnectableObservable对象可以和refCount运算符串联使用,refCount运算符返回一个Observable对象,该对象的subscription数量>0时,refCount上游的ConnectableObservable对象连接到源,该对象的subscription数量减少到0时,其上游的ConnectableObservable对象从源断开连接。

import { interval } from 'rxjs';
import { tap, publish, refCount } from 'rxjs/operators';

const obs$ = interval(1000).pipe(
  tap(x => console.log(`Tap: ${x}`)),
  publish(),
  refCount(),
)

setTimeout(() => {
  // 开始产生数据
  const sub = obs$.subscribe();
  setTimeout(() => {
    // 停止产生数据
    sub.unsubscribe();
  }, 2100);
}, 1100);

// 输出
//
// Tap: 0
// Tap: 1

refCount返回的Observable对象的subscription数量减少到0之后,如果又有新的subscription,则前后两段subscription接收到的数据流是否相同取决于refCount上游的ConnectableObservable对象的源是“冷”还是“热”。

import {interval} from 'rxjs';
import {take, publish, refCount} from 'rxjs/operators';

const coldSource = interval(1000).pipe(
  take(3)
);
const hotSource = coldSource.pipe(
  publish()
);

const obsWithColdSource = coldSource.pipe(
  publish(),
  refCount(),
);

const obsWithHotSource = hotSource.pipe(
  publish(),
  refCount(),
);

const cs1 = obsWithColdSource.subscribe(
  x => console.log(`Cold 1: ${x}`)
);
const hs1 = obsWithHotSource.subscribe(
  x => console.log(`Hot 1: ${x}`)
);
setTimeout(() => {
  cs1.unsubscribe();
  // 输出新数据流中的内容
  const cs2 = obsWithColdSource.subscribe(
    x => console.log(`Cold 2: ${x}`)
  )
  hs1.unsubscribe();
  // 继续输出原先数据流中的内容
  const hs2 = obsWithHotSource.subscribe(
    x => console.log(`Hot 2: ${x}`)
  );
}, 1100);

hotSource.connect();

// 输出:
//
// Cold 1: 0
// Hot 1: 0
// Hot 2: 1
// Cold 2: 0
// Hot 2: 2
// Cold 2: 1
// Cold 2: 2

publish + refCount运算符的组合很常见,以至于有专门的share运算符来表示这种组合:

obs$.pipe(share())
// 等价于
obs$.pipe(publish(), refCount())

publishReplay(1)publishLast()这两种方式返回的Observable对象都会缓存上游数据流中的最新一份数据,不管当前是否有subscription存在,该Observable对象的新subscription会马上接收到其缓存的数据。两者的区别在于publishLast()等到上游的Observable对象完成,才缓存其完成前的最新一份数据,其subscription收到其缓存的数据后会马上收到其完成的信号。publishReplay(1)不必等到上游的Observable对象完成,其subscription收到其缓存的数据后,可能收到新的数据,也可能收到其完成的信号,取决于其上游的Observable对象是否完成。

import { interval } from 'rxjs';
import { take, publishLast, publishReplay, refCount } from 'rxjs/operators';

const source = interval(1000).pipe(
  take(3)
);

const lastObs = source.pipe(
  publishLast(),
  refCount(),
);

const replayObs = source.pipe(
  publishReplay(1),
  refCount(),
);

setTimeout(() => {
  // 只收到数据流中最后一个数据,然后马上complete
  lastObs.subscribe({
    next: x => console.log(`Last: ${x}`),
    complete: () => console.log('Last complete')
  });
  // 收到数据流中缓存的最新数据以及之后的数据,直至complete
  replayObs.subscribe({
    next: x => console.log(`Replay: ${x}`),
    complete: () => console.log('Replay complete')
  });
}, 2100);

// 开始产生数据
lastObs.subscribe();
replayObs.subscribe();

// 输出:
//
// Replay: 1
// Last: 2
// Last complete
// Replay: 2
// Replay complete

publishReplayrefCount的组合也很常见,因此有单独的运算符shareReplay来表示二者的组合。

tap运算符

tap运算符对上游数据流中的每一项数据调用指定的处理函数进行处理,然后将数据原封不动地发送给下游,处理函数的返回值(如果有的话)被丢弃,只为其副作用(如打印日志)。

上游的错误也会被原封不动地发送给下游,处理函数中发生的错误也会被发送到下游。无论是上游出现错误还是tap处理函数中发生了错误,数据流都会终止。

import { of, throwError, concat } from 'rxjs';
import { tap } from 'rxjs/operators';

const processStream = (source) => {
  source.pipe(
    tap(x => {
      if (typeof x === 'number') {
        console.log(`Tap, number: ${x}`);
      } else {
        // 该错误会被发送到下游,数据流终止
        throw new Error(`Tap: ${x} is not number`);
      }
    }, e => {
      // 处理上游的错误,下游仍会收到同样的错误,数据流终止
      console.log(`Tap, handle error: ${e}`)
    })
  ).subscribe({
    next: x => console.log(`Sub, value: ${x}`),
    error: e => console.log(`Sub, error: ${e}`),
  });
};

processStream(concat(
  of(1, 2),
  throwError('Source: error'),
  of(3, 'a'),
));

setTimeout(() => {
  processStream(concat(
    of(1, 2, 'a', 3),
    throwError('Source: error'),
  ));
}, 1000);

// 输出:
//
// Tap, number: 1
// Sub, value: 1
// Tap, number: 2
// Sub, value: 2
// Tap, handle error: Source: error
// Sub, error: Source: error
// Tap, number: 1
// Sub, value: 1
// Tap, number: 2
// Sub, value: 2
// Sub, error: Error: Tap: a is not number

Subject

Subject是实现了Observer接口的Observable,因为RxJS中控制数据流的方式就是调用Observer接口中的方法,这就给了用户直接控制数据流的方式。同时这也意味着Subject一定是“热的”,因为其Observer接口方法被调用后,只有Subject当前的subscription及其下游可能看到数据的变动。之后的subscription即使看到以前的数据,也是Subject中缓存的数据,不是实时的数据。

RxJS 6提供了4种Subject:

它们之间的表现差异可以查看Learn RxJS的这篇文章实例演示

BehaviorSubjectReplaySubject(1)的区别在于前者可以确保有数据,后者不提供这种担保。因此BehaviorSubject提供了value属性,可以同步地获取其最新的数据,而其它类型的Subject只能异步地获取其数据。

Subject经常用来充当用户操作和数据流之间的桥梁,虽然fromEvent也能做到这一点,但使用Subject可以避免暴露控件底层的DOM元素细节,在大多数情况下是更好的选择。