1。同步和非同步的方式
public
static
void
main(string args) else
}// 非同步的方式接收訊息
consumer.setmessagelistener(new messagelistener()
}catch(exception e)
}}); //(非同步接收)
} catch (exception e) finally catch (throwable ignore)
}}
現在講講consumer.receive和consumer.setmessagelistener方式的不同;
1。 consumer.receive方式
public message receive() throws jm***ception
beforemessageisconsumed(md);
aftermessageisconsumed(md, false);
return createactivemqmessage(md);
}
可以看到sendpullcommand,其會先傳送乙個pull命令,然後然後從接收的訊息列表中dequeue一條訊息,dequeue的引數-1表示,如果沒有訊息返回,會阻塞在這兒,如果有訊息便會進行消費。
下面講解非同步消費訊息過程。
@override
public
void
setmessagelistener(messagelistener listener) throws jm***ception
if (listener != null)
this.messagelistener.set(listener);
session.redispatch(this, unconsumedmessages);
if (wasrunning)
} else
}
有一句session.start();然後進入activemqsession類,看下這個start方法
/**
* start this session.
**@throws jm***ception
*/protected
void
start() throws jm***ception
executor.start();
}
在看這句c.start();
public
void
start() throws jm***ception
started.set(true);
unconsumedmessages.start();
session.executor.wakeup();
}
在看activemqsessionexecutor中的wakeup方法
public
void
wakeup()
this.taskrunner = session.connection.getsessiontaskrunner().createtaskrunner(this,
"activemq session: " + session.getsessionid());
}taskrunner = this.taskrunner;}}
taskrunner.wakeup();
} catch (interruptedexception e)
} else }}
}
其建立了乙個執行緒執行的類;
public taskrunner createtaskrunner(task task, string name) else
}
看其構造方法
public
dedicatedtaskrunner(final task task, string name, int priority, boolean daemon) finally ", task);}}
};thread.setdaemon(daemon);
thread.setname(name);
thread.setpriority(priority);
thread.start();
}
在看runtask方法:
final
void runtask()
}log.trace("running task {}", task);
if (!task.iterate())
while (!pending) }}
}} catch (interruptedexception e) finally
}}
其中有這句話task.iterate()。。就是執行到activemqsessionexecutor的iterate方法
public
boolean
iterate()
}// no messages left queued on the listeners.. so now dispatch messages
// queued on the session
messagedispatch message = messagequeue.dequeuenowait();
if (message == null) else
}
檢視dispatch方法
void dispatch(messagedispatch message)
}}
然後看看consumer.dispatch(message);這句話。最終是執行到了activemqmessageconsumer的dispatch方法。。
@override
public
void
dispatch(messagedispatch md)
activemqmessage message = createactivemqmessage(md);
beforemessageisconsumed(md);
try
aftermessageisconsumed(md, expired);
} catch (runtimeexception e) exception while processing message: {}", getconsumerid(), md.getmessage().getmessageid(), e);
md.setrollbackcause(e);
if (isautoacknowledgebatch() || isautoacknowledgeeach() || session.isindividualacknowledge()) else
}} else
if (md.getmessage() == null) else
} else }}
}} else tracking transacted redelivery {}", getconsumerid(), md.getmessage());
if (transactedindividualack) else
} else
if ((consumerwithpendingtransaction = redeliverypendingincompetingtransaction(md)) != null) delivering duplicate {}, pending transaction completion on {} will rollback", getconsumerid(), md.getmessage(), consumerwithpendingtransaction);
session.getconnection().rollbackduplicate(this, md.getmessage());
dispatch(md);
} else suppressing duplicate delivery on connection, poison acking: {}", getconsumerid(), md);
posionack(md, "suppressing duplicate delivery on connection, consumer " + getconsumerid());}}
}}
if (++dispatchedcount % 1000 == 0)
} catch (exception e)
}
然後檢視上述方法。你會發現最終執行到了。consumer上的listener。 ActiveMq點對點模式傳送 接收訊息
訊息傳送流程 1 客戶機傳送訊息到jms訊息中介軟體 2 服務端負責監聽jms訊息目的地。3 發現jms裡面有訊息產生,服務就可以接受訊息。點對點訊息傳送服務 1 訊息只能被乙個服務接受 2 多個服務同時監聽訊息伺服器,遵循先來後到原則。3 訊息一旦被接受,訊息自動消失。4 如果訊息一直沒有被接受,...
ActiveMQ五種訊息的傳送 接收
1.生產者 連線工廠 connectionfactory connfactory new activemqconnectionfactory activemqconnection.default user,activemqconnection.default password,tcp localho...
突然不發訊息給ActiveMQ但能接收
今天專案在聯調過程中,activemq突然不好使了。在此之前一月內,專案組的人都沒有去修改 重啟過mq服務。雖然現在知道是由於許可權的問題導致只能收不能發 站在activemq角度是收不到,但可以發 只是到現在還不知道原來沒變過的 怎麼以前可以用,現在卻不行了?通過查詢示例 發現有connectio...