create table tbl (
id int8,
-- ..... 其他字段 (比如已完結狀態)
state int, -- 完結狀態(1 表示已完結)
deadts timestamp, -- 超時時間
nts interval, -- 超時間隔,用於更新下一次通知時間 (比如一天通知一次)
notify_times int default 0, -- 通知次數
deadts_next timestamp -- 下一次通知時間
);
create index idx_tbl_1 on tbl (deadts,notify_times,state) where notify_times=0 and state<>1;
create index idx_tbl_2 on tbl (deadts_next,state) where deadts_next is not null and state<>1;
with tmp1 as (
update tbl set
deadts_next=now()+nts,
notify_times=notify_times+1
where ctid = any (array(
select ctid from tbl where
( deadts < now() and notify_times=0 and state<>1)
union all
select ctid from tbl where
( deadts_next < now() and deadts_next is not null and state<>1)
limit 10000 -- 一次獲取1萬條超時資料
))
returning *
) select * from tmp1;
cte scan on tmp1 (cost=18163.25..18163.45 rows=10 width=48)
cte tmp1
-> update on tbl tbl_2 (cost=18151.05..18163.25 rows=10 width=54)
initplan 1 (returns $0)
-> limit (cost=0.13..18151.03 rows=10000 width=6)
-> index scan using idx_tbl_1 on tbl (cost=0.13..169527.13 rows=369766 width=6)
index cond: (deadts < now())
-> index scan using idx_tbl_2 on tbl tbl_1 (cost=0.43..590959.46 rows=51535 width=6)
index cond: (deadts_next < now())
-> tid scan on tbl tbl_2 (cost=0.01..12.21 rows=10 width=54)
tid cond: (ctid = any ($0))
(12 rows)
-- 1億條完結
insert into tbl select id, 1, now(), '5 min', 0, null from generate_series(1,100000000) t(id);
-- 100萬條超時
insert into tbl select id, 0, now(), '5 min', 0, null from generate_series(1,1000000) t(id);
with tmp1 as (
update tbl set
deadts_next=now()+nts,
notify_times=notify_times+1
where ctid = any (array(
select ctid from tbl where
( deadts < now() and notify_times=0 and state<>1)
union all
select ctid from tbl where
( deadts_next < now() and deadts_next is not null and state<>1)
limit 10000 -- 一次獲取1萬條超時資料
))
returning *
) select * from tmp1;
-- 計畫
cte scan on tmp1 (cost=18163.25..18163.45 rows=10 width=48) (actual time=39.092..78.707 rows=10000 loops=1)
output: tmp1.id, tmp1.state, tmp1.deadts, tmp1.nts, tmp1.notify_times, tmp1.deadts_next
buffers: shared hit=75094 read=49 dirtied=49
cte tmp1
-> update on public.tbl tbl_2 (cost=18151.05..18163.25 rows=10 width=54) (actual time=39.089..74.637 rows=10000 loops=1)
output: tbl_2.id, tbl_2.state, tbl_2.deadts, tbl_2.nts, tbl_2.notify_times, tbl_2.deadts_next
buffers: shared hit=75094 read=49 dirtied=49
initplan 1 (returns $0)
-> limit (cost=0.13..18151.03 rows=10000 width=6) (actual time=31.265..36.899 rows=10000 loops=1)
output: tbl.ctid
buffers: shared hit=11395
buffers: shared hit=11395
-> index scan using idx_tbl_1 on public.tbl (cost=0.13..169527.13 rows=369766 width=6) (actual time=0.014..0.014 rows=0 loops=1)
output: tbl.ctid
index cond: (tbl.deadts < now())
buffers: shared hit=1
-> index scan using idx_tbl_2 on public.tbl tbl_1 (cost=0.43..590959.46 rows=51535 width=6) (actual time=31.249..33.870 rows=10000 loops=1)
output: tbl_1.ctid
index cond: (tbl_1.deadts_next < now())
buffers: shared hit=11394
-> tid scan on public.tbl tbl_2 (cost=0.01..12.21 rows=10 width=54) (actual time=39.017..43.529 rows=10000 loops=1)
output: tbl_2.id, tbl_2.state, tbl_2.deadts, tbl_2.nts, (tbl_2.notify_times + 1), (now() + tbl_2.nts), tbl_2.ctid
tid cond: (tbl_2.ctid = any ($0))
buffers: shared hit=21395
planning time: 0.301 ms
execution time: 79.905 ms
time: 79.905 ms
基於spark的流式資料處理 DStream概述
spark streaming工作機制 spark streaming程式的基本步驟 建立streamingcontext物件 spark streaming工作機制 在spark streaming中,會有乙個元件receiver,作為乙個長期執行的task跑在乙個executor上 每個rece...
訊息佇列 怎麼處理訊息佇列中重複的資料
在分布式系統中,上游發來的訊息佇列必然會存在重複的資料,這是不可避免的。producer在傳送訊息時比如遇到網路問題時,傳送後因超時得不到伺服器的ack,從而進行重發。如果傳送的訊息內容是銀行扣款,那麼發生的問題可想而知。有人會問為啥中介軟體不能幫我們做排重?中介軟體排重會有以下問題 名稱質量標準 ...
資料庫連線池的超時處理
資料庫連線池,能夠減少頻繁的建立,達到節約資源的效果,但是當連線遠端資料庫的時候,會出現資料庫連線池快速耗盡的問題,如果在資料連線池裡對超時的資料連線進行監視,超過時間就關閉,這樣往往會導致資料庫的異常,只能夠在呼叫遠端的資料連線的實際操作程式中去設定.private void updatestat...