spark版本2.4.0。
spark中的排序實現也是通過timsort類實現,實現具體方式與jdk略有區別。
具體實現,在timsort類的sort()方法的sort()方法中。
if (nremaining < min_merge)
當被排序的陣列長度小於32時,具體的排序流程分為兩步,首先通過countrunandmakeascending()方法標出從當前陣列排序起點開始最長的一段單調區間並調均將這段區間調整為增序。
private int countrunandmakeascending(buffer a, int lo, int hi, comparator<? super k> c) else
return runhi - lo;
}
這裡的**可以看到,先從起點下標lo開始與其下乙個下標位置runhi=lo+1進行比較,如果lo+1位置的資料小於lo,那麼此時確認lo位置開始尋找lo位置開始的第一段遞減區間,直到該遞減區間結束,之後逆置這段區間為增序,記下區間結束下標,該下標與lo之差即為該段遞增區間的長度。而如果一開始就為遞增區間,那麼就一直記錄到該段遞增結束即可,並按照上述操作返回該段區間總長度。
private void binarysort(buffer a, int lo, int hi, int start, comparator<? super k> c)
assert left == right;
/** the invariants still hold: pivot >= all in [lo, left) and
* pivot < all in [left, start), so pivot belongs at left. note
* that if there are elements equal to pivot, left points to the
* first slot after them -- that's why this sort is stable.
* slide elements over to make room for pivot.
*/int n = start - left; // the number of elements to move
// switch is just an optimization for arraycopy in default case
switch (n)
s.copyelement(pivotstore, 0, a, left);
}}
之後就會通過binarysort()方法進行二分插入排序,由於已經通過之前的方式得到了從開始位置的一段遞增區間,那麼就可以以此為基礎開始從遞增區間後的下乙個元素開始往前插入,通過二分法確定插入位置,直到全部插入完畢,這是被排序陣列小於32時候的情況。
接下來是大於32的情況,會採用歸併排序。
在這裡的歸併排序,會通過乙個sortstate的內部類來輔助完成歸併排序,在這裡開始會初始化乙個sortstate。
runbase = new int[stacklen];
runlen = new int[stacklen];
sortstate實際維護了兩個堆疊,乙個堆疊用來儲存乙個遞增區間在當前陣列中的起始下標,另乙個則代表該區間的長度,通過堆疊的下標來定義乙個在被維護陣列中的有序區間。用來歸併排序。
sortstate sortstate = new sortstate(a, c, hi - lo);
int minrun = minrunlength(nremaining);
通過minrunlength()方法確認,以被排序的陣列長度得到一次最小的排序長度。
private int minrunlength(int n)
return n + r;
}
最小數量根據具體的長度在16到32的區間。
do
// push run onto pending-run stack, and maybe merge
sortstate.pushrun(lo, runlen);
sortstate.mergecollapse();
// advance to find next run
lo += runlen;
nremaining -= runlen;
} while (nremaining != 0);
之後按照小於32時候的場景,首先根據countrunandmakeascending()方法得到排序當前其實位置下標下的一段遞增區間,如果該區間小於最小排序長度那麼先得到剩陣列未排序餘排序長度和最小排序的最小者,先根據之前的binartsort()進行二分插入排序,使得在接下來的排序中,下一段遞增區間為此次發生的二分插入排序的排序長度。
在得到上述的乙個有序遞增區間之後,則將該區間的起始位置和長度分別壓入堆疊,並通過mergecollapse()方法進行歸併排序。
private void mergecollapse() else if (runlen[n] > runlen[n + 1])
mergeat(n);
}}
是否採取要開始排序在這裡判斷,如果當sortstate中的遞增區間數量為1,也沒有必要進行排序,否則只有在堆疊數量為2,且棧底長度大於棧頂的時候才不會進行歸併。
歸併排序主要分為三個步驟。
int k = gallopright(s.getkey(a, base2, key0), a, base1, len1, 0, c);
assert k >= 0;
base1 += k;
len1 -= k;
if (len1 == 0)
return;
/* * find where the last element of run1 goes in run2. subsequent elements
* in run2 can be ignored (because they're already in place).
*/len2 = gallopleft(s.getkey(a, base1 + len1 - 1, key0), a, base2, len2, len2 - 1, c);
assert len2 >= 0;
if (len2 == 0)
return;
// merge remaining runs, using tmp array with min(len1, len2) elements
if (len1 <= len2)
mergelo(base1, len1, base2, len2);
else
mergehi(base1, len1, base2, len2);
首先,在gallopright()方法中,為第一步。
int maxofs = len - hint;
while (ofs < maxofs && c.compare(key, s.getkey(a, base + hint + ofs, key1)) >= 0)
if (ofs > maxofs)
ofs = maxofs;
// make offsets relative to b
lastofs += hint;
ofs += hint;
在這裡的一步,主要是從以第二個區間第乙個元素為基準找到第乙個遞增區間中首個大於該元素的下標,由於區間都是遞增,可以保證該元素之前都是小於第二個區間的第乙個元素的,所以第乙個區間的這一段元素不需要參與接下來的歸併排序。
同樣的第二步,gallopleft()的操作類似,選取第乙個區間的最後乙個元素插入到從第二個區間左邊最大值開始尋找首個小於該元素的值,這樣可以保證左端都是大於第乙個區間最後乙個元素的,同樣的第二個區間的這一段不用參與歸併排序。
最後參與到第三步,歸併排序的只剩下第乙個區間的右邊和第二個區間的左邊,根據這兩端的長度確定是採用mergelo()還是mergehi()方法進行排序。
在排序的過程中還有一些細節,比如在歸併中,如果一次性將超過7個的第乙個區間插入到結果陣列,那麼說明此時第乙個區間中元素很有可能在接下來的一段時間都過小,重複之前的第一步操作可以使得效能大大提高。
在完成sortstate的歸併排序之後,回到timsort的迴圈中,會繼續按照之前的規則尋找下一段遞增區間放入堆疊中準備排序直到陣列被全部劃分為遞增區間存在堆疊中,並在結束後呼叫sortstate的mergeforcecollapse()方法將堆疊中剩餘的所有區間歸併排序完畢,一次排序也宣告結束。
Spark實現排序
question 用spark對資料進行排序,首先按照顏值的從高到低進行排序,如果顏值相等,在根據年齡的公升序排序 1.user類繼承ordered,並且序列化 package cn.edu360.spark.day06 import org.apache.log4j.import org.apac...
Spark實現排序
question 用spark對資料進行排序,首先按照顏值的從高到低進行排序,如果顏值相等,在根據年齡的公升序排序 1.user類繼承ordered,並且序列化 package cn.edu360.spark.day06 import org.apache.log4j.import org.apac...
Spark的二次排序
1 資料樣本 1 52 4 3 61 3 2 11 14 2 45 4 11 3 23 5 12 6 13 2 排序規則 先按照第乙個字元排序,如果第乙個相同,再按照第二個字元排序 3 排序後的結果 1 31 5 1 14 2 12 4 2 45 3 63 23 4 11 5 12 6 13 4 s...