理解 Node.js 事件驱动机制的原理
学习Node.js一定要理解的内容之一,文中主要涉及到了EventEmitter的使用和一些异步情况的处理,比较偏基础,值得一读。
大多数Node.js对象都依赖了EventEmitter模块来监听和响应事件,比如我们常用的HTTPrequests,responses,以及streams。
constEventEmitter=require('events');
事件驱动机制的最简单形式,是在Node.js中十分流行的回调函数,例如fs.readFile。在回调函数这种形式中,事件每被触发一次,回调就会被触发一次。
我们先来探索下这个最基本的方式。
你准备好了就叫我哈,Node!
很久很久以前,在js里还没有原生支持Promise,async/await还只是一个遥远的梦想,回调函数是处理异步问题的最原始的方式。
回调从本质上讲是传递给其他函数的函数,在JavaScript中函数是第一类对象,这也让回调的存在成为可能。
一定要搞清楚的是,回调在代码中的并不表示异步调用。回调既可以是同步调用的,也可以是异步调用的。
举个例子,这里有一个宿主函数fileSize,它接受一个回调函数cb,并且可以通过条件判断来同步或者异步地调用该回调函数:
functionfileSize(fileName,cb){ if(typeoffileName!=='string'){ //Sync returncb(newTypeError('argumentshouldbestring')); } fs.stat(fileName,(err,stats)=>{ if(err){ //Async returncb(err); } //Async cb(null,stats.size); }); }
这其实也是个反例,这样写经常会引起一些意外的错误,在设计宿主函数的时候,应当尽可能的使用同一种风格,要么始终都是同步的使用回调,要么始终都是异步的。
我们来研究下一个典型的异步Node函数的简单示例,它用回调样式编写:
constreadFileAsArray=function(file,cb){ fs.readFile(file,function(err,data){ if(err){ returncb(err); } constlines=data.toString().trim().split('\n'); cb(null,lines); }); };
readFileAsArray函数接受两个参数:一个文件路径和一个回调函数。它读取文件内容,将其拆分成行数组,并将该数组作为回调函数的参数传入,调用回调函数。
现在设计一个用例,假设我们在同一目录中的文件numbers.txt包含如下内容:
10 11 12 13 14 15
如果我们有一个需求,要求统计该文件中的奇数数量,我们可以使用readFileAsArray来简化代码:
readFileAsArray('./numbers.txt',(err,lines)=>{ if(err)throwerr; constnumbers=lines.map(Number); constoddNumbers=numbers.filter(n=>n%2===1); console.log('Oddnumberscount:',oddNumbers.length); });
这段代码将文件内容读入字符串数组中,回调函数将其解析为数字,并计算奇数的个数。
这才是最纯粹的Node回调风格。回调的第一个参数要遵循错误优先的原则,err可以为空,我们要将回调作为宿主函数的最后一个参数传递。你应该一直用这种方式这样设计你的函数,因为用户可能会假设。让宿主函数把回调当做其最后一个参数,并让回调函数以一个可能为空的错误对象作为其第一个参数。
回调在现代JavaScript中的替代品
在现代JavaScript中,我们有Promise,Promise可以用来替代异步API的回调。回调函数需要作为宿主函数的一个参数进行传递(多个宿主回调进行嵌套就形成了回调地狱),而且错误和成功都只能在其中进行处理。而Promise对象可以让我们分开处理成功和错误,还允许我们链式调用多个异步事件。
如果readFileAsArray函数支持Promise,我们可以这样使用它,如下所示:
readFileAsArray('./numbers.txt') .then(lines=>{ constnumbers=lines.map(Number); constoddNumbers=numbers.filter(n=>n%2===1); console.log('Oddnumberscount:',oddNumbers.length); }) .catch(console.error);
我们在宿主函数的返回值上调用了一个函数来处理我们的需求,这个.then函数会把刚刚在回调版本中的那个行数组传递给这里的匿名函数。为了处理错误,我们在结果上添加一个.catch调用,当发生错误时,它会捕捉到错误并让我们访问到这个错误。
在现代JavaScript中已经支持了Promise对象,因此我们可以很容易的将其使用在宿主函数之中。下面是支持Promise版本的readFileAsArray函数(同时支持旧有的回调函数方式):
constreadFileAsArray=function(file,cb=()=>{}){ returnnewPromise((resolve,reject)=>{ fs.readFile(file,function(err,data){ if(err){ reject(err); returncb(err); } constlines=data.toString().trim().split('\n'); resolve(lines); cb(null,lines); }); }); };
我们使该函数返回一个Promise对象,该对象包裹了fs.readFile的异步调用。Promise对象暴露了两个参数,一个resolve函数和一个reject函数。
当有异常抛出时,我们可以通过向回调函数传递error来处理错误,也同样可以使用Promise的reject函数。每当我们将数据交给回调函数处理时,我们同样也可以用Promise的resolve函数。
在这种同时可以使用回调和Promise的情况下,我们需要做的唯一一件事情就是为这个回调参数设置默认值,防止在没有传递回调函数参数时,其被执行然后报错的情况。在这个例子中使用了一个简单的默认空函数:()=>{}。
通过async/await使用Promise
当需要连续调用异步函数时,使用Promise会让你的代码更容易编写。不断的使用回调会让事情变得越来越复杂,最终陷入回调地狱。
Promise的出现改善了一点,Generator的出现又改善了一点。处理异步问题的最新解决方式是使用async函数,它允许我们将异步代码视为同步代码,使其整体上更加可读。
以下是使用async/await版本的调用readFileAsArray的例子:
asyncfunctioncountOdd(){ try{ constlines=awaitreadFileAsArray('./numbers'); constnumbers=lines.map(Number); constoddCount=numbers.filter(n=>n%2===1).length; console.log('Oddnumberscount:',oddCount); }catch(err){ console.error(err); } } countOdd();
首先,我们创建了一个async函数——就是一个普通的函数声明之前,加了个async关键字。在async函数内部,我们调用了readFileAsArray函数,就像把它的返回值赋值给变量lines一样,为了真的拿到readFileAsArray处理生成的行数组,我们使用关键字await。之后,我们继续执行代码,就好像readFileAsArray的调用是同步的一样。
要让代码运行,我们可以直接调用async函数。这让我们的代码变得更加简单和易读。为了处理异常,我们需要将异步调用包装在一个try/catch语句中。
有了async/await这个特性,我们不必使用任何特殊的API(如.then和.catch)。我们只是把这种函数标记出来,然后使用纯粹的JavaScript写代码。
我们可以把async/await这个特性用在支持使用Promise处理后续逻辑的函数上。但是,它无法用在只支持回调的异步函数上(例如setTimeout)。
EventEmitter模块
EventEmitter是一个处理Node中各个对象之间通信的模块。EventEmitter是Node异步事件驱动架构的核心。Node的许多内置模块都继承自EventEmitter。
它的概念其实很简单:emitter对象会发出被定义过的事件,导致之前注册的所有监听该事件的函数被调用。所以,emitter对象基本上有两个主要特征:
- 触发定义过的事件
- 注册或者取消注册监听函数
为了使用EventEmitter,我们需要创建一个继承自EventEmitter的类。
classMyEmitterextendsEventEmitter{ }
我们从EventEmitter的子类实例化的对象,就是emitter对象:
constmyEmitter=newMyEmitter();
在这些emitter对象的生命周期里,我们可以调用emit函数来触发我们想要的触发的任何被命名过的事件。
myEmitter.emit('something-happened');
emit函数的使用表示发生某种情况发生了,让大家去做该做的事情。这种情况通常是某些状态变化引起的。
我们可以使用on方法添加监听器函数,并且每次emitter对象触发其关联的事件时,将执行这些监听器函数。
事件!==异步
先看看这个例子:
constEventEmitter=require('events'); classWithLogextendsEventEmitter{ execute(taskFunc){ console.log('Beforeexecuting'); this.emit('begin'); taskFunc(); this.emit('end'); console.log('Afterexecuting'); } } constwithLog=newWithLog(); withLog.on('begin',()=>console.log('Abouttoexecute')); withLog.on('end',()=>console.log('Donewithexecute')); withLog.execute(()=>console.log('***Executingtask***'));
WithLog是一个事件触发器,它有一个方法——execute,该方法接受一个参数,即具体要处理的任务函数,并在其前后包裹log以输出其执行日志。
为了看到这里会以什么顺序执行,我们在两个命名的事件上都注册了监听器,最后执行一个简单的任务来触发事件。
下面是上面程序的输出结果:
Beforeexecuting Abouttoexecute ***Executingtask*** Donewithexecute Afterexecuting
这里我想证实的是以上的输出都是同步发生的,这段代码里没有什么异步的成分。
- 第一行输出了"Beforeexecuting"
- begin事件被触发,输出"Abouttoexecute"
- 真正应该被执行的任务函数被调用,输出"Executingtask"
- end事件被触发,输出"Donewithexecute"
- 最后输出"Afterexecuting"
就像普通的回调一样,不要以为事件意味着同步或异步代码。
跟之前的回调一样,不要一提到事件就认为它是异步的或者同步的,还要具体分析。
如果我们传递taskFunc是一个异步函数,会发生什么呢?
//... withLog.execute(()=>{ setImmediate(()=>{ console.log('***Executingtask***') }); });
输出结果变成了这样:
Beforeexecuting Abouttoexecute Donewithexecute Afterexecuting ***Executingtask***
这样就有问题了,异步函数的调用导致"Donewithexecute"和"Afterexecuting"的输出并不准确。
要在异步函数完成后发出事件,我们需要将回调(或Promise)与基于事件的通信相结合。下面的例子说明了这一点。
使用事件而不是常规回调的一个好处是,我们可以通过定义多个监听器对相同的信号做出多个不同的反应。如果使用回调来完成这件事,我们要在单个回调中写更多的处理逻辑。事件是应用程序允许多个外部插件在应用程序核心之上构建功能的好办法。你可以把它们当成钩子来挂一些由于状态变化而引发执行的程序。
异步事件
我们把刚刚那些同步代码的示例改成异步的:
constfs=require('fs'); constEventEmitter=require('events'); classWithTimeextendsEventEmitter{ execute(asyncFunc,...args){ this.emit('begin'); console.time('execute'); asyncFunc(...args,(err,data)=>{ if(err){ returnthis.emit('error',err); } this.emit('data',data); console.timeEnd('execute'); this.emit('end'); }); } } constwithTime=newWithTime(); withTime.on('begin',()=>console.log('Abouttoexecute')); withTime.on('end',()=>console.log('Donewithexecute')); withTime.execute(fs.readFile,__filename);
用WithTime类执行asyncFunc函数,并通过调用console.time和console.timeEnd报告该asyncFunc所花费的时间。它在执行之前和之后都将以正确的顺序触发相应的事件,并且还会发出error/data事件作为处理异步调用的信号。
我们传递一个异步的fs.readFile函数来测试一下withTimeemitter。我们现在可以直接通过监听data事件来处理读取到的文件数据,而不用把这套处理逻辑写到fs.readFile的回调函数中。
执行这段代码,我们以预期的顺序执行了一系列事件,并且得到异步函数的执行时间,这些是十分重要的。
Abouttoexecute execute:4.507ms Donewithexecute
请注意,我们是将回调与事件触发器emitter相结合实现的这部分功能。如果asynFunc支持Promise,我们可以使用async/await函数来做同样的事情:
classWithTimeextendsEventEmitter{ asyncexecute(asyncFunc,...args){ this.emit('begin'); try{ console.time('execute'); constdata=awaitasyncFunc(...args); this.emit('data',data); console.timeEnd('execute'); this.emit('end'); }catch(err){ this.emit('error',err); } } }
我认为这段代码比之前的回调风格的代码以及使用.then/.catch风格的代码更具可读性。async/await让我们更加接近JavaScript语言本身(不必再使用.then/.catch这些api)。
事件参数和错误
在之前的例子中,有两个事件被发出时还携带了别的参数。
error事件被触发时会携带一个error对象。
this.emit('error',err);
data事件被触发时会携带一个data对象。
this.emit('data',data);
我们可以在emit函数中不断的添加参数,当然第一个参数一定是事件的名称,除去第一个参数之外的所有参数都可以在该事件注册的监听器中使用。
例如,要处理data事件,我们注册的监听器函数将访问传递给emit函数的data参数,而这个data也正是由asyncFunc返回的数据。
withTime.on('data',(data)=>{ //dosomethingwithdata });
error事件比较特殊。在我们基于回调的那个示例中,如果不使用监听器处理error事件,node进程将会退出。
举个由于错误使用参数而造成程序崩溃的例子:
classWithTimeextendsEventEmitter{ execute(asyncFunc,...args){ console.time('execute'); asyncFunc(...args,(err,data)=>{ if(err){ returnthis.emit('error',err);//NotHandled } console.timeEnd('execute'); }); } } constwithTime=newWithTime(); withTime.execute(fs.readFile,'');//BADCALL withTime.execute(fs.readFile,__filename);
第一次调用execute将会触发error事件,由于没有处理error,Node程序随之崩溃:
events.js:163 thrower;//Unhandled'error'event ^ Error:ENOENT:nosuchfileordirectory,open''
第二次执行调用将受到此崩溃的影响,并且可能根本不会被执行。
如果我们为这个error事件注册一个监听器函数来处理error,结果将大不相同:
withTime.on('error',(err)=>{ //dosomethingwitherr,forexamplelogitsomewhere console.log(err) });
如果我们执行上述操作,将会报告第一次执行execute时发送的错误,但是这次node进程不会崩溃退出,其他程序的调用也都能正常完成:
{Error:ENOENT:nosuchfileordirectory,open''errno:-2,code:'ENOENT',syscall:'open',path:''}
execute:4.276ms
需要注意的是,基于Promise的函数有些不同,它们暂时只是输出一个警告:
UnhandledPromiseRejectionWarning:Unhandledpromiserejection(rejectionid:1):Error:ENOENT:nosuchfileordirectory,open''
DeprecationWarning:Unhandledpromiserejectionsaredeprecated.Inthefuture,promiserejectionsthatarenothandledwillterminatetheNode.jsprocesswithanon-zeroexitcode.
另一种处理异常的方式是在监听全局的uncaughtException进程事件。然而,使用该事件全局捕获错误并不是一个好办法。
关于uncaughtException,一般都会建议你避免使用它,但是如果必须用它,你应该让进程退出:
process.on('uncaughtException',(err)=>{ //somethingwentunhandled. //Doanycleanupandexitanyway! console.error(err);//don'tdojustthat. //FORCEexittheprocesstoo. process.exit(1); });
但是,假设在同一时间发生多个错误事件,这意味着上面的uncaughtException监听器将被多次触发,这可能会引起一些问题。
EventEmitter模块暴露了once方法,这个方法发出的信号只会调用一次监听器。所以,这个方法常与uncaughtException一起使用。
监听器的顺序
如果针对一个事件注册多个监听器函数,当事件被触发时,这些监听器函数将按其注册的顺序被触发。
//first withTime.on('data',(data)=>{ console.log(`Length:${data.length}`); }); //second withTime.on('data',(data)=>{ console.log(`Characters:${data.toString().length}`); }); withTime.execute(fs.readFile,__filename);
上述代码会先输出Length信息,再输出Characters信息,执行的顺序与注册的顺序保持一致。
如果你想定义一个新的监听函数,但是希望它能够第一个被执行,你还可以使用prependListener方法:
withTime.on('data',(data)=>{ console.log(`Length:${data.length}`); }); withTime.prependListener('data',(data)=>{ console.log(`Characters:${data.toString().length}`); }); withTime.execute(fs.readFile,__filename);
上述代码中,Charaters信息将首先被输出。
最后,你可以用removeListener函数来删除某个监听器函数。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。