使用者名稱訪問url
時間戳asdzxc
1.測試從文字讀取資料
文字讀取則是有限流,會直接讀取完畢。這裡我們讀取乙個使用者訪問乙個**資訊
visitor_id string,url string,timestamp long
這也是常用的一種方式來做離線分析,一般上報的日誌都會切割為日誌,我們可以直接讀取日誌來進行分析
datastream
textstream = env.
readtextfile
("d:\\develop\\projects\\flink-study"
+"-project\\src\\resources\\readfile");
datastream
logs = textstream.
map(
newmapfunction
()})
;
這種方式我們一般不適用,因為在大資料場景下,一般也不會將大資料檔案放在容器中,因為容器裡面就是記憶體裡面,大資料檔案動輒j上t,怎麼可能放在容器裡,所以這種方式也僅僅是測試邏輯罷了。
/**
* 2.測試從容器中讀取資料
;
這種方式是目前企業一般使用的,採集—解耦儲存元件—實時處理平台。比如flume—kafka—spark|flink|storm。或者是阿里的sls—datahub—blink—datahub—bi|mysql|adb
properties properties =
newproperties()
; properties.
setproperty
("bootstrap.servers"
,"pc:9092");
properties.
setproperty
("group.id"
,"log1");
datastream
kafkastream = env.
addsource
(new
flinkkafkaconsumer011
("visitor_log"
,new
******stringschema()
, properties)
);
優秀如flink元件,也不是適配萬能源的。畢竟通用的源就那麼幾個,不可能官方都匹配了。所以遇到一些企業的小範圍使用元件,我們就需要使用自定源來實現讀取資料。我們只要實現sourcefunction即可實現。
package com.keino.apitest;
import org.apache.flink.streaming.api.functions.source.sourcefunction;
//這裡實現介面的時候,需要定義產生的資料型別
public
class
mysource
implements
sourcefunction
}/** * 該方法用來停止資料的生成
*/@override
public
void
cancel()
}
/**
* 4.自定義source
*/datastream
mysourcestream = env.
addsource
(new
mysource()
);mysourcestream.
print
("user define test").
setparallelism(1
);env.
execute()
;
以上完整** c 基礎之讀取資料
從檔案中讀取資料 1.開啟檔案 fopen,得到file 2.讀取資料 fread 3.關閉檔案 fclose 開啟檔案 const char filename c test aaa.xyz file fp fopen filename,rb if fp null 注意 模式為 rb read bi...
大資料基礎 Flink 視窗模型
在大多數場景下,我們需要統計的資料流都是無界的,因此我們無法等待整個資料流終止後才進行統計。通常情況下,我們只需要對某個時間範圍或者數量範圍內的資料進行統計分析 如每隔五分鐘統計一次過去一小時內所有商品的點選量 或者每發生1000次點選後,都去統計一下每個商品點選率的佔比。在 flink 中,我們使...
Curator之讀取資料。
下面來看如何通過curator介面來獲取節點的資料內容。statable pathable 以上就是一系列最常用的讀取資料節點內容的api介面,下面通過一些場景來說明如何使用這些api。client.getdata forpath path 注意,該介面呼叫後的返回值是byte。client.get...