按卡口分組,獲取不同車速型別通過卡口的車輛數
val sql = "select * from traffic.monitor_flow_action"
val df = spark.sql(sql)
implicit val monitorflowactionencoder: encoder[monitorflowaction] = expressionencoder()
implicit val tupleencoder: encoder[tuple2[string, monitorflowaction]] = expressionencoder()
implicit val stringencoder: encoder[string] = expressionencoder()
implicit val speedsortkeyencoder: encoder[speedsortkey] = expressionencoder()
implicit val stringandspeedsortkeyencoder: encoder[tuple2[string, speedsortkey]] = expressionencoder()
val ds = df.as[monitorflowaction].map(action => (action.monitor_id, action))
ds.cache()
val ds1 = ds.groupbykey(tuple => tuple._1)
.mapgroups((i,x) => else if (speed >= 60 && speed < 90) else if (speed >= 90 && speed < 120) else if (speed >= 120)
}(monitorid, speedsortkey(lowspeed, normalspeed, mediumspeed, highspeed))
})按車速型別排序,獲取top5車輛高速通過的卡口
獲取top5卡口對應的監控記錄
//將top5卡口生成廣播變數
val broads = spark.sparkcontext.broadcast(list)
//從原始的監控資料中過濾出top5卡口資料
val ds2 = ds.filter(x => )
ds2.map(_._2).createorreplacetempview("top10_speed_tmp")
獲取top5卡口中每個卡口top10高速通過的車輛記錄
val df2 = spark.sql("select " + args(0) + " as task_id, date,monitor_id,camera_id,car,action_time,speed,road_id,area_id from (select *, row_number() over(partition by monitor_id order by speed desc) rank from top10_speed_tmp) t where t.rank<=10")
使用sql方式
select *,
if(speed>120,1,0) as highflag,
if(speed<60,1,0) as lowerflag,
#case when speed>120 then 1 else 0 end as highflag,
#case when speed<60 then 1 else 0 end as lowerflag,
from monitor_flow_action;
tmp1:
*,highflag:1,middelflag:0,normalflag:0,lowerflag:0
*,highflag:0,middelflag:1,normalflag:0,lowerflag:0
****************************************====
select monitor_id,sum(highflag) highcnt,sum(middelflag) middelcnt,....
group by monitor_id
from tmp1
order by highcnt desc,middelcnt desc,...
limit 5
// 遍歷速度,判斷並統計四個值
while (x.hasnext) else if (speed >= 60 && speed < 90) else if (speed >= 90 && speed < 120) else if (speed >= 120)
}(monitorid, speedsortkey(lowspeed, normalspeed, mediumspeed, highspeed))
})import spark.implicits._
val arr = ds1.sort($"_2".desc).take(5)
val list = arr.map(item => item._1)
val monitorids = list.map(item => highspeed(args(0), item))
val broads = spark.sparkcontext.broadcast(list)
val ds2 = ds.filter(x => )
ds2.map(_._2).createorreplacetempview("top10_speed_tmp")
val df2 = spark.sql("select " + args(0) + " as task_id, date,monitor_id,camera_id,car,action_time,speed,road_id,area_id from (select *, row_number() over(partition by monitor_id order by speed desc) rank from top10_speed_tmp) t where t.rank<=10")
spark.close()}}
//自定義排序
case class speedsortkey(lowspeed:int, normalspeed:int, mediumspeed:int, highspeed:int) extends ordered[speedsortkey] with serializableelse if (this.mediumspeed - that.mediumspeed != 0) else if (this.normalspeed - that.normalspeed != 0) else if (this.lowspeed - that.lowspeed != 0) 0}
override def tostring:string = "speedsortkey [lowspeed=" + lowspeed + ", normalspeed=" + normalspeed + ", mediumspeed=" + mediumspeed + ", highspeed=" + highspeed + "]"
}case class highspeed(task_id: string, monitor_id: string)
case class monitorflowaction(date:string, monitor_id:string, camera_id:string,car:string,action_time:string,speed:string,road_id:string,area_id:string)
普通指標到智慧型指標的轉換
普通指標到智慧型指標的轉換 int iptr new int 42 shared ptr int p iptr 智慧型指標到普通指標的轉換 int pi p.get 注意的地方 那就是不要將智慧型指標與普通指標混用。如果專案允許,堅持使用智慧型指標,避免原生指標。智慧型指標與普通指標需要特別特別特別...
C 智慧型指標和普通指標引數的使用問題
char str char pvargtocompletionroutine string ss str 記憶體洩漏 給乙個物件申請一塊記憶體空間,由於某種原因這塊記憶體未釋放掉,這塊記憶體被占用導致應用卡頓等。記憶體溢位 擁有一塊20位元組的記憶體空間,你將30位元組的檔案寫入其中,就會造成溢位。...
智慧型交通監控
智慧型交通監控 一.功能模組 高畫質交通攝像機,高畫質卡口解決方案,闖紅燈抓拍系統,平安城市建設解決方案,智慧型城市監控系統,抓拍車牌攝像機,停車場車牌識別系統,高速公路雷達測速,車牌識別攝像機,智慧型交通高畫質攝像機,電警攝像機,闖紅燈電警攝像機,治安卡口攝像機,道閘監控攝像機,收費站攝像機,雷達...