最近在做乙個小功能,具體邏輯是通過**讀取datahub資料並把獲取到的資料產出成資料檔案。因為資料量較大,所以考慮到使用多執行緒來執行。下面是主要**
執行緒啟動方法
public
void
start() throws exception
讀取datahub**
/**
* 每個檔案的最大資料條數
** @param maxsize
*/public void start(integer maxsize, string shardid) throws exception
boolean bexit = false;
gettopicresult topicresult = client.gettopic(projectname, topicname);
// 首先初始化offset上下文
offsetcontext offsetctx = client.initoffsetcontext(projectname, topicname, subid, shardid);
string cursor = null; // 開始消費的cursor
if (!offsetctx.hasoffset()) else
// logger.info("start consume records, begin offset context:" + offsetctx.toobjectnode().tostring()
// + ", cursor:" + cursor);
long recordnum = 0l;
int limit = 30000
; while (!bexit)
// 將最後一次消費點位上報
client.commitoffset(offsetctx);
// logger.info("commit offset suc! offset context: " + offsetctx.toobjectnode().tostring());
// 可以先休眠一會,再繼續消費新記錄
// thread.sleep(1000);
logger.info("sleep 1s and continue consume records! shard id:" + shardid);
} else
// 上報點位,該示例是每處理100條記錄上報一次點位
offsetctx.setoffset(record.getoffset());
recordnum++;
if (recordnum % 100 == 0) }}
cursor = recordresult.getnextcursor();
}} catch (subscriptionofflineexception e) catch (offsetresetedexception e) catch (offsetsessionchangedexception e) catch (invalidcursorexception ex) catch (exception e)
}}
產出資料檔案**
/**
* 寫入txt檔案
**@param result
*@param filename
*@return
*/public
static
boolean
writetext(listresult, string filename,string filepath)
string relfilepath = null;
if(filepath.endswith(file.separator))else
file file = new file(relfilepath);
if (!file.exists())
out = new bufferedwriter(new outputstreamwriter(new fileoutputstream(file), "gbk"));
for (string info : result)
flag = true;
}if (out != null) catch (ioexception e)
}}catch(exception e)finally
}
然後資料執行起來後就發現生成的檔案裡面的資料有些是不對的,有些行多幾個字段,有些行少了幾個字段。
後來查了諸多資料,然後就知道了多個執行緒訪問同乙個方法時,為了保證資料的一致性,需要對共同訪問的方法加同步鎖,這個很重要!
於是就把生成檔案的**修改了成了
/**
* 寫入txt檔案
**@param result
*@param filename
*@return
*/public
static
synchronized
boolean
writetext(listresult, string filename,string filepath)
string relfilepath = null;
if(filepath.endswith(file.separator))else
file file = new file(relfilepath);
if (!file.exists())
out = new bufferedwriter(new outputstreamwriter(new fileoutputstream(file), "gbk"));
for (string info : result)
flag = true;
}if (out != null) catch (ioexception e)
}}catch(exception e)finally
}
然後問題就解決了。 多個執行緒呼叫同乙個執行緒函式
多個執行緒呼叫同乙個執行緒函式 如題,能這樣嗎?因為有很多個操作,但是這些操作都是一樣的,所以想用相同的執行緒函式,但是感覺執行時執行緒還是乙個乙個執行,並沒有提高速度,應該是我理解錯了!老大些幫幫忙,給個建議問題補充 我的意思是執行緒處理函式,剛剛找了點資料,覺得這想法沒有錯,只要執行緒裡面不用全...
同乙個頁面多個div,ajax呼叫
最近在做專案的時候發現兩個問題,第乙個問題是在主頁面有多個div,在div裡面有input元素的onclick事件,在第一次開啟div的時候通過onclick呼叫ajax可以被執行,當關掉開啟的div重新再開啟的時候onclick事件不生效。第二個問題是兩個div呼叫ajax時會相互影響,當開啟第乙...
Objective C 乙個方法如何傳遞多個引數?
乙個方法可以包含多個引數,不過後面的引數都要寫名字。多個引數的寫法 方法的資料型別 函式名 引數1資料型別 引數1值的名字 引數2的名字 引數2資料型別 引數2值的名字 舉個例子,乙個方法的定義 void setkids nsstring myoldestkidname secondkid nsst...