Spark之根據日誌對IP位置統計

2021-09-19 01:38:15 字數 2538 閱讀 1573

根據ip計算訪問者歸屬地,按照省計算訪問次數,將計算好的結果寫入mysql中

日誌檔案型別:20090121000133586141000|117.101.219.241|12.zgwow.com|/launcher/index.htm|mozilla/4.0 (compatible; msie 7.0; windows nt 5.1)||

ip規則:1.15.128.0|1.15.159.255|17793024|17801215|亞洲|中國|北京|北京||方正寬頻|110100|china|cn|116.405285|39.904989

1.整理資料切分出ip欄位,ip位址轉換成十進位制

2.載入規則,整理規則取出有用的字段快取到記憶體中(executor)

3.將訪問log與ip規則匹配(如何更快?因為ip有序,二分法查詢)

4.取出對應身份名稱,將其和1組合在一起

5.按省份名進行聚合

6.將聚合後的資料寫入到mysql 

package com.thy.day3

import scala.io.

object myutils

ipnum

} //讀ip規則,返回(起始ip,結束ip,省份)

def readrules(path:string): array[(long, long, string)] =).toarray

rules

} //二分查詢,將ip求出long後與對應ip規則中查詢

def binarysearch(lines: array[(long, long, string)], ip: long) : int = }-1

} /**

def main(args: array[string]): unit =

*/}

package com.thy.day3

import org.apache.spark.broadcast.broadcast

import org.apache.spark.rdd.rdd

import org.apache.spark.

object iplocation

(province,1)

}//將日誌檔案傳入func函式整理

val provinceandone: rdd[(string, int)] = accesslines.map(func)

val reduced: rdd[(string, int)] = provinceandone.reducebykey(_+_)

val r = reduced.collect()

println(r.tobuffer)

sc.stop()

}}

package com.thy.day3

import org.apache.spark.rdd.rdd

import org.apache.spark.

/** * 從hdfs中讀取ip規則。如果規則很大也會分片儲存,直接拿到executor中可能只能拿到部分,所以先收集到driver端,在廣播到executor

*/object iplocation2 )

//將分散多個executor中的部分ip規則收集到driver

val rulesindriver: array[(long, long, string)] = rules.collect()

//將driver端的ip規則廣播到executor

val refbroadcast = sc.broadcast(rulesindriver)

//建立rdd,讀取訪問日誌資料

val accesslog: rdd[string] = sc.textfile("e:\\hdfsdemo\\access.log")

//對日誌資料進行整理,(與1區別不是傳入函式了,相當於匿名內部類)

val provinceandone: rdd[(string, int)] = accesslog.map(log =>

(province, 1)

})val reduced: rdd[(string, int)] = provinceandone.reducebykey(_+_)

println(reduced.collect().tobuffer)

}}

將統計出來的資料寫入到mysql

/**

寫入大量資料時存在問題,寫一次拿一次連線,資源消耗大

*/reduced.foreach(tp => )

稍微優化:利用foreachpartition,一次拿乙個分割槽,封裝函式 

/**

一次拿出乙個分割槽(乙個分割槽用乙個連線,可以將乙個分割槽中的多條資料寫完在釋放jdbc連線,這樣更節省資源)

*/reduced.foreachpartition(it => )

pstm.close()

conn.close()

})

根據ip查詢地理位置

圍繞純真資料庫的一系列小工具 文件,文件 文件純真資料庫utf 8版本。轉換工具也以開源形式發布,用php實現 nali,名字取自中文 的拼音。nali包含一組命令列程式,其主要功能就是把一些網路工具的輸出的ip字串,附加上地理位置資訊 使用純真資料庫 例如218.65.137.1會變成218.65...

根據IP判斷地理位置

將資料檔案按行讀入記憶體,ip資料檔案的格式如下 起始ip 結束ip國家省 市區未知運營商 10000000 20000000 中國北京市 北京市海淀區 未知電信 讀取檔案 listlines null lines files.readlines ipfile,charsets.utf 8 遍歷每一...

Spark案例之根據ip位址計算歸屬地二

之前的是單機版的根據ip位址計算歸屬地,當資料量小的時候還可以,但是在大資料實際生產中是不行的,必須將它改造成乙個spark程式,然後在spark集群上執行 spark程式和單機版的程式不一樣,下面來仔細分析一下spark程式的執行流程 首先是乙個spark集群,集群中有master和worker,...