[Netty] FastThreadLocal (十四)

这篇具有很好参考价值的文章主要介绍了[Netty] FastThreadLocal (十四)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1.FastThreadLocal介绍

FastThreadLocal是Netty中常用的一个工具类, FastThreadLocal所使用的InternalThreadLocalMap内部不是采用哈希表, 而是直接通过数组索引的方式返回object, 省去了哈希表的查找过程, 因此效率相比于JDK的ThreadLocal更高。

2.FastThreadLocal分析

每个FastThreadLocal带有一个类型为int的index, 该属性在整个JVM中是全局唯一的, JVM中第一个实例化的FastThreadLocal的index为0, 第二个为1。

public FastThreadLocal() {
    index = InternalThreadLocalMap.nextVariableIndex();
}
 
public static int nextVariableIndex() {
    //nextIndex是一个静态变量,每次调用nextVariableIndex()都会自增1,让后赋给FastThreadLocal的index属性
    int index = nextIndex.getAndIncrement();
    if (index < 0) {
        nextIndex.decrementAndGet();
        throw new IllegalStateException("too many thread-local indexed variables");
    }
    return index;
}
 
static final AtomicInteger nextIndex = new AtomicInteger();

InternalThreadLocalMap可以通过FastThreadLocal的index值直接通过数据下标拿到相应的object。

public final V get(InternalThreadLocalMap threadLocalMap) {
    Object v = threadLocalMap.indexedVariable(index);
    if (v != InternalThreadLocalMap.UNSET) {
        return (V) v;
    }
 
    return initialize(threadLocalMap);
}
 
public Object indexedVariable(int index) {
    Object[] lookup = indexedVariables;
    return index < lookup.length? lookup[index] : UNSET;
}

如果要使用FastThreadLocal, 线程应该为FastThreadLocalThread, 内部使用InternalThreadLocalMap替换了JDK的ThreadLocalMap。

public static InternalThreadLocalMap get() {
    Thread thread = Thread.currentThread();
    if (thread instanceof FastThreadLocalThread) {
        //如果是FastThreadLocalThread,那么可以直接获取该FastThreadLocalThread的InternalThreadLocalMap
        return fastGet((FastThreadLocalThread) thread);
    } else {
        return slowGet();
    }
}
 
 
private static InternalThreadLocalMap slowGet() {
    //如果是普通的Thread,会先通过ThreadLocal找到Thread对应的InternalThreadLocalMap,该ThreadLocal是一个静态变量,在JVM中是唯一的
    ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
    InternalThreadLocalMap ret = slowThreadLocalMap.get();
    if (ret == null) {
        ret = new InternalThreadLocalMap();
        slowThreadLocalMap.set(ret);
    }
    return ret;
}

FastThreadLocal中有一个特殊的index

private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();

这个值在整个JVM中是唯一且不变的, 并且该值也是通过InternalThreadLocalMap.nextVariableIndex()来取值的, 意味着这个值永远是0。
正常的FastThreadLocal的index是从1开始的, 因为InternalThreadLocalMap中index为0的object是一个特殊的object。

private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
    Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
    Set<FastThreadLocal<?>> variablesToRemove;
    if (v == InternalThreadLocalMap.UNSET || v == null) {
        variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());
        threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);
    } else {
        variablesToRemove = (Set<FastThreadLocal<?>>) v;
    }
 
    variablesToRemove.add(variable);
}

每个FastThreadLocalThread的InternalThreadLocalMap中index为0的object是一个Set<FastThreadLocal<?>>, 这个set保存了FastThreadLocalThread所用到的所有的FastThreadLocal, 如果要删除FastThreadLocalThread中的所有Object, 直接删除set即可

public static void removeAll() {
    //获取FastThreadLocalThread的InternalThreadLocalMap
    InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
    if (threadLocalMap == null) {
        return;
    }
 
    try {
        //获取index为variablesToRemoveIndex的object,也就是上面提到的index为0的特殊的object,他是一个Set
        Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
        if (v != null && v != InternalThreadLocalMap.UNSET) {
            @SuppressWarnings("unchecked")
            //将object强转为Set
            Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
            //获取该FastThreadLocalThread的所有的FastThreadLocal
            FastThreadLocal<?>[] variablesToRemoveArray =
                    variablesToRemove.toArray(new FastThreadLocal[variablesToRemove.size()]);
            for (FastThreadLocal<?> tlv: variablesToRemoveArray) {
                //依次调用这些FastThreadLocal的remove方法
                tlv.remove(threadLocalMap);
            }
        }
    } finally {
        //最后将该FastThreadLocal的InternalThreadLocalMap置为null
        InternalThreadLocalMap.remove();
    }
}

FastThreadLocal.removeAll()方法会在DefaultThreadFactory中被调用, 通过DefaultThreadFactory这个工厂类new出来的Thread都是FastThreadLocalThread。

public void run() {
    try {
        r.run();
    } finally {
        FastThreadLocal.removeAll();
    }
}

每个线程在结束后都会调用FastThreadLocal.removeAll(), 这样该线程所有通过FastThreadLocal设置的Object在线程结束后都会被置为null, 避免了内存泄露。

3.FastThreadLocal结构分析

[Netty] FastThreadLocal (十四)

  1. InternalThreadLocalMap中并不是Entry的key-value结构, 而是Object数组
  2. 索引0位置存放FastThreadLocal的Set集合, 其他索引位置初始化为UNSET, 数据存入的时候更新为具体的Object
  3. FastThreadLocal中包含一个自增的index表示在InternalThreadLocalMap的数组中的索引位置
  4. Set<FastThreadLocal<?>>结构中存放FastThreadLocal的引用, 更容易解决内存泄漏的问题

4.FastThreadLocal方法分析

public class FastThreadLocalTest {
    private static FastThreadLocal<Object> threadLocal =
            new FastThreadLocal<Object>(){
                @Override
                protected Object initialValue() throws Exception {
                    return new Object();
                }
            };
    // 每个线程拿到的对象都是线程独享
    // 线程对对象的修改不会影响其他线程
    public static void main(String[] args) {
        new Thread(() -> {
            // 1.获取ThreadLocalMap
            // 2.直接通过索引取出对象
            // 3.初始化对象, 如果没有对象的话
            Object o = threadLocal.get();
            System.out.println(o);

            while (true){
                // 1.获取ThreadLocalMap
                // 2.直接通过索引set对象
                // 3.remove
                threadLocal.set(new Object());
                try {
                    Thread.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(() -> {
            Object o = threadLocal.get();
            System.out.println(o);

            while (true){
                System.out.println(threadLocal.get() == o);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}
  • get方法
  • set方法
4.1 FastThreadLocal.get()
    public final V get() {
        InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
        Object v = threadLocalMap.indexedVariable(index);
        if (v != InternalThreadLocalMap.UNSET) {
            return (V) v;
        }

        return initialize(threadLocalMap);
    }

public static InternalThreadLocalMap get() {
    Thread thread = Thread.currentThread();
    if (thread instanceof FastThreadLocalThread) { // 当前线程是否为 FastThreadLocalThread 类型
        return fastGet((FastThreadLocalThread) thread);
    } else {
        return slowGet();
    }
}
private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
    InternalThreadLocalMap threadLocalMap = thread.threadLocalMap(); // 获取 FastThreadLocalThread 的 threadLocalMap 属性
    if (threadLocalMap == null) {
        thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
    }
    return threadLocalMap;
}
private static InternalThreadLocalMap slowGet() {
    ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap; 
    InternalThreadLocalMap ret = slowThreadLocalMap.get(); // 从 JDK 原生 ThreadLocal 中获取 InternalThreadLocalMap
    if (ret == null) {
        ret = new InternalThreadLocalMap();
        slowThreadLocalMap.set(ret);
    }
    return ret;
}
  1. 获取ThreadLocalMap

    • FastThreadLocalThread: fastGet() 方法获取 FastThreadLocalThread 的threadLocalMap 属性。
    • ThreadLocal: slowGet() 方法获取 InternalThreadLocalMap 就退化成 JDK 原生的 ThreadLocal 获取。
  2. 直接通过索引取出对象
    [Netty] FastThreadLocal (十四)

  3. 初始化对象, 如果没有对象的话
    [Netty] FastThreadLocal (十四)

4.2 FastThreadLocal.set()
public final void set(V value) {
    if (value != InternalThreadLocalMap.UNSET) { // 1\. value 是否为缺省值
        InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); // 2\. 获取当前线程的 InternalThreadLocalMap
        setKnownNotUnset(threadLocalMap, value); // 3\. 将 InternalThreadLocalMap 中数据替换为新的 value
    } else {
        remove();
    }
}
  1. 判断 value 是否为缺省值
  2. 获取当前线程的InternalThreadLocalMap
  3. 将 InternalThreadLocalMap 中对应数据替换为新的 value
  4. remove

setKnownNotUnset(): 将数据添加到 InternalThreadLocalMap

    private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
        if (threadLocalMap.setIndexedVariable(index, value)) {
            addToVariablesToRemove(threadLocalMap, this);
        }
    }

    public boolean setIndexedVariable(int index, Object value) {
        Object[] lookup = indexedVariables;
        if (index < lookup.length) {
            Object oldValue = lookup[index];
            lookup[index] = value;
            return oldValue == UNSET;
        } else {
        	// 扩容
            expandIndexedVariableTableAndSet(index, value);
            return true;
        }
    }

    private void expandIndexedVariableTableAndSet(int index, Object value) {
        Object[] oldArray = indexedVariables;
        final int oldCapacity = oldArray.length;
        int newCapacity = index;
        newCapacity |= newCapacity >>>  1;
        newCapacity |= newCapacity >>>  2;
        newCapacity |= newCapacity >>>  4;
        newCapacity |= newCapacity >>>  8;
        newCapacity |= newCapacity >>> 16;
        newCapacity ++;

        Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
        Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);
        newArray[index] = value;
        indexedVariables = newArray;
    }

private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
    Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex); // 获取数组下标为 0 的元素
    Set<FastThreadLocal<?>> variablesToRemove;
    if (v == InternalThreadLocalMap.UNSET || v == null) {
        variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>()); // 创建 FastThreadLocal 类型的 Set 集合
        threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove); // 将 Set 集合填充到数组下标 0 的位置
    } else {
        variablesToRemove = (Set<FastThreadLocal<?>>) v; // 如果不是 UNSET,Set 集合已存在,直接强转获得 Set 集合
    }
    variablesToRemove.add(variable); // 将 FastThreadLocal 添加到 Set 集合中
}
  1. 找到数组下标 index 位置

    1. 如果数组容量大于 FastThreadLocal 的 index 索引, 直接找到数组下标 index 位置将新 value 设置进去
    2. 在设置新的 value 之前, 将index 位置的元素取出, 如果旧元素还是UNSET缺省对象, 返回成功
  2. expandIndexedVariableTableAndSet(): 扩容

  3. addToVariablesToRemove(): 向 InternalThreadLocalMap 添加完数据之后, 将 FastThreadLocal 对象保存到待清理的 Set 中

remove():文章来源地址https://www.toymoban.com/news/detail-416819.html

public final void remove() {
    remove(InternalThreadLocalMap.getIfSet());
}
public static InternalThreadLocalMap getIfSet() {
    Thread thread = Thread.currentThread();
    if (thread instanceof FastThreadLocalThread) {
        return ((FastThreadLocalThread) thread).threadLocalMap();
    }
    return slowThreadLocalMap.get();
}
public final void remove(InternalThreadLocalMap threadLocalMap) {
    if (threadLocalMap == null) {
        return;
    }
    Object v = threadLocalMap.removeIndexedVariable(index); // 删除数组下标 index 位置对应的 value
    removeFromVariablesToRemove(threadLocalMap, this); // 从数组下标 0 的位置取出 Set 集合,并删除当前 FastThreadLocal
    if (v != InternalThreadLocalMap.UNSET) {
        try {
            onRemoval((V) v); // 空方法,用户可以继承实现
        } catch (Exception e) {
            PlatformDependent.throwException(e);
        }
    }
}
  1. 调用 InternalThreadLocalMap.getIfSet() 获取当前 InternalThreadLocalMap
    • 如果是 FastThreadLocalThread 类型, 直接取 FastThreadLocalThread 中 threadLocalMap 属性
    • 如果是普通Thread, 从 ThreadLocal 类型的 slowThreadLocalMap 中获取
  2. InternalThreadLocalMap 会从数组中定位到下标 index 位置的元素, 覆盖为缺省对象 UNSET
  3. 清理当前的 FastThreadLocal 对象, InternalThreadLocalMap 会取出数组下标 0 位置的 Set 集合, 删除当前FastThreadLocal

到了这里,关于[Netty] FastThreadLocal (十四)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Netty中NioEventLoop介绍

            Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。Netty 在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性。         Netty 是一个基于NIO的客户、

    2024年02月08日
    浏览(26)
  • 网络编程的无冕之王-Netty入门和核心组件介绍

    最近我在研究Netty,之前只是经常听说,并没有实际做过研究,为什么突然要好好研究一下它,主要是因为前段时间,我在看RocketMQ底层原理的时候发现它的底层的网络通信都是基于Netty,然后网上一查,果然,大家太多的耳熟能详的工具组件,都是基于Netty做的开发。大家看

    2024年02月10日
    浏览(49)
  • LINUX基础培训十四之系统参数介绍

    前言、本章学习目标  了解LINUX中常见系统内核参数 掌握常见系统参数优化方法 Linux内核有非常多的参数,而对这些内核参数的修改会尽可能的提高内核的稳定性,并且,在业务高峰期的时候,可以保证内核尽可能的稳定高效。 Linux的内核参数是可以按照它们的功能进行分类

    2024年01月25日
    浏览(34)
  • 大数据Flink(六十四):Flink运行时架构介绍

    文章目录 Flink运行时架构介绍 一、系统架构 二、​​​​​​​​​​​​​​整体构成 三、作业管理器(JobManager) 四、任务管理器(TaskManager) 我们已经对 Flink 的主要特性和部署提交有了基本的了解,那它的内部又是怎样工作的,集群配置设置的一些参数又到底有什么

    2024年02月11日
    浏览(52)
  • elasticSearch核心概念的介绍(十四):ES集群索引分片管理

    上一章节我们对ES的集群进行了搭建,有兴趣的朋友可以参考一下elasticSearch核心概念的介绍(十三):docker搭建ES集群 这里我们来介绍了ES集群索引的分片管理 ES集群索引分片管理 介绍 分片(shard):因为ES是个分布式的搜索引擎,所以索引通常都会分解成不同部分,而这些

    2023年04月27日
    浏览(56)
  • 【C++从0到王者】第十四站:list基本使用及其介绍

    如下所示,是库里面对list的基本介绍 链表是序列容器,允许在序列内的任何位置进行常量时间的插入和擦除操作,以及两个方向的迭代。 链表容器被实现为双链表;双链表可以将它们包含的每个元素存储在不同且不相关的存储位置。排序是通过与前面元素的链接和后面元素的

    2024年02月14日
    浏览(46)
  • 【springboot+vue项目(十四)】基于Oauth2的SSO单点登录(一)整体流程介绍

    场景:现在有一个前后端分离的系统,前端框架使用vue-element-template,后端框架使用springboot+springSecurity+JWT+Redis(登录部分)现在需要接入到 已经存在的第三方基于oauth2.0的非标准接口 统一认证系统。  温馨提示:如果是接入到 基于oauth2.0的 标准接口的认证服务系统,可以直

    2024年02月19日
    浏览(44)
  • Linux内核(十四)Input 子系统详解 I —— 子系统介绍以及相关结构体解析

    input子系统就是管理输入的子系统 ,和Linux其他子系统一样,都是Linux内核针对某一类设备而创建的框架。 鼠标、键盘、触摸屏等都属于输入设备,Linux将这些设备的共同特性抽象出来,这就形成了input子系统的框架。 Linux内核只需要通过input框架向用户层上报输入事件 (如:

    2024年02月05日
    浏览(44)
  • 【Netty】Netty 解码器(十二)

    回顾Netty系列文章: Netty 概述(一) Netty 架构设计(二) Netty Channel 概述(三) Netty ChannelHandler(四) ChannelPipeline源码分析(五) 字节缓冲区 ByteBuf (六)(上) 字节缓冲区 ByteBuf(七)(下) Netty 如何实现零拷贝(八) Netty 程序引导类(九) Reactor 模型(十) 工作原理

    2024年02月07日
    浏览(54)
  • 【netty基础四】netty与nio

    阻塞I/O在调用InputStream.read()方法时是 阻塞的,它会一直等到数据到来 (或超时)时才会返回; 同样,在调用ServerSocket.accept()方法时,也会一直 阻塞到有客户端连接 才会返回,每个客户端连接成功后,服务端都会启动一个线程去处理该客户端的请求。 阻塞I/O的通信模型示意

    2024年02月10日
    浏览(87)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包