利用廣播變數來進行資料的傳輸

2021-09-29 18:11:43 字數 2978 閱讀 8845

package report

import config.confighelper

import org.apache.spark.broadcast.broadcast

import org.apache.spark.rdd.rdd

import org.apache.spark.sql.sparksession

import scalikejdbc.

import scalikejdbc.config.dbs

import utils.makeatpkpi

//利用廣播變數來進行資料的傳輸

object traintimebroadcastanalysis

)//轉換成rdd,因為只有rdd中才有reducebykey方法

.rdd

//進行資料聚合

//zip函式是把兩個list聚合

//開發中,需要聚合的文段

//舉個例子:

//rdd中有如下元素

//kv

//((a,b),list(7,8,9))

//((a,b),list(1,2,3))

//((a,b),list(4,5,6))

//((c,d),list(4,5,6))

//。。。。。。

//reducebykey後要對v進行操作

//原理將v中的list前後zip再map每個元素進行逐個元組元素(a,b)的累加。

//a代表v的前乙個元素,b代表v的後乙個元素

//如val a=list(7,8,9)

//val b=list (1,2,3)

//val k= a zip b =((7,1),(8,2)(9,3))

//k map (tp=>tp._1+tp._2) ----- > (8,10,12)

//如此迴圈 結果:

//新rdd中的元素:

//((a,b),list(12,15,18))

//((c,d),list(4,5,6))

//。。。。。。

.reducebykey

//寫入mysql中,用scalikejdbc寫

dbs.

setup()

//在driver端建立乙個mysql鏈結,傳送到從節點端

//乙個鏈結被n個從節點公用,排隊,十分影響效率

//在excutor端中每一條資料建立乙個鏈結,每次都獲取開執行緒,關執行緒,浪費時間

//每個excutor用乙個執行緒

rddresult.

foreachpartition

(partition=

>)}

})//釋放資源

session.

stop()

}}

樣例類

makeatpkpi.scala

package utils

import org.apache.commons.lang.stringutils

import org.apache.spark.sql.row

object makeatpkpi

else

if(atperror.

equals

("無線傳輸單元"))

else

if(atperror.

equals

("應答器資訊接收單元"))

else

if(atperror.

equals

("軌道電路資訊讀取器"))

else

if(atperror.

equals

("測速測距單元"))

else

if(atperror.

equals

("人機互動介面單元"))

else

if(atperror.

equals

("列車介面單元"))

else

if(atperror.

equals

("司法記錄單元"))

else

//兩個list的拼接要用++

list[int](1

)++ listerror

}else

//建立乙個容器用來存放標籤

//兩個list的拼接要用++

//這個是為了在整合以後統計總共多少條資料用,相當於數量

val list: list[int]

= list[int](1

)++ listatperror

list

}}

#配置檔案

#配置壓縮格式

#配置序列化方式

spark.serializer="org.apache.spark.serializer.kryoserializer"

#配置jdbc鏈結

jdbc.url="jdbc:mysql://localhost:3306/test?characterencoding=utf-8"

jdbc.driver="com.mysql.jdbc.driver"

jdbc.user="root"

jdbc.password="000000"

#配置scalikejdbc鏈結

db.default.url="jdbc:mysql://localhost:3306/test?characterencoding=utf-8"

db.default.driver="com.mysql.jdbc.driver"

db.default.user="root"

db.default.password="000000"

confighelper.scala

package config

import com.typesafe.config.

object confighelper

利用oradata進行資料恢復

之前因為現場對方工程師的原因,把oracle其他資料夾都給格了,只剩下oradata資料夾拷貝出來了,基於oracle9i版本 第一步 搭建乙個和之前一模一樣的資料庫系統環境 第三步 將oradata資料夾覆蓋過去 第四步 啟動服務,登入之後應該會發現普通使用者無法登入,會報錯ora 01033 o...

利用pandas進行資料清洗的方法

目錄 我們有下面的乙個資料,利用其做簡單的資料分析。這是一家服裝店統計的會員資料。最上面的一行是列座標,最左側一列是行座標。列座標中,第 0 列代表的是序號,第 1 列代表的會員的姓名,第 2 列代表年齡,第 3 列代表體重,第 4 6 列代表男性會員的三圍尺寸,第 7 9 列代表女性會員的三圍尺寸...

利用python進行資料分析

目錄 10 minutes to pandas 翻譯 pandas中loc iloc ix的區別 pandas dropna函式 pandas中dataframe的stack unstack 和pivot 方法的對比 pandas中關於set index和reset index的用法 python匿...