pyspark執行測試及筆記整理

2021-08-22 11:54:19 字數 4090 閱讀 8262

#!/usr/bin/env python

# -*- coding: utf-8 -*-

import datetime

import logging

import os

import pyspark.sql.functions as fun

from pyspark import sparkconf, sparkcontext

from pyspark.sql import hivecontext

time_format = "%y%m%d"

python_path = "/opt/anaconda2/bin/python2"

sql_template = """

select vender_id,store_id,search_keyword,

has_result,event_code,user_id,

search_source,action_type,page_no,dt

from db1.events

where dt>='{}' and dt<'{}'

and (event_code='{}' or event_code='{}')

and length(vender_id)>0

and length (store_id)>0

and length (search_keyword)>0

"""

# 集群環境中, 指定python路徑.

os.environ["pyspark_python"] = python_path

logging.basicconfig(level=logging.info, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

logger = logging.getlogger(__file__)

sc = sparkcontext(conf=conf)

if __name__ == '__main__':

now_time = datetime.datetime.now()

now_str = now_time.strftime(time_format)

yesterday = (now_time - datetime.timedelta(days=1)).strftime(time_format)

hive_context = hivecontext(sc)

df_hive = hive_context.sql(sql_template.format(yesterday, now_str, tag_pv, tag_click))

df_pv = df_hive.filter(df_hive['event_code'] == tag_pv)

logger.info("pv count:%s", df_pv.count())

logger.info("uv count:%s", df_pv.select("user_id").distinct().count())

logger.info("task1")

# task1, stat pv by store.

tmp_pv_by_store = df_pv.groupby("vender_id", "store_id").agg(fun.count("*").alias("pv"),

fun.countdistinct("user_id").alias("uv"))

logger.info("rdd_pv_by_store size:%s", tmp_pv_by_store.count())

rdd_pv_by_store = tmp_pv_by_store.filter("pv>=1000").orderby('pv', ascending=false)

for s in rdd_pv_by_store.collect():

logger.info("pv_by_store:%s %s %d %d", s[0], s[1], s[2], s[3])

logger.info("task2")

# task2, stat pv by keyword.

tmp_pv_by_keyword = df_pv.groupby("vender_id", "search_keyword").agg(fun.count("*").alias("pv"),

fun.countdistinct("user_id").alias("uv"))

logger.info("rdd_pv_by_keyword count:%s", tmp_pv_by_keyword.count())

rdd_pv_by_keyword = tmp_pv_by_keyword.filter("pv>=100").orderby('pv', ascending=false)

for keyword in rdd_pv_by_keyword.collect():

logger.info("pv_by_kw: %s %s %d %d", keyword[0], keyword[1], keyword[2], keyword[3])

logger.info("task3")

# task3, stat pv by has_result.

tmp_result_status = df_pv.filter("has_result != 1").groupby("search_keyword", "vender_id").agg(

fun.count("*").alias("pv"),

fun.countdistinct("user_id").alias("uv"))

logger.info("rdd_result_status count:%s", tmp_result_status.count())

rdd_result_status = tmp_result_status.filter("pv>=30").orderby('pv', ascending=false)

for x in rdd_result_status.collect():

logger.info("no_result:%s %s %d %d", x[0], x[1], x[2], x[3])

spark-submit --master yarn-client  --driver-memory 2g --executor-memory 3g test_events.py
1) 此指令碼為pyspark測試指令碼, 改到這種程度可以 yarn-client和yarn-cluster模式下執行.

2) 後面文章要總結一下conf的配置 和 執行模式的問題.

3) 集群環境中python環境的問題.

SQL語句執行效率及效能測試

寫程式的人,往往需要分析所寫的sql語句是否已經優化過了,伺服器的響應時間有多快,這個時候就需要用到sql的statistics狀態值來檢視了。通過設定statistics我們可以檢視執行sql時的系統情況。選項有profile,io time。介紹如下 set statistics profile...

SQL語句執行效率及效能測試

寫程式的人,往往需要分析所寫的sql語句是否已經優化過了,伺服器的響應時間有多快,這個時候就需要用到sql的statistics狀態值來檢視了。通過設定statistics我們可以檢視執行sql時的系統情況。選項有profile,io time。介紹如下 set statistics profile...

veins安裝及執行筆記

開啟omnet file import veins資料夾,引入veins。file import general existing projects into workspace,匯入veins,開啟omnetpp下的mingwenv.cmd cd veins 5.0 examples veins,...