實際業務中可能出現重複消費乙個可讀流的情況,比如在前置過濾器解析請求體,拿到body進行相關許可權及身份認證;認證通過後框架或者後置過濾器再次解析請求體傳遞給業務上下文。因此,重複消費同乙個流的需求並不奇葩,這類似於js上下文中通過 deep clone乙個物件來操作這個物件副本,防止源資料被汙染。
const koa = require('koa');
let parse = function(ctx));
ctx.req.on('end',()=>);
})}// 認證
let body = json.parse(decodeuricomponent(await parse(ctx)));
if(body.name != 'admin')
await next();
})// 解析body體,傳遞給業務層
let body = await parse(ctx);
ctx.postbody = body;
await next();
}) ctx.body = 'hello world\n';
ctx.body += `post body: $`;
});
上述**片段無法正常執行,請求無法得到響應。這是因為在前置過濾器的認證邏輯中消費了請求體,在第二級過濾器中就無法再次消費請求體,因此請求會阻塞。實際業務中,認證邏輯往往是與每個公司規範相關的,是乙個「二方庫」;而示例中的第二季過濾器則通常作為乙個三方庫存在,因此為了不影響第三方包消費請求體,必須在認證的二方包中儲存 ctx.req 這個可讀流的資料仍然存在,這就涉及到本文的主旨了。
複製流並不像複製乙個物件一樣簡單與直接,流的使用是一次性的,一旦乙個可讀流被消費(寫入乙個writeable物件中),那麼這個可讀流就是不可再生的,無法再使用。可是通過一些簡單的技巧可以再次復原乙個可讀流,不過這個復原出來的流雖然內容和之前的流相同,但卻不是同乙個物件了,因此這兩個物件的屬性及原型都不同,這往往會影響後續的使用,不過辦法總是有的,且看下文。
可讀流的「影分身之術」和鳴人的差不多,不過僅限於被轉殖物件的流這一特性,即保證轉殖出的流有著相同的資料。但是轉殖出來的流卻無法擁有原物件的其他屬性,但我們可通過原型鏈繼承的方式實現屬性及方法的繼承。
let readable = require('stream').readable;點評: 這種影分身之術可以同時複製出多個可讀流,同時需要針對原來的流重新進行賦值,並繼承原有屬性,這樣才能不影響後續的重複消費。let fs = require('fs');
let path = require('path');
class newreadable extends readable
start() );
this.originreadable.on('end',()=>);
this.originreadable.on('error',(e)=>);
}// 作為readable的實現類,必須實現_read函式,否則會throw error
_read()
} let clonereq = new newreadable(ctx.req);
let clonereq2 = new newreadable(ctx.req);
// 此時,ctx.req已被消費完(沒有內容),所有的資料都完全在轉殖出的兩個流上
// 消費clonereq,獲取認證資料
let body = json.parse(decodeuricomponent(await parse()));
// 將轉殖出的clonereq2重新設定原型鏈,繼承ctx.req原有屬性
clonereq2.__proto__ = ctx.req;
// 此後重新給ctx.req複製,留給後續過濾器消費
ctx.req = clonereq2;
if(body.name != 'admin')
await next();
})
stream模組有乙個特殊的類,即transform。關於transfrom的特性,我曾在深入node之transform一文中詳細介紹過,他擁有可讀可寫流雙重特性,那麼利用transfrom可以快速簡單的實現轉殖。
首先,通過 pipe 函式將可讀流導向兩個 transform流(之所以是兩個,是因為需要在前置過濾器消費乙個流,後續的過濾器消費第二個)。
let clonereq = new transform(
});let clonereq2 = new transform(
});ctx.req.pipe(clonereq)
ctx.req.pipe(clonereq2)
上述**中,看似 ctx.req 流被消費(pipe)了兩次,實際上pipe函式則可以看成 readable和writeable實現backpressure的一種「語法糖」實現,具體可通過 node中的stream-readable和writeable解讀 了解,因此得到的結果就是「ctx.req被消費了一次,可是資料卻複製在clonereq和clonereq2這兩個transfrom物件的讀緩衝區裡,實現了clone」
其實pipe針對readable和writeable做了限流,首先針對readable的data事件進行偵聽,並執行writeable的write函式,當writeable的寫緩衝區大於乙個臨界值(highwatermark),導致write函式返回false(此時意味著writeable無法匹配readable的速度,writeable的寫緩衝區已經滿了),此時,pipe修改了readable模式,執行pause方法,進入paused模式,停止讀取讀緩衝區。而同時writeable開始重新整理寫緩衝區,重新整理完畢後非同步觸發drain事件,在該事件處理函式中,設定readable為flowing狀態,並繼續執行flow函式不停的重新整理讀緩衝區,這樣就完成了pipe限流。需要注意的是,readable和writeable各自維護了乙個緩衝區,在實現的上有區別:readable的緩衝區是乙個陣列,存放buffer、string和object型別;而writeable則是乙個有向鍊錶,依次存放需要寫入的資料。最後,在資料複製的同時,再給其中乙個物件複製額外的屬性即可:
// 將轉殖出的clonereq2重新設定原型鏈,繼承ctx.req原有屬性
clonereq2.__proto__ = ctx.req;
// 此後重新給ctx.req複製,留給後續過濾器消費
ctx.req = clonereq2;
至此,通過transform實現clone已完成。完整的**如下(最前置過濾器):
// 認證
// let clonereq = new newreadable(ctx.req);
// let clonereq2 = new newreadable(ctx.req);
let clonereq = new transform(
});let clonereq2 = new transform(
});ctx.req.pipe(clonereq)
ctx.req.pipe(clonereq2)
// 此時,ctx.req已被消費完(沒有內容),所有的資料都完全在轉殖出的兩個流上
// 消費clonereq,獲取認證資料
let body = json.parse(decodeuricomponent(await parse()));
// 將轉殖出的clonereq2重新設定原型鏈,繼承ctx.req原有屬性
clonereq2.__proto__ = ctx.req;
// 此後重新給ctx.req複製,留給後續過濾器消費
ctx.req = clonereq2;
if(body.name != 'admin')
await next();
})
說明
ctx.req執行兩次pipe到對應clonereq和clonereq2,然後立即消費clonereq物件,這樣合理嗎?如果源資料夠大,pipe還未結束就在消費clonereq,會不會有什麼問題?
其實 pipe函式裡面大多是非同步操作,即針對 源和目的流做的一些流控措施。目的流使用的是clonereq物件,該物件在例項化的過程中transform函式直接通過呼叫next函式將接受到的資料傳入到transform物件的可讀流快取中,同時觸發『readable和data事件』。這樣,我們在下文消費clonereq物件也是通過「偵聽data事件」實現的,因此即使ctx.req的資料仍沒有被消費完,下文仍可以正常消費clonereq物件。資料流仍然可以看做是從ctx.req --> clonereq --> 消費。
使用transform流實現clone 可讀流的弊端:
上例中,transfrom流的例項化傳入了乙個引數highwatermark,該引數在transfrom中的作用 在 上文深入node之transform中有過詳解,即當transfrom流的讀緩衝大小 < highwatermark時,transfrom流就會將接收到的資料儲存在讀緩衝裡,等待消費,同時執行 transfrom函式;否則什麼都不做。
因此,當要clone的源內容大於highwatermark時,就無法正常使用這種方式進行clone了,因為由於源內容》highwatermark,在沒有後續消費transfrom流的情況下就不執行transfrom方法(當transfrom流被消費時,transfrom流的讀緩衝就會變小,當其大小所以設定乙個合理的highwatermark大小很重要,預設的highwatermark為 16kb。
巧妙複製乙個流
實際業務中可能出現重複消費乙個可讀流的情況,比如在前置過濾器解析請求體,拿到body進行相關許可權及身份認證 認證通過後框架或者後置過濾器再次解析請求體傳遞給業務上下文。因此,重複消費同乙個流的需求並不奇葩,這類似於js上下文中通過 deep clone乙個物件來操作這個物件副本,防止源資料被汙染。...
巧妙複製乙個流
實際業務中可能出現重複消費乙個可讀流的情況,比如在前置過濾器解析請求體,拿到body進行相關許可權及身份認證 認證通過後框架或者後置過濾器再次解析請求體傳遞給業務上下文。因此,重複消費同乙個流的需求並不奇葩,這類似於js上下文中通過 deep clone乙個物件來操作這個物件副本,防止源資料被汙染。...
巧妙複製乙個流
實際業務中可能出現重複消費乙個可讀流的情況,比如在前置過濾器解析請求體,拿到body進行相關許可權及身份認證 認證通過後框架或者後置過濾器再次解析請求體傳遞給業務上下文。因此,重複消費同乙個流的需求並不奇葩,這類似於js上下文中通過 deep clone乙個物件來操作這個物件副本,防止源資料被汙染。...