獲取資料源,自定義下沉器本處暫時不贅述,主要是對核心topn的**進行解析
獲取資料流並轉化成物件
datastream
datastream = datastreamsource.
map(value-
> jsonobject.
parseobject
(value,useraction.
class))
;
將亂序資料抽取出來,設定watermark
datastream
timeddata = datastream.
assigntimestampsandwatermarks
(new
useractiont***tractor()
)public
static
class
useractiont***tractor
extends
boundedoutofordernesstimestampextractor
@override
public
long
extracttimestamp
(useraction useraction)
}
過濾出購買行為
datastream
filterdata = timeddata.
filter
(new
filterfunction
})
視窗統計購買數量
datastream
windoweddata = filterdata.
keyby
("itemid").
timewindow
(time.
minutes
(60l)
,time.
minutes
(5l)).
aggregate
(new
countagg()
,new
windowresultfunction()
)//商品購買實體類
public
static
class
itembuycount
//count聚合函式
public
static
class
countagg
implements
aggregatefunction
@override
public long add
(useraction useraction,long acc)
@override
public long getresult
(long acc)
@override
public long merge
(long acc1,long acc2)
}public
static
class
windowresultfunction
implements
windowfunction
}
計算topn
datastream
> topitems = windoweddata.
keyby
("windowend").
process
(new
topnhotitems(3
));
public
static topnhotitems extends
keyedprocessfunction
>
@override
public
void
open
(configuration parameters)
throws exception
@override
public
void
processelement
(itembuycount input,context context, collector
> collector)
throws exception
@override
public
void
ontimer
(long timestamp,ontimercontext ctx,collector
> out)
throws exception
//請空狀態,釋放空間
itemstate.
clear()
; allitems.
sort
(new
comparator
()})
; list
itembuycounts =
newarraylist
<
>()
;for
(int i=
0;i) out.
collect
(itembuycounts);}
}
10.最後把得到的流輸出到自定義的sink中 python 簡單的socket應用(針對TCP)
出處 可以總結為以下五個步驟 1.建立socket套接字 2.繫結ip和port 3.listen使套接字變為被動連線 4.accept等待客戶端連線 5.recv send接受和傳送資料 先做一些準備工作 import socket server socket.socket family,type...
Linux共享目錄 特殊許可權t的應用
建立乙個共享目錄,使其他使用者可以在這個目錄下建立檔案,但是不能刪除別的使用者建立的檔案 建立乙個共享目錄,並設定許可權 root catyuan mkdir test1 root catyuan chmod 777 var test root catyuan chmod o t var test ...
python實現excel內容逐行寫入txt
最近在做文字分類,拿到的資料很亂。要做下一步,不管是分詞還是tfidf都要先做資料的分類。3萬篇文章,在乙個excel中,每行有每篇文章的id 內容 title content 分類 relative breeds 共三列 按分類建立子目錄,文章按分類放入子目錄中,每篇文章寫入乙個txt檔案,txt...