自己定義乙個單並行度的source,需要自己實現乙個sourcefunction介面 !
import org.apache.flink.streaming.api.functions.source.sourcefunction;
/** * 自己定義乙個單並行度的source
* 需要自己實現乙個sourcefunction介面**/
public
class
mynoparallesource
implements
sourcefunction
}public
void
cancel()
}
具體操作類:
import org.apache.flink.api.common.functions.mapfunction;
import org.apache.flink.streaming.api.datastream.datastream;
import org.apache.flink.streaming.api.datastream.datastreamsource;
import org.apache.flink.streaming.api.datastream.singleoutputstreamoperator;
import org.apache.flink.streaming.api.environment.streamexecutionenvironment;
import org.apache.flink.streaming.api.windowing.time.time;
public
class
streamingdemowithnoparallesource})
;//每2秒鐘處理一次資料
datastream
sum = num.
timewindowall
(time.
seconds(2
)).sum(0
);sum.
print()
.setparallelism(1
);env.
execute
("noparallesource");
}}
需要自己實現乙個sourcefunction介面 ! 注意在scala中介面的實現用extends實現!
package streaming
import org.apache.flink.streaming.api.functions.source.sourcefunction
class mynoparallelscala extends sourcefunction[long]
}override def cancel()=
}
具體的操作**:
package streaming
import org.apache.flink.streaming.api.scala.streamexecutionenvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.time
object strreamingdemowithmynoparallelsourcescala )
val num = mapdata.timewindowall(time.seconds(2)).sum(0)
num.print().setparallelism(1)
env.execute("strreamingdemowithmynoparallelsourcescala")
}}
結果是:
/**
* 結果是:
** 接收到資料42
* 接收到資料43
* 85
* 接收到資料44
* 接收到資料45
* 89
* 接收到資料46
* 接收到資料47
* 93
* 接收到資料48
* 接收到資料49
* 97
* 接收到資料50
* 接收到資料51
* 101
*/
flink自定義trigger詳解
1 中有句話是這樣的 其實,我們要實現基於事件時間的視窗隨意輸出,比如1000個元素觸發一次輸出,那麼我們就可以通過修改這個觸發器來實現。這句話的意思是,預設的自帶的trigger一般是基於eventtime的。那麼這1000 個元素可能跨度是一小時,也可能跨度是兩小時,對吧 但是顯然預設的trig...
C 自定義單鏈表
在c 中模擬庫中已存在的集合,鍊錶linkedlist。可以做新增 插入等操作 可以先寫乙個鍊錶介面,然後再去實現介面的功能 inte ce ilist 建立乙個索引器 t getele int index 根據索引來得到元素 int locate t value 根據元素值,從前往後找到對應索引 ...
flink寫入kafka之自定義分割槽器
直入正題,flink寫入kafka根據某個資料中的字段做分割槽傳送到kafka的指定分割槽,如果你在sink中每次要手動寫producer,那麼你可以略過此文章 接著上篇文章flink寫入kafka之預設序列化類和預設分割槽器 直接上 自定義分割槽 suppresswarnings unchecke...