RxJS相关介绍

陈忠杰

前言

我从很早时候就开始关注ReactX(响应式编程),那时候我是用RxJava实现的库,不过很遗憾我并没有学会如何使用,只是觉得这个东西很高大上,没有搞懂的原因有很多比如个人经验不够,没有很强的动力去研究,网上的资料很难懂过于碎片等。

鉴于之前的个人遗憾,我研究了一段时间RxJS,我决定写下这篇文章来对RxJS做一个相对详细的、针对前端开发者的介绍。

这篇文章的目的有两个: 1. 给对rxjs感兴趣的人一些入门的指引
2.

RxJS到底是什么

RxJS就是一个ReactiveX的一个语言实现库,除了js实现库还有很多比如RxJava RxJS Rx.NET RxScala RxClojure RxSwift...

RxJS is a library for reactive programming using Observables, to make it easier to compose asynchronous or callback-based code.

官方解释一次词:异步

可以把RxJS 想成处理非同步行为的Lodash

RxJS好在哪

JS处理异步有Callback Promise等为什么还要用RxJS?

Rx可以更简洁的处理异步行为。

异步操作很关键的一点是程序的简洁性,因为在调度过程比较复杂的情况下,异步代码经常会既难写也难被读懂。虽然JS目前有很多简洁的做法,RxJava 的优势也是简洁,但它的简洁的与众不同之处在于,随着程序逻辑变得越来越复杂,它依然能够保持简洁

举个例子

现在有一个需求,拿到文章列表http://gank.io/api/data/Android/10/1返回的下面的所有图片,并且在浏览器中展示这些图片。

一般的JS实现方法方法:

fetch('http://gank.io/api/data/Android/10/1').then(res => res.json()).then(data => {  
  const { results } = data;
  const mImages = [];
  const mResults = results.filter(x => !!x.images);
  for (let i = 0; i < mResults.length; i +=1 ) {
    const imgs = mResults[i].images;
    for (let j = 0; j < imgs.length; j += 1) {
      mImages.push(imgs[j])
    }
  }
  console.log(mImages)
  // render(mImages)
})

用RxJS实现的方法:

Rx.Observable.fromPromise(fetch('http://gank.io/api/data/Android/10/1').then(res => res.json()))  
.mergeMap(x => x.results)
.filter(x => !!x.images)
.map(x => x.images)
.concatAll()
.subscribe(
  v => { console.log( v) },
  e => { console.log( e ) },
  () => { console.log('complete') }
);

代码更简洁,可维护易读性强,链式调用就像一条流。

API 介绍和原理简析

Rx最主要的概念就是Observable,可以看做一个序列sequences组合异步行为事件等。可以把RxJS 想成处理非同步行为的Lodash。

可以理解为面向流的编程。

响应式编程的原理主要有以下两点:

1 设计模式观察者模式迭代器模式

  • 观察者模式 Observer Pattern 其实很常见,在许多API 的设计上都用了Observer Pattern 实现,最简单的例子就是DOM 对象的事件监听,被监听的DOM称为Observable,可以叫做可观察者也可以是主题,具体点击后的执行行为称为可观察者Observer。
    可观察者跟观察者是一对多的关系,每当观察者发布信息,观察者能主动订阅到这些信息,执行相应的代码逻辑。(=)

  • 迭代器模式 Iterator 是一个对象,它的就像是一个指针(pointer),指向一个数据结构并产生一个序列(sequence),这个序列会有数据结构中的所有元素(element)。

2 编程范式函数式跟响应式

函数是一等公民,可以作为函数的参数或返回值,以函数的思维解决问题,通过函数组合实现复杂的逻辑; 强调纯函数、无副作用。 只做运算不碰IO,适合高并发处理,不用担心锁的问题。(=)

API介绍

主要介绍Rx的Observable(流)相关API,其它Scheduler Subject暂不介绍。

1)Observable.create

这是创建流的最原始方法

console.log('start');  
const observable = Rx.Observable.create((subscriber) => {  
  subscriber.next('1');
  subscriber.next('2');
  subscriber.next('3');
  subscriber.complete();
  // throw new Error();
})

observable.subscribe(  
  v => { console.log(v) },
  e => { console.log(e) },
  () => { console.log('complete') }
);
console.log('end');  

上面代码会依次打印 start 1 2 3 complete end create传入一个函数subscriber或者说是observer为参数,并且有next complete error方法,返回值被订阅,里面的传入三个参数分别对应next error complete的调用。 虽然Rx主要处理的异步,但是上面的代码完全是同步执行的。

2)Observable.from等

一般很少用create去创建Observable,Observable提供大量的创建流的方法,比如from有点类似ES6的创建数组。

Rx.Observable.from([1, 2, 3, 4]) // 依次输出1 2 3 4的流  
Rx.Observable.of(1, 2, 3, 4) // 依次输出1 2 3 4的流  
Rx.Observable.fromEvent(btn, 'click')  // 监听btn点击的流  
const mPromise = new Promise((resolve, reject) => {  
  setTimeout(() => {
    resolve(100)
  }, 2000)
})
Rx.Observable.fromPromise(mPromise)  // promise点击的流  
Rx.Observable.interval(5000) // 每隔5000ms输出从0开始递增的值  

3)关于Marble Diagrams

Marble Diagrams类似一串弹珠一样来表示每条流的一个过程。

/**
 * 转换observable 
 * 流的转换
 * 流转换类似对序列的一些操作 map filter concat reduce等
 * 在流中这些方法称为Operator
 */
/**
 * '-' => 代表一段时间比如200ms
 * -----0----1----2----3-----... |
 * 
 *      .map(v => v * 2)
 * 
 * -----0----2----4----6-----... |
 */
Rx.Observable.interval(5000)  
.map(v => v * 2)
.subscribe(
  v => { console.log( v) },
  e => { console.log( e ) },
  () => { console.log('complete') }
);

数字表示发射出去的值 X表示错误 |表示完成 ()表示同步执行 map操作符,对原始流遍历计算生成新的流 。

4)take mapTo concatAll

/**
 * -----0----1----2----3----4----5----....
 * 
 *    take(5)
 * 
 * -----0----1----2----3----4|
 * 
 *    .mapTo(v => Rx.Observable.of(1, 2, 3))
 * 
 * -----o----o----o----o----o|
 *      \     \    \   \    \
 *   (123)| (123)| ... ... ...
 * 
 *    .concatAll()
 * 
 * -----(123)----(123)----(123)----(123)----(123)|
 */
Rx.Observable.interval(500)  
.take(5)
.mapTo(Rx.Observable.of(1, 2, 3))
.concatAll()
.subscribe(
  v => { console.log( v) },
  e => { console.log( e ) },
  () => { console.log('complete') }
);

take表示取前几个,mapTo表示转换成什么,concatAll表示把所有的值做concat处理生成新的流

5)takeUntil让Promise可以取消

/**
 * 让promise可以取消
 * ---------r|
 * -----e----e------e
 * 
 *  takeUntil
 * 
 * -----|
 */
const source =  
  Rx.Observable
  .from(fetch(URL).then(res => res.json()))
  .takeUntil(Rx.Observable.fromEvent(cancelBtn, 'click'));
source.subscribe(  
   v => { console.log( v) },
   e => { console.log( e ) },
   () => { console.log('complete') }
 );

点击按钮后可以取消,promise对应的方法将不执行。

6)merge concat

/**
 * cancat merge
 * ----0---1---2---3|
 * (123)|
 *  concat
 * ----0---1---2---3(123)|
 * ---0--1--3|
 *    map(x => x + 100))
 * ---100--101--102|
 *  concat
 * ----0---1---2---3(123)--100--101--102|
 */
Rx.Observable.interval(400).take(4)  
  .concat(Rx.Observable.of(1, 2, 3))
  .concat(Rx.Observable.interval(200).take(3).map(x => x + 100))
  .subscribe(
    v => { console.log( v) },
    e => { console.log( e ) },
    () => { console.log('complete') }
  );
// merge
Rx.Observable.interval(400).take(4)  
  .merge(Rx.Observable.of(1, 2, 3))
  .merge(Rx.Observable.interval(300).take(3).map(x => x + 100))
  .subscribe(
    v => { console.log( v) },
    e => { console.log( e ) },
    () => { console.log('complete') }
  );

这里画的可能有问题。 concat必须等待前面的执行完,下一个concat进去的流才会执行;而merge是直接合并跟时间无关。

7)throttleTime debounceTime

这两个操作符都是用来过滤流的,一个是控制流的闸阀,一个是必须等待一定时间才能接受的的操作。 比如一个按钮点击设置了throttleTime 500ms 你一直点击这个按钮,得到的结果是每个500ms获取一个点击的发出的值; 比如设置了debounceTime 500ms你一直点击这个按钮只有第一次才有效果,你必须在点击之间的间隔大于500ms 才能把这个点击算上

RxJS的实践