#!/usr/bin/env python
# -*- coding: utf-8 -*-
import datetime
import logging
import os
import pyspark.sql.functions as fun
from pyspark import sparkconf, sparkcontext
from pyspark.sql import hivecontext
time_format = "%y%m%d"
python_path = "/opt/anaconda2/bin/python2"
sql_template = """
select vender_id,store_id,search_keyword,
has_result,event_code,user_id,
search_source,action_type,page_no,dt
from db1.events
where dt>='{}' and dt<'{}'
and (event_code='{}' or event_code='{}')
and length(vender_id)>0
and length (store_id)>0
and length (search_keyword)>0
"""
# 集群環境中, 指定python路徑.
os.environ["pyspark_python"] = python_path
logging.basicconfig(level=logging.info, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getlogger(__file__)
sc = sparkcontext(conf=conf)
if __name__ == '__main__':
now_time = datetime.datetime.now()
now_str = now_time.strftime(time_format)
yesterday = (now_time - datetime.timedelta(days=1)).strftime(time_format)
hive_context = hivecontext(sc)
df_hive = hive_context.sql(sql_template.format(yesterday, now_str, tag_pv, tag_click))
df_pv = df_hive.filter(df_hive['event_code'] == tag_pv)
logger.info("pv count:%s", df_pv.count())
logger.info("uv count:%s", df_pv.select("user_id").distinct().count())
logger.info("task1")
# task1, stat pv by store.
tmp_pv_by_store = df_pv.groupby("vender_id", "store_id").agg(fun.count("*").alias("pv"),
fun.countdistinct("user_id").alias("uv"))
logger.info("rdd_pv_by_store size:%s", tmp_pv_by_store.count())
rdd_pv_by_store = tmp_pv_by_store.filter("pv>=1000").orderby('pv', ascending=false)
for s in rdd_pv_by_store.collect():
logger.info("pv_by_store:%s %s %d %d", s[0], s[1], s[2], s[3])
logger.info("task2")
# task2, stat pv by keyword.
tmp_pv_by_keyword = df_pv.groupby("vender_id", "search_keyword").agg(fun.count("*").alias("pv"),
fun.countdistinct("user_id").alias("uv"))
logger.info("rdd_pv_by_keyword count:%s", tmp_pv_by_keyword.count())
rdd_pv_by_keyword = tmp_pv_by_keyword.filter("pv>=100").orderby('pv', ascending=false)
for keyword in rdd_pv_by_keyword.collect():
logger.info("pv_by_kw: %s %s %d %d", keyword[0], keyword[1], keyword[2], keyword[3])
logger.info("task3")
# task3, stat pv by has_result.
tmp_result_status = df_pv.filter("has_result != 1").groupby("search_keyword", "vender_id").agg(
fun.count("*").alias("pv"),
fun.countdistinct("user_id").alias("uv"))
logger.info("rdd_result_status count:%s", tmp_result_status.count())
rdd_result_status = tmp_result_status.filter("pv>=30").orderby('pv', ascending=false)
for x in rdd_result_status.collect():
logger.info("no_result:%s %s %d %d", x[0], x[1], x[2], x[3])
spark-submit --master yarn-client --driver-memory 2g --executor-memory 3g test_events.py
1) 此指令碼為pyspark測試指令碼, 改到這種程度可以 yarn-client和yarn-cluster模式下執行.
2) 後面文章要總結一下conf的配置 和 執行模式的問題.
3) 集群環境中python環境的問題.
SQL語句執行效率及效能測試
寫程式的人,往往需要分析所寫的sql語句是否已經優化過了,伺服器的響應時間有多快,這個時候就需要用到sql的statistics狀態值來檢視了。通過設定statistics我們可以檢視執行sql時的系統情況。選項有profile,io time。介紹如下 set statistics profile...
SQL語句執行效率及效能測試
寫程式的人,往往需要分析所寫的sql語句是否已經優化過了,伺服器的響應時間有多快,這個時候就需要用到sql的statistics狀態值來檢視了。通過設定statistics我們可以檢視執行sql時的系統情況。選項有profile,io time。介紹如下 set statistics profile...
veins安裝及執行筆記
開啟omnet file import veins資料夾,引入veins。file import general existing projects into workspace,匯入veins,開啟omnetpp下的mingwenv.cmd cd veins 5.0 examples veins,...