一 flink基礎之資料讀取

2021-10-12 10:14:42 字數 3018 閱讀 5901

使用者名稱訪問url

時間戳asdzxc

1.測試從文字讀取資料

文字讀取則是有限流,會直接讀取完畢。這裡我們讀取乙個使用者訪問乙個**資訊

visitor_id string,url string,timestamp long

這也是常用的一種方式來做離線分析,一般上報的日誌都會切割為日誌,我們可以直接讀取日誌來進行分析

datastream

textstream = env.

readtextfile

("d:\\develop\\projects\\flink-study"

+"-project\\src\\resources\\readfile");

datastream

logs = textstream.

map(

newmapfunction

()})

;

這種方式我們一般不適用,因為在大資料場景下,一般也不會將大資料檔案放在容器中,因為容器裡面就是記憶體裡面,大資料檔案動輒j上t,怎麼可能放在容器裡,所以這種方式也僅僅是測試邏輯罷了。

/**

* 2.測試從容器中讀取資料

;

這種方式是目前企業一般使用的,採集—解耦儲存元件—實時處理平台。比如flume—kafka—spark|flink|storm。或者是阿里的sls—datahub—blink—datahub—bi|mysql|adb

properties properties =

newproperties()

; properties.

setproperty

("bootstrap.servers"

,"pc:9092");

properties.

setproperty

("group.id"

,"log1");

datastream

kafkastream = env.

addsource

(new

flinkkafkaconsumer011

("visitor_log"

,new

******stringschema()

, properties)

);

優秀如flink元件,也不是適配萬能源的。畢竟通用的源就那麼幾個,不可能官方都匹配了。所以遇到一些企業的小範圍使用元件,我們就需要使用自定源來實現讀取資料。我們只要實現sourcefunction即可實現。

package com.keino.apitest;

import org.apache.flink.streaming.api.functions.source.sourcefunction;

//這裡實現介面的時候,需要定義產生的資料型別

public

class

mysource

implements

sourcefunction

}/** * 該方法用來停止資料的生成

*/@override

public

void

cancel()

}

/**

* 4.自定義source

*/datastream

mysourcestream = env.

addsource

(new

mysource()

);mysourcestream.

print

("user define test").

setparallelism(1

);env.

execute()

;

以上完整**

c 基礎之讀取資料

從檔案中讀取資料 1.開啟檔案 fopen,得到file 2.讀取資料 fread 3.關閉檔案 fclose 開啟檔案 const char filename c test aaa.xyz file fp fopen filename,rb if fp null 注意 模式為 rb read bi...

大資料基礎 Flink 視窗模型

在大多數場景下,我們需要統計的資料流都是無界的,因此我們無法等待整個資料流終止後才進行統計。通常情況下,我們只需要對某個時間範圍或者數量範圍內的資料進行統計分析 如每隔五分鐘統計一次過去一小時內所有商品的點選量 或者每發生1000次點選後,都去統計一下每個商品點選率的佔比。在 flink 中,我們使...

Curator之讀取資料。

下面來看如何通過curator介面來獲取節點的資料內容。statable pathable 以上就是一系列最常用的讀取資料節點內容的api介面,下面通過一些場景來說明如何使用這些api。client.getdata forpath path 注意,該介面呼叫後的返回值是byte。client.get...