前言:
Zookeeper中的节点主要分为临时节点和持久节点。
持久节点在创建之后,除非主动发起删除,否则节点会一直存在;
而临时节点则不同,创建该节点的Session过期后,则该Session创建的所有临时节点都会被删除。
本文主要来从源码的角度来分析下临时节点删除的全过程。
1.SessionTrackImpl的心跳检测
既然当Session过期后,Zookeeper会删除该Session创建的所有临时节点,那么我们就可以从Session的管理器SessionTrackImpl入手。
1.1 SessionTrackImpl.run()
public class SessionTrackerImpl extends ZooKeeperCriticalThread implements SessionTracker {
synchronized public void run() {
try {
while (running) {
currentTime = Time.currentElapsedTime();
if (nextExpirationTime > currentTime) {
this.wait(nextExpirationTime - currentTime);
continue;
}
SessionSet set;
// 直接删除到期时间的所有Session
set = sessionSets.remove(nextExpirationTime);
if (set != null) {
for (SessionImpl s : set.sessions) {
// 设置Session isClosing=true
setSessionClosing(s.sessionId);
// 设置session过期处理,重点在这里,具体见1.2
expirer.expire(s);
}
}
nextExpirationTime += expirationInterval;
}
} catch (InterruptedException e) {
handleException(this.getName(), e);
}
LOG.info("SessionTrackerImpl exited loop!");
}
}
SessionTrackImpl本质上还是session桶管理的模式,所以针对到期的session桶,则清理桶中的全部session。
1.2 ZooKeeperServer.expire() 处理session过期信息
public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
public void expire(Session session) {
long sessionId = session.getSessionId();
LOG.info("Expiring session 0x" + Long.toHexString(sessionId)
+ ", timeout of " + session.getTimeout() + "ms exceeded");
// 关闭session
close(sessionId);
}
private void close(long sessionId) {
// 提交一个关闭请求
submitRequest(null, sessionId, OpCode.closeSession, 0, null, null);
}
private void submitRequest(ServerCnxn cnxn, long sessionId, int type,
int xid, ByteBuffer bb, List<Id> authInfo) {
// 请求主要就是sessionId和操作类型 closeSession
Request si = new Request(cnxn, sessionId, xid, type, bb, authInfo);
submitRequest(si);
}
public void submitRequest(Request si) {
...
try {
touch(si.cnxn);
boolean validpacket = Request.isValid(si.type);
if (validpacket) {
// 直接交由firstProcessor处理
firstProcessor.processRequest(si);
if (si.cnxn != null) {
incInProcess();
}
} else {
LOG.warn("Received packet at server of unknown type " + si.type);
new UnimplementedRequestProcessor().processRequest(si);
}
} ...
}
}
从代码分析中可以看出,closeSession也被当做一个事务请求,请求体主要包含sessionId和操作类型。
然后交由firstProcessor来处理。
2.Processor处理closeSession请求
2.1 PrepRequestProcessor.pRequest2Txn() 处理事务请求
public class PrepRequestProcessor extends ZooKeeperCriticalThread implements
RequestProcessor {
final List<ChangeRecord> outstandingChanges = new ArrayList<ChangeRecord>();
final HashMap<String, ChangeRecord> outstandingChangesForPath = new HashMap<String, ChangeRecord>();
// 处理请求
protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize)
throws KeeperException, IOException, RequestProcessorException {
request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,
Time.currentWallTime(), type);
...
switch (type) {
case OpCode.create:
...
case OpCode.closeSession:
// 获取当前session创建的所有临时节点
HashSet<String> es = zks.getZKDatabase()
.getEphemerals(request.sessionId);
synchronized (zks.outstandingChanges) {
for (ChangeRecord c : zks.outstandingChanges) {
if (c.stat == null) {
// Doing a delete
es.remove(c.path);
} else if (c.stat.getEphemeralOwner() == request.sessionId) {
es.add(c.path);
}
}
// 将临时节点删除事件包装成ChangeRecord对象放入outstandingChanges
for (String path2Delete : es) {
addChangeRecord(new ChangeRecord(request.hdr.getZxid(),
path2Delete, null, 0, null));
}
zks.sessionTracker.setSessionClosing(request.sessionId);
}
LOG.info("Processed session termination for sessionid: 0x"
+ Long.toHexString(request.sessionId));
break;
}
}
void addChangeRecord(ChangeRecord c) {
synchronized (zks.outstandingChanges) {
zks.outstandingChanges.add(c);
zks.outstandingChangesForPath.put(c.path, c);
}
}
PrepRequestProcessor只是对当前session创建的临时节点进行预处理,将这些临时节点的包装成ChangeRecord对象,并添加到zks.outstandingChanges、zks.outstandingChangesForPath两个集合中,用于后续processor处理
2.2 FinalRequestProcessor.processRequest() 最终处理请求
public class FinalRequestProcessor implements RequestProcessor {
public void processRequest(Request request) {
...
if (request.hdr != null) {
TxnHeader hdr = request.hdr;
Record txn = request.txn;
// 重要处理在这里
// 交由ZookeeperServer处理
rc = zks.processTxn(hdr, txn);
}
}
}
2.2.1 ZooKeeperServer.processTxn() 处理事务请求
public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
ProcessTxnResult rc;
int opCode = hdr.getType();
long sessionId = hdr.getClientId();
// 这里交由ZKDatabase处理,具体见2.2.2
rc = getZKDatabase().processTxn(hdr, txn);
if (opCode == OpCode.createSession) {
if (txn instanceof CreateSessionTxn) {
CreateSessionTxn cst = (CreateSessionTxn) txn;
sessionTracker.addSession(sessionId, cst
.getTimeOut());
} else {
LOG.warn("*****>>>>> Got "
+ txn.getClass() + " "
+ txn.toString());
}
} else if (opCode == OpCode.closeSession) {
sessionTracker.removeSession(sessionId);
}
return rc;
}
}
2.2.2 ZKDatabase.processTxn()
public class ZKDatabase {
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
// 交由DataTree处理
return dataTree.processTxn(hdr, txn);
}
}
2.2.3 DataTree.processTxn() 处理事务请求
public class DataTree {
public ProcessTxnResult processTxn(TxnHeader header, Record txn) {
ProcessTxnResult rc = new ProcessTxnResult();
try {
rc.clientId = header.getClientId();
rc.cxid = header.getCxid();
rc.zxid = header.getZxid();
rc.type = header.getType();
rc.err = 0;
rc.multiResult = null;
switch (header.getType()) {
case OpCode.create:
...
case OpCode.closeSession:
killSession(header.getClientId(), header.getZxid());
break;
}
}
}
void killSession(long session, long zxid) {
// 获取当前session所创建的临时节点
HashSet<String> list = ephemerals.remove(session);
if (list != null) {
for (String path : list) {
try {
// 具体处理
deleteNode(path, zxid);
if (LOG.isDebugEnabled()) {
...
}
} catch (NoNodeException e) {
LOG.warn("Ignoring NoNodeException for path " + path
+ " while removing ephemeral for dead session 0x"
+ Long.toHexString(session));
}
}
}
}
public void deleteNode(String path, long zxid)
throws KeeperException.NoNodeException {
int lastSlash = path.lastIndexOf('/');
String parentName = path.substring(0, lastSlash);
String childName = path.substring(lastSlash + 1);
DataNode node = nodes.get(path);
if (node == null) {
throw new KeeperException.NoNodeException();
}
nodes.remove(path);
synchronized (node) {
aclCache.removeUsage(node.acl);
}
DataNode parent = nodes.get(parentName);
if (parent == null) {
throw new KeeperException.NoNodeException();
}
synchronized (parent) {
// 删除父节点下该子节点信息
parent.removeChild(childName);
parent.stat.setPzxid(zxid);
long eowner = node.stat.getEphemeralOwner();
if (eowner != 0) {
HashSet<String> nodes = ephemerals.get(eowner);
if (nodes != null) {
// 删除该临时节点
synchronized (nodes) {
nodes.remove(path);
}
}
}
node.parent = null;
}
...
// 触发该临时节点的watch监听
Set<Watcher> processed = dataWatches.triggerWatch(path,
EventType.NodeDeleted);
childWatches.triggerWatch(path, EventType.NodeDeleted, processed);
childWatches.triggerWatch(parentName.equals("") ? "/" : parentName,
EventType.NodeChildrenChanged);
}
}
总结:
最终在FinalRequestProcessor中删除该session创建所有的临时节点。
删除临时节点包含三个步骤:
1.清理其父节点下当前节点信息
2.删除当前临时节点信息文章来源:https://www.toymoban.com/news/detail-611349.html
3.触发当前节点的所有监听文章来源地址https://www.toymoban.com/news/detail-611349.html
到了这里,关于Zookeeper临时节点删除时机解析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!