#flinksql寫入mysql
//source
tableenv.sqlupdate("create table cmis_hs_lm_loan(\n" +
" cust_id varchar,\n" +
" loan_typ varchar,\n" +
" orig_prcp double,\n" +
" loan_actv_dt varchar,\n" +
" dataeventtype varchar\n" +
") with (\n" +
" 'connector.type' = 'kafka',\n" +
" 'connector.version' = 'universal',\n" +
" 'connector.topic'='cmis_hs-lm_loan',\n" +
" 'connector.properties.bootstrap.servers' = '60.60.90.11:9092,60.60.90.12:9092,60.60.90.13:9092',\n" +
" 'connector.properties.zookeeper.connect' = '60.60.90.11:2181',\n" +
" 'connector.properties.group.id' = 'test',\n" +
" 'format.type' = 'json',\n" +
" 'connector.startup-mode' = 'latest-offset'\n" +
")");
tableenv.sqlupdate("create table tssp (\n" +
" cust_id varchar(30),\n" +
" orig_prcp decimal(16,2),\n" +
" loan_actv_dt varchar(10)\n" +
") with (\n" +
" 'connector.type' = 'jdbc',\n" +
" 'connector.url' = 'jdbc:mysql:',\n" +
" 'connector.table' = 'testsql',\n" +
" 'connector.driver' = 'com.mysql.jdbc.driver',\n" +
" 'connector.username' = 'liuyang',\n" +
" 'connector.password' = 'liuyang@2019',\n" +
" 'connector.write.flush.max-rows' = '1' \n" + //注意,要設定為1,預設為5000,所以半天沒寫入mysql
")"); tableenv.sqlupdate("insert into tssp\n" +
"select cust_id, orig_prcp, loan_actv_dt\n" +
"from cmis_hs_lm_loan\n" +
"where loan_actv_dt = '"+dt+"'" +
"and dataeventtype='insert'"
);
注意:**『connector.write.flush.max-rows』 = 『1』,**預設為5000條,所以導致資料一直無法寫入mysql,程式停止執行也不會將緩衝池中的資料寫入到mysql中的;將其設定為1之後,問題解決; function其實也沒有那麼難
function其實也沒有那麼難 1.最簡單的function函式,你想到了嗎?library plyr data iris head iris test1 1,function dat head test1 class test1 用subset實現也完全麼問題,但是就是輸出的結果形式不同 hea...
什麼基礎也沒有,怎麼自學程式設計?
作為自學程式設計的過來人,我深知學習方法永遠都不是重點,學習方法,學習路線基本千篇一律,大同小異,每個人或多或少都能總結幾條,但是自學者的心裡建設誰去指導,如果乙個人沒有實踐過的真實自學程式設計經歷,自學的感受和經驗是總結不出來的,比如大學計算機專業的學生,肯定不會有我我這樣的感受,因為學習環境不一...
沒有報錯也沒有提示如何如何列印日誌
在mybatis的jar包裡有乙個log4j的jar包,放入專案的lib裡 然後加上log4j的配置檔案放入資源資料夾 開啟log4j.rootlogger debug log4j.rootlogger debug,console關閉log4j.rootlogger error或者log4j.roo...