正常情況下,乙個流在執行一次終端操作之後便結束了。本文通過複製流內資料的方式,曲折的實現了同乙個流上執行多次操作。
demo只是思路,其效能並不一定高效,尤其是資料都在記憶體中處理時複製的開銷很大。但如果流涉及大量i/o,也許效能會有提高。
public class streamforker
public streamforkerfork(object key, function, ?> f)
public results getresults() finally
return consumer;
}private forkingstreamconsumerbuild() , (m1, m2) -> );
return new forkingstreamconsumer<>(queues, actions);
}private future<?> getoperationresult(list> queues, function, ?> f)
}
accept
方法將原始流中所有的資料新增到各個blockingqueue內,此處實現了複製
class forkingstreamconsumerimplements consumer, results
@override
public void accept(t t)
@suppresswarnings("unchecked")
void finish()
@suppresswarnings("unchecked")
@override
public r get(object key) catch (exception e)
}}
此處重寫了tryadvance
介面,只是簡單的從blockingqueue中取出資料,執行action。業務邏輯中複製流是為了做什麼事情,action就是這件事情。forkingstreamconsumer.end_of_stream
是queue中資料結束的標示
class blockingqueuespliteratorimplements spliterator
@override
public boolean tryadvance(consumer<? super t> action) catch (interruptedexception e)
}if (t != forkingstreamconsumer.end_of_stream)
return false;
}@override
public spliteratortrysplit()
@override
public long estimatesize()
@override
public int characteristics()
}
多執行緒併發同乙個表問題
table for update for update of a.id a1.有where條件時,鎖定條件中指定的資料行 行級封鎖 2.無where條件是,鎖定表a 表級封鎖 1.有where條件時,鎖定條件中指定的資料行 行級封鎖 2.無where條件是,鎖定表a 表級封鎖 a,b直接封鎖a,b表...
如何同步共享同乙個list
例如多個執行緒要從同乙個list 中取物件,別的執行緒取了,其他執行緒則不可以再去這個物件.1.同步多執行緒 對 linklist 的removefirst 的操作或者 其他list 的remove 再get第乙個物件 的方法來實現.class sendsmstask implements runn...
celery 重複執行同乙個task
今天用celery 執行 task的時候碰到了 重複執行的情況,而且是重複執行了8次 電腦是8核的 谷歌了一下,celery 在執行task時有個機制,就是任務時長超過了 visibility timeout 時還沒執行完,就會指定其他worker重新開始task,預設的時長是一小時.但是我這個肯定...