Zookeeper临时节点删除时机解析

这篇具有很好参考价值的文章主要介绍了Zookeeper临时节点删除时机解析。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言:

Zookeeper中的节点主要分为临时节点和持久节点。

持久节点在创建之后,除非主动发起删除,否则节点会一直存在;

而临时节点则不同,创建该节点的Session过期后,则该Session创建的所有临时节点都会被删除。

本文主要来从源码的角度来分析下临时节点删除的全过程。

1.SessionTrackImpl的心跳检测

既然当Session过期后,Zookeeper会删除该Session创建的所有临时节点,那么我们就可以从Session的管理器SessionTrackImpl入手。

1.1 SessionTrackImpl.run()

public class SessionTrackerImpl extends ZooKeeperCriticalThread implements SessionTracker {
 
    synchronized public void run() {
        try {
            while (running) {
                currentTime = Time.currentElapsedTime();
                if (nextExpirationTime > currentTime) {
                    this.wait(nextExpirationTime - currentTime);
                    continue;
                }
                SessionSet set;
                // 直接删除到期时间的所有Session
                set = sessionSets.remove(nextExpirationTime);
                if (set != null) {
                    for (SessionImpl s : set.sessions) {
                        // 设置Session isClosing=true
                        setSessionClosing(s.sessionId);
                        // 设置session过期处理,重点在这里,具体见1.2
                        expirer.expire(s);
                    }
                }
                nextExpirationTime += expirationInterval;
            }
        } catch (InterruptedException e) {
            handleException(this.getName(), e);
        }
        LOG.info("SessionTrackerImpl exited loop!");
    }
}

SessionTrackImpl本质上还是session桶管理的模式,所以针对到期的session桶,则清理桶中的全部session。

1.2 ZooKeeperServer.expire() 处理session过期信息

public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
	public void expire(Session session) {
        long sessionId = session.getSessionId();
        LOG.info("Expiring session 0x" + Long.toHexString(sessionId)
                + ", timeout of " + session.getTimeout() + "ms exceeded");
        // 关闭session
        close(sessionId);
    }
    
    private void close(long sessionId) {
        // 提交一个关闭请求
        submitRequest(null, sessionId, OpCode.closeSession, 0, null, null);
    }
    
    private void submitRequest(ServerCnxn cnxn, long sessionId, int type,
            int xid, ByteBuffer bb, List<Id> authInfo) {
        // 请求主要就是sessionId和操作类型 closeSession
        Request si = new Request(cnxn, sessionId, xid, type, bb, authInfo);
        submitRequest(si);
    }
    
    public void submitRequest(Request si) {
        ...
        try {
            touch(si.cnxn);
            boolean validpacket = Request.isValid(si.type);
            if (validpacket) {
                // 直接交由firstProcessor处理
                firstProcessor.processRequest(si);
                if (si.cnxn != null) {
                    incInProcess();
                }
            } else {
                LOG.warn("Received packet at server of unknown type " + si.type);
                new UnimplementedRequestProcessor().processRequest(si);
            }
        } ...
    }
}

从代码分析中可以看出,closeSession也被当做一个事务请求,请求体主要包含sessionId和操作类型。

然后交由firstProcessor来处理。

2.Processor处理closeSession请求

2.1 PrepRequestProcessor.pRequest2Txn() 处理事务请求

public class PrepRequestProcessor extends ZooKeeperCriticalThread implements
        RequestProcessor {
    
    final List<ChangeRecord> outstandingChanges = new ArrayList<ChangeRecord>();
    final HashMap<String, ChangeRecord> outstandingChangesForPath = new HashMap<String, ChangeRecord>();
    
    // 处理请求
 	protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize)
        throws KeeperException, IOException, RequestProcessorException {
        
        request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,
                                    Time.currentWallTime(), type);
		...
        switch (type) {
            case OpCode.create:
         	...
            case OpCode.closeSession:
                // 获取当前session创建的所有临时节点
                HashSet<String> es = zks.getZKDatabase()
                        .getEphemerals(request.sessionId);
                synchronized (zks.outstandingChanges) {
                    for (ChangeRecord c : zks.outstandingChanges) {
                        if (c.stat == null) {
                            // Doing a delete
                            es.remove(c.path);
                        } else if (c.stat.getEphemeralOwner() == request.sessionId) {
                            es.add(c.path);
                        }
                    }
                    // 将临时节点删除事件包装成ChangeRecord对象放入outstandingChanges
                    for (String path2Delete : es) {
                        addChangeRecord(new ChangeRecord(request.hdr.getZxid(),
                                path2Delete, null, 0, null));
                    }

                    zks.sessionTracker.setSessionClosing(request.sessionId);
                }

                LOG.info("Processed session termination for sessionid: 0x"
                        + Long.toHexString(request.sessionId));
                break;
        }     
    }
    
    void addChangeRecord(ChangeRecord c) {
        synchronized (zks.outstandingChanges) {
            zks.outstandingChanges.add(c);
            zks.outstandingChangesForPath.put(c.path, c);
    }
}

PrepRequestProcessor只是对当前session创建的临时节点进行预处理,将这些临时节点的包装成ChangeRecord对象,并添加到zks.outstandingChanges、zks.outstandingChangesForPath两个集合中,用于后续processor处理

2.2 FinalRequestProcessor.processRequest() 最终处理请求

public class FinalRequestProcessor implements RequestProcessor {
    public void processRequest(Request request) {
     	...
        if (request.hdr != null) {
               TxnHeader hdr = request.hdr;
               Record txn = request.txn;
			   // 重要处理在这里
               // 交由ZookeeperServer处理
               rc = zks.processTxn(hdr, txn);
        }
    }
}

2.2.1 ZooKeeperServer.processTxn() 处理事务请求

public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
	public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
        ProcessTxnResult rc;
        int opCode = hdr.getType();
        long sessionId = hdr.getClientId();
        // 这里交由ZKDatabase处理,具体见2.2.2
        rc = getZKDatabase().processTxn(hdr, txn);
        if (opCode == OpCode.createSession) {
            if (txn instanceof CreateSessionTxn) {
                CreateSessionTxn cst = (CreateSessionTxn) txn;
                sessionTracker.addSession(sessionId, cst
                        .getTimeOut());
            } else {
                LOG.warn("*****>>>>> Got "
                        + txn.getClass() + " "
                        + txn.toString());
            }
        } else if (opCode == OpCode.closeSession) {
            sessionTracker.removeSession(sessionId);
        }
        return rc;
    }
}

2.2.2 ZKDatabase.processTxn() 

public class ZKDatabase {
	public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
        // 交由DataTree处理
        return dataTree.processTxn(hdr, txn);
    }
}

2.2.3 DataTree.processTxn() 处理事务请求

public class DataTree {
	public ProcessTxnResult processTxn(TxnHeader header, Record txn) {
        ProcessTxnResult rc = new ProcessTxnResult();

        try {
            rc.clientId = header.getClientId();
            rc.cxid = header.getCxid();
            rc.zxid = header.getZxid();
            rc.type = header.getType();
            rc.err = 0;
            rc.multiResult = null;
            switch (header.getType()) {
                case OpCode.create:
                ...
                case OpCode.closeSession:
                    killSession(header.getClientId(), header.getZxid());
                    break;
            }
        }
	}
    
    void killSession(long session, long zxid) {
        // 获取当前session所创建的临时节点
        HashSet<String> list = ephemerals.remove(session);
        if (list != null) {
            for (String path : list) {
                try {
                    // 具体处理
                    deleteNode(path, zxid);
                    if (LOG.isDebugEnabled()) {
                        ...
                    }
                } catch (NoNodeException e) {
                    LOG.warn("Ignoring NoNodeException for path " + path
                            + " while removing ephemeral for dead session 0x"
                            + Long.toHexString(session));
                }
            }
        }
    }
    
    public void deleteNode(String path, long zxid)
            throws KeeperException.NoNodeException {
        int lastSlash = path.lastIndexOf('/');
        String parentName = path.substring(0, lastSlash);
        String childName = path.substring(lastSlash + 1);
        DataNode node = nodes.get(path);
        if (node == null) {
            throw new KeeperException.NoNodeException();
        }
        nodes.remove(path);
        synchronized (node) {
            aclCache.removeUsage(node.acl);
        }
        DataNode parent = nodes.get(parentName);
        if (parent == null) {
            throw new KeeperException.NoNodeException();
        }
        synchronized (parent) {
            // 删除父节点下该子节点信息
            parent.removeChild(childName);
            parent.stat.setPzxid(zxid);
            long eowner = node.stat.getEphemeralOwner();
            if (eowner != 0) {
                HashSet<String> nodes = ephemerals.get(eowner);
                if (nodes != null) {
                    // 删除该临时节点
                    synchronized (nodes) {
                        nodes.remove(path);
                    }
                }
            }
            node.parent = null;
        }
        ...

        // 触发该临时节点的watch监听    
        Set<Watcher> processed = dataWatches.triggerWatch(path,
                EventType.NodeDeleted);
        childWatches.triggerWatch(path, EventType.NodeDeleted, processed);
        childWatches.triggerWatch(parentName.equals("") ? "/" : parentName,
                EventType.NodeChildrenChanged);
    }
}

总结:

最终在FinalRequestProcessor中删除该session创建所有的临时节点。

删除临时节点包含三个步骤:

1.清理其父节点下当前节点信息

2.删除当前临时节点信息

3.触发当前节点的所有监听文章来源地址https://www.toymoban.com/news/detail-611349.html

到了这里,关于Zookeeper临时节点删除时机解析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Zookeeper专题】Zookeeper特性与节点数据类型详解

    对于我们这些JavaCoder来说,市面上有各式各样,功能相似的中间件供我们使用。我想大家应该都清楚,要认识一个中间件,最好的方式应该是从它的创造背景开始说起。 ( PS:Zookeeper主要用来解决分布式集群中应用系统的一致性问题 ) ( PS:个人认为ZK最重要,或者说最有

    2024年02月06日
    浏览(43)
  • zookeeper第一课-Zookeeper特性与节点数据类型详解

    ZooKeeper 是一个开源的分布式协调框架,是Apache Hadoop 的一个子项目,主要用来解决分布式集群中应用系统的一致性问题。 Zookeeper 的设计目标是将那些复杂且容易出错的分布式一致性服务封装起来,构成一个高效可靠的原语集,并以一系列简单易用的接口提供给用户使用。

    2024年02月13日
    浏览(47)
  • Java操作Zookeeper节点

    引入jar包: zookeeper的权限: ZooKeeper提供了如下几种验证模式(scheme): • digest:Client端由用户名和密码验证,譬如user:password,digest的密码生成方式是Sha1摘要的base64形式 • auth:不使用任何id,代表任何已确认用户。 • ip:Client端由IP地址验证,譬如172.2.0.0/24 • world:固定

    2024年02月10日
    浏览(40)
  • 四、Zookeeper节点类型

    目录 1、临时节点 2、永久节点 Znode有两种,分别为 临时节点 和 永久节点 。 节点的类型在创建时即被确定,并且不能改变。 临时节点 的生命周期依赖于创建它们的会话。一旦 会话结束,临时节点将被自动删除 ,

    2024年02月19日
    浏览(31)
  • Zookeeper篇——深入认识和学习Zookeeper节点-Znode,涵盖概念以及详细操作节点命令,保姆级教程,超详细、超全面!!!

    Zookeeper是一个分布式的协调服务,它通过维护一个分层的数据结构来存储和管理数据。这个数据结构被称为znode节点。每个znode节点在Zookeeper的命名空间中都有一个唯一的路径,类似于文件系统中的路径。 Zookeeper中的znode节点有以下几种类型: 永久节点(Persistent znode):永久

    2024年02月03日
    浏览(51)
  • Zookeeper(一)特性与节点数据

            1、 一致性(Consistency): 在分布式环境中,一致性是指数据在多个副本之间是否能够保持一直的特性;         2、 可用性(Availability): 每次请求都能获得正确的响应,但不保证获取的数据为最新数据;         3、 分区容错性(Partition tolerance): 分布式系统

    2024年02月04日
    浏览(46)
  • zookeeper之节点基本操作(头歌)

    开启ZooKeeper服务器。 使用客户端(zkCli.sh)连接客户端(IP:127.0.0.1,端口号:2181)。 创建/enode临时节点(节点数据为空)。 创建/spnode持久节点(节点数据为空)。 断开客户端(zkCli.sh)与客服端连接。 本关任务是使用命令行,进行以下操作: 开启ZooKeeper服务器。 使用客

    2024年02月03日
    浏览(51)
  • ZooKeeper数据模型/znode节点深入

    1、Znode的数据模型 1.1 Znode是什么? Znode维护了一个stat结构,这个stat包含数据变化的版本号、访问控制列表变化、还有时间戳。版本号和时间戳一起,可让Zookeeper验证缓存和协调更新。每次znode的数据发生了变化,版本号就增加。 1.2 ZooKeeper的Stat结构体 例: (1)创建新的子

    2024年02月10日
    浏览(30)
  • 使用Zookeeper对集群节点进行管理

    Zookeeper是Hadoop生态系统中分布式的服务管理框架,负责存储和管理集群中的公共数据如配置信息等,并且对节点进行注册和通知管理。它具有如下几个特点: 集群由一个领导者(Leader),多个跟随者(Follower)组成 集群中只要有半数以上节点存活,Zookeeper集群就能正常服务。

    2024年02月06日
    浏览(39)
  • zookeeper中节点信息的查看方式

    1.先到zookeeper的bin目录下 2、输入命令./zkCli.sh 3.输入命令 ls / 4、输入命令 ls /(节点名称) 即可查看想查看的节点信息

    2024年02月11日
    浏览(39)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包