hive streming sink 投入生產,發現寫入效能底下,排查過後,發現是hive在每寫入一條資料,都會判斷檔案大小來決定檔案是否需要滾動,判斷檔案大小使用的hdfs的api,需要訪問namenode,這就是寫入效能底下的根源。截止到flink 1.11.2這個問題任然沒有解決。
這個問題解決,需要自己實現hivebulkwrite***ctory,**如下:
public class hivebulkwrite***ctory implements hadooppathbasedbulkwriter.factory
@override
public hadooppathbasedbulkwritercreate(path targetpath, path inprogresspath) throws ioexception
@override
public void dispose() catch (ioexception ignored)
}@override
public void addelement(rowdata element) throws ioexception
@override
public void flush()
@override
public void finish() throws ioexception
};}}
在hivetablesink#consumedatastream方法中,修改自己定義的hivebulkwrite***ctory。
hivebulkwrite***ctory hadoopbulkfactory = new hivebulkwrite***ctory(recordwrite***ctory);
Flink1 11記憶體模型與引數調整
flink taskmanager啟動日誌 total process memory flink總資源數 2048m,引數 taskmanager.memory.process.size jvm metaspace jvm元空間,引數 taskmanager.memory.jvm metaspace...
flink寫入HDFS中文亂碼
客戶端埋點日誌進行解析時需要獲取地區編碼和名稱,程式是通過flink分布式快取將地區編碼和名稱資料傳到每個task節點進行讀取。本地測試時沒有問題,但是部署到集群資料寫入hdfs後發現中文亂碼,部分 如下 設定分布式快取檔案位址 streamexecutionenvironment bsenv st...
1 11 flink中的動態載入udf jar包
專案中想要把flink做到平台化,只需要編輯sql便能把任務跑起來,開發過程中遇到乙個問題,就是如何能夠自動的載入自定義的函式包,因為專案中已經把main打包成乙個通用的jar,使用時只需要把sql資訊用引數形式傳入就可以.但是如果sql中需要使用到udf,那麼就需要實現flink的動態載入jar ...