[size=large]1、普通事務spout[/size]
/*** 普通事務spout
*/public class mytxspout implements itransactionalspout;
string session_id = ;
string time = ;
for (long i = 0; i < 100; i++)
}public void declareoutputfields(outputfieldsdeclarer declarer)
public mapgetcomponentconfiguration()
public org.apache.storm.transactional.itransactionalspout.coordinatorgetcoordinator(map conf,
topologycontext context)
public org.apache.storm.transactional.itransactionalspout.emittergetemitter(map conf,
topologycontext context)
}
[size=large]2、事務spout建立乙個新的事務(元資料)metadata[/size]
[b]2、1元資料定義[/b]
public class mymata implements serializable
public long getbeginpoint()
public void setbeginpoint(long beginpoint)
public int getnum()
public void setnum(int num)
}
[b]2、2 獲得(元資料)metadata,逐個發射實際batch的tuple[/b]
public class myemitter implements itransactionalspout.emitter
//逐個發射實際batch的tuple
public void emitbatch(transactionattempt tx, mymata coordinatormeta, batchoutputcollector collector)
collector.emit(new values(tx, dbmap.get(i)));}}
public void cleanupbefore(biginteger txid)
public void close()
}
[size=x-large]3、事務bolt,會從emitter接收資料處理,處理完成,提交給finishbatch方法處理。[/size]
/*** 事務bolt
*/public class mytransactionbolt extends basetransactionalbolt
integer count = 0;
batchoutputcollector collector;
transactionattempt tx;
// 會從emitter接收資料處理,處理完成,提交給finishbatch方法處理。
public void execute(tuple tuple)
}// 批處理提交
public void finishbatch()
public void declareoutputfields(outputfieldsdeclarer declarer)
}
[size=large]4、icommitter,batch之間強制按照順序進行提交[/size]
public class mycommitter extends basetransactionalbolt implements icommitter
public void finishbatch() else
dbmap.put(global_key, newvalue);
} else
system.err.println("total*************************=:" + dbmap.get(global_key).count);
// collector.emit(tuple)
}public void prepare(map conf, topologycontext context, batchoutputcollector collector, transactionattempt id)
public void declareoutputfields(outputfieldsdeclarer declarer)
public static class dbvalue
}
[size=large]5、topo類[/size]
public class mytopo catch (alreadyaliveexception e) catch (invalidtopologyexception e) catch (authorizationexception e)
} else }}
[size=large]6、測試結果[/size]
[quote]
啟動乙個事務:0----10
mytransactionbolt prepare 1 attemptid3005464965348344518
mytransactionbolt prepare 1 attemptid3005464965348344518
mytransactionbolt transactionattempt 1 attemptid3005464965348344518
mytransactionbolt prepare 1 attemptid3005464965348344518
mytransactionbolt transactionattempt 1 attemptid3005464965348344518
mytransactionbolt transactionattempt 1 attemptid3005464965348344518
mytransactionbolt transactionattempt 1 attemptid3005464965348344518
mytransactionbolt transactionattempt 1 attemptid3005464965348344518
mytransactionbolt transactionattempt 1 attemptid3005464965348344518
finishbatch 3
mytransactionbolt transactionattempt 1 attemptid3005464965348344518
mytransactionbolt transactionattempt 1 attemptid3005464965348344518
mytransactionbolt transactionattempt 1 attemptid3005464965348344518
mytransactionbolt transactionattempt 1 attemptid3005464965348344518
finishbatch 4
finishbatch 3
total*************************=:10
啟動乙個事務:10----10
mytransactionbolt prepare 2 attemptid4420908201582652570
mytransactionbolt prepare 2 attemptid4420908201582652570
mytransactionbolt prepare 2 attemptid4420908201582652570
mytransactionbolt transactionattempt 2 attemptid4420908201582652570
mytransactionbolt transactionattempt 2 attemptid4420908201582652570
mytransactionbolt transactionattempt 2 attemptid4420908201582652570
mytransactionbolt transactionattempt 2 attemptid4420908201582652570
mytransactionbolt transactionattempt 2 attemptid4420908201582652570
mytransactionbolt transactionattempt 2 attemptid4420908201582652570
mytransactionbolt transactionattempt 2 attemptid4420908201582652570
mytransactionbolt transactionattempt 2 attemptid4420908201582652570
mytransactionbolt transactionattempt 2 attemptid4420908201582652570
finishbatch 3
mytransactionbolt transactionattempt 2 attemptid4420908201582652570
finishbatch 4
finishbatch 3
total*************************=:20[/quote]
日誌傳送是否可以與普通事務日誌備份並存
2016 03 18 15 18 整理,未發布 日誌傳送實際是備份日誌 拷貝日誌 還原日誌。我們可以將備份好的日誌檔案拷貝乙份到其他儲存,再結合必要的完整備份,就可以用來還原資料。如果做了日誌傳送,還需對其做普通的日誌備份。那就要考慮清楚,普通的日誌備份不能截斷事務日誌,不然日誌傳送中的日誌鏈就會中...
SQL server從入門精通 事務
事務 我的理解 執行幾條語句時,只要有一條語句執行不成功,其他的語句都不夠被執行 事務 將多個操作當做乙個獨立的邏輯單元的執行方式為事務 特點 多個操作只有在都執行成功時才算成功,只要有乙個執行失敗那應該整體就屬於失敗,成功了可以提交,失敗了可以回滾 語法begin transaction tr i...
關於SQLSERVER 事物的運用 1 普通事物
關於sqlserver 事物的運用 概述 以往在sql2000下處理異常通常的方式比較繁瑣,sql2005版本以上加入了begin try end try begin cath end catch 是異常的捕獲稍微顯得簡單一些,根據我的測試,將我對sql事物的處理做一下整理 本文分為三個部分來描述,...