python和flink相關的網上例子比較少
先溫固下shell中這些環境變數吧。
python
flink shell變數
型別scalaflink shell變數型別
s_env
pyflink.datastream.stream_execution_environment.streamexecutionenvironment
senv
streamexecutionenvironment
st_env
pyflink.table.table_environment.streamtableenvironment
stenv
streamtableenvironmentimpl
b_env
class 'pyflink.dataset.execution_environment.executionenvironment
benv
executionenvironment
bt_env
pyflink.table.table_environment.batchtableenvironment
btenv
batchtableenvironmentimpl
注:scala獲取型別舉例:
scala> senv.getclass.get******name
res2: string = streamexecutionenvironment
啟動命令:
pyflink-shell.sh local
互動模式中輸入的**(python datastream api):
import tempfile
import os
import shutil
sink_path = tempfile.gettempdir() + '/streaming.csv'
if os.path.exists(sink_path):
if os.path.isfile(sink_path):
os.remove(sink_path)
else:
shutil.rmtree(sink_path)
s_env.set_parallelism(1)
t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
st_env.connect(filesystem().path(sink_path)).with_format(oldcsv()
.field_delimiter(',')
.field("a", datatypes.bigint())
.field("b", datatypes.string())
.field("c", datatypes.string())).with_schema(schema()
.field("a", datatypes.bigint())
.field("b", datatypes.string())
.field("c", datatypes.string())).register_table_sink("stream_sink")
t.select("a + 1, b, c").insert_into("stream_sink")
st_env.execute("stream_job")
實驗結果:/tmp/streaming.csv
開啟後如下:
Spark部署模式(一) Local模式
目錄 1.官方求pi案例 直接執行已打成依賴jar包中的指定class 2.編寫scala語言實現功能 3.整個spark運算的流程 4.spark中的driver和executor 5.總結spark中各種組成部分的關係 local模式就是spark執行在單節點的模式,通常用於在本機上練手和測試,...
Nutch的local和deploy模式
local模式 1.將hbase安裝目錄下lib 下面的所有 jar 複製到nutch runtime local lib下2.nutch runtime local 下先建立urls目錄mkdir urls,目錄下建立seed.txt touch seed.txt,如果能正常執行,則萬事大吉,你會...
Spark的local模式環境搭建
簡介 部署模式 執行模式 spark可以在那些情況下執行,spark 框架編寫的應用程式可以執行在本地模式 local mode 集群模式 cluster mode 和雲服務 cloud 方便開發測試和生產部署。spark本地模式的安裝 1.上傳安裝包解壓安裝包 解壓軟體包 tar zxvf spa...