實時資料是無邊界的,即不斷地有資料輸入,但我們的統計一般是有時間範圍的,離線統計以年月日為統計週期,最小能到小時週期,如果是分鐘甚至秒級別計算,則可認為是實時計算,我們把實時資料流按時間段分割成乙個個視窗,則可基於視窗進行資料統計。
我司開源pike支援三種視窗,結合各種udaf,通過sql就能能實現各種聚合統計:
跳動視窗是最直觀,最簡單的介面,如下圖,t1->t2為乙個視窗,t2->t3為乙個視窗,各個視窗之間沒有重疊,實時統計結果也是基於各個視窗內的資料,結果輸出頻率等於統計週期。
跳動視窗比較常用,一般適用於統計無交叉分鐘級別實時流量。
pike example:
withperiod 5m
select top 100 output(dt(outputctx())) as dt,
output('pc客戶端') as plt,
mjoin(dim_channelid,'dim_sync_channel','series_titlechinese') as title,
count(distinct userid, ipvalue) as uv
from dol_client
group by title
order by uv desc
滑動視窗相對跳動視窗稍複雜,主要在於相鄰視窗間有重疊,如下圖,t1->t3為視窗w1,t2->t4為視窗w2,w2與w1重疊t2->t3,此時w2相對w1滑動視窗為t1->t2,w1統計視窗為t1->t3, 統計視窗必須為滑動視窗的整數倍,即(t3-t1)%(t2-t1)=0。若統計視窗等於滑動視窗,則滑動視窗轉化為跳動視窗,因此,可認為跳動視窗是滑動視窗的特例。
滑動視窗主要適用於統計最近m時間內資料,輸出結果間隔為n, n <= m 且 m % n = 0。
pike example:
//以5分鐘為輸出頻率,同時統計最近5分鐘,最近10分鐘,最近4小時網頁端點直播uv和vv
withperiod 5m
select output(dt(outputctx())) as dt,
'ikan' as plt,
case when dim_liveondemand_c=102 then '點播' else '直播' end as type,
count(distinct userid,ipvalue) as uv_5m,
count(distinct userid,ipvalue,channelid) as vv_5m,
move('10m',count(distinct userid,ipvalue)) as uv_10m,
move('10m',count(distinct userid,ipvalue,channelid)) as vv_10m,
move('4h',linearcount(10000000,userid,ipvalue)) as uv_4h,
move('4h',linearcountex(100,userid,ipvalue,channelid)) as vv_4h,
from dol_ikan
group by plt,type
累計視窗則是累計乙個時間段內資料不斷輸出,例如w1為從t1開始累計到t2的資料,w2為從t1開始累計到t3的資料,w3為從t1開始累計到t4的資料,w1、w2、w3共享初始狀態;在乙個完整的累計週期內,完整累計週期必須為輸出頻率的整數倍,t1為初始狀態,t2輸出w1統計結果,t3輸出w2統計結果,t4輸出w3統計結果,t2-t1=t3-t2=t4-t3,(t3-t1)%(t2-t1)=0,(t4-t1)%(t2-t1)=0;下乙個完整累計週期則清零為初始化狀態重新開始統計,例如w6,w3都是乙個完整累計視窗,且w6,w3無交集,w6,w3之間如同跳動視窗。
累計視窗主要適用於獲取從整點或整天開始,累計到當前時間的統計資料,一般完整累計視窗與離線週期對應,但卻需要獲取當前時刻的實時統計資料,例如實時獲取當天累計vv、當前小時累計uv。
pike examlpe:
withperiod 5m
select output(dt(outputctx())) as dt,
channelid,
count(1) as logcount,
count(distinct userid, ipvalue) as uv,
count(distinct if(strisnullorempty(vvid), userid + channelid, vvid)) as vv,
accumulate('1h', count(1)) as logcount_thishour,
accumulate('1h', linearcount(10000000, userid, ipvalue)) as uv_thishour
accumulate('1h', linearcountex(100, if(strisnullorempty(vvid), userid + channelid, vvid))) as vv_thishour,
accumulate('1d', count(1)) as logcount_thisday,
accumulate('1d', hyperloglogcount(5, userid, ipvalue)) as uv_thisday,
accumulate('1d', loglogadaptivecount(5, if(strisnullorempty(vvid), userid + channelid, vvid))) as vv_thisday
from dol_smart
group by channelid
基於gst launch的實時轉碼
目標是實現乙個實時轉碼,可用於iptv提供節目源。相關工作在ubuntu作業系統下進行。需要對源 進行修改的時候,直接採用apt get source命令獲取源 根據需要進行修改,然後安裝,這樣能最大限度的保證相容性和穩定性。命令列示例 gstreamer是通過不同功能的element構成pipel...
spark之實時統計
這篇部落格其實和spark之spark streaming處理檔案流資料區別不是特別的大,權可以看作為畢業設計作準備的,使用了執行緒和通訊的模式處理檔案流,最後對5秒內的輸入資料進行統計,如下 package openclass import org.apache.spark.streaming.d...
hbase基於solr的實時索引
實時查詢方案 hbase key value store solr web前端實時查詢展示 1.hbase 提供海量資料儲存 2.solr提供索引構建與查詢 3.key value store 提供自動化索引構建 從hbase到solr 使用流程 前提 cdh5.3.2solr集群搭建好,cdh5....