本篇文章帶大家解讀一下Node.js流源碼,深入了解下Node可讀流,看看其基本原理、使用方法與工作機制,希望對大家有所幫助!
1. 基本概念
1.1. 流的歷史演變
流不是 Nodejs 特有的概念。 它們是幾十年前在 Unix 操作系統中引入的,程序可以通過管道運算符(|)對流進行相互交互。
在基于Unix系統的MacOS以及Linux中都可以使用管道運算符(|),他可以將運算符左側進程的輸出轉換成右側的輸入。
在Node中,我們使用傳統的readFile去讀取文件的話,會將文件從頭到尾都讀到內存中,當所有內容都被讀取完畢之后才會對加載到內存中的文件內容進行統一處理。
這樣做會有兩個缺點:
-
內存方面:占用大量內存
-
時間方面:需要等待數據的整個有效負載都加載完才會開始處理數據
為了解決上述問題,Node.js效仿并實現了流的概念,在Node.js流中,一共有四種類型的流,他們都是Node.js中EventEmitter的實例:
-
可讀流(Readable Stream)
-
可寫流(Writable Stream)
-
可讀可寫全雙工流(Duplex Stream)
-
轉換流(Transform Stream)
為了深入學習這部分的內容,循序漸進的理解Node.js中流的概念,并且由于源碼部分較為復雜,本人決定先從可讀流開始學習這部分內容。
1.2. 什么是流(Stream)
流是一種抽象的數據結構,是數據的集合,其中存儲的數據類型只能為以下類型(僅針對objectMode === false的情況):
- string
- Buffer
我們可以把流看作這些數據的集合,就像液體一樣,我們先把這些液體保存在一個容器里(流的內部緩沖區BufferList),等到相應的事件觸發的時候,我們再把里面的液體倒進管道里,并通知其他人在管道的另一側拿自己的容器來接里面的液體進行處理。
1.3. 什么是可讀流(Readable Stream)
可讀流是流的一種類型,他有兩種模式三種狀態
兩種讀取模式:
-
流動模式:數據會從底層系統讀取,并通過EventEmitter盡快的將數據傳遞給所注冊的事件處理程序中
-
暫停模式:在這種模式下將不會讀取數據,必須顯示的調用Stream.read()方法來從流中讀取數據
三種狀態:
-
readableFlowing === null:不會產生數據,調用Stream.pipe()、Stream.resume會使其狀態變為true,開始產生數據并主動觸發事件
-
readableFlowing === false:此時會暫停數據的流動,但不會暫停數據的生成,因此會產生數據積壓
-
readableFlowing === true:正常產生和消耗數據
2. 基本原理
2.1. 內部狀態定義(ReadableState)
ReadableState
_readableState: ReadableState { objectMode: false, // 操作除了string、Buffer、null之外的其他類型的數據需要把這個模式打開 highWaterMark: 16384, // 水位限制,1024 * 16,默認16kb,超過這個限制則會停止調用_read()讀數據到buffer中 buffer: BufferList { head: null, tail: null, length: 0 }, // Buffer鏈表,用于保存數據 length: 0, // 整個可讀流數據的大小,如果是objectMode則與buffer.length相等 pipes: [], // 保存監聽了該可讀流的所有管道隊列 flowing: null, // 可獨流的狀態 null、false、true ended: false, // 所有數據消費完畢 endEmitted: false, // 結束事件收否已發送 reading: false, // 是否正在讀取數據 constructed: true, // 流在構造好之前或者失敗之前,不能被銷毀 sync: true, // 是否同步觸發'readable'/'data'事件,或是等到下一個tick needReadable: false, // 是否需要發送readable事件 emittedReadable: false, // readable事件發送完畢 readableListening: false, // 是否有readable監聽事件 resumeScheduled: false, // 是否調用過resume方法 errorEmitted: false, // 錯誤事件已發送 emitClose: true, // 流銷毀時,是否發送close事件 autoDestroy: true, // 自動銷毀,在'end'事件觸發后被調用 destroyed: false, // 流是否已經被銷毀 errored: null, // 標識流是否報錯 closed: false, // 流是否已經關閉 closeEmitted: false, // close事件是否已發送 defaultEncoding: 'utf8', // 默認字符編碼格式 awaitDrainWriters: null, // 指向監聽了'drain'事件的writer引用,類型為null、Writable、Set<Writable> multiAwaitDrain: false, // 是否有多個writer等待drain事件 readingMore: false, // 是否可以讀取