所謂二次排序,對第1個字段相同的資料,使用第2個字段進行排序。
舉個例子,電商平台記錄了每一使用者的每一筆訂單的訂單金額,現在要求屬於同乙個使用者的所有訂單金額作排序,並且輸出的使用者名稱也要排序。
賬戶訂單金額
hadoop@apache
200hive@apache
550yarn@apache
580hive@apache
159hadoop@apache
300hive@apache
258hadoop@apache
300yarn@apache
100hadoop@apache
150yarn@apache
560yarn@apache
260二次排序後的結果
賬戶訂單金額
hadoop@apache
150hadoop@apache
200hadoop@apache
300hadoop@apache
300hive@apache
159hive@apache
258hive@apache
550yarn@apache
100yarn@apache
260yarn@apache
560yarn@apache
580實現的思路是使用自定義key,key中實現按使用者名稱和訂單金額2個字段的排序,自定義分割槽和分組類,按使用者名稱進行分割槽和分組。自定義排序的比較器,分別用於在map端和reduce的合併排序。
因為hadoop預設使用的字串序列化j**a.io.dataoutputstream.writeutf(), 使用了"變種的utf編碼",序列化後的位元組流不能在rawcomparator使用。在實現中,用一種變通的方法,直接使用「賬戶」欄位的位元組流,並且把位元組流長度也一併序列化。rawcomparator得到的位元組流就是我們寫進去的位元組流。當然,在進行反序列化時,需要根據這個長度來讀出「賬戶」字段。
程式**
package com.hadoop;
import j**a.io.datainput;
import j**a.io.dataoutput;
import j**a.io.ioexception;
import j**a.nio.charset.charset;
import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.conf.configured;
import org.apache.hadoop.fs.path;
import org.apache.hadoop.io.doublewritable;
import org.apache.hadoop.io.longwritable;
import org.apache.hadoop.io.text;
import org.apache.hadoop.io.writablecomparable;
import org.apache.hadoop.io.writablecomparator;
import org.apache.hadoop.mapreduce.job;
import org.apache.hadoop.mapreduce.partitioner;
import org.apache.hadoop.mapreduce.reducer;
import org.apache.hadoop.mapreduce.lib.input.fileinputformat;
import org.apache.hadoop.mapreduce.lib.output.fileoutputformat;
import org.apache.hadoop.security.usergroupinformation;
import org.apache.hadoop.util.tool;
import org.apache.hadoop.util.toolrunner;
public class secondarysortmapreduce extends configured implements tool
public string getaccount()
public double getcost()
@override
public void write(dataoutput out) throws ioexception
@override
public void readfields(datainput in) throws ioexception
@override
public int compareto(costbean o)
return account.compareto(o.account);
} @override
public string tostring() }
/*** 用於map端和reduce端排序的比較器:如果賬戶相同,則比較金額
* @author ivan
* */
public static class costbeancomparator extends writablecomparator else
} }/**
* 用於map端在寫磁碟使用的分割槽器
* @author ivan
* */
public static class costbeanpatitioner extends partitioner }
/*** 用於在reduce端分組的比較器根據account欄位分組,即相同account的作為一組
* @author ivan
* */
public static class groupcomparator extends writablecomparator }
/*** @author ivan
* */
private final costbean outputkey = new costbean();
private final doublewritable outputvalue = new doublewritable();
@override
protected void map(longwritable key, text value, context context)
throws ioexception, interruptedexception }
public static class secondarysortreducer extends reducer
} }public int run(string args) throws exception
/*** @param args
* @throws exception
*/public static void main(string args) throws exception
toolrunner.run(new configuration(), new secondarysortmapreduce(), args);
}}
執行環境
拿上面的例子作為測試資料
賬戶金額
hadoop@apache
200hive@apache
550yarn@apache
580hive@apache
159hadoop@apache
300hive@apache
258hadoop@apache
300yarn@apache
100hadoop@apache
150yarn@apache
560yarn@apache
260
MapReduce二次排序
預設情況下,map輸出的結果會對key進行預設的排序,但個別需求要求對key排序的同時還需要對value進行排序 這時候就要用到二次排序了。本章以hadoop權威指南中計算每年最大氣溫值為例,原始資料雜亂無章 2008 33 2008 23 2008 43 2008 24 2008 25 2008 ...
Map reduce二次排序
map reduce的流程切面 splitmapperpartitioncombinergroupreducer 這裡要解釋下 partition 和 group 它們都是shuffle的重要步驟 的區別.他們的作用都是為了reducer分配記錄去處理.但區別是partition是把記錄分給不同的r...
mapreduce二次排序案例
為什麼需要二次排序?在mapreduce操作時,我們知道傳遞的會按照key的大小進行排序,最後輸出的結果是按照key排過序的。有的時候我們在key排序的基礎上,對value也進行排序。這種需求就是二次排序 解決思路 我們可以把key和value聯合起來作為新的key,記作newkey。這時,newk...