共计 10347 个字符,预计需要花费 26 分钟才能阅读完成。
怎么实现 CloudStack High Availability 源码分析,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面丸趣 TV 小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。
我们先来看 DirectAgentAttache 的内部类 PingTask, 首先我们要知道每一个注册到 CS 中的主机都有一个对应的 DirectAgentAttache, 这也就意味着每一个 HOST 都有一个 PingTask 线程在后台循环运行,时间间隔是由全局变量 ping.interval 来指定的,默认是 60s.
我们来看 PingTask 的代码
ServerResource resource = _resource;
if (resource != null) { PingCommand cmd = resource.getCurrentStatus(_id);
int retried = 0;
while (cmd == null ++retried = _HostPingRetryCount.value()) { Thread.sleep(1000*_HostPingRetryTimer.value());
cmd = resource.getCurrentStatus(_id);
}
if (cmd == null) { s_logger.warn( Unable to get current status on
_id 代表 host_id, 当 getCurrentStatus 能返回正确的 cmd 就说明能够 Ping 通该 host,那接下来就是执行_agentMgr.handleCommands
public void handleCommands(final AgentAttache attache, final long sequence, final Command[] cmds) { for (final Pair Integer, Listener listener : _cmdMonitors) { final boolean processed = listener.second().processCommands(attache.getId(), sequence, cmds);
}
}
其中我们关心 BehindOnPingListener,我们来看它的 processCommands 方法
@Override
public boolean processCommands(final long agentId, final long seq, final Command[] commands) {
final boolean processed = false;
for (final Command cmd : commands) { if (cmd instanceof PingCommand) { pingBy(agentId);
}
}
return processed;
}
接下来是 pingBy 方法
public void pingBy(final long agentId) {
// Update PingMap with the latest time if agent entry exists in the PingMap
if (_pingMap.replace(agentId, InaccurateClock.getTimeInSeconds()) == null) {
s_logger.info( PingMap for agent: + agentId + will not be updated because agent is no longer in the PingMap
}
}
这里重点就是这个_pingMap,我们看到它其实是一个 ConcurrentHashMap,key 是 agentId(比如 hostId),value 是一个时间戳,就是当我们这一次如果 Ping 通之后会把当前时间作为 value 插入到_pingMap 中。我们回顾一下上面说过 PingTask 是每 ping.interval 时间间隔执行一次,所以如果我们的主机是在正常运行的话那么_pingMap 就会几乎每 ping.interval 更新一次。(当然执行 getCurrentStatus 方法会有一定的延迟)那如果主机出现突然的故障导致网络无法连接的情况下,那_pingMap 中的时间就会一直停留在上一次 Ping 通的那个时间戳。
所以我们来总结一下 PingTask 的逻辑:就是每隔 ping.interval(默认 60s) 去 Ping 我们的主机,如果能够 Ping 通就更新_pingMap 中的 value 为当前时间戳,否则什么都不做。
接下来我们要看的另一个后台线程是 MonitorTask, 同样是每隔 ping.interval 执行一次,先是方法 findAgentsBehindOnPing
protected List Long findAgentsBehindOnPing() {
final List Long agentsBehind = new ArrayList Long
final long cutoffTime = InaccurateClock.getTimeInSeconds() - getTimeout();
for (final Map.Entry Long, Long entry : _pingMap.entrySet()) { if (entry.getValue() cutoffTime) { agentsBehind.add(entry.getKey());
}
}
return agentsBehind;
}
protected long getTimeout() { return (long) (PingTimeout.value() * PingInterval.value());
}
全局变量 ping.timeout 默认值是 2.5,这段代码的意思就是找出上一次 Ping 通的时间距离现在超过 ping.interval 的 2.5 倍的主机,简单讲就是 Ping 不通或者 Ping 通的延时超过我们认为的不合理时间的主机。正常情况下该方法返回的会是一个空的 List,这个时候 MonitorTask 就结束当前任务。但是如果出现网络延时或者主机故障的时候,就要执行接下来的代码。
final List Long behindAgents = findAgentsBehindOnPing();
for (final Long agentId : behindAgents) { final QueryBuilder HostVO sc = QueryBuilder.create(HostVO.class);
sc.and(sc.entity().getId(), Op.EQ, agentId);
final HostVO h = sc.find();
if (h != null) { final ResourceState resourceState = h.getResourceState();
if (resourceState == ResourceState.Disabled || resourceState == ResourceState.Maintenance || resourceState == ResourceState.ErrorInMaintenance) { disconnectWithoutInvestigation(agentId, Event.ShutdownRequested);
} else { final HostVO host = _hostDao.findById(agentId);
if (host != null (host.getType() == Host.Type.ConsoleProxy || host.getType() == Host.Type.SecondaryStorageVM
|| host.getType() == Host.Type.SecondaryStorageCmdExecutor)) { disconnectWithoutInvestigation(agentId, Event.ShutdownRequested);
} else { disconnectWithInvestigation(agentId, Event.PingTimeout);
}
}
}
}
我们假设出问题的是一台计算节点,那么一路往下将要执行的将是 AgentManagerImpl 的 handleDisconnectWithInvestigation 方法
protected boolean handleDisconnectWithInvestigation(final AgentAttache attache, Status.Event event) { final long hostId = attache.getId();
HostVO host = _hostDao.findById(hostId);
if (host != null) {
Status nextStatus = null;
nextStatus = host.getStatus().getNextStatus(event);
if (nextStatus == Status.Alert) { Status determinedState = investigate(attache);
if (determinedState == null) { if ((System.currentTimeMillis() 10) - host.getLastPinged() AlertWait.value()) {
determinedState = Status.Alert;
} else {
return false;
}
}
final Status currentStatus = host.getStatus();
if (determinedState == Status.Down) {
event = Status.Event.HostDown;
} else if (determinedState == Status.Up) { agentStatusTransitTo(host, Status.Event.Ping, _nodeId);
return false;
} else if (determinedState == Status.Disconnected) { if (currentStatus == Status.Disconnected) { if ((System.currentTimeMillis() 10) - host.getLastPinged() AlertWait.value()) {
event = Status.Event.WaitedTooLong;
} else {
return false;
}
} else if (currentStatus == Status.Up) {
event = Status.Event.AgentDisconnected;
}
}
}
}
handleDisconnectWithoutInvestigation(attache, event, true, true);
host = _hostDao.findById(hostId); // Maybe the host magically reappeared?
if (host != null host.getStatus() == Status.Down) { _haMgr.scheduleRestartForVmsOnHost(host, true);
}
return true;
}
我们先看一下该方法最后的那个 if, 就是在特定的条件下我们最终的目的就是重启该主机上的所有虚拟机,这才是 HA 的真正目的。但是我们要记住我们进入这个 handleDisconnectWithInvestigation 方法的前提其实是很简单的,就是只要我们发现距离上一次 Ping 通该主机的时间超过比如说 2 分半钟就会进入该方法,而我们要真正执行 HA 应该是要非常确定该主机确实是挂掉了的情况下才发生的。所以该方法前面一大堆都是在反复的确认主机的状态,就如方法名所示 Inverstigation(调查)。我们假设该主机的 currentStatus 是 UP,event 我们知道是 PingTimeout, 所以 nextStatus 就是 Alert。接下来就是执行 investigate 方法
protected Status investigate(final AgentAttache agent) { final Long hostId = agent.getId();
final HostVO host = _hostDao.findById(hostId);
if (host != null host.getType() != null !host.getType().isVirtual()) { final Answer answer = easySend(hostId, new CheckHealthCommand());
if (answer != null answer.getResult()) {
final Status status = Status.Up;
return status;
}
return _haMgr.investigate(hostId);
}
return Status.Alert;
}
该方法先会向该 hostId 发送一个 CheckHealthCommand,这个时候会有两种可能:
1、如果能够接受到应答说明此时该主机是正常的就直接返回 UP 状态,我们在回到 handleDisconnectWithInvestigation 就会发现此时该任务也就基本结束了意思就是触发该方法的仅仅是临时的网络不通或者什么情况现在主机已经恢复正常
2. 那另一种情况就是 CheckHealthCommand 没有得到应答,也就是说我直接从 management-server 去请求你主机你没有反应,那也不代表你就真的挂了,接下来怎么办呢,我们去找各种侦探(investigators)去调查你是否 alive
@Override
public Status investigate(final long hostId) { final HostVO host = _hostDao.findById(hostId);
if (host == null) {
return Status.Alert;
}
Status hostState = null;
for (Investigator investigator : investigators) { hostState = investigator.isAgentAlive(host);
if (hostState != null) {
return hostState;
}
}
return hostState;
}
那假如我们的主机是一台 XenServer 的主机的话,最重要的当然是 XenServerInvestigator,我们来看它的 isAgentAlive 方法
public Status isAgentAlive(Host agent) { CheckOnHostCommand cmd = new CheckOnHostCommand(agent);
List HostVO neighbors = _resourceMgr.listAllHostsInCluster(agent.getClusterId());
for (HostVO neighbor : neighbors) { Answer answer = _agentMgr.easySend(neighbor.getId(), cmd);
if (answer != null answer.getResult()) { CheckOnHostAnswer ans = (CheckOnHostAnswer)answer;
if (!ans.isDetermined()) {
continue;
}
return ans.isAlive() ? null : Status.Down;
}
}
return null;
}
逻辑也很简单就是我直接找不到你我就去找你同一个 Cluster 中的邻居,我向你的每一个邻居主机发送一个 CheckOnHostCommand 命令,看它们能不能知道你到底怎么了。关于 CheckOnHostCommand 命令的具体实现在开头那篇官网的文章里有详细的说明
If the network ping investigation returns that it cannot detect the status of the host, CloudStack HA then relies on the hypervisor specific investigation. For VmWare, there is no such investigation as the hypervisor host handles its own HA. For XenServer and KVM, CloudStack HA deploys a monitoring script that writes the current timestamp on to a heartbeat file on shared storage. If the timestamp cannot be written, the hypervisor host self-fences by rebooting itself. For these two hypervisors, CloudStack HA sends a CheckOnHostCommand to a neighboring hypervisor host that shares the same storage. The neighbor then checks on the heartbeat file on shared storage and see if the heartbeat is no longer being written. If the heartbeat is still being written, the host reports that the host in question is still alive. If the heartbeat file’s timestamp is lagging behind, after an acceptable timeout value, the host reports that the host in question is down and HA is started on the VMs on that host.
大致的意思是 CS 会在每一个 XenServer 和 KVM 的主机上运行一段监控脚本,这个脚本会将当前时间戳写入一个在共享存储的文件中。如果某一台主机发现自己无法往文件中写入数据将会强制自己重启。那上面那段代码的逻辑就是向与该被调查的主机共享存储的其他主机发送 CheckOnHostCommand 命令,邻居主机接受到命令就去查看文件中被调查主机有没有持续的更新时间戳,如果有它就返回相应说该主机 is still alive,否则就返回说该主机 is down. 这样只有主机确实出了故障无法连接的情况下,handleDisconnectWithInvestigation 方法中的 determinedState 才会是 Status.Down,那么 event 就变成了 Status.Event.HostDown,接下来就执行 HighAvailabilityManagerImpl 的 scheduleRestartForVmsOnHost 方法重起该主机上的所以虚拟机,然后是在数据库中出入一个 HaWorkVO,然后唤醒 CS 启动的时候初始化好的 WorkerThread, 到很重要的同样是 HighAvailabilityManagerImpl 的 restart 方法
protected Long restart(final HaWorkVO work) {
boolean isHostRemoved = false;
Boolean alive = null;
if (work.getStep() == Step.Investigating) { if (!isHostRemoved) {
Investigator investigator = null;
for (Investigator it : investigators) {
investigator = it;
try
{(1) alive = investigator.isVmAlive(vm, host);
break;
} catch (UnknownVM e) { s_logger.info(investigator.getName() + could not find + vm);
}
}
boolean fenced = false;
if (alive == null) { for (FenceBuilder fb : fenceBuilders) {(2) Boolean result = fb.fenceOff(vm, host);
if (result != null result) {
fenced = true;
break;
}
}
}(3) _itMgr.advanceStop(vm.getUuid(), true);
}
}
vm = _itMgr.findById(vm.getId());(4)if (!_forceHA !vm.isHaEnabled()) {
return null; // VM doesn t require HA
}
try {
HashMap VirtualMachineProfile.Param, Object params = new HashMap VirtualMachineProfile.Param, Object(5) if (_haTag != null) { params.put(VirtualMachineProfile.Param.HaTag, _haTag);
}
WorkType wt = work.getWorkType();
if (wt.equals(WorkType.HA)) { params.put(VirtualMachineProfile.Param.HaOperation, true);
}(6) try{ _itMgr.advanceStart(vm.getUuid(), params, null);
}catch (InsufficientCapacityException e){
s_logger.warn( Failed to deploy vm + vmId + with original planner, sending HAPlanner
_itMgr.advanceStart(vm.getUuid(), params, _haPlanners.get(0));
}
}
return (System.currentTimeMillis() 10) + _restartRetryInterval;
}
如上代码我所标记的有 5 个重点需要关注的。大致的流程如下:
(1)调用各个 investigator.isVmAlive 方法,如果 isAlive 则什么都不做,否则往下走
(2)调用 fb.fenceOff 方法
(3)执行_itMgr.advanceStop 方法
(4)关于_forceHA 变量,因为我在全局变量和数据库的 configuration 表中都没有找到,所以初始化的值为 FALSE,那么也就是说只有 vm.isHaEnabled 为 ture 的 VM 才会继续执行下去,否则直接 return 了
(5)_haTag 的值是由全局变量 ha.tag 来指定的,默认为空,如果指定了这个值对后面确定 VM 分配主机很重要,记住这行代码 params.put(VirtualMachineProfile.Param.HaTag, _haTag);
(6)这里有没有很熟悉,是的,凡是读过 CS 创建 VM 实例的过程代码的人都知道这个方法就是去分配一个 VM,那么到这里整个 CS 的 HA 执行代码就完成大部分了,接下来就是重启 VM, 至于该 VM 能否重启就要依赖各种条件了,比如该 Cluster 中有没有合适的主机、主机的物理资源是否充足、有没有设置 ha.tag、VM 有没有使用标签等等这里就不再详述了。
看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注丸趣 TV 行业资讯频道,感谢您对丸趣 TV 的支持。