pyflink基礎應用之kafka
執行環境
pyflink需要特定的python版本(3.5、3.6或3.7)。執行一下命令,以確保python版本滿足要求。
$ python -v
pyflink已經發布到pypi,可以直接安裝:
$ python -m pip install apache-flink
拷貝三個jar包到flink_home/lib下。
flink-connector-kafka_2.11-1.11.0.jar
flink-sql-connector-kafka_2.11-1.11.0.jar
kafka-clients-2.4.1.jar
作者的執行環境為python3.7.6、flink1.11.0、kafka2.11(broker1.0.0)。預研的時候遇到最多的問題是缺少jar包和jar包衝突,多看執行輸出的日誌,根據日誌錯誤提示補充相應的jar包。
參考資料有:
執行kafka
建立主題
bin/kafka-topics.sh --zookeeper 192.168.113.11:2181/kafka --create --replication-factor 1 --partitions 1 --topic flink_test2
啟動生產者-發出測試資料
bin/kafka-console-producer.sh --broker-list 192.168.113.11:9092 --topic flink_test2
測試資料格式為:
啟動消費者-檢測是否接受到資料
例項**
本應用採用pyflink+sql方式編寫**。
#!/usr/bin/python3.7
from pyflink.datastream import streamexecutionenvironment, checkpointingmode
from pyflink.table import streamtableenvironment, tableconfig, datatypes, csvtablesink, writemode, sqldialect
s_env = streamexecutionenvironment.get_execution_environment()
s_env.set_parallelism(1)
#必須開啟checkpoint,時間間隔為毫秒,否則不能輸出資料
s_env.enable_checkpointing(3000)
st_env = streamtableenvironment.create(s_env, tableconfig())
st_env.use_catalog(「default_catalog」)
st_env.use_database(「default_database」)
sourcekafkaddl = 「」"
create table sourcekafka(
id int comment 『序號』,
name varchar comment 『姓名』
)comment 『從kafka中源源不斷獲取資料』
with(
『connector』 = 『kafka』,
『topic』 = 『flink_test2』,
『properties.bootstrap.servers』 = 『192.168.113.11:9092』,
『scan.startup.mode』 = 『earliest-offset』,
『format』 = 『json』
)「」"
st_env.execute_sql(sourcekafkaddl)
fieldnames = [「id」, 「name」]
fieldtypes = [datatypes.int(), datatypes.string()]
csvsink = csvtablesink(fieldnames, fieldtypes, 「/root/tiamaes/result.csv」, 「,」, 1, writemode.overwrite)
st_env.register_table_sink(「csvtablesink」, csvsink)
resultquery = st_env.sql_query(「select * from sourcekafka」)
resultquery.insert_into(「csvtablesink」)
st_env.execute(「pyflink-kafka-v2」)
儲存檔案為pyflink_kafka.py
**執行
採用local-single部署模式執行:
python pyflink_kafka.py
持續檢查result.cvs的內容:
tail –f result.cvs
執行時沒有錯誤日誌時,在result.cvs能持續看到通過kafka生產者發生的資料。
WEB應用之httpd基礎入門(五)
前文我們聊到了httpd的啟動使用者和相關許可權的說明,資源壓縮配置 https的實現,回顧請參考今天我們來說說httpd的重定向 hsts 反向 的配置 首先來了解下重定向吧,什麼意思呢?假如我們訪問乙個資源在伺服器上不存在或者不在我們對應訪問url下,而使用者又不知道我們新的url的情況下,我們...
DelegateAndEvent應用之回馬槍
應用 delegate 和event 實現函式的 在實際的開發中非常有用。它實現的實際上是一種依賴通知的效果。通常可以用在 子窗體資訊更新的結果反饋至母窗體 類的屬性值和 ui控制項值依賴時的相互 通知 等。現通過簡單的例子 demo 這兩種應用場景 一 類的屬性值發生變化時,反饋到和它關聯的控制項...
Docker應用之倉庫
倉庫是存放映象的地方 註冊伺服器是管理倉庫的具體伺服器,每個伺服器上可以有多個倉庫,每個倉庫也可以有多個映象 如 dl.dockerpool.com ubuntu dl.dockerpool.com就是註冊伺服器位址,ubuntu是倉庫名 一 docker hub公共映象市場 docker hub是...