Angular 应用架构指南(二)
在本章中,我们介绍了异步代码的使用情况及其目的。很明显,随着异步代码的增多,其可读性和可维护性会越来越低,从而产生了诸如回调地狱等模式。本章描述了处理这些问题的几种技术。改变你的编码风格是一种方法。查看诸如承诺(promises)之类的结构,尤其是在与 async/await 结合使用时,是另一种方法。使用 async/await 意味着我们突然获得了一种可以类比为异步代码中的顺序的东西。我们尽量
原文:
zh.annas-archive.org/md5/e32b0f23109b0daf0acc38b5411ab61e
译者:飞龙
第三章:异步编程
要了解异步代码是什么,我们首先来了解一下同步代码是什么。使用同步代码,一个语句在另一个语句之后执行。代码是可预测的;你知道会发生什么以及何时发生。这是因为你可以像这样从上到下阅读代码:
print('a')
print('b')
print('c')
// output
a, b, c
现在,使用异步代码,你将失去同步代码提供的所有美好的可预测性。事实上,关于异步代码,你了解的很少,除了它最终会执行完成。所以,异步代码,或者称为 async 代码,看起来更像是这样的:
asyncPrint('a')
asyncPrint('b')
asyncPrint('c')
// output
c, b, a
如你所见,一个语句完成的顺序并不是由该语句在代码中出现的顺序决定的。相反,有一个时间元素参与其中,它决定了何时一个语句完成了它的运行过程。
异步代码在事件循环中运行。这意味着 async 代码按照以下顺序运行:
-
运行异步代码
-
等待响应准备好,然后触发中断
-
运行事件处理器
在这里需要强调的一个重要事情是,异步代码是非阻塞的——其他操作可以在异步代码运行时进行。因此,异步代码是处理 I/O、长时间运行的任务和网络请求的好候选。
在本章中,我们将:
-
了解异步编程是什么以及它与同步编程有何不同
-
解释回调模型
-
描述承诺以及它们是如何完全改变我们编写异步代码的方式
-
看看其他存在的异步库以及它们应该在什么情况下使用
-
发现新的标准 async/await
回调模式
以前,我们描述了当你在作为开发者的日常生活中遇到异步和同步代码时,它们看起来是什么样子。可能有趣的是了解操作系统如何看待此类代码以及它是如何处理它们的。操作系统通过以下概念来处理异步代码:
-
事件,这些是向操作系统发出信号的消息,表明已经发生某种类型动作
-
事件处理器,这是当事件发生时应该运行的代码片段
-
事件队列,这是放置所有事件及其事件处理器的位置,等待执行
让我们在以下图中说明这种流程:
在前面的图像中,我们可以看到事件是如何从一个事件队列中被选取的。在这里,当分发器告诉它时,点击事件会被执行,并且相应的事件处理程序被执行。事件处理程序运行事件处理程序中的相关代码行,当完成时,将控制权交还给分发器。之后,队列中的下一个事件开始新一轮的循环。这就是在单线程系统中通常的样子,其中一次只执行一个事件处理程序。也存在多线程系统。在多线程系统中,存在多个线程。这意味着我们可能同时执行几个事件处理程序。但尽管有多个线程,只有一个活动线程。系统本身仍然是单线程的。困惑吗?这里的关键是:多线程系统中的线程是协作的,这意味着它们可以被中断。这意味着在完成一个工作单元后,活动线程会被改变。这产生了一种效果,似乎所有事情都在并行发生。让我们为了清晰起见来举例说明:
在这里,我们可以看到一段代码被分成了不同的区域。当某个区域被执行后,它将控制权交给下一个线程,这个线程成为新的活动线程。一旦该线程通过其某个区域执行了代码,它将控制权交给下一个线程。随着多个 CPU 的出现,我们能够从感知的并行性(之前已描述)转变为实际的并行执行。在这种现实中,每个 CPU 存在一个线程,因此我们有多条活动线程。
这些是您可以执行异步代码的不同方式。我们将关注单线程执行,因为这是 JavaScript 和网页中实现的方式。
网页上的回调模式
处理它的方法是附加函数到未来的事件上。当事件发生时,我们附加的函数被执行。一个例子是XMLHttpRequest
,它看起来像这样:
const xhr = new XMLHttpRequest();
xhr.open('GET','/path', true);
xhr.onload = () => {
// run me when the request is finished
}
xhr.send(null);
在这里,我们可以看到除了xhr.onload
之外的所有行都是同步执行的。将函数附加到onload
是同步的,但是运行onload
指向的函数不会发生,直到请求完成。我们也可以定义其他事件,例如onreadystatechange
,并将一个函数附加到它上:
xhr.onreadystatechange = () => {}
由于网页是单线程的,这就是我们处理异步代码的方式。onreadystatechange
对象及其回调被注册到操作系统中。一旦异步部分完成,操作系统会被一个事件分发唤醒。之后,回调被调用。
Node.js 中的回调模式
Node.js 是单线程的,就像网络一样。为了处理长时间运行的操作,它也使用回调模式。Node.js 中的回调模式有一些更详细的细节,可以描述为具有以下属性:
-
只有一个函数来处理成功和错误响应
-
回调只被调用一次
-
函数是调用函数的最后一个参数
-
回调包含参数的错误和结果,顺序排列,这也被称为错误优先
现在我们来展示调用代码的样子,其中回调作为函数的最后一个参数提供:
callAsync('1',2, (error, response) => {
if(error) {
console.error(error);
} else {
console.log('response', response);
// do something with the response
}
})
这段代码满足了模式所规定的所有属性,即函数调用中的最后一个参数是回调函数。此外,回调函数将错误作为第一个参数,将响应作为第二个参数。此外,回调函数的主体首先检查是否存在错误,然后在没有错误的情况下处理我们得到的响应。
作为参考,让我们也看看callAsync()
是如何实现的:
function callAsync(param, param2, fn) {
setTimeout(() => {
if(param > param2) {
fn(null, 'success');
} else {
fn('error', null);
}
}
之前的实现只是一个原型,但它确实展示了两个重要的方面。一方面是setTimeout()
函数所代表的时间因素,以及函数需要时间才能完成的事实。
另一方面是我们的第三个参数fn()
,它的调用方式不同。当一切顺利时,我们调用fn(null, 'success')
;当发生错误时,我们调用fn('error', null)
。我们调用fn()
的方式就是如何传达成功和失败。
结构化异步代码的问题——回调地狱
在上一节中,我们介绍了回调模式作为处理异步调用的方法。该模式提供了一种处理此类调用的结构化方式,因为我们总可以在其方法签名中知道期望什么;错误是第一个参数,第二个参数是响应,依此类推。但是,该模式确实有其缺点。这些缺点可能一开始并不明显,因为你可能只是像这样调用代码:
openFile('filename', (err, content) => {
console.log( content );
statement4;
statement5;
})
statement2;
statement3
我们在这里看到的是如何调用openFile()
方法。一旦它运行完成,回调就会被调用,并在回调内部,我们继续调用statement4
和statement5
。
从可读性的角度来看,这看起来是不错的。但是,当你需要连续进行多个异步调用,并且这些调用相互依赖时,问题就出现了。可能首先需要登录到系统中,然后获取其他数据,或者可能意味着你需要进行一个调用,以确定哪些数据需要作为下一个调用的输入,就像这个例子中一样:
getData('url', (err, data) => {
getMoreData('newurl/'+ data.id, (moreData) => {
getEvenMoreData('moreurl/'+ moreData.id, () => {
console.log('done here');
})
})
})
我们在这里看到的一个反模式是表格化和可读性的丧失。对于每一次调用,我们看到代码缩进了一步;它是嵌套的。当我们有三次这样的调用时,我们可以看到代码看起来并不美观;它是可读的,但并不那么吸引人。另一个缺点是,从技术上讲,正确地放置括号和大括号也是一项技术挑战,我们可能会在放置这些符号时遇到困难。如果在其中加入几个if...else
语句,你将很难匹配所有符号。
有几种方法可以解决这个问题:
-
保持代码简洁,并使用命名函数而不是匿名函数
-
减少认知负担,并将函数移动到它们自己的模块中
-
使用更高级的结构,例如 Promise、生成器和 ES7 及其他异步库中的异步函数
保持代码简洁是关于给我们的匿名函数一个专有的名字并将它们拆分成自己的函数;这样我们的代码看起来会是这样:
function getEvenMoreDataCallback(err, evenMoreData) {
console.log('done here');
}
function getMoreDataCallback(err, moreData){
getEvenMoreData('moreurl/'+ moreData.id, getEvenMoreDataCallback);
}
function getDataCallback(err, data){
getMoreData('newurl/'+ data.id, getMoreDataCallback);
}
getData('url', getDataCallback)
这清楚地扁平化了代码,并使其更容易阅读。它还消除了正确匹配大括号的必要性,因为函数只有一层深度。
这将代码部分移除,但仍然存在认知负担,因为我们不得不处理三个函数定义和一个函数调用。我们可以将它们移动到它们自己的专用模块中,如下所示:
let getDataCallback = require('./datacallback');
getData('url', getDataCallback);
对于其他方法,它看起来会是这样:
function getEvenMoreDataCallback(err, evenMoreData) {
console.log('done here');
}
以及这个:
var getEvenMoreDataCallback = require('./evenmorecallback');
function getMoreDataCallback(err, moreData){
getEvenMoreData('moreurl/'+ moreData.id, getEvenMoreDataCallback);
}
现在我们已经移除了很多认知代码。在这个例子中,它可能没有物有所值,因为方法并不长,但想象一下,如果方法有 30 或 40 行长;将它们放入一个单独的模块中会更有意义。
第三个选择是使用更高级的结构来处理这类代码。我们将在接下来的章节中讨论这些内容。
Promises
Promises 的出现是对上一节中描述的回调地狱问题的一种回应。它们有着相当长的历史,可以追溯到 20 世纪 80 年代初,当时传奇人物芭芭拉·利斯科夫提出了Promise
这个术语。Promise
的想法是将异步代码扁平化。一个Promise
据说有以下状态:
-
Pending:这意味着它尚未决定,或者数据尚未可用
-
Fulfilled:数据已返回
-
Rejected:操作过程中发生了错误
Thenables
有一个重要的事情要知道,Promise
会立即返回,但结果不会立即可用。Promise 也被称为thenables,因为一旦数据接收,你需要使用其then()
方法注册一个回调,如下所示:
const promise = new Promise((resolve, reject) => {
// either call resolve() if we have a success or reject() if it fails
});
// the 'promise' variable points to a construct
// that will eventually contain a value
promise((data) => { // <- registering a callback on then()
// our data has arrived at this point
})
在前面的代码中,我们展示了如何创建一个 Promise 以及如何使用then()
方法注册它。promise
变量实例包含一个会立即返回的结构。then()
方法中的回调会在数据准备好供我们使用时被调用。从这个意义上说,Promise
类似于回调模式。
Promise
实际上只是围绕异步构造的一个包装。
简而言之,要使用 Promises,我们需要:
-
创建
promise
并确保在数据到达或发生错误时调用resolve()
或reject()
-
使用其
then()
方法注册回调 -
注册一个回调来处理错误,因为这是负责任的做法
要使用承诺,我们需要实例化它并使其成为方法的一部分,如下所示:
function getData() {
return new Promise((resolve, reject) => {
setTimeout(() => {
resolve('data');
},1000);
})
}
我们看到,当我们实例化一个Promise
对象时,它的构造函数接受两个参数,resolve
和reject
。让我们将这个与我们所知的承诺可以有的状态联系起来,即挂起、已解决和拒绝。当getData()
最初被调用时,返回的promise
处于pending
状态。两秒后,承诺将得到解决,因为我们调用了resolve()
方法。让我们看看getMoreData()
方法,看看我们如何将Promise
置于拒绝状态:
function getMoreData() {
return new Promise((resolve, reject) => {
setTimeout(() => {
reject('error from more data')
},1000);
})
}
在这种情况下,我们在两秒后调用reject()
方法。这将使承诺处于拒绝状态。要从promise
实例获取数据,我们需要在其上调用then()
方法,如下所示:
promise.then( successCallback, <optional error call back> );
承诺的then()
方法接受两个回调:第一个回调是数据回调,第二个回调是可选的错误回调。让我们在我们的定义的getData()
方法中使用它,如下所示:
getData().then( (data) => {
console.log('data', data);
})
很明显,我们不能直接在方法上调用getData()
来获取数据,但我们需要在它返回的promise
上调用.then()
。一旦我们提供了一个回调,我们就能获取数据并按我们的意愿处理它。
处理拒绝的承诺
对于拒绝的承诺,我们有两种处理方式:我们可以在.then()
方法中使用第二个回调,或者我们可以使用.catch()
方法。以下是我们可以使用的两个版本:
// alternative 1
getMoreData().then(
data => {
console.log('data',data);
},
err => {
console.log('error',err);
}
)
// alternative 2
getMoreData().then(data => {
console.log('data', data);
})
.catch((err) => {
console.log('error', err);
});
在第一种情况下,我们在then()
方法中添加了第二个回调,而在第二种版本中,我们将一个catch()
方法链接到现有的then()
方法上。它们是等效的,所以你可以使用任何一个,但只能使用一个。
链式操作 – 处理多个承诺
承诺最强大的功能在于其链式调用的能力,从而使代码看起来是同步的。一个链式调用看起来像这样:
getData()
.then(getMoreData)
.then(getEvenMoreData)
.catch(handleError)
这使得代码非常易于阅读。你可以知道事情发生的顺序;即,getData()
之后是getMoreData()
,然后是getEvenMoreData()
。我们不仅能够按照我们想要的顺序运行方法,而且还可以访问前一个promise
中的数据,如下所示:
function getData() {
return new Promise((resolve, reject) => {
setTimeout(() => {
resolve('data');
})
})
}
function getMoreData(data) { // data is from getData
return new Promise((resolve, reject) => {
setTimeout(() => {
resolve('data');
})
})
}
getData().then(getMoreData)
我们还可以看到如何将.catch()
方法添加到末尾以处理错误。链式承诺的性质是这样的,错误会一直传播到catch()
方法。
然而,在特定级别处理错误是完全可能的,如下所示:
getData()
.then(getMoreData, (err) => {}) // local error handler
.then(getEvenMoreData )
.then(data => {} )
.catch(handleError ) // global error handler
现在我们有两个错误处理器,一个在本地级别,即.then(getMoreData, (err) => {})
作为then()
方法的第二个参数。这与只在调用链底部添加.catch()
的效果不同。如果只有底部的.catch()
方法存在,那么链路就会短路。目前,当getMoreData()
方法拒绝promise
时,当前链路将调用本地错误函数、.catch()
方法和最后一个.then()
方法。然而,如果promise
被拒绝,最后一个.then()
方法中的数据参数将不会被设置。链式调用非常强大,它给我们带来了以下功能:
-
按顺序调用异步方法
-
将之前解析的承诺(promise)数据作为我们方法的输入
-
能够全局处理错误以及针对每个承诺(promise)处理错误,尽管结果可能不同
异步库
到目前为止,我们讨论了回调模式以及使用承诺如何给你的代码带来急需的秩序感。编写异步代码不仅仅是停止自己陷入混乱的代码中,它还关乎生产力。有些库可以让你在真正致力于直接处理异步编程时变得非常高效。在撰写本文时,最知名的库包括:
-
Async:这是最广为人知的。它可以在
caolan.github.io/async/
找到。 -
步骤:这个库将自己定位为一个可以帮助你进行串行执行、并行执行,并承诺使错误处理变得轻松的库。它可以在
github.com/creationix/step
找到。 -
Node fibers:这是一个与前面两个库非常不同的库,可以将其视为为 JavaScript 带来轻量级线程(light-thread)支持。它可以在
github.com/laverdet/node-fibers
找到。
Async 库
我们已经展示了回调和承诺。我们从回调的问题,即回调地狱,以及承诺是如何解决这个问题的。然而,有一个名为async的库,它是回调和承诺的替代品。那么,我们为什么要使用 async 库呢?async 库旨在在异步上下文中操作集合。库的作者自己是这样说的:
Async 是一个提供直接、强大函数的实用模块,用于处理异步 JavaScript
因此,如果你的异步代码开始变得难以管理,而你又发现自己想要操作异步集合而不是零散的几个调用,这个库可能适合你。在大多数情况下,承诺(promises)可能是你想要的。
Async 库提供了许多有用的功能。Async 库的思路是让你的代码看起来更好,这样你就可以专注于构建事物,而不是挣扎着去理解代码在做什么。
要使用它,只需通过输入以下命令进行安装:
npm install async --save
async.map()
让我们看看一个例子,其中async
大放异彩,能够移除不必要的代码。以下示例展示了我们如何调用fs.stat()
方法,该方法将异步地告诉我们关于文件的信息,例如其大小、创建时间等。一个普通的调用fs.stat()
看起来像这样:
// async-demo/app.js
const fs = require('fs');
const basePath = __dirname + '/files/';
const files = ['file1.txt', 'file2.txt', 'file3.txt'];
fs.stat(basePath + 'file1.txt', (err, result) => {
if(err) {
console.log('err');
} else {
const { size, birthtime } = result;
console.log('Size',size);
console.log('Created', birthtime);
}
});
如果我们想要进行多个电话并且想要知道几个文件的状态呢?一次发送多个电话——每个文件一个电话——意味着我们的电话会在不同时间返回,这取决于文件的大小。如果我们不介意在所有电话都返回之前不关心响应怎么办?这正是异步库能帮助我们解决的问题。这里有一个map()
函数,它允许我们同时发送多个电话,并且只有在所有电话都完成后才返回,如下所示:
// app-map.js
const async = require('async');
const fs = require('fs');
const basePath = __dirname + '/files/';
const files = ['file1.txt', 'file2.txt', 'file3.txt'];
const mappedFiles = files.map( f => basePath + f);
async.map(mappedFiles, fs.stat,(err, results) => {
if(err) {
console.log('error', err);
}else {
// looping through our results array
results.forEach(({size, birthtime}) => {
console.log('Size',size);
console.log('Created', birthtime);
});
}
});
那么,是什么让它如此出色呢?首先,我们的代码旨在找出每个文件的一些文件统计信息。让我们看看没有异步库的生活会是什么样子:
// example of running a callback method in a forEach()
['file1','file2','file3'].forEach( f => {
var states = [];
fs.stat(f, (err, stat) => {
console.log('stat', stat);
states.push( stat );
})
})
我们可以看到,我们需要引入一个状态数组来收集所有结果,即使这样,我们可能还需要添加一些逻辑来知道我们是否处于数组的最后一个项目,因此可以根据我们现在已经拥有所有结果的事实开始处理。
因此,从所有这些中我们可以得出的结论是,async.map()
帮助我们调用一系列异步调用到一个调用中,使我们能够在每个电话完成后处理所有结果,而不是在之前。
async.parallel()
这个库中另一个重要的方法是async.parallel()
,它允许我们并行发送很多语句,如下所示:
// async-demo/app-parallell.js
const async = require('async');
function getMessages(fn) {
setTimeout(() => {
fn(null,['mess1', 'mess2', 'mess3']);
}, 3000);
}
function getOrders(fn) {
setTimeout(() => {
fn(null, ['order1', 'order2', 'order3']);
}, 5000);
}
async.parallel([
getMessages,
getOrders
],(error, results) => {
if(error) {
console.log('error', error);
} else {
console.log('results', results);
}
});
从前面的代码中我们可以看到,它允许我们并行启动多个电话。我们在提供给async.parallell([])
方法的数组中指定电话。从你在这里可以辨别出的信息来看,我们提供的函数接受一个参数,fn
,它是回调函数,例如getOrders(fn) {}
。
async.series()
另一种情况是,你可能希望电话一个接一个地发生。为此,我们得到了async.series()
方法,我们这样调用它:
async.series([
function login(){}
function loadUserDetails() {}
],(result) => {})
以这种方式运行代码保证了代码的运行顺序,同时也确保了如果发生错误,调用链不会继续。
这个库中有许多有用的函数,我们强烈建议您查看caolan.github.io/async/docs.html
上的文档。
Async/await
async/await 是 ECMAScript 标准 ES2017 的一部分。这个构造在处理异步操作时提供了同步的体验。目前,您需要像 Babel 这样的工具在前端运行它,但对于 Node.js 来说,在版本>= 8 上运行它就足够了。Async/await 在后台通过一个称为生成器(generators)的概念来实现。生成器是可以在之后退出和重新进入的函数。要了解更多关于生成器的信息,请查看以下链接:developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/function*
。这是处理异步代码的新方法,它确实有助于使我们的代码看起来更同步,从而减少了与异步编程相关的认知痛苦。
让我们回顾一下我们的旧例子,说明回调地狱的情况:
getData()
.then( data => {
getMoreData(moreData => {
getEvenMoreData(() => {
// do stuff
})
})
});
我们清楚地看到了以这种方式调用代码的缺点。async/await 扮演了救世主的角色,因为它确实清理了这里的事情。然而,让我们首先解释不同的部分以及我们如何努力重构先前的例子。使用 async/await 的方法通常是最高级的方法;在链式async
方法中的最高级意味着它是第一个被调用的方法。在先前的例子中,这将是由getData()
方法。让我们将getData()
转换成如下形式:
async function getData() {
// more to come
}
在这一点上,我们需要意识到的是,我们需要将另外两个方法getMoreData()
和getEvenMoreData()
重构为返回 Promise 而不是基于回调的方法。为什么你会这样问呢?好吧,当我们使用 async/await 时,我们希望以某种方式调用代码。正如之前所暗示的,我们将在getData()
函数前使用关键字async
。更重要的是,我们希望以下方式使用关键字await
:
async function getData() {
let data = await getMoreData();
let otherData = await getEvenMoreData();
}
看看前面的代码,我们意识到我们的现有方法签名存在不匹配。不匹配不是我们需要将我们的实现切换为基于 Promise 的主要原因。真正的原因是await
关键字能够展开 Promise,但不能展开基于回调的方法。展开意味着它可以从我们的异步操作的结果中取出值并返回。
在将它们转换为 Promise 之前,我们的方法当前的状态是:
function getMoreData(cb) {
setTimeout(() => cb('more data'), 3000);
}
function getEvenMoreData(cb) {
setTimeout( () => cb('even more data'), 3000 );
}
将它们转换为基于 Promise 的方法意味着它们现在应该看起来像这样:
function getMoreData() {
return new Promise((resolve, reject) => {
setTimeout(() => resolve('more data'))
});
}
function getEvenMoreData() {
return new Promise((resolve, reject) => {
setTimeout(() => resolve('more data'))
});
}
在这一点上,我们准备返回到getData()
方法并添加缺失的代码。当我们调用getMoreData()
和getEvenMoreData()
时,我们现在可以使用关键字await
等待 Promise 解析,如下所示:
async function getData() {
var data = await Promise.resolve('data');
var moreData = await getMoreData(data);
var evenMoreData = await getEvenMoreData(moreData);
return evenMoreData;
}
现在我们得到的是完全同步看起来似的代码。那么我们如何从getData()
中检索数据呢?很简单——它返回一个promise
。因此,我们可以这样调用它:
getData().then((result) => console.log('result', result) );
async/await 真的是一个强大的结构,因为它消除了由回调地狱引起的许多认知痛苦,并进一步改进了承诺的概念。
摘要
在本章中,我们介绍了异步代码的使用情况及其目的。很明显,随着异步代码的增多,其可读性和可维护性会越来越低,从而产生了诸如回调地狱等模式。本章描述了处理这些问题的几种技术。改变你的编码风格是一种方法。查看诸如承诺(promises)之类的结构,尤其是在与 async/await 结合使用时,是另一种方法。使用 async/await 意味着我们突然获得了一种可以类比为异步代码中的顺序的东西。我们尽量保持尽可能不依赖框架,因为理解所有提到的概念而不将它们与特定应用框架的概念混淆是很重要的。尽管如此,可以说:Angular 允许你使用任何你想要的异步方法来组织你的代码。例如,进行 HTTP 调用时,使用的是与 RxJS 库紧密相关的 Angular 服务,但你也可以自由地使用基于承诺的样式,如fetch()
API。使用 Babel 和它支持的转换器,也可以利用 Angular 中的 async/await。
本章为异步编程奠定了基础。下一章将通过介绍函数式响应式编程(FRP)的概念来在此基础上进行构建。它更多地涉及如何处理数据似乎在它想出现的时候出现的事实。尽管听起来很混乱,但如果我们将数据视为流,即使是这种情况也可以被建模,以创建一种结构和秩序的感觉。更多内容将在下一章中介绍。
第四章:函数式响应式编程
根据维基百科,函数式响应式编程(FRP)是一种响应式编程的编程范式,它使用函数式编程的构建块。好吧,这听起来很复杂,但它究竟意味着什么呢?为了理解整个句子,我们需要将其分解一下。让我们尝试定义以下内容:
-
编程范式是一个涵盖性理论,或工作方式,它围绕程序应该如何组织和结构化。面向对象编程和函数式编程是编程范式的例子。
-
响应式编程简而言之,是使用异步数据流的编程。异步数据流是值可以在任何时间点到达的数据流。
-
函数式编程是一种编程范式,它采用更数学化的方法,将函数调用视为数学计算,从而避免改变状态或处理可变数据。
因此,简而言之,我们的维基百科定义意味着我们有一种函数式编程方法来处理可能在任何时间点到达的值。这实际上并没有太多意义,但希望到本章结束时,事情会有所澄清。
在本章中,我们将学习以下内容:
-
声明式编程与命令式编程之间的区别
-
异步数据流
-
如何操作这些流
函数式编程与命令式编程的比较
我们将讨论和描述两种不同的编程风格:命令式编程和声明式编程。函数式编程是声明式编程的一个子集。解释声明式编程的最简单方法就是将其与它的对立面——命令式编程——进行比较。命令式编程关注的是程序应该如何达到其结果。另一方面,函数式编程是一种声明式编程范式,这意味着它的重点是程序应该完成什么,或者说“什么”。这是一个重要的区别。
命令式编程与声明式编程的比较
命令式编程由帮助改变程序状态的语句组成。如前所述,它关注的是“如何”而不是“什么”。让我们看看代码中这可能是什么样子,以便使其更清晰:
let sum = 0;
function updateSum(records) {
for( let i = 0; i< records.length; i++ ) {
sum += records[i];
}
}
updateSum([1,2,3,4]);
上述代码具有以下效果:当我们调用updateSum()
时,变量sum
会被更新。我们还可以看到,该函数非常明确地说明了求和应该“如何”发生。
声明式编程更关注“什么”要实现。很容易将其视为更高级的,因为你说出了你想要实现什么。让我们看看一些 SQL 代码。SQL 是一种声明式编程语言:
// content of table 'orderitem'
-------------------
id price productId
-------------------
1 100 1
1 50 11
SELECT
SUM(price) as total
FROM orderitem
// result of the query
150
在这里,我们正在查询一个表以获取多个记录,同时告诉 SQL 我们想要汇总的内容。我们明显在进行相同类型的操作,即汇总某物。区别在于,在我们的声明性示例中,我们告诉 SQL 我们想要做什么;我们信任 SQL 知道如何汇总。
一等高阶函数
“一等”这个术语意味着语言本身将函数视为值;它们可以作为其他函数的参数传递。高阶函数是接受其他函数作为参数的函数。让我们通过一个例子来使这一点更清晰:
function project(obj, fn) {
return fn(obj);
}
project( { name : 'chris', age: 37 }, (obj) => obj['name'] ); // 'chris'
project({ name : 'chris', age: 37 }, (obj) => obj['age'] ) // 37
在这里,我们可以看到我们的project()
函数的第二个参数是一个函数。该函数被应用于第一个参数。我们还可以看到,根据我们给高阶函数作为其第二个参数的输入参数,高阶函数的行为会有所不同。
纯函数
纯函数是一个没有副作用的函数。函数所做的任何操作都不会影响它之外的变量。这意味着在计算中使用输入参数时,不应引起副作用,例如与文件系统交互或打开网络连接等。让我们来看一个例子:
function notAPureFunction(filePath) {
const fileContent = fs.readFileSync(filePath);
const rows = fileContent.split(',');
let sum = 0;
rows.forEach(row => { sum += row; });
return sum;
}
如我们所见,我们的函数打开一个文件,遍历其行,并计算所有行内容的总和。不幸的是,这个函数与文件系统进行交互,这被认为是一个副作用。这看起来可能有点牵强,但在一个更长的函数中,同时看到计算——记录日志和与数据库交互——发生,或者至少是我的经验是这样的。这样的代码远非理想,它存在关注点分离和其他许多问题。然而,当涉及到纯函数时,将纯部分隔离到它们自己的函数中是一个好主意,这将导致以下结果:
function calculateSum(rows) { // now it's pure
let sum = 0;
rows.forEach(row => { sum += row; });
return sum;
}
function getRows(filePath) { // still not pure, but some things needs to perform side-effects
const fileContent = fs.readFileSync(filePath);
const rows = fileContent.split(',');
}
如您所见,我们现在有两个函数。我们设法将纯部分隔离到一个名为calculateSum()
的函数中,并最终创建了执行副作用的getRows()
函数。大多数程序都以某种形式具有副作用,但作为程序员,你的任务是尽可能地将这些函数与纯函数分开。
实际上,我们在这里描述了两件事:
-
纯函数:它们更像是没有副作用的数学计算。
-
单一职责原则(SRP):做好函数式编程的一部分是编写小而专注的函数。尽管这并不是函数式编程或纯函数的严格属性,但它是一个重要的原则,将帮助你采用函数式编程生活方式时拥有正确的思维方式。
我们没有提到的一件事是为什么纯函数在函数式编程中扮演着至关重要的角色。它们通过其计算性质是可预测的,这使得它们易于测试。构建主要由许多小型可预测函数组成的系统,使整个系统可预测。
递归
“要理解递归这个词,请看递归这个词。”
这是在大多数工程学院中流传的一个笑话,它以非常简短的方式解释了它。递归是一个数学概念。让我们进一步解释一下。官方定义如下:
递归是当程序的一个步骤涉及调用程序本身时,程序所经历的过程。经历递归的程序被称为“递归”。
好吧,这在人类语言中意味着什么?它说在运行我们的函数的某个时刻,我们将调用自己。这意味着我们有一个看起来像这样的函数:
function something() {
statement;
statement;
if(condition) {
something();
}
return someValue;
}
我们可以看到,在函数something()
的某个地方,它的主体调用了自身。一个递归函数应该遵守以下规则:
-
应该调用自身
-
最终应该遇到退出条件
如果递归函数没有退出条件,由于函数将无限期地调用自身,我们将耗尽内存。某些类型的问题比其他类型的问题更适合应用递归编程。这些问题的例子包括:
-
遍历树
-
编译代码
-
编写压缩算法
-
排序列表
有许多更多例子,但重要的是要记住,尽管它是一个伟大的工具,但它不应该到处使用。让我们看看递归真正闪耀的例子。我们的例子是一个链表。链表由知道它们连接到的节点的节点组成。Node
结构的代码如下:
class Node {
constructor(
public left,
public value
) {}
}
使用Node
这样的结构,我们可以构建由多个链接节点组成的链表。我们可以以下列方式连接一组节点实例:
const head = new Node(null, 1);
const firstNode = new Node(head, 2);
const secondNode = new Node(firstNode, 3);
上述代码的图形表示如下图。在这里,我们可以清楚地看到我们的节点由什么组成以及它们是如何连接的:
在这里,我们有一个链表,其中我们有三个连接的节点实例。头节点没有连接到左侧的节点。然而,第二个节点连接到第一个节点,第一个节点连接到头节点。以下类型的列表操作可能很有趣:
-
给定链表中的任何节点,找到头节点
-
在链表中给定位置插入一个节点
-
从链表中给定位置删除一个节点
让我们看看我们如何解决第一个要点。首先,我们将使用命令式方法,然后我们将使用递归方法来查看它们之间的区别。更重要的是,让我们讨论为什么递归方法可能更受欢迎:
// demo of how to find the head node, imperative style
const head = new Node(null, 1);
const firstNode = new Node(head, 2);
const secondNode = new Node(firstNode, 3);
function findHeadImperative (startNode) {
while (startNode.left !== null) {
startNode = startNode.left;
}
return startNode;
}
const foundImp = findHeadImperative(secondNode);
console.log('found', foundImp);
console.log(foundImp === head);
正如我们所见,我们正在使用一个while
循环遍历列表,直到我们找到left
属性为空的节点实例。现在,让我们展示递归方法:
// demo of how to find head node, declarative style using recursion
const head = new Node(null, 1);
const firstNode = new Node(head, 2);
const secondNode = new Node(firstNode, 3);
function findHeadRecursive(startNode) {
if(startNode.left !== null) {
return findHeadRecursive(startNode.left);
} else {
return startNode;
}
}
const found = findHeadRecursive(secondNode);
console.log('found', found);
console.log(found === head);
在前面的代码中,我们检查startNode.left
是否为空。如果是这种情况,我们就达到了退出条件。如果我们还没有达到退出条件,我们就继续调用自己。
好的,所以我们有两种方法:命令式方法和递归方法。为什么后者会好得多呢?嗯,使用递归方法,我们从一个长长的列表开始,每次调用自己时都会使列表变短:有点像一种分而治之的方法。递归方法中明显突出的一点是我们通过说“不,我们的退出条件还没有满足,继续处理”来推迟执行。继续处理意味着我们像在if
子句中那样调用自己。递归编程的目的是我们得到更少的代码行数吗?嗯,这可能是一个结果,但更重要的是:它改变了我们解决问题的思维方式。在命令式编程中,我们有一种从上到下解决问题的思维方式,而在递归编程中,我们的思维方式更倾向于,定义何时完成,将问题切割成更容易处理的部分。在前面的例子中,我们丢弃了不再有趣的链表部分。
没有更多的循环
当开始以更函数式的方式编写代码时,一个更显著的变化是我们摆脱了for
循环。现在我们知道了递归,我们可以直接使用它。让我们看看一个简单的命令式代码片段,它打印一个数组:
// demo of printing an array, imperative style
let array = [1, 2, 3, 4, 5];
function print(arr) {
for(var i = 0, i < arr.length; i++) {
console.log(arr[i]);
}
}
print(arr);
使用递归的相应代码看起来像这样:
// print.js, printing an array using recursion
let array = [1, 2, 3, 4, 5];
function print(arr, pos, len) {
if (pos < len) {
console.log(arr[pos]);
print(arr, pos + 1, len);
}
return;
}
print(array, 0, array.length);
如我们所见,我们的命令式代码在精神上仍然存在。我们仍然从0
开始。此外,我们继续进行,直到我们到达数组的最后一个位置。一旦我们达到退出条件,我们就退出方法。
重复模式
到目前为止,我们还没有真正将递归作为一个概念推销出去。我们有点理解,但可能还没有说服自己为什么好的老式while
或for
循环不能被替换。递归在解决看起来像重复模式的问题时特别出色。一个例子就是树。树有一些类似的概念,比如由节点组成。没有子节点连接的节点被称为叶子。有子节点但没有向上节点连接的节点被称为根节点。让我们用一张图来展示这一点:
我们在树上想要执行的一些有趣的操作包括:
-
总结节点值
-
计算节点数量
-
计算宽度
-
计算深度
为了尝试解决这个问题,我们需要思考如何将树作为数据结构存储。最常见的方法是通过创建一个表示节点具有值、left
属性和right
属性的表示,然后这两个属性依次指向节点。因此,该节点类的代码可能看起来像这样:
class NodeClass {
constructor(left, right, value) {
this.left = left;
this.right = right;
this.value = value;
}
}
下一步是思考如何创建树本身。此代码显示了如何创建一个具有根节点和两个子节点的树,以及如何将这些节点绑定在一起:
// tree.js
class NodeClass {
constructor(left, right, value) {
this.left = left;
this.right = right;
this.value = value;
}
}
const leftLeftLeftChild = new NodeClass(null, null, 7);
const leftLeftChild = new NodeClass(leftLeftLeftChild, null, 1);
const leftRightChild = new NodeClass(null, null, 2);
const rightLeftChild = new NodeClass(null, null, 4);
const rightRightChild = new NodeClass(null, null, 2);
const left = new NodeClass(leftLeftChild, leftRightChild, 3);
const right = new NodeClass(rightLeftChild, rightRightChild, 5);
const root = new NodeClass(left, right, 2);
module.exports = root;
值得注意的是,实例left
和right
没有子节点。我们可以看到这一点,因为我们创建时将它们的值设置为null
。另一方面,我们的根节点有对象实例left
和right
作为子节点。
总结
此后,我们需要思考如何总结节点。仅从外观上看,这似乎意味着我们应该总结顶层节点及其两个子节点。因此,代码实现将开始如下:
// tree-sum.js
const root = require('./tree');
function summarise(node) {
return node.value + node.left.value + node.right.value;
}
console.log(summarise(root)) // 10
如果我们的树增长并突然看起来像这样:
让我们向前面的代码添加一些内容,使其看起来像这样:
// example of a non recursive code
function summarise(node) {
return node.value +
node.left.value +
node.right.value +
node.right.left.value +
node.right.right.value +
node.left.left.value +
node.left.right.value;
}
console.log(summarise(root)) // 19
这实际上是正常工作的代码,但可以改进。此时,我们应该在树中看到的是重复的模式。我们有以下三角形:
一个三角形由2、3、5组成,另一个由3、1、2组成,最后一个由5、4、2组成。每个三角形通过取节点本身,加上其左子节点和右子节点来计算其和。递归就是关于这个:发现重复的模式并将其编码化。我们现在可以使用递归来实现我们的summarise()
函数,如下所示:
function summarise(node) {
if(node === null) {
return 0;
}
return node.value + summarise(node.left) + summarise(left.right);
}
我们在这里所做的是将重复的模式表示为节点 + 左节点 + 右节点
。当我们调用summarise(node.left)
时,我们只是再次为该节点运行summarise()
。前面的实现既简短又优雅,能够遍历整个树。一旦你发现你的问题可以看作是一个重复的模式,递归就真正变得优雅了。完整的代码如下:
// tree.js
class NodeClass {
constructor(left, right, value) {
this.left = left;
this.right = right;
this.value = value;
}
}
const leftLeftLeftChild = new NodeClass(null, null, 7);
const leftLeftChild = new NodeClass(leftLeftLeftChild, null, 1);
const leftRightChild = new NodeClass(null, null, 2);
const rightLeftChild = new NodeClass(null, null, 4);
const rightRightChild = new NodeClass(null, null, 2);
const left = new NodeClass(leftLeftChild, leftRightChild, 3);
const right = new NodeClass(rightLeftChild, rightRightChild, 5);
const root = new NodeClass(left, right, 2);
module.exports = root;
// tree-sum.js
const root = require("./tree");
function sum(node) {
if (node === null) {
return 0;
}
return node.value + sum(node.left) + sum(node.right);
}
console.log("sum", sum(root));
计数
现在我们开始理解递归的本质,实现一个计算树中所有节点数量的函数变得相当简单。我们可以重用之前的总结函数,简单地计算每个非空节点为1
,空节点为0
。所以,我们只需修改现有的总结函数,如下所示:
//tree-count.js
const root = require("./tree");
function count(node) {
if (node === null) {
return 0;
} else {
return 1 + count(node.left) + count(node.right);
}
}
console.log("count", count(root));
前面的代码确保我们成功遍历了每个节点。我们的退出条件发生在我们到达 null 时。也就是说,我们试图从一个节点移动到其不存在的子节点之一。
宽度
要创建一个宽度函数,我们首先需要定义我们所说的宽度是什么。让我们再次看看我们的树:
这棵树的宽度是4。这是怎么回事?对于树中的每一步向下,我们的节点向左和向右各扩展一步。这意味着为了正确计算宽度,我们需要遍历树的边缘。每次我们必须向左或向右遍历一个节点时,我们就会增加宽度。从计算的角度来看,我们感兴趣的是像这样遍历树:
因此,代码应该反映这一事实。我们可以这样实现:
// tree-width.js
const root = require("./tree");
function calc(node, direction) {
if (node === null) {
return 0;
} else {
return (
1 + (direction === "left" ?
calc(node.left, direction) :
calc(node.right, direction))
);
}
}
function calcWidth(node) {
return calc(node.left, "left") + calc(node.right, "right");
}
console.log("width", calcWidth(root));
特别注意在calcWidth()
函数中,我们分别用node.left
和node.right
作为参数调用calc()
。我们还添加了left
和right
参数,在calc()
方法中意味着我们将继续朝那个方向前进。我们的退出条件是最终遇到 null。
异步数据流
异步数据流是一系列数据,其中值一个接一个地发出,它们之间有延迟。异步这个词意味着发出的数据可以在任何时间出现,比如一秒后甚至两分钟后。异步流的一种建模方法是将在时间轴上放置发出的值,如下所示:
有很多事情可以被认为是异步的。其中之一是通过 AJAX 获取数据。数据何时到达取决于许多因素,例如:
-
您的连接速度
-
后端 API 的响应性
-
数据的大小,以及许多其他因素。
重点是数据并不是在这个非常时刻到达。
可以被认为是异步的其他事情包括用户发起的事件,如滚动或鼠标点击。这些是在任何时间点都可能发生的事件,取决于用户的交互。因此,我们可以将这些 UI 事件视为时间轴上的连续数据流。以下图表描绘了代表用户多次点击的数据流。每次点击都会导致一个点击事件,c,我们将它放置在时间轴上:
初看我们的图表描绘了四个点击事件。仔细观察后,我们发现点击事件似乎被分组了。前一个图表包含以下两条信息:
-
发生了多次点击事件
-
点击事件之间有特定的延迟发生
在这里,我们可以看到前两次点击似乎在时间上非常接近;当两个事件在时间上非常接近时,这将被解释为双击。因此,我们上面的图像告诉我们发生了哪些事件;它还告诉我们它们何时以及多久发生一次。查看前面的图表,区分单击和双击相当容易。
我们可以为每种点击行为分配不同的动作。双击可能意味着我们想要放大,而单击可能意味着我们想要选择某个东西;具体取决于你正在编写的应用程序。
第三个例子是输入的情况。如果我们有一个用户在输入一段时间后停止输入的情况呢?在经过一定时间后,用户期望 UI 做出反应。这就是搜索字段的情况。在这种情况下,用户可能在搜索字段中输入一些内容,并在完成后按下搜索按钮。在 UI 中模拟这种情况的另一种方式是提供一个搜索字段,并等待用户停止输入,作为开始搜索用户想要的内容的信号。最后一个例子被称为自动完成行为。它可以按以下方式建模:
输入的前三个字符似乎属于同一个搜索查询,而第四个字符输入得晚得多,可能属于另一个查询。
本节的目的在于强调不同的事物适合作为流来建模,以及时间轴上发出的值的放置可以意味着某些东西。
将列表与异步流比较——为 RxJS 做准备
到目前为止,我们已经讨论了如何将异步事件建模为时间轴上的连续数据流,或称为流建模。事件可以是 AJAX 数据、鼠标点击或其他类型的事件。以这种方式建模事物为事物提供了一个有趣的视角,但例如,在双击的情况下,除非我们能够挖掘出数据,否则这并没有什么意义。可能还有另一种情况,我们需要过滤掉某些数据。我们在这里讨论的是如何操作流。如果没有这种能力,流建模本身就没有实际价值。
有不同的方法来操作数据:有时我们希望将发出的数据改变为其他数据,有时我们可能希望改变数据被发送到监听器的频率。有时,我们希望我们的数据流变成一个完全不同的数据流。我们将尝试模拟以下情况:
-
投影:改变发出的值的 数据
-
过滤:改变被发出的内容
将函数式编程范式与流结合
本章已经涵盖了函数式编程和异步数据流。使用 RxJS 不需要对函数式编程有深入的了解,但你确实需要理解声明式意味着什么,以便专注于正确的事情。你的关注点应该是你想要完成的事情,而不是你想要如何完成它。作为一个库,RxJS 将负责如何完成。更多内容将在下一章中介绍。
这些可能看起来是两个不同的主题。然而,将它们结合起来,我们就能获得操纵流的能力。流可以被看作是一系列数据,其中数据在某个时间点可用。如果我们开始将我们的流视为列表,特别是不可变列表,那么就有一些与列表一起进行的操作,通过应用操作符来操纵列表。操纵的结果是一个新列表,而不是一个被修改的列表。所以,让我们开始应用我们的列表哲学及其操作符到以下情况。
投影
在这里,我们可以看到我们的流正在发出值1、2、3和4,然后发生了一个操作,将每个值增加一个。这是一个相当简单的情况。如果我们将其视为一个列表,我们可以看到我们在这里所做的只是一个投影,我们会这样编码:
let newList = list.map(value => value + 1)
过滤
列表和流中可能有一些你不希望的项目。为了解决这个问题,你需要创建一个过滤器来过滤掉不需要的数据。通过模拟初始数组、操作和结果数组,我们得到以下内容:
在 JavaScript 中,我们可以通过编写以下代码来完成这个任务:
let array = [1,2,3];
let filtered = array.filter(data => data % 2 === 0);
结合思维模式
那么,我们在这个部分试图表达什么?显然,我们已经展示了如何操纵列表的例子。好吧,我们所做的是展示了我们如何显示轴上的项目。从这个意义上说,我们可以看到异步事件和值列表以相同的方式思考是很容易的,因为我们以相同的方式图形化地描绘它们。问题是,我们为什么要这样做?原因在于,这正是 RxJS 库希望你在下一章开始操纵和制作流时拥有的心态。
概述
本章已经确立,我们可以将异步事件建模为时间轴上的值。我们引入了将这些流与列表进行比较的想法,并因此应用了不会改变列表本身但仅创建一个新列表的功能方法。应用函数式范式的优点在于,我们可以专注于要实现什么而不是如何实现,从而采用声明式方法。我们意识到将异步和列表结合起来并从中创建可读的代码并不容易。幸运的是,这正是 RxJS 库为我们做的事情。正是这种认识使我们为即将到来的第五章,RxJS 基础,做好了准备,在那里我们介绍 RxJS 作为一个库:在异步混乱中创建秩序,将一切建模为流。有了 RxJS,我们真正可以专注于要实现什么而不是如何实现,因为它附带了一系列流操作函数。在阅读下一章之后,你将了解 RxJS 在基本层面的工作原理,以及它如何解决本章中提到的问题。
第五章:RxJS 基础
JavaScript 的响应式扩展(RxJS)是由 Matt Podwysocky 创建的一系列库。库的第四版由微软维护和开发。第四版可以在以下链接找到:github.com/Reactive-Extensions/RxJS
。
第五版是对第四版的完全重写,可以在以下地址找到:github.com/ReactiveX/rxjs
。其最大贡献者是 Ben Lesh,其他值得注意的贡献者包括 Andre Staltz。第五版也是 Angular 在处理 HTTP 等方面的库选择。
在本章中,您将学习:
-
组成 RxJS 的模式有哪些
-
RxJS 的核心概念
-
如何手动创建自己的 Observables 并订阅它们
-
你可以创建 Observable 的多种方式
-
管理清理的重要性
-
通过学习实现 RxJS 库的核心部分来理解其底层原理
观察者模式
观察者模式是四人帮模式。这是一个因被包含在 Erich Gamma、Richard Helm、Ralph Johnson 和 John Vlissides 所著的《设计模式:可复用面向对象软件元素》一书中而闻名的设计模式。该模式有两个关键参与者:一个 Subject 和一个 Observer。Subject 被观察者观察。通常,Subject 持有一个内部观察者列表,当 Subject 上发生更改时应该通知这些观察者。Subject 通常是一个模型,而观察者是一些 UI 组件。简而言之,Subject 应该能够:
-
维护观察者列表
-
添加观察者
-
移除观察者
-
当发生更改时通知所有观察者
相反,观察者应该只持有一个属性,那就是一个可以在更新发生时由主题调用的 update()
方法。这个模式背后的想法是创建不同层之间的松散耦合。主题和观察者都不应该直接通过名称了解对方,而应该通过抽象。因此,一个主题的类图可能如下所示:
在这里,我们包括了所有必需的方法:attach()
、detach()
和 notify()
,并且我们明确指出我们处理的是抽象观察者,而不是具体类型。至于观察者,这通常是一个只有一个方法 update()
的接口,可以由以下类图表示:
给定这些类图,让我们编写一些代码来演示实现可能的样子,并且我们从主题开始。对于这个例子,我们将使用 TypeScript,因为 TypeScript 知道接口是什么:
// observer-subject/subject.ts
import { Observer } from "./observer";
export class Subject {
observers: Array<Observer>;
constructor() {
this.observers = new Array<Observer>();
}
attach(observer: Observer) {
if (this.observers.indexOf(observer) === -1) {
this.observers.push(observer);
}
}
detach(observer) {
let index = this.observers.indexOf(observer);
if (index !== -1) {
this.observers = this.observers.slice(index, 1);
}
}
notify() {
this.observers.forEach(observer => observer.update());
}
}
如您所见,基本实现非常简短,但它是一个强大的结构。至于 Observer
,它甚至更短:
// observer-subject/observer.ts
export interface Observer {
update();
}
我们可以通过创建一个文件,例如 app.ts
,来尝试这个例子:
// observer-subject/app.ts
import { Subject } from "./subject";
import { Observer } from "./observer";
const subject = new Subject();
const observer = <Observer>{
update: () => console.log("First Observer Updated")
};
const observer2 = <Observer>{
update: () => console.log("Second Observer updated")
};
subject.attach(observer);
subject.attach(observer2);
subject.notify();
// should emit:
// First Observer Updated
// Second Observer updated
通过运行前面的代码,我们看到Subject
实例允许我们通过调用attach()
方法将其附加到Observer
实例上。然后我们在Subject
实例上调用notify()
,以确保所有订阅的Observer
实例都得到通知。
好的,所以现在我们已经有一些核心实现,一个实际的使用案例是什么样的呢?想象一下,我们有一个扮演Subject
角色的ProductModel
类和一个扮演Observer
角色的ProductUI
类。ProductModel
类的一个简单实现可能如下所示:
// product-model/product.model.ts
import { Subject } from "./subject";
export class ProductModel extends Subject {
private titleValue = "";
private makeValue = "";
get title(){
return this.titleValue;
}
set title(value) {
this.titleValue = value;
this.notify();
}
get make() {
return this.makeValue;
}
set make(value) {
this.makeValue = value;
this.notify();
}
}
在这里,我们可以看到我们有两个属性,title
和make
,当它们两者都发生变化时,我们调用从基类Subject
继承的notify()
方法。让我们看看ProductUI
类可能是什么样子:
// product-model/product.ui.ts
import { Observer } from "./observer";
import { ProductModel } from "./product.model";
export class ProductUI implements Observer {
constructor(private model: ProductModel) {
this.model.attach(this); // add ProductUI to the observer list
this.renderUI();
}
renderUI() {
console.log("calling renderUI");
this.draw();
}
draw() {
// implement
console.log("calling draw");
}
update() {
console.log("calling update");
this.renderUI(); // rerender the UI when update() is called
}
}
在前面的代码中,我们看到我们在构造函数中接收一个ProductModel
实例,并且我们还对该实例调用attach()
方法,以便将其注册为Observer
。我们还定义了一个update()
方法,其中我们决定如果它被调用,我们将重新渲染 UI。
这是一个使用观察者模式并用于模型到 UI 通信的典型示例,这只是许多使用可能性之一。一般原则是在Subject
和Observer
实例之间以松耦合的方式进行通信。真正的优势是能够在单个Subject
上拥有多个Observer
实例,这样如果Subject
发生变化,所有其Observer
实例也会随之变化。这也被称为发布/订阅,通常简称为 Pub/Sub。
RxJS 核心概念
RxJS 由一些核心概念组成,这些概念对于你早期理解非常重要。那些是:
-
可观察的:这是一个表示数据流的类。
-
观察者:这是一个能够发出数据的类。
-
生产者:这是内部产生数据的东西,观察者最终会发出这些数据。
-
操作符:这是
Observable
上的一个方法,它允许我们操作流本身或它发出的数据。 -
流:这与
Observable
的一个实例同义。之所以称之为流,是因为你应该将数据视为连续的,而不是真正有结束,除非你明确地定义一个结束。
可观察的和观察者
在定义了我们最初需要了解的所有概念之后,现在是我们将这些概念放入上下文中,以进一步加深我们的理解。让我们从一个定义Observable
开始,并逐步深入到之前提到的每个概念。Observable
可以通过以下代码创建:
let stream$ = Rx.Observable.create(observer => observer.next(1));
这是创建一个Observable
所需的最少代码量。在这个阶段,屏幕上没有写入任何内容,因为我们需要订阅流。让我们给我们的Observable
添加一个订阅者。我们通过在流实例上调用subscribe()
方法来实现这一点:
let stream$ = Rx.Observable.create(observer => observer.next(1));
stream$.subscribe(data => console.log('data',data) ) // write data, 1 to the console
看看这段代码,我们看到Observable
调用了create()
方法,该方法反过来创建了一个Observable
的实例。有趣的是,create()
方法接受一个函数作为参数;这个函数本身接受一个观察者实例。因此,我们有一个看起来像这样的 API:Observer.create(fn(observerInstance))
。在这个函数内部发生的事情是,我们调用observer.next(1)
。在更高层次上,我们通过使用create()
这个factory
函数创建了一个Observable
。在这个例子中,我们的Observable
行为非常简单,就是发射值 1。当我们调用observer.next(1)
时,我们发射数据。为了获取发射的数据,我们需要调用subscribe()
方法。
生产者
如果我们尝试将此与观察者模式进行比较,我们会看到一些概念是重复的,例如观察者。在这个模式中,当发生某些事情时,观察者会收到通知,而主题会主动改变。看看之前的代码,看起来像观察者是主动改变的一方。但这并不完全正确;它更像是一个中介,这带我们来到了 RxJS 的下一个概念,即Producer
。Producer
负责生成我们需要的值。通过在我们的代码中引入Producer
,我们看到观察者更像是一个中介:
// rxjs-example/producer.js
const Rx = require("rxjs/Rx");
class Producer {
constructor() {
this.counterMax = 5;
this.current = 0;
}
hasValues() {
return this.current < this.counterMax;
}
next() {
return this.current++;
}
}
let stream$ = Rx.Observable.create(observer => {
let producer = new Producer();
while (producer.hasValues()) {
observer.next(producer.next());
}
});
stream$.subscribe(data => console.log("data", data));
// data 0, data 1, data 2, data 3, data 4
如我们所见,生产者是负责生成数据的一方,而观察者负责将数据传递给订阅者。
可观察的错误和完成
流不仅仅是生成数据;流还可以生成错误以及达到其完成状态。如果发生错误或完成,流将不再生成任何值。为了表示我们有一个错误,我们在观察者上调用error()
方法,如下所示:
let stream$ = Rx.Observable.create(observer => {
observer.error('we have an error');
});
为了捕获发射的错误,我们需要在我们的subscribe()
调用中引入第二个回调,如下所示:
// rxjs-example/error.js
const Rx = require("rxjs/Rx");
let stream$ = Rx.Observable.create(observer => {
observer.error("we have an error");
});
stream$.subscribe(
data => console.log("data", data),
error => console.error("err", error)
)
到目前为止,我们已经学习了如何发射数据,以及如何发出错误信号。我们能做的最后一件事是关闭流,或者完成它,因为关闭流也被称为完成。我们通过在观察者上调用complete()
来实现这一点。这将确保不再发射任何值。为了捕获完成信号,我们需要在我们的subscribe()
调用中添加另一个回调。你可以这样使用它:
// rxjs-example/completion.js
const Rx = require("rxjs/Rx");
let stream$ = Rx.Observable.create(observer => {
observer.next(1);
observer.complete();
});
stream$.subscribe(
data => console.log("data", data), // 1
error => console.error("err", error), // never hit
() => console.log("complete") ); // will be hit
操作符
我们要讨论的最后一个概念是操作符。操作符简单地说是一个作用于Observable
并按某种方式改变流的函数。操作符本质上是不可变的。这种不可变性使得代码更容易测试和推理。RxJS 提供了 60 多个操作符,以帮助在大多数情况下定义你的流及其行为。
可能会有这样的情况,你需要创建自己的操作符,但很可能是已经有了一个操作符可以完成你想要的功能。
当你定义你的流及其行为时,你将使用一个或多个操作符。它可能看起来像以下这样:
let stream$ = Rx.Observable.of(1,2)
.map( x => x +1 )
.filter( x > 2 );
stream$.subscribe( data => console.log('data', data))
// data 3
在这里,我们可以看到我们正在使用 .map()
操作符和 .filter()
来改变我们的流数据。.map()
通过将每个值增加一来对流中的每个值进行操作。.filter()
对改变后的流进行操作;由调用 .map()
引起的改变。它也针对流中的每个值进行操作,但条件性地决定应该发出什么。最终结果只发出一个值,3
。还有许多其他的操作符,但这应该能给你一个关于操作符是什么以及如何使用它们的想法。
创建 Observables
大多数时候,在创建 Observables 时,你不会使用create()
方法。你将使用其他方法。为什么是这样呢?好吧,一个 Observable 实例通常源于某种异步概念。在使用 RxJS 创建 Angular 应用程序的上下文中,Observable 实例将通过执行以下操作之一来创建:
-
使用 AJAX 通过 HTTP 创建或获取数据
-
使用响应式表单监听输入变化
-
监听路由变化
-
监听 UI 事件
-
包装异步概念
在 RxJS 中,有一些不同的创建操作符可以帮助你解决这些任务,但 Angular 框架实际上可能在内部创建 Observables。让我们看看除了create()
方法之外的一些创建操作符:
创建操作符
如我们之前所述,Observable 是一种表示随时间发出的数据的表现形式。有时,数据会立即到达,有时则需要时间。无论哪种情况,能够以相同的方式对数据进行建模都是非常强大的。
of()
让我们看看一个非常简单的创建操作符,of()
。它接受一个可变数量的参数,这些参数将被作为值发出,如下所示:
let stream$ = Rx.Observable.of(1, 2, 3, 4);
stream$.subscribe( data => console.log(data)) // 1, 2, 3 ,4
值会立即触发。这在只想测试事情时非常有用。
interval()
另一个有趣的操作符是interval()
操作符,它接受一个毫秒数作为参数。这定义了每条发出数据之间的延迟时间(以毫秒为单位)。它将从数字 0 开始。需要注意的是,除非例如应用了take()
操作符,否则它将无限期地生成值。take()
操作符将限制发出的值的数量并关闭流。该操作符的典型用法如下:
let stream$ = Rx.Observable.interval(1000)
.take(3); // 1s delay between values, starting at 0
stream$.subscribe(data => console.log(data))
// 0, 1, 2
from()
from()
操作符允许我们从其他异步/同步概念创建一个Observable
。当几乎所有东西都可以被制作成Observable
时,这非常强大,因为它允许丰富的组合。以下是一个典型的代码片段示例:
let stream$ = Rx.Observable.from(new Promise(resolve, reject) => {
resolve('some data');
});
stream$.subscribe( data => console.log(data)); // some data
let stream2$ = Rx.Observable.from([1,2,3,4]);
stream2$.subscribe( data => console.log(data)); // 1,2,3,4
fromEvent()
我们已经多次提到丰富的组合以及将一切转换为 Observable 的力量。我们已经将承诺转换为 Observables,这使得一切变成了数据流,使得整个情况更容易推理。我们的意思是,当每个异步概念都被转换为 Observable 时,我们突然能够以相同的方式思考它们。可以应用于鼠标点击的操作符也可以应用于 AJAX 请求等等。
此外,我们甚至可以将 UI 事件转换为 Observables。通过使用 .fromEvent()
操作符,我们能够将一个元素及其对应的事件转换为一个 Observable。这是我们手中的真正力量,它允许我们将诸如自动完成等场景简化为 3-4 行代码。此操作符的典型用法如下:
let elem = document.getElementById('input');
// we assume we have a <input id="input"> in our markup
let keyStream$ = Rx.Observable.fromEvent(elem, 'keyUp');
// listens to the keyUp event
bindCallback()
到目前为止,我们已经列出了很多方法,无论是同步还是异步,都可以将一个结构转换为 Observable。回调是第一个尝试解决整个异步问题的模式,并且应该指出,由于可读性差,回调可能是解决异步代码的最差方式。幸运的是,有一个操作符可以将回调转换为 Observable,称为 bindCallback()
。它可以按以下方式使用:
function fnWithCallback(cb) {
setTimeout(() => cb('data'), 3000);
}
let fnWithCallbackBinded = Rx.Observable.bindCallback(fnWithCallback);
let source$ = fnWithCallbackBinded();
source$.subscribe(data => console.log('callback', data));
我们可以看到,我们首先定义了一个名为 fnWithCallback()
的函数。我们将这个函数作为参数传递给 bindCallback()
方法。这会产生一个 fnWithCallbbackBinded()
函数。调用该函数将生成一个我们可以订阅的 Observable
。因此,每当 fnWithCallback()
函数中的 cb('data')
因为 setTimeout()
而被调用时,这将导致我们的 source$
的数据回调被调用。这在实践中是如何工作的呢?这实际上非常简单。让我们尝试实现我们自己的 Observable
。我们已经学习了以下内容:
-
bindCallback()
方法接受一个函数作为参数 -
调用
bindCallback()
应该生成一个函数 -
调用
bindCallback()
的结果应该生成一个Observable
-
调用
subscribe()
应意味着我们的数据回调应该是fnWithCallback()
中的cb
参数
因此,最终的实现应该看起来像这样:
// rxjs-creation-operators/bind-callback.ts
class Observable {
behaviorFn;
constructor(behaviorFn) {
this.behaviorFn = behaviorFn;
}
static bindCallback(behaviorFn): Function {
return (): Observable => {
return new Observable(behaviorFn);
};
}
subscribe(dataCb) {
this.behaviorFn(dataCb);
}
}
let fn = Observable.bindCallback(cb => {
setTimeout(() => cb("data"), 3000);
});
const stream$ = fn();
stream$.subscribe(data => console.log("data", data));
// outputs: data data
清理
我们现在已经涵盖了核心概念,如 Observable、Observer、Producer 和操作符。我们还探讨了如何手动创建一个 Observable,但意识到有不同类型的创建操作符可以帮助您从其他结构创建 Observable,有时 Angular 框架本身会为您创建 Observable。但我们遗漏了一个重要的事情,那就是清理。会有一些情况,Observable 会分配资源或简单地永远持续,就像interval()
操作符一样。有一种明确的补救措施——在我们完成 Observable 后定义并运行一个清理函数。定义这样的函数迫使我们回到create
操作符,并在其行为函数中修改一些代码,如下所示:
let stream$ = Rx.Observable.create(observer => {
let counter = 0;
let id = setInterval(() => observer.next(counter++), 1000);
return function cleanUpFn() { clearInterval(id); }
});
订阅
前面的代码描述了一个需要清理发生的情况。我们定义了一个setInterval()
结构,它似乎会无限期地发出值。cleanUpFn()
有取消这种行为的能力,前提是它被调用。我们在行为函数的末尾返回cleanUpFn()
。
问题是,我们如何获取它?答案是,我们需要讨论一个新概念:订阅。订阅是在对流调用subscribe()
时返回的东西。让我们用这个来修改前面的代码:
let stream$ = Rx.Observable.create(observer => {
let counter = 0;
let id = setInterval(() => observer.next(counter++), 1000);
return function cleanUpFn() { clearInterval(id); }
});
let subscription = stream$.subscribe((data) => console.log('data'));
setTimeout(() => subscription.unsubscribe(), 2000);
在前面的代码中,我们通过调用subscribe()
创建了变量subscription
,但真正有趣的部分发生在最后一行:我们定义了一个超时,它会在我们的subscription
上调用unsubscribe()
。这将调用我们的cleanUpFn()
,以便取消间隔。
你处理的大多数流都不需要取消订阅,但那些分配资源或启动一些永远持续的结构,而我们没有拦截的,将需要有一个清理行为,我们在完成我们的流后需要调用这个行为。
创建 RxJS 的核心实现
理解某事物有不同的阶段。理解一个库就是学习其概念,并正确地利用其方法。然后是更深层次的理解,比如根据你在博客中找到的一些最佳实践指南,知道应该使用哪些方法。最后,你将达到一个真正深入的理解阶段,你想要理解正在发生的事情,开始对源代码本身进行修改,并可能通过向项目提交 Pull Request 来尝试增强它,这很可能是基于 GitHub 的。
本节旨在立即向您提供部分这种更深入的知识。我们意识到,在这个时候,您的大脑可能因为所有新学的概念和一些巧妙的操作符而有些混乱。让我们从头开始,先从最初介绍的概念入手,尝试逆向工程正在发生的事情。
实现 create()
在本章的开头,我们学习了如何创建一个 Observable。代码看起来是这样的:
let stream$ = Rx.Observable.create( observer => observer.next(1));
stream$.subscribe( data => console.log(data));
只需看一下代码,我们就可以对底层正在发生的事情做出有根据的猜测。很明显,我们需要一个Observable
类。
这个类需要一个接受函数作为参数的create()
方法。create()
方法应该返回一个Observable
。此外,我们的Observable
类需要一个接受函数作为参数的subscribe()
方法。让我们从这里开始,看看我们会走到哪里。
首先,让我们定义我们的Observable
类,并使用上述方法:
class MyObservable {
static create(behaviourFn): MyObservable {}
constructor() {}
subscribe(dataFn) {}
}
好的,所以我们有一个包含三个方法的类;让我们尝试实现这些方法。让我们从我们所知道的create()
方法开始:
class MyObservable {
static create(behaviourFn): MyObservable {
return new Observable(behaviourFn);
}
constructor(private behaviourFn) {}
subscribe(dataFn) {}
}
我们用粗体标出了所需更改,并在类中引入了一个名为behaviourFn()
的字段。此外,我们的create()
方法通过传递create()
方法参数中的behaviourFn
来实例化一个Observable
。这意味着构造函数需要接受一个函数作为参数,并将其保存以供以后使用。关于传递给create
方法的behaviourFn()
,我们知道它接受一个观察者实例作为参数,并规定了观察者实例应该发出哪些值。为了使任何东西能够捕获这些发出的值,我们需要实现我们的最后一个方法,subscribe()
。我们知道subscribe()
接受dataFn()
作为参数,并在调用subscribe()
方法时以某种方式调用我们的behaviourFn
以触发行为。因此,让我们在我们的现有代码中修改这一点:
class MyObservable {
static create(private behaviourFn): MyObservable {
return new MyObservable(behaviourFn);
}
constructor(behaviourFn) { this.behaviourFn = behaviourFn; }
subscribe(dataFn) {
this.behaviourFn(observer);
}
}
在这一点上,我们意识到我们需要一个Observer
类,这样我们才能向behaviourFn()
传递一些东西。我们还需要弄清楚如何调用dataFn()
以及何时调用。经过一分钟思考,我们意识到观察者必须负责调用dataFn()
,因此似乎只有将dataFn()
传递到我们的Observer
类的构造函数中,以便以后使用,才是合理的,如下所示:
class Observer {
constructor(private dataFn) {}
next(value) { this.dataFn(val) }
}
通过实现这个Observer
类,我们做了三件事:一是通过构造函数传递dataFn()
并将其作为Observer
类的一个字段;二是创建了一个next()
方法在Observer
上,这是我们必须做的,因为我们了解到观察者实例应该调用next()
来生成值;第三,我们确保在next()
方法内部调用dataFn()
,以确保每当通过调用next()
方法生成值时,订阅者都会被告知。将所有这些代码放在一起,我们创建了一个非常基础的 RxJS 实现,实际上它是可以工作的!为了更好地理解我们目前所拥有的,让我们显示到目前为止使用的所有代码:
// rxjs-core/Observable.ts
class Observer {
constructor(private dataFn) {}
next(value) { this.dataFn(value) }
}
class MyObservable {
behaviourFn;
static create(behaviourFn): MyObservable {
return new Observable(behaviourFn);
}
constructor(behaviourFn) { this.behaviourFn = behaviourFn; }
subscribe(dataFn) {
let observer = new Observer(dataFn);
this.behaviourFn( observer );
}
}
let stream$ = MyObservable.create( observer => observer.next(1)); // 1
处理订阅
在上一节中,我们学习了如何实现一个非常基本的内核。然而,在本章的早期部分,提到有时你的 Observable 会分配资源或显示一种明显无法停止生成值的行为。处理这种情况是我们的责任。RxJS 明确地在这里指出了路径,即定义一个清理函数并确保在调用 unsubscribe()
时调用它。让我们展示这样一个场景,其中我们显然需要关注清理:
// rxjs-core/Observer-with-subscription.ts
interface Subscription {
unsubscribe();
}
class MyObservableWithSubscription {
static create(behaviourFn): MyObservableWithSubscription {
return new MyObservableWithSubscription(behaviourFn);
}
constructor(private behaviourFn) {}
subscribe(dataFn): Subscription {
let observer = new MyObserver(dataFn);
let cleanUpFn = this.behaviourFn(observer);
return {
unsubscribe: cleanUpFn
};
}
}
let streamWithSubscription$ = MyObservableWithSubscription.create(observer => {
let counter = 0;
let id = setInterval(() => observer.next(counter++), 1000);
return function cleanUpFn() {
clearInterval(id);
};
});
const subscription = streamWithSubscription$.subscribe(data =>
console.log("data", data)
);
subscription.unsubscribe();
查看代码,我们发现当我们定义行为函数(代码片段的底部)时,我们设置了一个 setInterval()
构造,该构造定期调用 observer.next(
)。我们确保将引用保存在变量 ID 中。我们需要确保当我们选择取消 setInterval()
行为时可以做到这一点。我们通过在 behaviourFn
函数的最后一行定义一个 cleanUpFn()
来做到这一点。这使我们来到了代码片段的上半部分。在这里,我们看到我们通过确保将调用 this.behaviourFn()
的结果保存到名为 cleanUpFn
的变量中来修改 subscribe()
方法。这确实是我们在 behaviourFn()
中定义的 cleanUpFn()
。最后,我们通过将其作为对象的一部分返回并将其分配给 unsubscribe()
属性来公开 cleanUpFn()
属性。最后我们需要做的是调用 unsubscribe()
方法以确保我们的分配资源被释放,或者在这个特定例子中,取消 setInterval()
构造。调用 unsubscribe 将会调用 cleanUpFn()
,然后它将调用 clearInterval()
,这将取消间隔。
添加操作符
我们在定义自己的 RxJS 内核实现方面已经走了很长的路,但我们还缺少一个重要的拼图——操作符。操作符是 RxJS 的真正力量,可以被视为一个实用方法,它允许我们轻松地操作我们的流。让我们以 filter()
作为我们的示例目标。一个过滤操作符是一个你可以对其流调用的方法。想法是提供一个函数,能够逐个值地确定特定值是否应该被发出。一个典型的用例如下:
let stream$ = Observable.of(1,2,3)
.filter( x => x > 1 );
stream$.subscribe( data => console.log(data))
// will emit 2,3
在前面的代码中,我们可以看到我们提供给过滤函数的参数函数有效地排除了任何不符合条件的值。在这种情况下,所有大于 1
的值将被发出,从而对值 1
进行排序。让我们将 filter()
方法添加到我们之前定义的 MyObservable
类中,如下所示:
// rxjs-core/operator/Observable.ts, starting off with MyObservable, more to come
import { MyObserver } from "./Observer";
class MyObservable {
behaviorFn;
static create(behaviourFn): MyObservable {
return new MyObservable(behaviourFn);
}
constructor(behaviorFn) {
this.behaviorFn = behaviorFn;
}
filter(filterFn): FilterableObservable {
/* implement */
}
subscribe(dataFn) {
let observer = new MyObserver(dataFn);
let cleanUpFn = this.behaviorFn(observer);
return {
unsubscribe: cleanUpFn
};
}
}
从前面的代码片段中我们可以看到,filter()
方法被添加到了MyObservable
中,我们看到它本身返回一个 Observable,同时接受一个filterFn()
参数。你需要问自己的问题是,我们现有的MyObservable
构造函数是否足够。我们现有的构造函数接受一个behaviourFn()
,我们很可能需要存储传入的filterFn
参数,因此我们需要扩展构造函数或选择一个新的MyObservable
实现。我们思考了一下,意识到选择一个新的、更专门的MyObservable
可能更好,因为我们想避免大量的分支逻辑。因此,该方法的实现应该修改为类似以下的样子:
// rxjs-core/operator/Observable.ts, starting off with MyObservable, more to come
import { MyObserver } from "./Observer";
class MyObservable {
behaviorFn;
static create(behaviourFn): MyObservable {
return new MyObservable(behaviourFn);
}
constructor(behaviorFn) {
this.behaviorFn = behaviorFn;
}
filter(filterFn): FilterableObservable {
return new FilterableObservable(filterFn, this.behaviorFn);
}
subscribe(dataFn) {
let observer = new MyObserver(dataFn);
let cleanUpFn = this.behaviorFn(observer);
return {
unsubscribe: cleanUpFn
};
}
}
好的,现在我们有一个新的类要实现,FilterableObservable
。这个类应该共享MyObservable
的大部分行为,但展示我们如何发出数据。因此,我们是在从MyObservable
继承,但有自己的特别之处。让我们尝试一个实现:
// rxjs-core/operator/Observable.ts
import { MyObserver } from "./Observer";
class MyObservable {
behaviorFn;
static create(behaviourFn): MyObservable {
return new MyObservable(behaviourFn);
}
constructor(behaviorFn) {
this.behaviorFn = behaviorFn;
}
filter(filterFn): FilterableObservable {
return new FilterableObservable(filterFn, this.behaviorFn);
}
subscribe(dataFn) {
let observer = new MyObserver(dataFn);
let cleanUpFn = this.behaviorFn(observer);
return {
unsubscribe: cleanUpFn
};
}
}
export class FilterableObservable extends MyObservable {
constructor(private filterFn, behaviourFn) {
super(behaviourFn);
}
subscribe(dataFn) {
let observer = new MyObserver(dataFn);
observer.next = value => {
if (this.filterFn(value)) {
dataFn(value);
}
};
let cleanUpFn = this.behaviorFn(observer);
return {
unsubscribe: cleanUpFn
};
}
}
const stream$ = new MyObservable(observer => {
observer.next(1);
observer.next(2);
observer.next(3);
}).filter(x => x > 2);
stream$.subscribe(data => console.log("data", data));
// prints 3
在前面的代码片段中,我们可以看到我们重写了subscribe()
实现,或者更具体地说,我们在Observer
实例上重写了next()
方法。我们使用filterFn()
来评估是否应该生成某个值。现在我们已经成功实现了filter()
操作符。
回顾基础知识,添加错误和完成
在完成了 RxJS 基础实现的英勇壮举之后,我们希望对理解其内部工作原理感到相当满意。到目前为止,我们只在subscribe()
中实现了dataFn
;subscribe()
方法中还有两个回调需要实现。让我们看一个代码片段并突出显示缺失的部分:
let stream$ = Rx.Observable.of(1,2,3);
stream$.subscribe(
data => console.log(data),
err => console.error(err),
() => console.log('complete');
)
我们已经突出了最后两个回调作为缺失的功能。我们知道,为了触发错误回调,我们需要调用observer.error('some message')
。我们也知道,在抛出错误后不应再发出任何值。让我们提供一个这样的例子:
let stream$ = Rx.Observable.create( observer => {
observer.next(1);
observer.error('err');
observer.next(2);
});
stream$.subscribe(
data => console.log(data),
err => console.error(err)
);
// should emit 1, err
在这个阶段,我们意识到需要修改我们的Observer
类以支持error()
方法调用。我们还需要警惕我们刚才描述的条件,因为错误发生后不应再发出更多值。让我们直接进入实现:
class Observer {
hasError: boolean;
constructor(private dataFn, private errorFn) {}
next(value) {
if (!this.hasError) {
this.dataFn(value);
}
}
error(err) {
this.errorFn(err);
this.hasError = true;
}
}
在前面的代码片段中,我们可以看到我们向errorFn
构造函数传递了另一个参数。next()
方法需要更新,因此我们需要用条件包装它,以确定是否生成值。最后,我们需要定义error()
方法,调用传入的errorFn
并设置hasError
字段为true
。
我们还需要做一件事,那就是更新Observable
类中的subscribe()
方法:
class Observable {
behaviourFn;
static create(behaviourFn): Observable {
return new Observable(behaviourFn);
}
constructor(behaviourFn) {
this.behaviourFn = behaviourFn;
}
subscribe(dataFn, errorFn) {
let observer = new Observer(dataFn, errorFn);
let cleanUpFn = this.behaviourFn(observer);
return {
unsubscribe: cleanUpFn
};
}
}
提前提醒一下,当我们定义 filter()
操作符以覆盖 next()
方法时,我们需要确保这个操作符在确定是否生成值时考虑到 hasError
。我们将把这个留给你,亲爱的读者,去实现。
最后一件待办事项是支持完成。完成与抛出错误有许多相似之处,即不应再发出更多值。区别在于我们应该触发最后一个回调。与 error()
方法实现一样,我们从 Observer
实现开始:
// rxjs-core/error-complete/Observer.ts
class Observer {
hasError: boolean;
isCompleted: boolean;
constructor(
private dataFn,
private errorFn,
private completeFn ) {}
next(value) {
if(!this.hasError && !this.isCompleted) {
this.dataFn(value);
}
}
error(err) {
this.errorFn(err);
this.hasError = true;
}
complete() {
this.completeFn();
this.isCompleted = true;
}
}
根据前面的代码,我们看到我们的更改包括添加一个 isCompleted
字段。我们还向构造函数中传递了一个 completeFn()
。需要在 next()
值中添加逻辑,因为完成现在是我们需要寻找的另一个状态,除了错误之外。最后,我们添加了 complete()
方法,它只是调用传入的函数并将 isComplete
字段设置为 true
。
与之前一样,我们需要更新 Observable
类以传递完成函数:
// rxjs-core/error-complete/Observable.ts
import { Observer } from './Observer';
class Observable {
behaviourFn;
static create(behaviourFn): Observable {
return new Observable(behaviourFn);
}
constructor(behaviourFn) {
this.behaviourFn = behaviourFn;
}
filter(filterFn):Observable {
return new FilterableObservable(
filterFn,
this.behaviourFn
);
}
subscribe(dataFn, errorFn, completeFn) {
let observer = new Observer(dataFn, errorFn, completeFn);
let cleanUpFn = this.behaviourFn( observer );
return {
unsubscribe: cleanUpFn
};
}
}
const stream$ = new Observable(observer => {
observer.next(1);
observer.error("error");
observer.next(2);
});
stream$.subscribe(
data => console.log("data", data),
err => console.log("error", err),
() => console.log("completed")
);
// prints 1, error, no more is emitted after that
这里做一个快速的实际情况检查:我们实际上已经实现了 RxJS 的核心功能——观察者、Observable 和一个操作符。我们离理解正在发生的事情更近了。我们意识到实现其他 59 个操作符是一项相当大的成就,而且当有一个团队维护现有的 RxJS 存储库时,这可能不是一个好主意。我们新获得的知识并非徒劳;理解正在发生的事情永远不会错。谁知道呢?也许你们中的某位读者将成为贡献者;你们确实已经得到了工具。
摘要
我们首先讨论了构成 RxJS 的模式。接着,我们描述了其核心概念。随后,我们解释了何时以及为什么需要创建自己的 Observable,选择 RxJS 的众多创建操作符之一,或者依赖 Angular 框架来完成这项工作。我们简要讨论了清理 Observable 的重要性以及何时这样做是个好主意。
最后,我们承担了实现 RxJS 核心部分的任务,以更深入地理解其核心概念以及它是如何结合在一起的。这希望给你们提供了一个相当坚实的基础和深入的理解,当我们进入下一章时,将涵盖更多操作符和一些更高级的概念。
第六章:操作流及其值
让我们从回顾上一章开始,提醒自己我们已经对 RxJS 有了多深的理解。我们学习了诸如Observable
、Observer
和Producer
等概念,以及它们是如何相互作用的。此外,我们还了解了订阅过程,以便我们实际上可以接收我们渴望的值。我们还研究了如何从流中取消订阅,以及在哪些情况下需要定义这种行为。最后,我们通过学习如何构建 RxJS 的核心实现,从而看到了所有这些概念的实际应用。拥有所有这些知识,我们应该对 RxJS 的基础感到相当自信,但正如上一章提到的,我们需要操作符的帮助来真正对我们的流做些有意义的事情。
让我们不再拖延,开始讨论本章内容。操作符是我们可以在流上调用的函数,可以以许多不同的方式执行操作。操作符是不可变的,这使得流易于推理,也将使测试变得相当容易。正如你将在本章中看到的那样,我们很少只处理一个流,而是处理许多流,理解如何构建和控制这些流将使你从认为它是黑暗魔法转变为实际上能够在需要时应用 RxJS。
在本章中,我们将涵盖以下内容:
-
如何使用基本操作符
-
使用操作符以及现有工具调试流
-
深入了解不同的操作符类别
-
培养以 Rx 方式解决问题的思维方式
开始
你几乎总是通过创建一个静态值的流来开始使用 RxJS 进行编码。为什么是静态值呢?好吧,没有必要让它变得过于复杂,而你真正需要开始推理的只是一个Observable
。随着你在解决问题的过程中逐渐进步,你可能会用更合适的 AJAX 调用或来自其他异步源值的调用来替换静态值。
然后,你开始思考你想要实现的目标。这会让你考虑你可能需要的操作符以及它们的顺序。你也可能会考虑如何将问题分解;这通常意味着创建多个流,其中每个流解决一个特定的问题,这些问题与你试图解决的更大问题相连接。
让我们从创建流开始,看看我们如何迈出与流一起工作的第一步。
以下代码创建了一个静态值的流:
const staticValuesStream$ = Rx.Observable.of(1, 2, 3, 4);
staticValuesStream$.subscribe(data => console.log(data));
// emits 1, 2, 3, 4
这是一个非常基础的例子,展示了我们如何创建一个流。我们使用of()
创建操作符,它接受任意数量的参数。所有参数都会在有订阅者时依次发出。在上面的代码中,我们还通过调用subscribe()
方法并传递一个以发出值为参数的函数来订阅staticValuesStream$
。
让我们引入一个操作符 map()
,它像一个投影一样工作,允许你改变正在发出的内容。map()
操作符在发出之前对流中的每个值进行调用。
你可以通过提供一个函数并执行一个投影来使用 map()
操作符,如下所示:
const staticValuesStream$ =
Rx.Observable
.of(1, 2, 3, 4)
.map(data => data + 1);
staticValuesStream$.subscribe(data => console.log(data))
// emits 2, 3, 4, 5
在前面的代码中,我们将 map()
操作符附加到 staticValuesStream$
上,并在发出之前对每个值应用它,并将其增加一。因此,结果数据发生了变化。这就是将操作符附加到流上的方法:简单地创建流,或者使用现有的一个,然后逐个添加操作符。
让我们添加另一个操作符 filter()
,以确保我们真正理解如何使用操作符。filter()
做什么?嗯,就像 map()
操作符一样,它应用于每个值,但它不是创建一个投影,而是决定哪些值将被发出。filter()
接收一个布尔值。任何评估为 true
的表达式意味着值将被发出;如果为 false
,则表达式将不会发出。
你可以使用以下方式使用 filter()
操作符:
const staticValuesStream$ =
Rx.Observable
.of(1, 2, 3, 4)
.map(data => data + 1)
.filter(data => data % 2 === 0 );
staticValuesStream$.subscribe(data => console.log(data));
// emits 2, 4
我们通过将其链接到现有的 map()
操作符来添加 filter()
操作符。我们给 filter()
操作符的条件是只对能被 2
整除的值返回 true
,这就是模运算符的作用。我们知道从之前的内容中,map()
操作符本身确保了值 2
、3
、4
和 5
被发出。这些是现在由 filter()
操作符评估的值。在这四个值中,只有 2
和 4
满足 filter()
操作符设定的条件。
当然,当在流上工作并应用操作符时,事情可能并不总是像前面的代码那样简单。可能无法准确预测将发出什么。在这些场合,我们有一些技巧可以使用。其中一种技巧是使用 do()
操作符,这将允许我们检查每个值而不改变它。这为我们提供了充足的机会来用于调试目的。根据我们在流中的位置,do()
操作符将输出不同的值。让我们看看 do()
操作符应用时不同情况下的不同影响:
const staticValuesStream$ =
Rx.Observable.of(1, 2, 3, 4)
.do(data => console.log(data)) // 1, 2, 3, 4
.map(data => data + 1)
.do(data => console.log(data)) // 2, 3, 4, 5
.filter(data => data % 2 === 0 )
.do(data => console.log(data)); // 2, 4
// emits 2, 4
staticValuesStream$.subscribe(data => console.log(data))
如您所见,仅通过使用 do()
操作符,我们就有了调试流的好方法,随着流复杂性的增加,这变得是必要的。
理解操作符
到目前为止,我们已经展示了如何创建一个流,并在它上面使用一些非常基本的运算符来改变发出的值。我们还介绍了如何使用 do()
运算符在不改变流的情况下检查流。并不是所有的运算符都像 map()
、filter()
和 do()
运算符那样容易理解。你可以使用不同的策略来尝试理解每个运算符的作用,以便你知道何时使用它们。使用 do()
运算符是一种方法,但还有一种图形化的方法可以采用。这种方法被称为弹珠图。它由一个代表时间从左到右流逝的箭头组成。在这个箭头上有一些代表发出的值的圆圈或弹珠。弹珠中有一个值,但弹珠之间的距离也可能描述了随时间发生的事情。弹珠图通常至少由两个带有弹珠的箭头和一个运算符组成。其想法是表示应用运算符后流发生了什么。第二个箭头通常表示结果流。
这里是一个弹珠图的例子:
RxJS 中的大多数运算符都在 RxMarbles 网站上用弹珠图表示:rxmarbles.com/
。这是一个真正伟大的资源,可以快速了解运算符的作用。然而,要真正理解 RxJS,你需要编写代码;这是不可避免的。当然,有不同方法可以做到这一点。你可以轻松设置自己的项目并从 NPM 安装 RxJS,通过 CDN 链接引用它,或者你可以使用像 JS Bin (www.jsbin.com) 这样的页面,它让你能够轻松地将 RxJS 作为库添加,并允许你立即开始编码。它看起来像这样:
JS Bin 让开始变得容易,但如果我们能将弹珠图和 JS Bin 结合起来,并得到你编码时的图形表示,那岂不是更好?你可以用 RxFiddle 实现这一点:rxfiddle.net/
。你可以输入你的代码,点击运行,然后你会看到一个弹珠图,显示你刚刚编写的代码,它看起来像这样:
流中的流
我们一直在研究不同的运算符,它们会改变正在发出的值。流还有一个不同的方面:如果你需要从一个现有的流中创建一个新的流怎么办?另一个好问题是:这种情况下通常在什么时候发生?有很多情况,例如:
-
基于 keyup 事件流进行 AJAX 调用。
-
计算点击次数并确定用户是单击、双击还是三击。
你应该明白了;我们从一个需要变成另一种类型的流的流类型开始。
让我们先看看如何创建一个流,并看看当我们尝试使用运算符创建流时会发生什么:
let stream$ = Rx.Observable.of(1,2,3)
.map(data => Rx.Observable.of(data));
// Observable, Observable, Observable
stream$.subscribe(data => console.log(data));
在这个阶段,通过map()
操作符传递的每个值都会产生一个新的Observable
。当你订阅stream$
时,每个发出的值都将是一个流。你的第一个本能可能是为这些值中的每一个都附加一个subscribe()
,就像这样:
let stream$ = Rx.Observable
.of(1,2,3)
.map(data => Rx.Observable.of(data))
stream$.subscribe(data => {
data.subscribe(val => console.log(val))
});
// 1, 2, 3
抵制这种冲动。这将只会创建难以维护的代码。你想要做的是将这些流合并成一个,这样你只需要一个subscribe()
。有一个操作符正是为此而设计的,称为flatMap()
。flatMap()
的作用是将你的流数组转换成一个流,一个元流。
它的使用方式如下:
let stream$ = Rx.Observable.of(1,2,3)
.flatMap(data => Rx.Observable.of(data))
stream$.subscribe(data => {
console.log(val);
});
// 1, 2, 3
好的,我们明白了,我们不想得到一个 Observable 流,而是一个值流。这个操作符看起来真的很棒。但我们仍然不确定何时使用它。让我们使这个例子更现实一些。想象一下,你有一个由一个输入字段组成的 UI。用户将字符输入到这个输入字段中。想象一下,你想要对输入的一个或多个字符做出反应,例如,在字符输入后执行一个 AJAX 请求。在这里,我们关注两个问题:如何收集输入的字符以及如何执行 AJAX 请求。
让我们从第一件事开始,捕捉输入字段中输入的字符。为此,我们需要一个 HTML 页面和一个 JavaScript 页面。让我们从 HTML 页面开始:
<html>
<body>
<input id="input" type="text">
<script src="img/Rx.min.js"></script>
<script src="img/app.js"></script>
</body>
</html>
这展示了我们的输入元素和 RxJS 的脚本引用,以及app.js
文件的引用。然后是app.js
文件,其中我们获取输入元素的引用,并在输入被输入时立即开始监听按键:
let elem = document.getElementById('input');
let keyStream$ = Rx.Observable
.fromEvent(elem, 'keyup')
.map( ev => ev.key);
keyStream$.subscribe( key => console.log(key));
// emits entered key chars
值得强调的是,我们通过调用fromEvent()
创建操作符来开始监听由keyup
事件发出的内容。然后,我们应用map()
操作符来挖掘ev.key
上的字符值存储。最后,我们订阅这个流。正如预期的那样,运行这段代码将在你输入 HTML 页面中的值时立即在控制台中打印出输入的字符。
让我们通过基于我们输入的内容执行一个 AJAX 请求来使这个例子更具体。为此,我们将使用fetch()
API 和一个名为 swapi(swapi.com)的在线 API,它包含了一系列包含《星球大战》电影信息的 API。让我们首先定义我们的 AJAX 调用,然后看看它如何融入我们现有的键流中。
我们说过我们会使用fetch()
。它允许我们像这样简单地制定一个 GET 请求:
fetch('https://swapi.co/api/people/1')
.then(data => data.json())
.then(data => console.log('data', data));
当然,我们希望将这个请求转换成一个Observable
,以便它能很好地与我们的keyStream$
协同工作。幸运的是,通过使用from()
操作符,我们可以轻松地实现这一点。然而,让我们首先将fetch()
调用重写成一个易于操作的方法。重写后的结果如下:
function getStarwarsCharacterStream(id) {
return fetch('https://swapi.co/api/people/' + id)
.then(data => data.json());
}
这段代码允许我们提供一个用于构造 URL 的参数,我们使用它通过 AJAX 获取一些数据。在这个阶段,我们准备将我们的函数连接到现有的流。我们通过输入以下内容来实现这一点:
let keyStream$ = Rx.Observable.fromEvent(elem, 'keyup')
.map(ev => ev.key)
.filter(key => key !== 'Backspace')
.flatMap( key =>
Rx.Observable
.from(getStarwarsCharacterStream(key))
);
我们使用from()
转换操作符以粗体形式突出显示flatMap()
操作符的使用。最后提到的操作符将我们的getStarwarsCharacterStream()
函数作为参数。from()
操作符将此函数转换为流。
在这里,我们学习了如何连接两个不同的流,以及如何将Promise
转换为流。尽管这种方法在纸面上看起来很好,但使用flatMap()
有其局限性,了解这些局限性非常重要。因此,让我们接下来谈谈switchMap()
操作符。使用switchMap()
操作符的好处将在执行长时间运行的任务时变得更加明显。为了论证,让我们定义这样一个任务,如下所示:
function longRunningTask(input) {
return new Promise(resolve => {
setTimeout(() => {
resolve('response based on ' + input);
}, 5000);
});
}
在此代码中,我们有一个执行需要 5 秒钟的函数;这足以展示我们试图说明的点。接下来,让我们展示如果我们继续在以下代码中使用flatMap()
操作符会产生什么效果:
let longRunningStream$ = keyStream$
.map(ev => ev.key)
.filter(key => elem.value.length >3)
.filter( key => key !== 'Backspace')
.flatMap( key =>
Rx.Observable
.from(longRunningTask(elem.value))
);
longRunningStream$.subscribe(data => console.log(data));
上述代码的工作方式如下:每次我们按下键,它都会生成一个事件。然而,我们有一个.filter()
操作符,它确保只有当至少输入了四个键时才会生成事件,filter(key => elem.value.length >3)
。让我们谈谈此时用户的期望。如果一个用户在一个输入控件中输入键,他们最可能期望在完成输入时发起一个请求。用户定义完成输入为输入一些字符,并且如果输入错误,他们应该能够删除字符。因此,我们可以假设以下输入序列:
// enters abcde
abcde
// removes 'e'
到目前为止,他们已经输入了字符,并在合理的时间内编辑了他们的答案。用户期望根据abcd
得到一个答案。然而,使用flatMap()
操作符意味着用户将得到两个答案,因为在现实中,他们输入了abcde
和abcd
。想象一下,如果我们根据这两个输入得到一个结果列表;它很可能是两个看起来有些不同的列表。基于我们代码的响应将看起来像这样:
我们的代码很可能能够通过在收到新响应时立即重新渲染结果列表来处理所描述的情况。然而,这里有两个问题:首先,我们对abcde
进行了不必要的网络请求,其次,如果后端响应足够快,我们将在结果列表渲染一次后,基于第二个响应再次渲染,从而在 UI 中看到闪烁。这不是一个好的情况,我们希望有一个情况,即如果我们继续输入,第一个请求将被放弃。这就是switchMap()
操作符发挥作用的地方。它确实做到了这一点。因此,让我们将前面的代码更改为以下代码:
let longRunningStream$ = keyStream$
.map(ev => ev.key)
.filter(key => elem.value.length >3)
.filter( key => key !== 'Backspace')
.switchMap( key =>
Rx.Observable
.from(longRunningTask(elem.value))
);
在此代码中,我们只是将flatMap()
切换为switchMap()
。当我们现在以完全相同的方式执行代码时,即用户首先输入12345
,然后很快将其更改为1234
,最终结果是:
如我们所见,我们只得到一个请求。这是因为当发生新事件时,前一个事件会被中止——switchMap()
正在施展它的魔法。用户很高兴,我们也很高兴。
AJAX
我们已经触及了制作 AJAX 请求的主题。有许多方法可以制作 AJAX 请求;最常见的方法有两种:
-
使用 fetch API;fetch API 是一个网络标准,因此内置在大多数浏览器中
-
使用内置在 RxJS 库中的
ajax()
方法;它曾经存在于一个名为 Rx.Dom 的库中
fetch()
fetch()
API 是一个网络标准。您可以在以下链接中找到官方文档:developer.mozilla.org/en-US/docs/Web/API/Fetch_API
。fetch()
API 是基于 Promise
的,这意味着在使用之前我们需要将其转换为 Observable
。该 API 暴露了一个 fetch()
方法,它以强制性的 URL 参数作为第一个参数,第二个参数是一个可选对象,允许您控制发送哪个正文(如果有),使用哪个 HTTP 动词,等等。
我们已经在 RxJS 的上下文中提到了如何最好地处理它。尽管如此,这仍然值得重复。但这并不像只是将我们的 fetch 操作符放入 from()
操作符中那么简单。让我们写一些代码来看看原因:
let convertedStream$ =
Rx.Observable.from(fetch('some url'));
convertedStream$.subscribe(data => 'my data?', data);
我们得到了我们的数据吗?抱歉,没有,我们得到了一个 Response
对象。但这很容易,只需在 map()
操作符中调用一个 json()
方法,然后我们肯定就有数据了?再次抱歉,没有,当你输入以下内容时,json()
方法返回一个 Promise
:
let convertedStream$ = Rx.Observable.from(fetch('some url'))
.map( r=> r.json());
// returns PromiseObservable
convertedStream$.subscribe(data => 'my data?', data);
我们已经在上一节中展示了可能的解决方案,如下所示:
getData() {
return fetch('some url')
.then(r => r.json());
}
let convertedStream$ = Rx.Observable.from(getData());
convertedStream$.subscribe(data => console.log('data', data));
在这段代码中,我们所做的是在将数据交给 from()
操作符之前简单地处理我们的数据。与 RxJS 不太一样,感觉与 Promise 玩耍。你可以采取一个更基于流的方案;我们几乎做到了,我们只需要做一些小的调整:
let convertedStream$ = Rx.Observable.from(fetch('some url'))
.flatMap( r => Rx.Observable.from(r.json()));
// returns data
convertedStream$.subscribe(data => console.log('data'), data);
就这样:我们的 fetch()
调用现在像流一样提供数据。那么我们做了什么?嗯,我们将 map()
调用更改为 flatMap()
调用。这样做的原因是当我们调用 r.json()
时,我们得到了一个 Promise
。我们通过将其包裹在 from()
调用中,Rx.Observable.from(r.json())
来解决这个问题。如果不将 map()
更改为 flatMap()
,那么流将发出一个 PromiseObservable
。正如我们在上一节中学到的,如果我们冒着在流中创建流的危险,我们需要 flatMap()
来拯救我们,它确实做到了。
ajax() 操作符
与基于 Promise
的 fetch()
API 不同,ajax()
方法实际上是基于 Observable
的,这使得我们的工作变得稍微容易一些。使用它非常直接,如下所示:
Rx.Observable
.ajax('https://swapi.co/api/people/1')
.map(r => r.response)
.subscribe(data => console.log('from ajax()', data));
如我们所见,前面的代码使用 URL 作为参数调用ajax()
操作符。值得提及的第二件事是调用map()
操作符,它从response
属性中提取我们的数据。因为它是一个Observable
,我们只需像往常一样通过调用subscribe()
方法并给它提供一个监听函数作为参数来订阅它。
这涵盖了当你想使用 HTTP 动词GET
获取数据时的简单情况。幸运的是,对于我们的需求来说,通过使用重载版本的ajax()
操作符来创建、更新或删除数据相当容易,这个操作符接受一个AjaxRequest
对象实例,它具有以下字段:
url?: string;
body?: any;
user?: string;
async?: boolean;
method?: string;
headers?: Object;
timeout?: number;
password?: string;
hasContent?: boolean;
crossDomain?: boolean;
withCredentials?: boolean;
createXHR?: () => XMLHttpRequest;
progressSubscriber?: Subscriber<any>;
responseType?: string;
从这个对象规范中我们可以看出,所有字段都是可选的,我们还可以通过我们的请求配置相当多的事情,例如headers
、timeout
、user
、crossDomain
等等;基本上这是我们期望从良好的 AJAX 包装功能中得到的。除了ajax()
操作符的重载之外,还存在一些简写选项:
-
get()
: 使用GET
动词获取数据 -
put()
: 使用PUT
动词更新数据 -
post()
: 使用POST
动词创建数据 -
patch()
: 使用PATCH
动词的目的是更新部分资源 -
delete()
: 使用DELETE
动词删除数据 -
getJSON()
: 使用GET
动词获取数据,并将响应类型设置为application/json
级联调用
到目前为止,我们已经介绍了你将使用 AJAX 发送或接收数据的两种主要方式。当涉及到接收数据时,通常并不像获取数据并渲染它那样简单。实际上,你很可能依赖于何时可以获取哪些数据。一个典型的例子是在获取剩余数据之前需要执行登录调用。在某些情况下,可能需要首先登录,然后获取登录用户的资料,一旦有了这些资料,就可以获取消息、订单或任何可能特定于某个用户的数据。这种以这种方式获取数据的现象被称为级联调用。
让我们看看如何使用承诺(Promises)进行级联调用,并逐步学习如何使用 RxJS 做同样的事情。我们之所以这样做,是因为我们假设大多数阅读这本书的人对承诺(Promises)都很熟悉。
让我们看看我们最初提到的依赖情况,我们需要按以下顺序执行以下步骤:
-
用户首先登录到系统中
-
然后我们获取用户的资料
-
然后我们获取用户订单的信息
使用承诺(promises),代码中可能看起来是这样的:
// cascading/cascading-promises.js
login()
.then(getUser)
.then(getOrders);
// we collect username and password from a form
const login = (username, password) => {
return fetch("/login", {
method: "POST",
body: { username, password }
})
.then(r => r.json())
.then(token => {
localStorage.setItem("auth", token);
});
};
const getUser = () => {
return fetch("/users", {
headers: {
Authorization: "Bearer " + localStorage.getToken("auth")
}
}).then(r => r.json());
};
const getOrders = user => {
return fetch(`/orders/user/${user.id}`, {
headers: {
Authorization: "Bearer " + localStorage.getToken("auth")
}
}).then(r => r.json());
};
这段代码描述了我们首先使用login()
方法登录系统,并获取一个令牌。我们使用这个令牌在未来的任何调用中确保我们进行认证调用。我们还看到我们如何执行getUser()
调用并获取一个用户实例。我们使用相同的用户实例来执行我们的最后一个调用getOrders()
,其中用户 ID 用作路由参数:`/orders/user/${user.id}`
。
我们已经展示了如何使用承诺(promises)来执行级联调用;我们这样做是为了为我们要解决的问题建立一个共同的基础。RxJS 的方法非常相似:我们已经展示了ajax()
操作符的存在,并且当处理 AJAX 调用时,它使我们的生活变得更简单。为了使用 RxJS 实现级联调用效果,我们只需简单地使用switchMap()
操作符。这将使我们的代码看起来像这样:
// cascading/cascading-rxjs.js
let user = "user";
let password = "password";
login(user, password)
.switchMap(getUser)
.switchMap(getOrders);
// we collect username and password from a form
const login = (username, password) => {
return Rx.Observable.ajax("/login", {
method: "POST",
body: { username, password }
})
.map(r => r.response)
.do(token => {
localStorage.setItem("auth", token);
});
};
const getUser = () => {
return Rx.Observable.ajax("/users", {
headers: {
Authorization: "Bearer " + localStorage.getToken("auth")
}
}).map(r => r.response);
};
const getOrders = user => {
return Rx.Observable.json(`/orders/user/${user.id}`, {
headers: {
Authorization: "Bearer " + localStorage.getToken("auth")
}
}).map(r => r.response);
};
我们已经指出了前面代码中需要更改的部分。简而言之,更改如下:
-
fetch()
被ajax()
操作符替换 -
我们调用
.map(r => r.response)
而不是.then(r => r.json())
-
我们对每个级联调用执行
.switchMap()
调用,而不是.then(getOrders)
还有一个有趣的方面需要我们探讨,那就是并行调用。当我们获取用户和订单时,我们在发起下一个调用之前等待前一个调用完全完成。在很多情况下,这可能并不是严格必要的。想象一下,我们有一个与之前类似的情况,但围绕用户有很多有趣的信息我们需要获取。除了获取订单之外,用户可能还有一个朋友集合或消息集合。获取这些数据的前提条件只是我们已经获取了用户,因此我们知道应该查询哪个朋友集合以及需要查询哪个消息集合。在承诺的世界中,我们会使用Promise.all()
构造来实现并行化。考虑到这一点,我们更新我们的Promise
代码,使其看起来像这样:
// parallell/parallell-promise.js
// we collect username and password from a form
login(username, password) {
return new Promise(resolve => {
resolve('logged in');
});
}
getUsersData(user) {
return Promise.all([
getOrders(user),
getMessages(user),
getFriends(user)
// not implemented but you get the idea, another call in parallell
])
}
getUser() {
// same as before
}
getOrders(user) {
// same as before
}
login()
.then(getUser)
.then(getUsersData);
如前所述的代码所示,我们引入了新的getUsersData()
方法,该方法并行获取订单、消息和朋友集合,使我们的应用更快地响应,因为数据将比逐个获取更快地到达。
我们可以通过引入forkJoin()
操作符轻松地使用 RxJS 实现相同的效果。它接受一系列流,并并行获取所有内容。因此,我们更新我们的 RxJS 代码,使其看起来如下:
// parallell/parallell-rxjs.js
import Rx from 'rxjs/Rx';
// imagine we collected these from a form
let user = 'user';
let password = 'password';
login(user, password)
.switchMap(getUser)
.switchMap(getUsersData)
// we collect username and password from a form
login(username, password) {
// same as before
}
getUsersData(user) {
return Rx.Observable.forkJoin([
getOrders(),
getMessages(),
getFriends()
])
}
getUser() {
// same as before
}
getOrders(user) {
// same as before
}
login()
.then(getUser)
.then(getUsersData);
深入探讨
到目前为止,我们已经查看了一些可以让你使用map()
和filter()
操作符创建或更改流的操作符,我们学习了如何管理不同的 AJAX 场景,等等。基础是有的,但我们还没有以结构化的方式真正接近操作符的话题。我们这是什么意思呢?嗯,操作符可以被认为是属于不同的类别。我们可用的操作符数量令人震惊,有 60 多个。如果我们真的要学习所有这些,这将需要时间。不过,这里的关键是:我们只需要知道存在哪些不同类型的操作符,这样我们就可以在适当的地方应用它们。这减少了我们的认知负担和记忆。一旦我们知道有哪些类别,我们只需要深入挖掘,很可能会最终知道总共 10-15 个操作符,其余的我们可以在需要时查阅。
目前,我们有以下类别:
-
创建操作符:这些操作符帮助我们首先创建流。几乎任何东西都可以通过这些操作符转换为流。
-
组合操作符:这些操作符帮助我们结合值以及流。
-
数学操作符:这些操作符对正在发射的值执行数学评估。
-
基于时间的操作符:这些操作符改变值发射的速度。
-
分组操作符:这些操作符的思路是对一组值而不是单个值进行操作。
创建操作符
我们使用创建操作符来创建流本身,因为坦白说:我们需要转换为流的东西并不总是流,但通过将其转换为流,它将必须与其他流很好地协同工作,最好的是,将能够利用使用操作符的全部力量。
那么,这些其他非流由什么组成呢?嗯,可以是任何异步或同步的内容。重要的是,这些是需要在某一点发射的数据。因此,存在一系列创建操作符。在接下来的小节中,我们将展示所有这些操作符中的一部分,足够你认识到将任何事物转换为流的力量。
of()操作符
我们已经有机会使用这个操作符几次了。它接受未知数量的以逗号分隔的参数,可以是整数、字符串或对象。如果你只想发射一组有限的值,这是一个你想要使用的操作符。要使用它,只需输入:
// creation-operators/of.js
const numberStream$ = Rx.Observable.of(1,2, 3);
const objectStream$ = Rx.Observable.of({ age: 37 }, { name: "chris" });
// emits 1 2 3
numberStream$.subscribe(data => console.log(data));
// emits { age: 37 }, { name: 'chris' }
objectStream$.subscribe(data => console.log(data));
从代码中可以看出,我们在of()
操作符中放置什么内容其实并不重要,它无论如何都能发射出来。
from()操作符
这个操作符可以接受数组或Promise
作为输入,并将它们转换为流。要使用它,只需像这样调用:
// creation-operators/from.js
const promiseStream$ = Rx.Observable.from(
new Promise(resolve => setTimeout(() => resolve("data"),3000))
);
const arrayStream$ = Rx.Observable.from([1, 2, 3, 4]);
promiseStream$.subscribe(data => console.log("data", data));
// emits data after 3 seconds
arrayStream$.subscribe(data => console.log(data));
// emits 1, 2, 3, 4
这样做可以节省我们很多麻烦,不必处理不同类型的异步调用。
range()操作符
这个操作符允许你指定一个范围,一个起始数字和一个结束数字。这是一个很好的简写,可以快速创建一个具有数字范围的流。要使用它,只需输入:
// creation-operators/range.js
const stream$ = Rx.Observable.range(1,99);
stream$.subscribe(data => console.log(data));
// emits 1... 99
fromEvent() 操作符
现在事情变得非常有趣。fromEvent()
操作符允许我们将 UI 事件(如click
或scroll
事件)混合起来,并将其转换成一个流。到目前为止,我们一直假设异步调用只与 AJAX 调用有关。这远非事实。我们可以将 UI 事件与任何类型的异步调用混合,这创造了一个非常有趣的情况,使我们能够编写非常强大、表达力丰富的代码。我们将在下一节中进一步探讨这个话题,在流中思考。
要使用这个操作符,你需要给它提供两个参数:一个 DOM 元素和事件名称,如下所示:
// creation-operators/fromEvent.js
// we imagine we have an element in our DOM looking like this <input id="id" />
const elem = document.getElementById("input");
const eventStream$ = Rx.Observable
.fromEvent(elem, "click")
.map(ev => ev.key);
// outputs the typed key
eventStream$.subscribe(data => console.log(data));
组合
组合操作符是关于组合不同流中的值。我们有几个操作符可以帮助我们。当我们需要从多个地方而不是一个地方获取数据时,这种类型的操作符是有意义的。如果没有我们即将描述的强大操作符,从不同来源组合数据结构可能会很繁琐且容易出错。
merge() 操作符
merge()
操作符从不同的流中获取数据并将其合并。然而,这些流可以是任何类型,只要它们是Observable
类型。这意味着我们可以将定时操作、承诺、of()
操作符的静态数据等的数据组合起来。合并所做的就是交错发出的数据。这意味着在以下示例中,它将同时从两个流中发出。使用这个操作符有两种方式,作为静态方法,也可以作为实例方法:
// combination/merge.js
let promiseStream = Rx.Observable
.from(new Promise(resolve => resolve("data")))
let stream = Rx.Observable.interval(500).take(3);
let stream2 = Rx.Observable.interval(500).take(5);
// instance method version of merge(), emits 0,0, 1,1 2,2 3, 4
stream.merge(stream2)
.subscribe(data => console.log("merged", data));
// static version of merge(), emits 0,0, 1,1, 2, 2, 3, 4 and 'data'
Rx.Observable.merge(
stream,
stream2,
promiseStream
)
.subscribe(data => console.log("merged static", data));
这里的要点是,如果你只需要将一个流与另一个流组合,那么使用这个操作符的实例方法版本,但如果你有多个流,那么使用静态版本。此外,指定流的顺序也很重要。
combineLatest()
想象一下,你与几个提供数据的端点建立了连接。你所关心的是每个端点发出的最新数据。你可能处于这样的情况:一段时间后,一个或多个端点停止发送数据,你想要知道最后发生了什么。在这种情况下,我们希望能够结合所有相关端点的最新值。这就是combineLatest()
操作符发挥作用的地方。你可以按照以下方式使用它:
// combination/combineLatest.js
let firstStream$ = Rx.Observable
.interval(500)
.take(3);
let secondStream$ = Rx.Observable
.interval(500)
.take(5);
let combinedStream$ = Rx.Observable.combineLatest(
firstStream$,
secondStream$
)
// emits [0, 0] [1,1] [2,2] [2,3] [2,4] [2,5]
combinedStream$.subscribe(data => console.log(data));
我们可以看到,由于take()
操作符限制了项目数量,firstStream$
在一段时间后停止发出值。然而,combineLatest()
操作符确保我们仍然得到了firstStream$
发出的最后一个值。
zip()
这个运算符的目的是尽可能多地拼接值。我们可能正在处理连续的流,也可能在处理有值数限制的流。你使用这个运算符的方式如下:
// combination/zip.js
let stream$ = Rx.Observable.of(1, 2, 3, 4);
let secondStream$ = Rx.Observable.of(5, 6, 7, 8);
let thirdStream$ = Rx.Observable.of(9, 10);
let zippedStream$ = Rx.Observable.zip(
stream$,
secondStream$,
thirdStream$
)
// [1, 5, 9] [2, 6, 10]
zippedStream$.subscribe(data => console.log(data))
如我们所见,在这里,我们垂直拼接值,并通过最小公倍数,thirdStream$
是最短的,计算发出的值的数量。这意味着我们将从左到右取值并将它们压缩在一起。由于 thirdStream$
只有两个值,我们最终只发出两个值。
concat()
初看,concat()
运算符看起来像另一个 merge()
运算符,但这并不完全正确。区别在于 concat()
会等待其他流完成后再从下一个流中发出流。你在调用 concat()
时的流排列方式很重要。运算符的使用方式如下:
// combination/concat.js
let firstStream$ = Rx.Observable.of(1,2,3,4);
let secondStream$ = Rx.Observable.of(5,6,7,8);
let concatStream$ = Rx.Observable.concat(
firstStream$,
secondStream$
);
concatStream$.subscribe(data => console.log(data));
数学
数学运算符是执行数学运算的运算符,例如找到最大或最小值,汇总所有值等。
max
max()
运算符用于找到最大值。它有两种形式:我们或者不带参数直接调用 max()
运算符,或者提供一个 compare
函数。然后 compare
函数决定某个值是否大于、小于或等于一个发出的值。让我们看看两种不同的版本:
// mathematical/max.js
let streamWithNumbers$ = Rx.Observable
.of(1,2,3,4)
.max();
// 4
streamWithNumbers$.subscribe(data => console.log(data));
function comparePeople(firstPerson, secondPerson) {
if (firstPerson.age > secondPerson.age) {
return 1;
} else if (firstPerson.age < secondPerson.age) {
return -1;
}
return 0;
}
let streamOfObjects$ = Rx.Observable
.of({
name : "Yoda",
age: 999
}, {
name : "Chris",
age: 38
})
.max(comparePeople);
// { name: 'Yoda', age : 999 }
streamOfObjects$.subscribe(data => console.log(data));
我们可以从前面的代码中看到,我们得到一个结果,并且它是最大的。
min
min()
运算符基本上是 max()
运算符的相反;它有两种形式:带参数和不带参数。它的任务是找到最小值。要使用它,请输入:
// mathematical/min.js
let streamOfValues$ = Rx.Observable
.of(1, 2, 3, 4)
.min();
// emits 1
streamOfValues$.subscribe(data => console.log(data));
sum
曾经有一个名为 sum()
的运算符,但它在几个版本中已经不存在了。取而代之的是 .reduce()
。使用 reduce()
运算符,我们可以轻松地实现相同的功能。以下是如何使用 reduce()
编写 sum()
运算符的示例:
// mathematical/sum.js
let stream = Rx.Observable.of(1, 2, 3, 4)
.reduce((acc, curr) => acc + curr);
// emits 10
stream.subscribe(data => console.log(data));
这个运算符的作用是遍历所有发出的值并将结果相加。所以,本质上,它汇总了所有内容。当然,这种运算符不仅可以应用于数字,也可以应用于对象。区别在于你如何执行 reduce()
操作。以下示例涵盖了这种情况:
let stream = Rx.Observable.of({ name : "chris" }, { age: 38 })
.reduce((acc, curr) => Object.assign({},acc, curr));
// { name: 'chris', age: 38 }
stream.subscribe(data => console.log(data));
如前述代码所示,reduce()
运算符确保所有对象的属性都合并到一个对象中。
时间
当谈论流时,时间是一个非常重要的概念。想象一下,你有多个具有不同带宽的流,或者一个流比另一个流快,或者你有一个在特定时间间隔内重试 AJAX 调用的场景。在这些所有情况下,我们需要控制数据发出的速度,时间在这些场景中都起着重要作用。在我们手中,有一大堆运算符,就像魔术师一样,使我们能够根据需要构建和控制我们的值。
interval()
操作符
在 JavaScript 中,有一个setInterval()
函数,允许你以固定的时间间隔执行代码,直到你选择停止它。RxJS 有一个与此行为相同的操作符,即interval()
操作符。它接受一个参数:通常是发出值之间的毫秒数。你可以按以下方式使用它:
// time/interval.js
let stream$ = Rx.Observable.interval(1000);
// emits 0, 1, 2, 3 ... n with 1 second in between emits, till the end of time
stream$.subscribe(data => console.log(data));
注意:这个操作符会一直发出,直到你停止它。停止它的最佳方法是将其与一个take()
操作符结合。take()
操作符接受一个参数,指定在停止之前它想要发出多少个值。更新后的代码如下:
// time/interval-take.js
let stream$ = Rx.Observable.interval(1000)
.take(2);
// emits 0, 1, stops emitting thanks to take() operator
stream$.subscribe(data => console.log(data));
timer()
操作符
timer()
操作符的任务是在一定时间后发出值。它有两种风味:你可以在一定毫秒数后发出一个值,或者你可以在它们之间保持一定的延迟继续发出值。让我们看看可用的两种不同风味:
// time/timer.js
let stream$ = Rx.Observable.timer(1000);
// delay with 500 milliseconds
let streamWithDelay$ = Rx.Observable.timer(1000, 500)
// emits 0 after 1000 milliseconds, then no more
stream$.subscribe(data => console.log(data));
streamWithDelay$.subscribe(data => console.log(data));
delay()
操作符
delay()
操作符延迟所有发出值,并按以下方式使用:
// time/delay.js
let stream$ = Rx.Observable
.interval(100)
.take(3)
.delay(500);
// 0 after 600 ms, 1 after 1200 ms, 2 after 1800 ms
stream.subscribe(data => console.log(data));
sampleTime()
操作符
sampleTime()
操作符用于在样本周期过后才发出值。一个很好的用例是当你想要有一个冷却功能。想象一下,你有用户频繁地按保存按钮。保存可能需要几秒钟才能完成。一种方法是保存时禁用保存按钮。另一种有效的方法是简单地忽略按钮的任何点击,直到操作有机会完成。以下代码正是这样做的:
// time/sampleTime.js
let elem = document.getElementById("btn");
let stream$ = Rx.Observable
.fromEvent(elem, "click")
.sampleTime(8000);
// emits values every 8th second
stream$.subscribe(data => console.log("mouse clicks",data));
debounceTime()
操作符
sampleTime()
操作符能够忽略用户一段时间,但debounceTime()
操作符采取了不同的方法。防抖作为一个概念意味着我们在发出值之前等待事情平静下来。想象一下用户输入的输入元素。用户最终会停止输入。我们想确保用户确实已经停止了,所以我们等待一段时间后才真正采取行动。这正是debounceTime()
操作符为我们做的事情。以下示例展示了我们如何监听用户在输入元素中输入,等待用户停止输入,最后执行一个 AJAX 调用:
// time/debounceTime.js
const elem = document.getElementById("input");
let stream$ = Rx.Observable.fromEvent(elem, "keyup")
.map( ev => ev.key)
.filter(key => key !== "Backspace")
.debounceTime(2000)
.switchMap( x => {
return new Rx.Observable.ajax(`https://swapi.co/api/people/${elem.value}`);
})
.map(r => r.response);
stream$.subscribe(data => console.log(data));
当用户在文本框中输入一个数字时,在 2 秒的无操作后,keyup 事件将被触发。之后,将使用我们的文本框输入执行一个 AJAX 调用。
分组
分组操作符允许我们对一组收集的事件进行操作,而不是一次只对一个发出的事件进行操作。
buffer()
操作符
buffer()
操作符的想法是我们可以收集大量事件,而无需立即发出。该操作符本身接受一个参数,一个Observable
,它定义了何时停止收集事件。在那个时刻,我们可以选择对这些事件做什么。以下是你可以使用此操作符的方法:
// grouping/buffer.js
const elem = document.getElementById("input");
let keyStream$ = Rx.Observable.fromEvent(elem,"keyup");
let breakStream$ = keyStream$.debounceTime(2000);
let chatStream$ = keyStream$
.map(ev => ev.key)
.filter(key => key !== "Backspace")
.buffer(breakStream$)
.switchMap(newContent => Rx.Observable.of("send text as I type", newContent));
chatStream$.subscribe(data=> console.log(data));
这所做的是收集事件,直到有 2 秒的空闲时间。在那个时刻,我们释放所有已经缓冲起来的关键事件。当我们释放所有这些事件时,例如,我们可以通过 AJAX 将它们发送到某个地方。这在聊天应用中是一个典型的场景。使用前面的代码,我们总是可以发送最新输入的字符。
bufferTime()运算符
与buffer()
非常相似的运算符是bufferTime()
。这个运算符允许我们指定我们希望缓冲事件多长时间。它比buffer()
稍微灵活一些,但仍然非常有用。
流式思维
到目前为止,我们已经经历了一系列场景,这些场景展示了我们有哪些运算符可供使用,以及它们如何被串联起来。我们还看到了像flatMap()
和switchMap()
这样的运算符如何在我们从一种类型的可观察对象移动到另一种类型时真正改变事情。那么,当与可观察对象一起工作时,你应该采取哪种方法?显然,我们需要使用运算符来表示一个算法,但我们应该从哪里开始呢?我们首先需要做的是思考起点和终点。我们想要捕获哪些类型的事件,最终结果应该是什么样子?这已经给我们提供了关于我们需要执行多少转换才能达到那里的线索。如果我们只想转换数据,那么我们可能只需要一个map()
运算符和一个filter()
运算符。如果我们想从一个Observable
转换到下一个,那么我们需要一个flatMap()
或switchMap()
。我们是否有特定的行为,比如等待用户停止输入?如果有,那么我们需要查看debounceTime()
或类似的运算符。这实际上与所有问题是一样的:分解问题,看看你有哪些部分,分解并征服。不过,让我们尝试将其分解成一系列步骤:
-
输入是什么?UI 事件或其他什么?
-
输出是什么?最终结果是什么?
-
根据第二个要点,我需要哪些转换才能达到目标?
-
我是否需要处理多个流?
-
我是否需要处理错误,如果是的话,应该如何处理?
希望这能让你了解如何思考流。记住,从小处着手,逐步实现你的目标。
摘要
我们着手学习更多关于基本操作符的知识。在这个过程中,我们遇到了map()
和filter()
操作符,它们使我们能够控制被发射的内容。对do()
操作符的了解为我们提供了调试流的方法。此外,我们还了解了存在沙盒环境,例如 JS Bin 和 RxFiddle,以及它们如何帮助我们快速开始使用 RxJS。接下来,我们深入探讨了 AJAX 这一主题,并构建了对可能出现的不同场景的理解。在深入 RxJS 的过程中,我们研究了不同的操作符类别。我们对这一点只是略作了解,但它为我们提供了一种方法来了解库中哪些类型的操作符。最后,我们通过探讨如何改变和发展我们的思维方式来思考流,结束了这一章节。
正是凭借所有这些获得的知识,我们现在准备进入下一章更高级的 Rx 主题。我们掌握了基础知识,现在是时候精通它们了。
更多推荐
所有评论(0)