前幾天有dw使用者反饋,在往一張表(rcfile表)中用「insert overwrite table partition(xx) select ...」 插入資料的時候,會產生重複檔案。看了下這個作業log,發現map task 000005起了兩個task attempt ,第二個attempt是推測執行,並且這兩個attemp都在task close函式裡面重新命名temp檔案成正式檔案,而不是通過mapreduce框架的兩階段提交協議(two phrase commit protocol)在收到tasktracker發過來的committaskaction時再commit task來保證只有乙個attemp的結果成為正式結果。
task log中的輸出如下:
attempt_201304111550_268224_m_000005_0
renamed path hdfs: to hdfs: . file size is 666922
attempt_201304111550_268234_m_000005_1
renamed path hdfs: to hdfs: . file size is 666922
public void close() throws ioexception
outwriter.close();
outwriter = null;
if (!exception)
} else
}}
finally
if (rj != null)
hadoopjobexechelper.runningjobkilluris.remove(rj.getjobid());
jobid = rj.getid().tostring();
}} catch (exception e)
}
jobclose方法
public static void jobclose(string outputpath, boolean success, jobconf job,
loghelper console, dynamicpartitionctx dynpartctx) throws hiveexception, ioexception
}
mvfiletofinalpath方法
public static void mvfiletofinalpath(string specpath, configuration hconf,
boolean success, log log, dynamicpartitionctx dpctx, filesinkdesc conf) throws ioexception,
hiveexception
// step3: move to the file destination
log.info("moving tmp dir: " + intermediatepath + " to: " + finalpath);
utilities.renameormovefiles(fs, intermediatepath, finalpath);
}} else
fs.delete(tasktmppath, true);
}
removetemporduplicatefiles(filesystem fs, path path, dynamicpartitionctx dpctx),動態分割槽和非動態分割槽表不同處理邏輯
/**
* remove all temporary files and duplicate (double-committed) files from a given directory.
** @return a list of path names corresponding to should-be-created empty buckets.
*/public static arraylistremovetemporduplicatefiles(filesystem fs, path path,
dynamicpartitionctx dpctx) throws ioexception
arraylistresult = new arraylist();
if (dpctx != null)
}taskidtofile = removetemporduplicatefiles(items, fs);
// if the table is bucketed and enforce bucketing, we should check and generate all buckets
if (dpctx.getnumbuckets() > 0 && taskidtofile != null) }}
}} else
return result;
}
removetemporduplicatefiles(filestatus items, filesystem fs),對於每個目錄下相同taskid不同attemptid的檔案進行去重
public static hashmapremovetemporduplicatefiles(filestatus items,
filesystem fs) throws ioexception
hashmaptaskidtofile = new hashmap();
for (filestatus one : items)
} else else else
long len1 = todelete.getlen();
long len2 = taskidtofile.get(taskid).getlen();
if (!fs.delete(todelete.getpath(), true)) else }}
}return taskidtofile;
}
作業 合併果子 fruit
題目描述 在乙個果園裡,多多已經將所有的果子打了下來,而且按果子的不同種類分成了不同的堆。多多決定把所有的果子合成一堆。每一次合併,多多可以把兩堆果子合併到一起,消耗的體力等於兩堆果子的重量之和。可以看出,所有的果子經過n 1次合併之後,就只剩下一堆了。多多在合併果子時總共消耗的體力等於每次合併所耗...
C 作業6 陣列合併 矩陣求和
專案2 陣列合併 已知有兩個有序的陣列a,b,將這兩個陣列合併到陣列c中,陣列c依然有序,如a 5 b 5 則c 10 include using namespace std int main b 5 int c 10 i,j,t for i 0 i 5 i c i a i for i 5,j 0 ...
C 第六次作業 陣列合併,矩陣求和
一 問題及 專案二 陣列合併 檔名稱 ra.cpp 作 者 湯永泰 完成日期 2017 年 5 月 10 日 版 本 號 v1.0 對任務及求解方法的描述部分 輸入描述 無 問題描述 第六次作業 專案二 陣列合併 程式輸出 略 問題分析 略 演算法設計 略 includeusing namespac...