有以下hive表的定義:
create table topic_recommend_score (
category_id int,
topic_id bigint,
score double,
rank int
);
這張表是我們業務裡話題推薦分值表的簡化版本。category_id代表分類id,topic_id是話題id,score是評分值。rank代表每個分類下話題分值的排名,用開窗函式計算出來的:
row_number() over(partition by t.category_id order by t.score desc)
select category_id,
concat_ws(',',collect_list(cast(topic_id as string)))
from topic_recommend_score
where rank >= 1 and rank <= 1000
group by category_id;
看起來沒什麼問題?但實際上是錯誤的。輸出結果中總會有一些category_id對應的列表順序異常,比如本來排名正數與排名倒數的兩批id調換了位置,即rank變成了n-3, n-2, n-1, n, 5, 6, 7, ..., n-4, 1, 2, 3, 4
。
select category_id,
regexp_replace(
concat_ws(',',
sort_array(
collect_list(
concat_ws(':',lpad(cast(rank as string),5,'0'),cast(topic_id as string))))
),'\\d+\:','')
from topic_recommend_score
where rank >= 1 and rank <= 1000
group by category_id;
這裡將rank放在了topic_id之前,用冒號分隔,然後用sort_array函式對collect_list之後的結果進行排序(只支援公升序)。特別注意,rank必須要在高位補足夠的0對齊,因為排序的是字串而不是數字,如果不補0的話,按字典序排序就會變成1, 10, 11, 12, 13, 2, 3, 4...
,又不對了。
將排序的結果拼起來之後,用regexp_replace函式替換掉冒號及其前面的數字,大功告成。
順便看一下hive原始碼中collect_list和collect_set函式對應的邏輯吧。
public class genericudafmkcollectionevaluator extends genericudafevaluator
implements serializable
// for partial1 and complete: objectinspectors for original data
private transient primitiveobjectinspector inputoi;
// for partial2 and final: objectinspectors for partial aggregations (list
// of objs)
private transient standardlistobjectinspector loi;
private transient listobjectinspector internalmergeoi;
private buffertype buffertype;
//needed by kyro
public genericudafmkcollectionevaluator()
public genericudafmkcollectionevaluator(buffertype buffertype)
@override
public objectinspector init(mode m, objectinspector parameters)
throws hiveexception else else
}} class mkarrayaggregationbuffer extends abstractaggregationbuffer else if(buffertype == buffertype.set) else
}} @override
public void reset(aggregationbuffer agg) throws hiveexception
@override
public aggregationbuffer getnewaggregationbuffer() throws hiveexception
//mapside
@override
public void iterate(aggregationbuffer agg, object parameters)
throws hiveexception
} //mapside
@override
public object terminatepartial(aggregationbuffer agg) throws hiveexception
@override
public void merge(aggregationbuffer agg, object partial)
throws hiveexception
}} @override
public object terminate(aggregationbuffer agg) throws hiveexception
private void putintocollection(object p, mkarrayaggregationbuffer myagg)
public buffertype getbuffertype()
public void setbuffertype(buffertype buffertype)
}
mysql查詢保持in的順序
給出案例 select b.userid as userid,b.username as username,b.domainid as domainid,b.currentcorp as currentcorp,b.headimg as imgpath,b.imgserverid as imgser...
python實現次數排序並且保持出現順序不變
有乙個列表資料,現在要按照出現次數最多的來降頻排序,並且當次數相同時保持資料出場的順序不變。如果使用c 需要採用優先佇列和字典計數,但想著python應該不需要這麼複雜吧。待資料 data 7,3,3,3,5,5,5,5,6,6,6,1 目標資料 ans 5,5,5,5,3,3,3,6,6,6,7,...
刪除序列相同元素並保持順序
怎樣在乙個序列上面保持元素順序的同時消除重複的值?如果序列上的值都是 hashable 型別,那麼可以很簡單的利用集合或者生成器來解 決這個問題。比如 def dedupe items seen set for item in items if item not in seen yield item...