Apache RocketMQ 源码解析 —— Controller 高可用切换架构

2025年5月29日

一、原理及核心概念浅述

1.1 核心架构

1.2 核心概念

  1. controller:负责管理broker间的主备关系,可以挂在namesrv中,不影响namesrv能力,支持独立部署。
  2. master/slave:主备身份。
  3. syncStateSet:字面意思为“同步状态集合”。当备节点能够及时跟上主节点,则会纳入syncStateSet。
  4. epoch:用于记录每一次主备切换时的状态,避免切换后产生数据丢失或者不一致的情况。

为方便理解,在某些过程中可以把controller当作班主任,master作为小组长,slave作为小组成员。同步过程是各位同学向小组长抄作业的过程,位于syncStateSet中的是优秀作业。

二、相关代码文件及说明

核心是“controller+broker+复制过程”,因此分三块进行叙述。

2.1 Controller

该部分代码主要集中在rocketmq-controller模块下,主要有如下代码文件:

  • **ControllerManager: **负责管理controller,其中存储了许多controller相关配置,并负责了心跳管理等核心功能。(班主任管理条例)
  • DLederController: Controller的DLedger实现。包含了controller的基本功能。在其中实现了副本信息管理、broker存活情况探测、选举Master等核心功能。(某种班主任)
  • **DefaultBrokerHeartbeatManager: **负责管理broker心跳,其中包含了broker存活情况表,以及在broker下线时的listeners,当副本掉线时,触发重新选举。(点名册)
  • ReplicasInfoManager: 负责controller中事件的处理。即各种选举事件、更换SyncStateSet事件等等。(小组登记册)
  • **ControllerRequestProcessor: **处理向controller发送的requests,例如让controller选举、向controller注册broker、心跳、更换SyncStateSet等等。(班主任信箱)
  • **DefaultElectPolicy: **选举Master的策略。可以选择从sync状态的副本中选,也可以支持从所有副本中(无论是否同步)的unclean选举。(班规)

2.2 Broker

该部分代码主要集中在rocketmq-broker模块中,可进入org/apache/rocketmq/broker/controller进行查看:

  • ReplicasManager: 完成自己作为一个replica的使命——找controller,角色管理,Master更新(Expand/Shrink)SyncStateSet等等。

2.3 复制模块

该部分代码主要集中在rocketmq-store模块中的ha文件夹下:

  • **HAService: **每个Replica必备的的service,负责管理作为主、备的同步任务。
  • **HAClient: **每个Slave 的HAService中必备的client,负责管理同步任务中的读、写操作。
  • HAConnection: 代表在Master中的HA连接,每个connection理论上对应一个slave。在该connection类中存储了传输过程中的诸多内容,包括channel、传输状态、当前传输位点等等信息。

三、核心流程

3.1 心跳

核心CODE:BROKER_HEARTBEAT

Broker端:

该部分较简单,带上code向controller发request,不再赘述:

BrokerController.sendHeartbeat() -> brokerOuterAPI.sendHeartbeat()

Controller端:

�1. 首先由**ControllerRequestProcessor**接收到code,进入处理逻辑:

private RemotingCommand handleBrokerHeartbeat(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
final BrokerHeartbeatRequestHeader requestHeader = (BrokerHeartbeatRequestHeader) request.decodeCommandCustomHeader(BrokerHeartbeatRequestHeader.class);
if (requestHeader.getBrokerId() == null) {
return RemotingCommand.createResponseCommand(ResponseCode.CONTROLLER_INVALID_REQUEST, "Heart beat with empty brokerId");
}
this.heartbeatManager.onBrokerHeartbeat(requestHeader.getClusterName(), requestHeader.getBrokerName(), requestHeader.getBrokerAddr(), requestHeader.getBrokerId(),
requestHeader.getHeartbeatTimeoutMills(), ctx.channel(), requestHeader.getEpoch(), requestHeader.getMaxOffset(), requestHeader.getConfirmOffset(), requestHeader.getElectionPriority());
return RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, "Heart beat success");
}
  1. 之后在onBrokerHeartbeat()中,主要更新controller brokerHeartbeatManager中的**brokerLiveTable**:
public void onBrokerHeartbeat(String clusterName, String brokerName, String brokerAddr, Long brokerId,
Long timeoutMillis, Channel channel, Integer epoch, Long maxOffset, Long confirmOffset, Integer electionPriority) {
BrokerIdentityInfo brokerIdentityInfo = new BrokerIdentityInfo(clusterName, brokerName, brokerId);
BrokerLiveInfo prev = this.brokerLiveTable.get(brokerIdentityInfo);
......
if (null == prev) {
this.brokerLiveTable.put(...);
log.info("new broker registered, {}, brokerId:{}", brokerIdentityInfo, realBrokerId);
} else {
prev.setXXX(......)
}
}

3.2 选举

相关CODE: CONTROLLER_ELECT_MASTER

有如下几种情形可能触发选举:

  1. controller主动发起,通过triggerElectMaster():
    1. HeartbeatManager监听到有broker心跳失效。 (班主任发现有小组同学退学了)
    2. Controller检测到有一组Replica Set不存在master。(班主任发现有组长虽然在名册里,但是挂了)
  2. broker发起将自己选为master,通过ReplicaManager.brokerElect():
    1. Broker向controller查metadata时,没找到master信息。(同学定期检查小组情况,问班主任为啥没小组长)
    2. Broker向controller注册完后,仍未从controller获取到master信息。(同学报道后发现没小组长,汇报)
  3. 通过tools发起:
    1. 通过选举命令ReElectMasterSubCommand发起。(校长直接任命)

上述所有过程,最终均触发:

controller.electMaster() -> replicasInfoManager.electMaster()

// 即,所有小组长必须通过班主任任命

public ControllerResult<ElectMasterResponseHeader> electMaster(final ElectMasterRequestHeader request,
final ElectPolicy electPolicy) {
...
// 从request中取信息
...
if (syncStateInfo.isFirstTimeForElect()) {
// 从未注册,直接任命
newMaster = brokerId;
}
// 按选举政策选主
if (newMaster == null) {
// we should assign this assignedBrokerId when the brokerAddress need to be elected by force
Long assignedBrokerId = request.getDesignateElect() ? brokerId : null;
newMaster = electPolicy.elect(brokerReplicaInfo.getClusterName(), brokerReplicaInfo.getBrokerName(), syncStateSet, allReplicaBrokers, oldMaster, assignedBrokerId);
}
if (newMaster != null && newMaster.equals(oldMaster)) {
// 老主 == 新主
// old master still valid, change nothing
String err = String.format("The old master %s is still alive, not need to elect new master for broker %s", oldMaster, brokerReplicaInfo.getBrokerName());
LOGGER.warn("{}", err);
// the master still exist
response.setXXX()
result.setBody(new ElectMasterResponseBody(syncStateSet).encode());
result.setCodeAndRemark(ResponseCode.CONTROLLER_MASTER_STILL_EXIST, err);
return result;
}
// a new master is elected
if (newMaster != null) {
// 出现不一样的新主
final int masterEpoch = syncStateInfo.getMasterEpoch();
final int syncStateSetEpoch = syncStateInfo.getSyncStateSetEpoch();
final HashSet<Long> newSyncStateSet = new HashSet<>();
//设置新的syncStateSet
newSyncStateSet.add(newMaster);
response.setXXX()...
ElectMasterResponseBody responseBody =
new ElectMasterResponseBody(newSyncStateSet);
}
result.setBody(responseBody.encode());
final ElectMasterEvent event = new ElectMasterEvent(brokerName, newMaster);
result.addEvent(event);
return result;
}
// 走到这里,说明没有主,选举失败
// If elect failed and the electMaster is triggered by controller (we can figure it out by brokerAddress),
// we still need to apply an ElectMasterEvent to tell the statemachine
// that the master was shutdown and no new master was elected.
if (request.getBrokerId() == null) {
final ElectMasterEvent event = new ElectMasterEvent(false, brokerName);
result.addEvent(event);
result.setCodeAndRemark(ResponseCode.CONTROLLER_MASTER_NOT_AVAILABLE, "Old master has down and failed to elect a new broker master");
} else {
result.setCodeAndRemark(ResponseCode.CONTROLLER_ELECT_MASTER_FAILED, "Failed to elect a new master");
}
return result;
}

3.3 更新SyncStateSet

核心CODE: CONTROLLER_ALTER_SYNC_STATE_SET

  1. �由master发起,主动向controller更换syncStateSet(等价于小组长汇报优秀作业
  2. controllerRequestProcessor接收更换syncStateSet的请求,进入handleAlterSyncStateSet()方法:
private RemotingCommand handleAlterSyncStateSet(ChannelHandlerContext ctx,
RemotingCommand request) throws Exception {
final AlterSyncStateSetRequestHeader controllerRequest = (AlterSyncStateSetRequestHeader) request.decodeCommandCustomHeader(AlterSyncStateSetRequestHeader.class);
final SyncStateSet syncStateSet = RemotingSerializable.decode(request.getBody(), SyncStateSet.class);
final CompletableFuture<RemotingCommand> future = this.controllerManager.getController().alterSyncStateSet(controllerRequest, syncStateSet);
if (future != null) {
return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS);
}
return RemotingCommand.createResponseCommand(null);
}
  1. 之后进入Controller.alterSyncStateSet() -> replicasInfoManager.alterSyncStateSet()方法:
public ControllerResult<AlterSyncStateSetResponseHeader> alterSyncStateSet(
final AlterSyncStateSetRequestHeader request, final SyncStateSet syncStateSet,
final BrokerValidPredicate brokerAlivePredicate) {
final String brokerName = request.getBrokerName();
...
final Set<Long> newSyncStateSet = syncStateSet.getSyncStateSet();
final SyncStateInfo syncStateInfo = this.syncStateSetInfoTable.get(brokerName);
final BrokerReplicaInfo brokerReplicaInfo = this.replicaInfoTable.get(brokerName);
// 检查syncStateSet是否有变化
final Set<Long> oldSyncStateSet = syncStateInfo.getSyncStateSet();
if (oldSyncStateSet.size() == newSyncStateSet.size() && oldSyncStateSet.containsAll(newSyncStateSet)) {
String err = "The newSyncStateSet is equal with oldSyncStateSet, no needed to update syncStateSet";
...
}
// 检查是否是master发起的
if (!syncStateInfo.getMasterBrokerId().equals(request.getMasterBrokerId())) {
String err = String.format("Rejecting alter syncStateSet request because the current leader is:{%s}, not {%s}",
syncStateInfo.getMasterBrokerId(), request.getMasterBrokerId());
...
}
// 检查master的任期epoch是否一致
if (request.getMasterEpoch() != syncStateInfo.getMasterEpoch()) {
String err = String.format("Rejecting alter syncStateSet request because the current master epoch is:{%d}, not {%d}",
syncStateInfo.getMasterEpoch(), request.getMasterEpoch());
...
}
// 检查syncStateSet的epoch
if (syncStateSet.getSyncStateSetEpoch() != syncStateInfo.getSyncStateSetEpoch()) {
String err = String.format("Rejecting alter syncStateSet request because the current syncStateSet epoch is:{%d}, not {%d}",
syncStateInfo.getSyncStateSetEpoch(), syncStateSet.getSyncStateSetEpoch());
...
}
// 检查新的syncStateSet的合理性
for (Long replica : newSyncStateSet) {
// 检查replica是否存在
if (!brokerReplicaInfo.isBrokerExist(replica)) {
String err = String.format("Rejecting alter syncStateSet request because the replicas {%s} don't exist", replica);
...
}
// 检查broker是否存活
if (!brokerAlivePredicate.check(brokerReplicaInfo.getClusterName(), brokerReplicaInfo.getBrokerName(), replica)) {
String err = String.format("Rejecting alter syncStateSet request because the replicas {%s} don't alive", replica);
...
}
}
// 检查是否包含master
if (!newSyncStateSet.contains(syncStateInfo.getMasterBrokerId())) {
String err = String.format("Rejecting alter syncStateSet request because the newSyncStateSet don't contains origin leader {%s}", syncStateInfo.getMasterBrokerId());
...
}
// 更新epoch
int epoch = syncStateInfo.getSyncStateSetEpoch() + 1;
...
// 生成事件,替换syncStateSet
final AlterSyncStateSetEvent event = new AlterSyncStateSetEvent(brokerName, newSyncStateSet);
...
}
  1. 最后通过syncStateInfo.updateSyncStateSetInfo(),更新syncStateSetInfoTable.get(brokerName)得到的syncStateInfo信息(该过程可以理解为班主任在班级分组册上找到了组长的名字,拿出组员名单,更新)。

3.4 复制

该部分较复杂,其中HAService/HAClient/HAConnection以及其中的各种Service/Reader/Writer容易产生混淆,对阅读造成阻碍。因此绘制本图帮助理解(可在粗读源码后回头理解):

下面对HA复制过程作拆解,分别讲解:

  1. 在各个replica的DefaultMessageStore中均注册了HAService,负责管理HA的复制。
  2. 在Master的 HAService中有一个**AcceptSocketService**, 负责自动接收各个slave的连接:
protected abstract class AcceptSocketService extends ServiceThread {
...
/**
* Starts listening to slave connections.
*
* @throws Exception If fails.
*/
public void beginAccept() throws Exception {
...
}
@Override
public void shutdown(final boolean interrupt) {
...
}
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
this.selector.select(1000);
Set<SelectionKey> selected = this.selector.selectedKeys();
if (selected != null) {
for (SelectionKey k : selected) {
if (k.isAcceptable()) {
SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
if (sc != null) {
DefaultHAService.log.info("HAService receive new connection, "
+ sc.socket().getRemoteSocketAddress());
try {
HAConnection conn = createConnection(sc);
conn.start();
DefaultHAService.this.addConnection(conn);
} catch (Exception e) {
log.error("new HAConnection exception", e);
sc.close();
}
}
}
...
}
}

  1. 在各个Slave 的HAService中存在一个**HAClient**,负责向master发起连接、传输请求。
public class AutoSwitchHAClient extends ServiceThread implements HAClient {
...
}
public interface HAClient {
void start();
void shutdown();
void wakeup();
void updateMasterAddress(String newAddress);
void updateHaMasterAddress(String newAddress);
String getMasterAddress();
String getHaMasterAddress();
long getLastReadTimestamp();
long getLastWriteTimestamp();
HAConnectionState getCurrentState();
void changeCurrentState(HAConnectionState haConnectionState);
void closeMaster();
long getTransferredByteInSecond();
}
  1. 当master收到slave的连接请求后,将会创建一个**HAConnection**,负责收发内容。
public interface HAConnection {
void start();
void shutdown();
void close();
SocketChannel getSocketChannel();
HAConnectionState getCurrentState();
String getClientAddress();
long getTransferredByteInSecond();
long getTransferFromWhere();
long getSlaveAckOffset();
}
  1. Master的HAConnection会与Slave的HAClient建立连接,二者均通过HAWriter(较简单,不解读,位于HAWriter类)往socket中写内容,再通过HAReader读取socket中的内容。只不过一个是HAServerReader,一个是HAClientReader:
public abstract class AbstractHAReader {
private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
protected final List<HAReadHook> readHookList = new ArrayList<>();
public boolean read(SocketChannel socketChannel, ByteBuffer byteBufferRead) {
int readSizeZeroTimes = 0;
while (byteBufferRead.hasRemaining()) {
...
boolean result = processReadResult(byteBufferRead);
...
}
}
...
protected abstract boolean processReadResult(ByteBuffer byteBufferRead);
}
  1. 两种HAReader均实现了processReadResult()方法,负责处理从socket中得到的数据。client需要详细阐述该方法,因为涉及到如何将读进来的数据写入commitlog,client的processReadResult():
@Override
protected boolean processReadResult(ByteBuffer byteBufferRead) {
int readSocketPos = byteBufferRead.position();
try {
while (true) {
...
switch (AutoSwitchHAClient.this.currentState) {
case HANDSHAKE: {
...
// 握手阶段,先检查commitlog完整性,截断
}
break;
case TRANSFER: {
// 传输阶段,将body写入commitlog
...
byte[] bodyData = new byte[bodySize];
...
if (bodySize > 0) {
// 传输阶段,将body写入commitlog
AutoSwitchHAClient.this.messageStore.appendToCommitLog(masterOffset, bodyData, 0, bodyData.length);
}
haService.updateConfirmOffset(Math.min(confirmOffset, messageStore.getMaxPhyOffset()));
...
break;
}
default:
break;
}
if (isComplete) {
continue;
}
}
// 检查buffer中是否还有数据, 如果有, compact()
...
break;
}
}
...
}
  1. server的processReadResult()主要用于接收client的握手等请求,较简单。更需要解释其WriteSocketService如何向socket中调用HAwriter去写数据:
abstract class AbstractWriteSocketService extends ServiceThread {
...
private void transferToSlave() throws Exception {
...
int size = this.getNextTransferDataSize();
if (size > 0) {
...
buildTransferHeaderBuffer(this.transferOffset, size);
this.lastWriteOver = this.transferData(size);
} else {
// 无需传输,直接更新caught up的时间
AutoSwitchHAConnection.this.haService.updateConnectionLastCaughtUpTime(AutoSwitchHAConnection.this.slaveId, System.currentTimeMillis());
haService.getWaitNotifyObject().allWaitForRunning(100);
}
}
@Override
public void run() {
AutoSwitchHAConnection.LOGGER.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
this.selector.select(1000);
switch (currentState) {
case HANDSHAKE:
// Wait until the slave send it handshake msg to master.
// 等待slave的握手请求,并进行回复
break;
case TRANSFER:
...
transferToSlave();
break;
default: ...
}
} catch (Exception e) {
...
}
}
...
// 在service结束后的一些事情
}
...
}

此处同样附上server实现processReadResult(),读socket中数据的代码:

@Override
protected boolean processReadResult(ByteBuffer byteBufferRead) {
while (true) {
...
HAConnectionState slaveState = HAConnectionState.values()[byteBufferRead.getInt(readPosition)];
switch (slaveState) {
case HANDSHAKE:
// 收到了client的握手
...
LOGGER.info("Receive slave handshake, slaveBrokerId:{}, isSyncFromLastFile:{}, isAsyncLearner:{}",
AutoSwitchHAConnection.this.slaveId, AutoSwitchHAConnection.this.isSyncFromLastFile, AutoSwitchHAConnection.this.isAsyncLearner);
break;
case TRANSFER:
// 收到了client的transfer状态
...
// 更新client状态信息
break;
default:
...
}
...
}

3.5 Active Controller的选举

该选举主要通过DLedger实现,在DLedgerController中通过RoleChangeHandler.handle()更新自身身份:

class RoleChangeHandler implements DLedgerLeaderElector.RoleChangeHandler {
private final String selfId;
private final ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("DLedgerControllerRoleChangeHandler_"));
private volatile MemberState.Role currentRole = MemberState.Role.FOLLOWER;
public RoleChangeHandler(final String selfId) {
this.selfId = selfId;
}
@Override
public void handle(long term, MemberState.Role role) {
Runnable runnable = () -> {
switch (role) {
case CANDIDATE:
this.currentRole = MemberState.Role.CANDIDATE;
// 停止扫描inactive broker任务
...
case FOLLOWER:
this.currentRole = MemberState.Role.FOLLOWER;
// 停止扫描inactive broker任务
...
case LEADER: {
log.info("Controller {} change role to leader, try process a initial proposal", this.selfId);
int tryTimes = 0;
while (true) {
// 将会开始扫描inactive brokers
...
break;
}
}
};
this.executorService.submit(runnable);
}
...
}
收藏
收藏暂无数据,请从小助手对话框添加
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
无疑 AI答疑专家
当前服务输出的内容均由人工智能模型生成,其生成内容的准确性和完整性无法保证,不代表我们的态度或观点。
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
账号:CNPilot
专家答疑