Java多线程篇(1)——深入分析synchronized

这篇具有很好参考价值的文章主要介绍了Java多线程篇(1)——深入分析synchronized。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

synchronized

原理概述

synchronized实现原理的关键字由浅入深依次为
字节码层面:monitorenter/monitorexit 指令
java对象层面: Mark Word 对象头
JVM层面: CAS、自旋 、 ObjectMonitor(MESA管层模型:cxq,entryList,wait三个队列)
操作系统层面: mutex 锁

其中 mark word 对象头如下图:
Java多线程篇(1)——深入分析synchronized,Java基础,java

锁升级

说到锁升级,我相信很多人都错误的认为升级过程是这样的:初始状态无锁,第一个线程进来升级成偏向锁,假如偏向锁还没释放又再有线程进来就会cas+自旋去获取轻量级锁,如果自旋超过一定次数就膨胀成重量级锁 。但其实这种说法是不正确的。

这其中有三个常见的误区:
误区一:初始状态不是无锁,而是偏向锁(匿名偏向锁)。
误区二:无锁不会升级成偏向锁,只能升级成轻量级锁或者重量级锁。
误区三:轻量级获锁没有自旋,只要一次CAS失败就会膨胀成重量级锁。自旋是重量级锁为了尽可能的不阻塞线程,在实际阻塞之前做的一些重试操作。

实际的锁升级操作是:初始为偏向锁(匿名偏向锁),A线程进来则偏向该线程(即使线程退出了对象头仍然偏向A线程)。后面B线程进来就会发现该对象锁已经偏向线程A了,就会撤销该对象锁的偏向并升级成轻量级锁,轻量级锁释放的时候又变成无锁状态。后续再有线程C进来,就由无锁直接变成轻量级锁,如果在C线程轻量级锁释放锁之前再有线程D进来就膨胀成重量级锁,直到最后都没有线程占用锁就恢复成无锁状态。

在理解上面单个对象锁升级过程后再来理解 JVM 对偏向锁做的一些优化(因为偏向锁撤销是有一定性能开销的,需要等到另一个线程到达安全点才能撤销):在多个对象锁的情况下,如果所有对象锁撤销偏向总数达到批量重偏向阈值(默认20)就会触发批量重偏向(将该类epoch +1,且当前正在生效的偏向锁epoch也同步+1,表示偏向锁进入下一代。之前旧的 epoch 则说明已过期,过期epoch的锁对象下次获锁时可以重偏向)。当撤销偏向总数达到批量撤销阈值(默认40)就会触发批量偏向撤销(将该类是否偏向锁标记为0,标记该 class 为不可偏向,并且撤销当前正在持有的偏向锁,后续new的对象锁也不再是偏向锁,而是无锁状态,表示对于该 class 直接执行轻/重量级锁的逻辑。)

口说无凭,下面就结合案例+源码来看看上面说的对不对。


初始状态

前面说到初始状态是匿名偏向锁,而不是无锁,这里来验证一下:
Java多线程篇(1)——深入分析synchronized,Java基础,java
可以看到锁标记是101,说明这是一个偏向锁,再观察仔细一点会发现这个偏向锁没有偏向任何一个线程。相信看到这里大家也明白了匿名偏向锁就是不偏向任何线程的偏向锁。

是否开启偏向锁是可以配置(jdk6之后默认开启):
XX:+UseBiasedLocking:开启偏向锁功能
XX:-UseBiasedLocking:关闭偏向锁功能
在一些老的jdk版本中(具体多老,我也没去研究,至少jdk8是),偏向锁存在4s延迟偏向——在JVM启动4s后创建出来的对象才会开启偏向,不过这个延迟也是可以通过JVM参数设置的:
-XX:BiasedLockingStartupDelay=0 将延迟改为0

偏向锁

偏向锁获取/重入

至此可以确认,锁初始状态是匿名偏向锁。
那么当第一个线程进来发生了啥,我们直接看到JVM处理 monitorenter 指令的源码

bytecodeInterpreter.cpp#BytecodeInterpreter::run#case(_monitorenter)

CASE(_monitorenter): {
		//得到栈顶元素,其实就是锁记录的对象
        oop lockee = STACK_OBJECT(-1);
        CHECK_NULL(lockee);
        //找到一个该对象可用的锁记录
        BasicObjectLock* limit = istate->monitor_base();
        BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
        BasicObjectLock* entry = NULL;
        while (most_recent != limit ) {
          if (most_recent->obj() == NULL) entry = most_recent;
          else if (most_recent->obj() == lockee) break;
          most_recent++;
        }
        
		//一般都可以找到
        if (entry != NULL) {
          //绑定锁记录和对象
          entry->set_obj(lockee);
          int success = false;
          uintptr_t epoch_mask_in_place = (uintptr_t)markOopDesc::epoch_mask_in_place;

          markOop mark = lockee->mark();
          intptr_t hash = (intptr_t) markOopDesc::no_hash;
          // 判断是否为偏向模式,即 Mark Word 最后三位是否为 101
          if (mark->has_bias_pattern()) {
            uintptr_t thread_ident;
            uintptr_t anticipated_bias_locking_value;
            thread_ident = (uintptr_t)istate->thread();
            anticipated_bias_locking_value =
              (((uintptr_t)lockee->klass()->prototype_header() | thread_ident) ^ (uintptr_t)mark) &
              ~((uintptr_t) markOopDesc::age_mask_in_place);

			// 分支一:如果为0说明偏向当前线程,且 class 的 epoch 等于 Mark Word 的 epoch,则偏向锁重入
            if  (anticipated_bias_locking_value == 0) {
              if (PrintBiasedLockingStatistics) {
                (* BiasedLocking::biased_lock_entry_count_addr())++;
              }
              success = true;
            }
            // 分支二:如果class的最后三位不为101,说明class关闭了偏向模式(批量撤销导致),则sucess为false,后续会尝试撤销偏向锁
            else if ((anticipated_bias_locking_value & markOopDesc::biased_lock_mask_in_place) != 0) {
              markOop header = lockee->klass()->prototype_header();
              if (hash != markOopDesc::no_hash) {
                header = header->copy_set_hash(hash);
              }
              if (Atomic::cmpxchg_ptr(header, lockee->mark_addr(), mark) == mark) {
                if (PrintBiasedLockingStatistics)
                  (*BiasedLocking::revoked_lock_entry_count_addr())++;
              }
            }
            // 分支三:如果epoch不相等,说明偏向锁已过期(批量重偏向导致),则尝试重偏向
            else if ((anticipated_bias_locking_value & epoch_mask_in_place) !=0) {
              markOop new_header = (markOop) ( (intptr_t) lockee->klass()->prototype_header() | thread_ident);
              if (hash != markOopDesc::no_hash) {
                new_header = new_header->copy_set_hash(hash);
              }
              if (Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), mark) == mark) {
                if (PrintBiasedLockingStatistics)
                  (* BiasedLocking::rebiased_lock_entry_count_addr())++;
              }
              else {
                CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
              }
              success = true;
            }
            // 分支四:到这个分支要么是匿名偏向锁,要么是正在偏向别的线程
            else {
              markOop header = (markOop) ((uintptr_t) mark & ((uintptr_t)markOopDesc::biased_lock_mask_in_place |
                                                              (uintptr_t)markOopDesc::age_mask_in_place |
                                                              epoch_mask_in_place));
              if (hash != markOopDesc::no_hash) {
                header = header->copy_set_hash(hash);
              }
              markOop new_header = (markOop) ((uintptr_t) header | thread_ident);
              DEBUG_ONLY(entry->lock()->set_displaced_header((markOop) (uintptr_t) 0xdeaddead);)
              //如果是匿名偏向就直接偏向当前线程
              if (Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), header) == header) {
                if (PrintBiasedLockingStatistics)
                  (* BiasedLocking::anonymously_biased_lock_entry_count_addr())++;
              }
              //反之就调用InterpreterRuntime::monitorenter撤销当前偏向锁并升级成轻量级锁
              else {
                CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
              }
              success = true;
            }
          }

          //实际上只有分支2会进入到里面的代码,因为其他分支sucess都为true
          //这段代码的逻辑其实主要也是撤销偏向锁并升级成轻量级锁,那为什么不和分支4的撤销偏向锁写在一块?
          //我认为是因为分支4肯定是不同线程不需要考虑轻量级锁重入,而这个需要
          if (!success) {
            markOop displaced = lockee->mark()->set_unlocked();
            entry->lock()->set_displaced_header(displaced);
            bool call_vm = UseHeavyMonitors;
            if (call_vm || Atomic::cmpxchg_ptr(entry, lockee->mark_addr(), displaced) != displaced) {
               // 如果是轻量级锁重入,将 Displaced Mark Word 设置为 NULL,标记这是一次重入,后续会对标记做轻量级锁的重入逻辑
              if (!call_vm && THREAD->is_lock_owned((address) displaced->clear_lock_bits())) {
                entry->lock()->set_displaced_header(NULL);
              }
              // 反之调用InterpreterRuntime::monitorenter撤销偏向锁并升级成轻量级锁
              else {
                CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
              }
            }
          }
          UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
        } else {
          istate->set_msg(more_monitors);
          UPDATE_PC_AND_RETURN(0);
        }
      }

根据注释不难看出,匿名偏向锁到偏向锁的过程就在分支四。

			// 分支四:到这个分支要么是匿名偏向锁,要么是正在偏向别的线程
            else {
              //...
              //如果是匿名偏向就直接偏向当前线程
              if (Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), header) == header) {
                if (PrintBiasedLockingStatistics)
                  (* BiasedLocking::anonymously_biased_lock_entry_count_addr())++;
              }
              //反之就调用InterpreterRuntime::monitorenter撤销当前偏向锁并升级
              else {
                CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
              }
              //...
            }
          }

其实就是CAS去替换mark work的thread id,只有原本是匿名偏向的情况下才会替换成功,如果替换失败就说明已偏向其他线程,就调用 InterpreterRuntime::monitorenter 撤销当前偏向锁并升级

偏向锁的撤销/重偏向和升级

interpreterRuntime.cpp#InterpreterRuntime::monitorenter

IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem))
  //...
  //如果开启了偏向锁模式,就进入fast_enter
  if (UseBiasedLocking) { 
    ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK);
  } 
  //反之直接进入slow_enter
  else {
    ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);
  }
  //...
IRT_END

什么是fast_enter ?什么是slow_enter?
slow_enter就是普通锁的加锁,这个后面再看。先看fast_enter。

这里的普通锁场景是我自己的术语,特指没有偏向锁情况下的锁场景

其实fast_enter最后也调用了slow_enter,只不过就是在调用之前多加了一层偏向锁的撤销/重偏向操作(同时会统计撤销次数,当达到阈值时触发批量重偏向或者批量撤销的逻辑)。只有成功重偏向了才不进入slow_enter,否则都说明偏向锁被撤销了,锁状态要么是无锁要么是轻量级锁(根据偏向线程是否存活来决定),都需进入slow_enter进行普通锁的获取(毕竟锁的获取还得继续下去)。

synchronizer.cpp#ObjectSynchronizer::fast_enter

void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS) {
 //再次判断是否开启了偏向锁模式
 if (UseBiasedLocking) {
    if (!SafepointSynchronize::is_at_safepoint()) {
      //撤销/重偏向
      BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD);
      if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) {
        return;
      }
    } else {
      assert(!attempt_rebias, "can not rebias toward VM thread");
      BiasedLocking::revoke_at_safepoint(obj);
    }
    assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
 }

 slow_enter (obj, lock, THREAD) ;
}

biasedLocking.cpp#BiasedLocking::revoke_and_rebias

BiasedLocking::Condition BiasedLocking::revoke_and_rebias(Handle obj, bool attempt_rebias, TRAPS) {
  assert(!SafepointSynchronize::is_at_safepoint(), "must not be called while at safepoint");
  markOop mark = obj->mark();
  //如果是匿名偏向且attempt_rebias为false,就会进入到这个分支,撤销偏向锁,返回 BIAS_REVOKED
  //例如:调用了hashcode
  if (mark->is_biased_anonymously() && !attempt_rebias) {
    markOop biased_value       = mark;
    markOop unbiased_prototype = markOopDesc::prototype()->set_age(mark->age());
    markOop res_mark = (markOop) Atomic::cmpxchg_ptr(unbiased_prototype, obj->mark_addr(), mark);
    if (res_mark == biased_value) {
      return BIAS_REVOKED;
    }
  }
  // 如果开启了偏向模式会进入这个分支
  else if (mark->has_bias_pattern()) {
    Klass* k = obj->klass();
    markOop prototype_header = k->prototype_header();
    //如果class关闭了偏向模式会进入这个分支,撤销偏向锁,返回 BIAS_REVOKED
    if (!prototype_header->has_bias_pattern()) {
      markOop biased_value       = mark;
      markOop res_mark = (markOop) Atomic::cmpxchg_ptr(prototype_header, obj->mark_addr(), mark);
      assert(!(*(obj->mark_addr()))->has_bias_pattern(), "even if we raced, should still be revoked");
      return BIAS_REVOKED;
    } 
    // 如果epoch已过期就会进入这个分支
    else if (prototype_header->bias_epoch() != mark->bias_epoch()) {
      // 如果参数允许重偏向,就进行重偏向,返回 BIAS_REVOKED_AND_REBIASED
      if (attempt_rebias) {
        assert(THREAD->is_Java_thread(), "");
        markOop biased_value       = mark;
        markOop rebiased_prototype = markOopDesc::encode((JavaThread*) THREAD, mark->age(), prototype_header->bias_epoch());
        markOop res_mark = (markOop) Atomic::cmpxchg_ptr(rebiased_prototype, obj->mark_addr(), mark);
        if (res_mark == biased_value) {
          return BIAS_REVOKED_AND_REBIASED;
        }
      }
      // 如果参数不允许重偏向,就还是撤销偏向锁,返回 BIAS_REVOKED 
      else {
        markOop biased_value       = mark;
        markOop unbiased_prototype = markOopDesc::prototype()->set_age(mark->age());
        markOop res_mark = (markOop) Atomic::cmpxchg_ptr(unbiased_prototype, obj->mark_addr(), mark);
        if (res_mark == biased_value) {
          return BIAS_REVOKED;
        }
      }
    }
  }

  //如果上述的cas失败了,就会更新class的撤销计数器并返回对应标识,根据标识判断是否需要批量重偏向或批量撤销
  HeuristicsResult heuristics = update_heuristics(obj(), attempt_rebias);
  if (heuristics == HR_NOT_BIASED) {
    return NOT_BIASED;
  } 
  // 分支一:撤销单个偏向锁的标识
  else if (heuristics == HR_SINGLE_REVOKE) {
    Klass *k = obj->klass();
    markOop prototype_header = k->prototype_header();
    //如果要撤销的偏向锁就是当前线程,直接调用 revoke_bias 方法撤销偏向锁,不需要等到 SafePoint
    if (mark->biased_locker() == THREAD && prototype_header->bias_epoch() == mark->bias_epoch()) {
      ResourceMark rm;
      if (TraceBiasedLocking) {
        tty->print_cr("Revoking bias by walking my own stack:");
      }
      EventBiasedLockSelfRevocation event;
      BiasedLocking::Condition cond = revoke_bias(obj(), false, false, (JavaThread*) THREAD, NULL);
      ((JavaThread*) THREAD)->set_cached_monitor_info(NULL);
      assert(cond == BIAS_REVOKED, "why not?");
      if (event.should_commit()) {
        event.set_lockClass(k);
        event.commit();
      }
      return cond;
    }
    //反之,将撤销封装为任务,提交给 VM 线程执行,VM 线程达到 SafePoint 后会调用 revoke_bias 方法
    //到达安全点会检测偏向线程是否存活,如果存活就直接升级成轻量级锁,如果不存活就先撤销成无锁,再由竞争线程去升级成轻量级锁
    else {
      EventBiasedLockRevocation event;
      VM_RevokeBias revoke(&obj, (JavaThread*) THREAD);
      VMThread::execute(&revoke);
      if (event.should_commit() && (revoke.status_code() != NOT_BIASED)) {
        event.set_lockClass(k);
        event.set_safepointId(SafepointSynchronize::safepoint_counter() - 1);
        event.set_previousOwner(revoke.biased_locker());
        event.commit();
      }
      return revoke.status_code();
    }
  }

  // 分支二:批量重偏向与批量撤销的标识
  assert((heuristics == HR_BULK_REVOKE) ||
         (heuristics == HR_BULK_REBIAS), "?");
  EventBiasedLockClassRevocation event;
  VM_BulkRevokeBias bulk_revoke(&obj, (JavaThread*) THREAD,
                                (heuristics == HR_BULK_REBIAS),
                                attempt_rebias);
  VMThread::execute(&bulk_revoke);
  if (event.should_commit()) {
    event.set_revokedClass(obj->klass());
    event.set_disableBiasing((heuristics != HR_BULK_REBIAS));
    event.set_safepointId(SafepointSynchronize::safepoint_counter() - 1);
    event.commit();
  }
  return bulk_revoke.status_code();
}

static HeuristicsResult update_heuristics(oop o, bool allow_rebias) {
  //...
  //返回批量撤销标识
  if (revocation_count == BiasedLockingBulkRevokeThreshold) {
    return HR_BULK_REVOKE;
  }
  //返回批量重偏向标识
  if (revocation_count == BiasedLockingBulkRebiasThreshold) {
    return HR_BULK_REBIAS;
  }
  //返回普通的单个撤销标识
  return HR_SINGLE_REVOKE;
}

案例验证:
Java多线程篇(1)——深入分析synchronized,Java基础,java

案例只演示了撤销和升级。重偏向的场景比较难实现…

批量重偏向和批量偏向撤销

试想这么一个场景,假如现在有100个对象锁已经全都偏向线程A,并且A线程已经退出了。后续B线程进来获取这100个锁的时发现全都偏向到了A,假如没有批量重偏向和批量撤销的话,就会老老实实撤销偏向100次。而偏向撤销是存在一定性能开销的(需要等到安全点才能撤销),这种大量撤销的情况下偏向锁的性能甚至还不如轻量级锁。所以JVM针对这种场景做了优化。

如果一定时间内(默认25s)撤销次数达到20,JVM就会认为自己偏向错了,将该类epoch +1,且当前正在生效的偏向锁epoch也同步+1,表示偏向锁进入下一代。之前旧的 epoch 则说明已过期,过期epoch的锁对象下次获锁时可以重偏向。 撤销次数达到40,JVM就会认为此时偏向锁不再适用,将该类是否偏向锁标记为0,标记该 class 为不可偏向,并且撤销当前正在持有的偏向锁,后续new的对象锁也不再是偏向锁,而是无锁状态,表示对于该 class 直接执行轻/重量级锁的逻辑。

biasedLocking.cpp#BiasedLocking::Condition::bulk_revoke_or_rebias_at_safepoint

class VM_BulkRevokeBias : public VM_RevokeBias {
	//...
    virtual void doit() {
        // 等待线程达到 SafePoint 后会调用 bulk_revoke_or_rebias_at_safepoint 方法
        // bulk_rebias 为 true 代表执行批量重偏向逻辑,为 false 表示执行批量撤销逻辑
        // attempt_rebias_of_object 代表是否允许重偏向,这里固定为 true
        _status_code = bulk_revoke_or_rebias_at_safepoint((*_obj)(), _bulk_rebias, _attempt_rebias_of_object, _requesting_thread);
        clean_up_cached_monitor_info();
    }
};

static BiasedLocking::Condition bulk_revoke_or_rebias_at_safepoint(oop o,
                                                                   bool bulk_rebias,
                                                                   bool attempt_rebias_of_object,
                                                                   JavaThread* requesting_thread) {
  //...
  
  //批量重偏向
  if (bulk_rebias) {
    if (klass->prototype_header()->has_bias_pattern()) {
      // 更新当前 class 的 epoch
      int prev_epoch = klass->prototype_header()->bias_epoch();
      klass->set_prototype_header(klass->prototype_header()->incr_bias_epoch());
      int cur_epoch = klass->prototype_header()->bias_epoch();
      // 遍历所有线程的栈,找出当前 class 对应的正处于锁定状态的对象,更新 epoch 值
      for (JavaThread* thr = Threads::first(); thr != NULL; thr = thr->next()) {
        GrowableArray<MonitorInfo*>* cached_monitor_info = get_or_compute_monitor_info(thr);
        for (int i = 0; i < cached_monitor_info->length(); i++) {
          MonitorInfo* mon_info = cached_monitor_info->at(i);
          oop owner = mon_info->owner();
          markOop mark = owner->mark();
          //更新该类正在使用的偏向锁对象的 epoch 与 类的epoch 保持一致
          if ((owner->klass() == k_o) && mark->has_bias_pattern()) {
            assert(mark->bias_epoch() == prev_epoch || mark->bias_epoch() == cur_epoch, "error in bias epoch adjustment");
            owner->set_mark(mark->set_bias_epoch(cur_epoch));
          }
        }
      }
    }
    // 对当前锁对象进行重偏向,第二个参数为 allow_rebias,表示是否允许重偏向,此时一般是 true
    revoke_bias(o, attempt_rebias_of_object && klass->prototype_header()->has_bias_pattern(), true, requesting_thread);
  }
  
  //批量撤销 
  else {
    if (TraceBiasedLocking) {
      ResourceMark rm;
      tty->print_cr("* Disabling biased locking for type %s", klass->external_name());
    }
    // 关闭当前 class 的偏向锁
    klass->set_prototype_header(markOopDesc::prototype());

    // 遍历所有线程的栈,找出当前 class 对应的正处于锁定状态的对象,撤销偏向锁
    for (JavaThread* thr = Threads::first(); thr != NULL; thr = thr->next()) {
      GrowableArray<MonitorInfo*>* cached_monitor_info = get_or_compute_monitor_info(thr);
      for (int i = 0; i < cached_monitor_info->length(); i++) {
        MonitorInfo* mon_info = cached_monitor_info->at(i);
        oop owner = mon_info->owner();
        markOop mark = owner->mark();
        if ((owner->klass() == k_o) && mark->has_bias_pattern()) {
          revoke_bias(owner, false, true, requesting_thread);
        }
      }
    }

    // 对当前锁对象进行撤销,第二个参数为 allow_rebias,表示是否允许重偏向,此处固定传 false
    revoke_bias(o, false, true, requesting_thread);
  }

  if (TraceBiasedLocking) {
    tty->print_cr("* Ending bulk revocation");
  }

 //如果满足偏向条件,则重偏向于当前线程
  BiasedLocking::Condition status_code = BiasedLocking::BIAS_REVOKED;
  if (attempt_rebias_of_object &&
      o->mark()->has_bias_pattern() &&
      klass->prototype_header()->has_bias_pattern()) {
    markOop new_mark = markOopDesc::encode(requesting_thread, o->mark()->age(),
                                           klass->prototype_header()->bias_epoch());
    o->set_mark(new_mark);
    status_code = BiasedLocking::BIAS_REVOKED_AND_REBIASED;
    if (TraceBiasedLocking) {
      tty->print_cr("  Rebiased object toward thread " INTPTR_FORMAT, (intptr_t) requesting_thread);
    }
  }

  assert(!o->mark()->has_bias_pattern() ||
         (attempt_rebias_of_object && (o->mark()->biased_locker() == requesting_thread)),
         "bug in bulk bias revocation");

  return status_code;
}

来案例验证一下是否真的会批量重偏向和批量撤销。
批量重偏向:
Java多线程篇(1)——深入分析synchronized,Java基础,java
批量撤销:
Java多线程篇(1)——深入分析synchronized,Java基础,java
不贴结果了,太长了,结果注释在代码上了,有兴趣可以自己运行一下。

 public static void main(String[] args) throws InterruptedException {
        List<Object> list = new ArrayList<>();

        //100个锁对象偏向线程A(101个是因为不想用下标0)
        new Thread(() -> {
            for (int i = 1; i <= 101; i++) {
                Object o = new Object();
                synchronized (o) {
                    list.add(o);
                }
            }
            //保活线程A,防止JVM底层复用线程
            while (true) { }
        }).start();
        Thread.sleep(3000);
        //原本偏向线程A
        System.out.println("原本偏向线程" + ClassLayout.parseInstance(list.get(1)).toPrintable());

        //另一个线程获锁30次
        new Thread(() -> {
            for (int i = 1; i <= 30; i++) {
                Object o = list.get(i);
                synchronized (o) {
                    if (i == 18 || i == 19 || i == 20 || i == 21) {
                        //18-轻量级锁 19-偏向此线程 20-偏向此线程 21-偏向此线程
                        // 不是默认20吗,为什么第19个就重偏向了? 我不知道,估计也是性能的优化吧...
                        System.out.println("第" + i + "个" + ClassLayout.parseInstance(o).toPrintable());
                    }
                }
            }
        }).start();
        Thread.sleep(3000);

        //第31个没有被再次获锁,也就是说虽然epoch已经过期了,但是没有被重偏向,所以也就还是之前的偏向(过期偏向)
        System.out.println("第31个" + ClassLayout.parseInstance(list.get(31)).toPrintable());
        //new object 的锁对象也还是匿名偏向
        System.out.println("new Object" + ClassLayout.parseInstance(new Object()).toPrintable());
    }

public static void main(String[] args) throws InterruptedException {
        List<Object> list = new ArrayList<>();

        //101个锁对象偏向线程A,并一直持有下标0的object
        new Thread(() -> {
            for (int i = 1; i <= 101; i++) {
                Object o = new Object();
                synchronized (o) {
                    list.add(o);
                }
            }
            while (true) { synchronized (list.get(0)) { } }
        }).start();
        Thread.sleep(3000);
        //第0个偏向A
        System.out.println("第0个" + ClassLayout.parseInstance(list.get(0)).toPrintable());

        //B线程获锁40次(撤销18次:撤销1~18,19~40重偏向到此线程)
        new Thread(() -> {
            for (int i = 1; i <= 40; i++) { synchronized (list.get(i)) { } }
            //保活线程,防止JVM底层复用线程
            while (true) { }
        }).start();
        Thread.sleep(3000);
        //第0个还是偏向A
        System.out.println("第0个" + ClassLayout.parseInstance(list.get(0)).toPrintable());

        //C线程获锁40次(撤销22次:1~18是轻量锁,撤销19~40)
        new Thread(() -> {
            for (int i = 1; i <= 40; i++) { synchronized (list.get(i)) { } }
        }).start();
        Thread.sleep(3000);

        //第0个偏向被撤销,变成轻量级锁
        System.out.println("第0个" + ClassLayout.parseInstance(list.get(0)).toPrintable());
        //new object 也不再是匿名偏向锁而是无锁
        System.out.println("new Object" + ClassLayout.parseInstance(new Object()).toPrintable());
        //第41个没有被动过,所以还是过期偏向
        System.out.println("第41个" + ClassLayout.parseInstance(list.get(41)).toPrintable());
    }

偏向锁的释放

bytecodeInterpreter.cpp#BytecodeInterpreter::run#case(_monitorexit)

CASE(_monitorexit): {
        //...
        // 遍历栈的锁记录
        while (most_recent != limit ) {
        
          // 判断锁记录关联的 obj 是否为 lockee
          if ((most_recent)->obj() == lockee) {
            BasicLock* lock = most_recent->lock();
            markOop header = lock->displaced_header();
            //设置锁记录的obj为null(没有修改到mark word的线程id)
            most_recent->set_obj(NULL);
            
            //如果不是偏向模式还需要轻/重量级锁的释放
            if (!lockee->mark()->has_bias_pattern()) {
              bool call_vm = UseHeavyMonitors;
              //如果 header != NULL 说明不是重入,需要真正解锁
              if (header != NULL || call_vm) {
                // CAS替换对象头的 Mark Word(轻量级锁才去替换,重量级锁直接进入分支)
                if (call_vm || Atomic::cmpxchg_ptr(header, lockee->mark_addr(), lock) != lock) {
                  // 将 obj 还原,然后调用 monitorexit 方法
                  most_recent->set_obj(lockee);
                  CALL_VM(InterpreterRuntime::monitorexit(THREAD, most_recent), handle_exception);
                }
              }
            }

            UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
          }
          // 如果不是关联的 obj,继续判断下一个锁记录
          most_recent++;
        }
        //...
      }

可以发现偏向锁释放并没有清空mark word偏向的线程id。

至此,偏向锁就完了。接下来就是普通锁场景了。

再次重申,这里的普通锁场景是我自己的术语,特指没有偏向锁情况下的锁场景。


轻量级锁

轻量级锁获取/重入

衔接前面偏向锁的内容可以知道,轻量级锁的获取可以从 slow_enter 看起。

synchronizer.cpp#ObjectSynchronizer::slow_enter

void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) {
  markOop mark = obj->mark();
  assert(!mark->has_bias_pattern(), "should not see bias pattern here");

  //mark->is_neutral()为true表示是无锁,则cas无锁->轻量级锁(将对象头替换为指向当前线程栈中的锁记录)
  if (mark->is_neutral()) {
    lock->set_displaced_header(mark);
    //没有自旋!没有自旋!没有自旋!
    //一次cas失败就直接进入到最下面的膨胀重量级锁的语句了。
    if (mark == (markOop) Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)) {
      TEVENT (slow_enter: release stacklock) ;
      return ;
    }
  }
  //否则判断是否轻量级锁重入
  else if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) {
    assert(lock != mark->locker(), "must not re-lock the same lock");
    assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock");
    lock->set_displaced_header(NULL);
    return;
  }
  
  //到这一步说明要膨胀成重量级锁了
  lock->set_displaced_header(markOopDesc::unused_mark());
  ObjectSynchronizer::inflate(THREAD,
                              obj(),
                              inflate_cause_monitor_enter)->enter(THREAD);
}

相对于fast_enter的逻辑简单多了,但是看到没有,轻量级获锁没有自旋!轻量级获锁没有自旋!轻量级获锁没有自旋!一次cas失败就直接进入到最下面的膨胀重量级锁的语句了。

轻量级锁膨胀

synchronizer.cpp#ObjectSynchronizer::inflate
这个方法其实就是为了得到一个ObjectMonitor对象对应一个重量级锁。通过调用 ObjectMonitor.enter/exit 实现重量级锁的获取/释放。

ObjectMonitor * ATTR ObjectSynchronizer::inflate (Thread * Self,
                                                  oop object,
                                                  const InflateCause cause) {
  //...
  //自旋直至成功膨胀为重量级锁(这个是膨胀的自旋并不是获锁的自旋)
  for (;;) {
      const markOop mark = object->mark() ;
      assert (!mark->has_bias_pattern(), "invariant") ;
      //如果已经有一个 objectMonitor 直接返回即可
      if (mark->has_monitor()) {
          ObjectMonitor * inf = mark->monitor() ;
          assert (inf->header()->is_neutral(), "invariant");
          assert (inf->object() == object, "invariant") ;
          assert (ObjectSynchronizer::verify_objmon_isinpool(inf), "monitor is invalid");
          return inf ;
      }
      //如果正在膨胀,让出cpu16次来实现等待的效果,16次之后还没膨胀完就park阻塞
      if (mark == markOopDesc::INFLATING()) {
         TEVENT (Inflate: spin while INFLATING) ;
         ReadStableMark(object) ;
         continue ;
      }
      
	  //mark->has_locker()为true 说明是轻量级锁状态,则轻量级锁->重量级锁 
      if (mark->has_locker()) {
          //构建一个 ObjectMonitor 对象并初始化
          ObjectMonitor * m = omAlloc (Self) ;
          //...

		  //cas替换对象的mark为INFLATING
		  // 为什么使用一个INFLATING而不是直接设置monitor呢?
          // 这是防止轻量级锁膨胀的同时又解锁,这时设置一个INFLATING
          // 可以让它cas失败,进入重量级锁的释放流程,而不是直接还原对象头,造成hashcode值莫名其妙的改变
          markOop cmp = (markOop) Atomic::cmpxchg_ptr (markOopDesc::INFLATING(), object->mark_addr(), mark) ;
          //...
          
          //设置 ObjectMonitor 的header,owner,object
          markOop dmw = mark->displaced_mark_helper() ;
          assert (dmw->is_neutral(), "invariant") ;
          m->set_header(dmw) ;
          m->set_owner(mark->locker());
          m->set_object(object);
          
          // 替换对象的mark为monitor的地址(设置为重量级锁状态)
          guarantee (object->mark() == markOopDesc::INFLATING(), "invariant") ;
          object->release_set_mark(markOopDesc::encode(m)); 

          //...
          return m ;
      }
      
	  //mark->has_locker()为fasle 说明是无锁状态,则无锁->重量级锁
	  //构建一个 ObjectMonitor 对象并初始化和设置header,owner,object
	  assert (mark->is_neutral(), "invariant");
      ObjectMonitor * m = omAlloc (Self) ;
      //...
      
      // cas替换对象的mark为monitor地址(设置为重量级锁状态)
      if (Atomic::cmpxchg_ptr (markOopDesc::encode(m), object->mark_addr(), mark) != mark) {
		  //...
      }

      //...
      return m ;
  }
}

总结:
1. 如果已经有ObjectMonitor直接返回
2. 如果正在膨胀则让出CPU16次实现等待膨胀完成的效果,16次之后阻塞
3. 如果上面两种情况都不是,则根据当前锁状态走轻量级锁->重量级锁还是无锁->重量级锁来创建ObjectMonitor

案例:
Java多线程篇(1)——深入分析synchronized,Java基础,java
Java多线程篇(1)——深入分析synchronized,Java基础,java

轻量级锁释放

锁的释放入口肯定是 bytecodeInterpreter.cpp#BytecodeInterpreter::run#case(_monitorexit) 。上面偏向锁释放已分析过该方法,得知轻量级锁释放会来到 InterpreterRuntime::monitorexit (其实真正做事情的是 ObjectSynchronizer::fast_exit)。

interpreterRuntime.cpp#InterpreterRuntime::monitorexit

IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorexit(JavaThread* thread, BasicObjectLock* elem))
//...
  // 调用 ObjectSynchronizer::slow_exit 方法进行解锁
  ObjectSynchronizer::slow_exit(h_obj(), elem->lock(), thread);
//...

synchronizer.cpp#ObjectSynchronizer::slow_exit

void ObjectSynchronizer::slow_exit(oop object, BasicLock* lock, TRAPS) {
  //实际上调用fast_exit
  fast_exit (object, lock, THREAD) ;
}

void ObjectSynchronizer::fast_exit(oop object, BasicLock* lock, TRAPS) {
  //...
  // 如果 Displaced Mark Word 为空,说明可能是锁重入或锁膨胀中,直接return
  if (dhw == NULL) {
     //...
     return;
  }
  mark = object->mark() ;
  // 如果 Mark Word 指向当前线程锁指针,通过 CAS 操作恢复 Mark Word,即解锁操作
  if (mark == (markOop) lock) {
     assert (dhw->is_neutral(), "invariant") ;
     if ((markOop) Atomic::cmpxchg_ptr (dhw, object->mark_addr(), mark) == mark) {
        TEVENT (fast_exit: release stacklock) ;
        return;
     }
  }
  // 到这一步说明已经是重量级锁,要进行重量级锁解锁
  ObjectSynchronizer::inflate(THREAD,
                              object,
                              inflate_cause_vm_internal)->exit(true, THREAD);
}

轻量级锁释放最重要的一步就是恢复对象头的 mark word ,即恢复到无锁状态。
案例:
Java多线程篇(1)——深入分析synchronized,Java基础,java


重量级锁

在看锁膨胀的时候有提到,膨胀后会得到一个ObjectMonitor对象,通过ObjectMonitor.enter/exit 方法来实现重量级锁的获取/释放。

重量级锁获取/重入

objectMonitor.cpp#ObjectMonitor::enter

void ATTR ObjectMonitor::enter(TRAPS) {
  //...

  //CAS重量级锁owner指向当前线程
  cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ;
  if (cur == NULL) {
     //...
     return ;
  }
  //是否重入
  if (cur == Self) {
     _recursions ++ ;
     return ;
  }
  //是否由轻量级锁膨胀过来的,是的话 _recursions 置为1
  if (Self->is_lock_owned ((address)cur)) {
    //...
    return ;
  }

  //TrySpin 自适应自旋获取
  if (Knob_SpinEarly && TrySpin (Self) > 0) {
     //...
     return ;
  }

  //...
    for (;;) {
      //获锁失败 EnterI 阻塞线程(方法内实际阻塞前还是会多次尝试(自旋)获锁)
      EnterI (THREAD) ;
      //...
    }
  //...
}

objectMonitor.cpp#ObjectMonitor::EnterI

void ATTR ObjectMonitor::EnterI (TRAPS) {
    //...
    //TryLock 尝试获锁一次 
    if (TryLock (Self) > 0) {
        //...
        return;
    }
    //...
    //TrySpin 自适应自旋获锁
    if (TrySpin (Self) > 0) {
        //...
        return;
    }
    //...
    
    //封装成ObjectWaiter入队cxq 入队失败会再次尝试获锁
    //循环:{
    //      cas入队cxq
    //      TryLock 尝试获锁
    //      }
    ObjectWaiter node(Self) ;
    //...
    for (;;) {
        node._next = nxt = _cxq ;
        if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;
        if (TryLock (Self) > 0) {
            //...
            return;
        }
    }

    //...
    
    //阻塞线程。阻塞前,唤醒后都会尝试获锁
    //循环:{
    //      TryLock 尝试获锁
    //      park阻塞线程(使用操作系统自带的mutex阻塞) 
    //      ...线程被唤醒
    //      TryLock 尝试获锁
    //      TrySpin 自适应自旋获锁
    //      内存屏障
    //      }
    for (;;) {	

        //TryLock 尝试获锁
        if (TryLock (Self) > 0) break ;
        assert (_owner != Self, "invariant") ;
        
        //...
        
        // 还是获锁失败,park 阻塞线程
        if (_Responsible == Self || (SyncFlags & 1)) {
            TEVENT (Inflated enter - park TIMED) ;
            Self->_ParkEvent->park ((jlong) RecheckInterval) ;
            RecheckInterval *= 8 ;
            if (RecheckInterval > 1000) RecheckInterval = 1000 ;
        } else {
            TEVENT (Inflated enter - park UNTIMED) ;
            Self->_ParkEvent->park() ;
        }
        
        //...线程被唤醒,TryLock 尝试获锁一次
        if (TryLock(Self) > 0) break ;
        
        //TrySpin 自适应自旋获锁
        if ((Knob_SpinAfterFutile & 1) && TrySpin (Self) > 0) break ;
        
        //...
        //内存屏障
        OrderAccess::fence() ;
    }
    //...
    // 跳出循环说明成功获锁,将当前线程的节点从 cxq 或 EntryList 
    UnlinkAfterAcquire (Self, &node) ;
    //...
    return ;
}

总结:

 1、cas重量级锁指向当前线程,是否重入,是否由轻量级膨胀
 2、TrySpin 自适应自旋获锁(获锁其实就是将重量级锁指向当前线程)
 3、EnterI {
           3.1、TryLock 获锁
           3.2、TrySpin 自适应自旋获锁
           3.3、封装成ObjectWait节点并入cxq队列  
           						for(;;) {
           							CAS入队cxq
                                    TryLock 获锁
								}
           3.4、调用pthread_mutex_lock阻塞线程
           						for(;;) { 
           							TryLock 获锁
           							park 
           							...唤醒后
           							TryLock 获锁
           							TrySpin 自适应自旋获锁
           							内存屏障
           						}
           	3.5、UnlinkAfterAcquire 将当前线程的节点从 cxq 或 EntryList 
       }

重量级锁释放

objectMonitor.cpp#ObjectMonitor::exit

void ATTR ObjectMonitor::exit(bool not_suspended, TRAPS) {
   Thread * Self = THREAD ;
   //如果锁不指向当前线程
   if (THREAD != _owner) {
     //如果当前线程是之前持有轻量级锁的线程,此时,owner 是指向 Lock Record 的指针
     if (THREAD->is_lock_owned((address) _owner)) {
       assert (_recursions == 0, "invariant") ;
       _owner = THREAD ;
       _recursions = 0 ;
       OwnerIsThread = 1 ;
     } 
     //其他线程占用锁,直接返回
     else {
       //...
       return;
     }
   }
   
   //判断是否重入
   if (_recursions != 0) {
     _recursions--;        // this is simple recursive enter
     TEVENT (Inflated exit - recursive) ;
     return ;
   }
   //... 
   
   for (;;) {
      assert (THREAD == _owner, "invariant") ;
      
      // 根据策略,选择不同的释放锁时机,默认为 0
      //优先释放锁放开自旋线程的策略(非公平锁)
      if (Knob_ExitPolicy == 0) {
         //先将 owner 设置为 NULL。此时正在CAS的线程就可以很快进入同步代码块就能获得锁
         OrderAccess::release_store_ptr (&_owner, NULL) ;
         OrderAccess::storeload() ;
         //  EntryList 和 cxq 都没有等待线程,说明没有线程需要被唤醒,直接返回
         // _succ 不为 NULL,说明存在继承人线程,也不需要唤醒,直接返回
         if ((intptr_t(_EntryList)|intptr_t(_cxq)) == 0 || _succ != NULL) {
            TEVENT (Inflated exit - simple egress) ;
            return ;
         }
         TEVENT (Inflated exit - complex egress) ;
         //因为前面释放锁了,所以这里需要再次获锁(如果获锁失败,则直接返回,由新的owner来唤醒后续线程)
         if (Atomic::cmpxchg_ptr (THREAD, &_owner, NULL) != NULL) {
            return ;
         }
         TEVENT (Exit - Reacquired) ;
      }
      //优先唤醒队列中线程的策略
      else {
         //跟上一个分支唯一的区别就是释放锁的时机不一样
         if ((intptr_t(_EntryList)|intptr_t(_cxq)) == 0 || _succ != NULL) {
            OrderAccess::release_store_ptr (&_owner, NULL) ;
            OrderAccess::storeload() ;
            if (_cxq == NULL || _succ != NULL) {
                TEVENT (Inflated exit - simple egress) ;
                return ;
            }
            
            if (Atomic::cmpxchg_ptr (THREAD, &_owner, NULL) != NULL) {
               TEVENT (Inflated exit - reacquired succeeded) ;
               return ;
            }
            TEVENT (Inflated exit - reacquired failed) ;
         } else {
            TEVENT (Inflated exit - complex egress) ;
         }
      }

      //...
      
      //根据QMode选择不同的唤醒模式,默认为0
      // QMode == 0: 优先唤醒 EntryList头,如果为空,则将 cxq 中的节点移动到 EntryList 中,再去唤醒 EntryList头
      // QMode == 1: 流程同上,不同的是,移动节点的同时,会反转cxq链表
      // QMode == 2: 优先唤醒 cxq 的头部节点,如果为空,则唤醒EntryList头
      // QMode == 3: 优先将 cxq 的节点移动到 EntryList 尾部,然后去唤醒 EntryList 头
      // QMode == 4: 优先将 cxq 的节点移动到 EntryList 头部,然后去唤醒 EntryList 头
      if (QMode == 2 && _cxq != NULL) {
          w = _cxq ;
          assert (w != NULL, "invariant") ;
          assert (w->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
          ExitEpilog (Self, w) ;
          return ;
      }

      if (QMode == 3 && _cxq != NULL) {
          w = _cxq ;
          for (;;) {
             assert (w != NULL, "Invariant") ;
             ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
             if (u == w) break ;
             w = u ;
          }
          assert (w != NULL              , "invariant") ;
          ObjectWaiter * q = NULL ;
          ObjectWaiter * p ;
          for (p = w ; p != NULL ; p = p->_next) {
              guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
              p->TState = ObjectWaiter::TS_ENTER ;
              p->_prev = q ;
              q = p ;
          }
          ObjectWaiter * Tail ;
          for (Tail = _EntryList ; Tail != NULL && Tail->_next != NULL ; Tail = Tail->_next) ;
          if (Tail == NULL) {
              _EntryList = w ;
          } else {
              Tail->_next = w ;
              w->_prev = Tail ;
          }
      }

      if (QMode == 4 && _cxq != NULL) {
          w = _cxq ;
          for (;;) {
             assert (w != NULL, "Invariant") ;
             ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
             if (u == w) break ;
             w = u ;
          }
          assert (w != NULL              , "invariant") ;
          ObjectWaiter * q = NULL ;
          ObjectWaiter * p ;
          for (p = w ; p != NULL ; p = p->_next) {
              guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
              p->TState = ObjectWaiter::TS_ENTER ;
              p->_prev = q ;
              q = p ;
          }
          if (_EntryList != NULL) {
              q->_next = _EntryList ;
              _EntryList->_prev = q ;
          }
          _EntryList = w ;
      }

      w = _EntryList  ;
      if (w != NULL) {
          assert (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
          ExitEpilog (Self, w) ;
          return ;
      }

     
      w = _cxq ;
      if (w == NULL) continue ;

      for (;;) {
          assert (w != NULL, "Invariant") ;
          ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
          if (u == w) break ;
          w = u ;
      }
      TEVENT (Inflated exit - drain cxq into EntryList) ;

      assert (w != NULL              , "invariant") ;
      assert (_EntryList  == NULL    , "invariant") ;

      if (QMode == 1) {
        
         ObjectWaiter * s = NULL ;
         ObjectWaiter * t = w ;
         ObjectWaiter * u = NULL ;
         while (t != NULL) {
             guarantee (t->TState == ObjectWaiter::TS_CXQ, "invariant") ;
             t->TState = ObjectWaiter::TS_ENTER ;
             u = t->_next ;
             t->_prev = u ;
             t->_next = s ;
             s = t;
             t = u ;
         }
         _EntryList  = s ;
         assert (s != NULL, "invariant") ;
      } else {
         // QMode == 0 or QMode == 2
         _EntryList = w ;
         ObjectWaiter * q = NULL ;
         ObjectWaiter * p ;
         for (p = w ; p != NULL ; p = p->_next) {
             guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
             p->TState = ObjectWaiter::TS_ENTER ;
             p->_prev = q ;
             q = p ;
         }
      }

      if (_succ != NULL) continue;

      w = _EntryList  ;
      if (w != NULL) {
          guarantee (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
          ExitEpilog (Self, w) ;
          return ;
      }
   }
}

void ObjectMonitor::ExitEpilog (Thread * Self, ObjectWaiter * Wakee) {
   //...
   // 将 owner 设置为 NULL 释放锁
   OrderAccess::release_store_ptr (&_owner, NULL) ;
   //内存屏障
   OrderAccess::fence() ;
   //...
   //unpark唤醒
   Trigger->unpark() ;
   //...
}

总结:

1、先判断是否owner指向当前线程,是否当前线程膨胀的轻量级锁,是否重入
2、根据不同的 Knob_ExitPolicy 释放锁时机策略,来决定优先放开自旋线程还是优先唤醒队列线程 
3、根据不同的 QMode 唤醒模型来决定具体唤醒哪一个线程(无论哪种模式唤醒前都会释放锁并加内存屏障)
      QMode = 0:优先唤醒 EntryList头,如果为空,则将 cxq 中的节点移动到 EntryList 中,再去唤醒 EntryList头
      QMode = 1: 流程同上,不同的是,移动节点的同时,会反转cxq链表
      QMode = 2: 优先唤醒 cxq 的头部节点,如果为空,则唤醒EntryList头
      QMode = 3: 优先将 cxq 的节点移动到 EntryList 尾部,然后去唤醒 EntryList 头
      QMode = 4: 优先将 cxq 的节点移动到 EntryList 头部,然后去唤醒 EntryList 头

为什么需要cxq和entryList两个队列?
我认为是因为如果只用一个队列的话出入队操作大概率会发生冲突。用两个队列从宏观上来看可以粗略的认为入队在cxq,出队在entryList。

重量级锁的降级

先看这么一个案例
Java多线程篇(1)——深入分析synchronized,Java基础,java
上面的案例验证了重量级锁释放后锁状态还是重量级锁(owner指向null),并没有降级到无锁。那为什么无竞争后会变成无锁呢?
因为JVM在全局安全点执行清理任务时会触发锁的降级来恢复闲置 ObjectMonitor 锁对象对应的 markword 对象头并重置 ObjectMonitor 等待复用。

safepoint.cpp#SafepointSynchronize::do_cleanup_tasks

//全局安全点的清理任务
void SafepointSynchronize::do_cleanup_tasks() {
    //...
    //触发重量级锁降级
    ObjectSynchronizer::deflate_idle_monitors();
    //...
}

synchronizer.cpp#ObjectSynchronizer::deflate_idle_monitors

void ObjectSynchronizer::deflate_idle_monitors() {
  //...
  
  // 遍历所有现存 ObjectMonitor
  else for (ObjectMonitor* block = gBlockList; block != NULL; block = next(block)) {
    assert(block->object() == CHAINMARKER, "must be a block header");
    nInCirculation += _BLOCKSIZE ;
    for (int i = 1 ; i < _BLOCKSIZE; i++) {
      ObjectMonitor* mid = &block[i];
      oop obj = (oop) mid->object();
      //obj为null说明还未分配,跳过
      if (obj == NULL) {
        guarantee (!mid->is_busy(), "invariant") ;
        continue ;
      }
      // 调用 ObjectSynchronizer::deflate_monitor 方法尝试降级
      deflated = deflate_monitor(mid, obj, &FreeHead, &FreeTail);
      //...
    }
  }
  //...
}

synchronizer.cpp#ObjectSynchronizer::deflate_monitor

bool ObjectSynchronizer::deflate_monitor(ObjectMonitor* mid, oop obj,
                                         ObjectMonitor** FreeHeadp, ObjectMonitor** FreeTailp) {
  //...
  if (mid->is_busy()) {
    //...
  } else {
     //...
     // 将锁对象的 Mark Word 设置为无锁状态(001)
     obj->release_set_mark(mid->header());
     //...
     // 将 monitor 放到空闲链表中,等待释放
     if (*FreeHeadp == NULL) *FreeHeadp = mid;
     if (*FreeTailp != NULL) {
       ObjectMonitor * prevtail = *FreeTailp;
       assert(prevtail->FreeNext == NULL, "cleaned up deflated?");
       prevtail->FreeNext = mid;
      }
     *FreeTailp = mid;
     deflated = true;
  }
  return deflated;
}

轻量级锁释放的时候也会变成无锁状态,但我个人认为这个过程不叫锁的降级,只是轻量级锁释放中的一个步骤而已。锁降级是指调用了 deflate_xxx 方法。毕竟 deflate 是可以是降低下降的意思,与之对立的是锁膨胀 inflate。

其他

锁粗化、锁消除

//锁粗化:因为是前后synchronized是lock对象,所以会粗化成一个synchronized来括住这两个同步块
public class LockCoarseningExample {
    private Object lock = new Object();

    public void doSomething() {
        synchronized (lock) { // 第一个同步块
            //...
        }
        synchronized (lock) { // 第二个同步块
            //..
        }
    }
}

//锁消除:这里的str拼接不会被其他线程访问(没有线程逃逸),可以进行锁消除
public class LockEliminationExample {
    public void doSomething() {
        StringBuilder str = new StringBuilder();
        for (int i = 0; i < 1000; i++) {
            str.append("Value " + i);
        }
    }
}

调用hashcode、wait/notify对Synchronized锁状态的影响

同步代码块内调用hashcode会立马变成重量级锁,同步代码块外调用会把偏向锁撤销变成无锁。
调用wait会变成重量级锁,调用notify会把偏向锁变成轻量级锁。

从源码上看,notify也调用了ObjectSynchronizer::inflate应该也会膨胀成重量级锁。
但是实际测试打印的结果是轻量级锁,我猜测可能是因为这种情况的重量级锁很快就被降级了。
Java多线程篇(1)——深入分析synchronized,Java基础,java

注:运行JDK版本是11,本文参考的JVM源码版本是jdk8u-hotspot文章来源地址https://www.toymoban.com/news/detail-700683.html

到了这里,关于Java多线程篇(1)——深入分析synchronized的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Java|多线程与高并发】线程安全问题以及synchronized使用实例

    Java多线程环境下,多个线程同时访问共享资源时可能出现的数据竞争和不一致的情况。 线程安全一直都是一个令人头疼的问题.为了解决这个问题,Java为我们提供了很多方式. synchronized、ReentrantLock类等。 使用线程安全的数据结构,例如ConcurrentHashMap、ConcurrentLinkedQueue等

    2024年02月09日
    浏览(47)
  • Java关键字之synchronized详解【Java多线程必备】

    点击   Mr.绵羊的知识星球  解锁更多优质文章。 目录 一、介绍 二、特性 1. 线程安全 2. 互斥访问 3. 可重入性 4. 内置锁 三、实现原理 四、和其他锁比较 1. 优点 2. 缺点 五、注意事项和最佳实践 六、使用案例 1. 案例一 2. 案例二     synchronized是Java中最基本的同步机制之一,

    2024年01月24日
    浏览(48)
  • Java并发编程(三)线程同步 上[synchronized/volatile]

    当使用多个线程来访问同一个数据时,将会导致数据不准确,相互之间产生冲突,非常容易出现线程安全问题,比如多个线程都在操作同一数据,都打算修改商品库存,这样就会导致数据不一致的问题。 所以我们通过线程同步机制来保证线程安全,加入同步锁以避免在该线程没有完成

    2024年02月13日
    浏览(44)
  • Java多线程(4)---死锁和Synchronized加锁流程

    目录 前言 一.synchronized 1.1概念  1.2Synchronized是什么锁? 1.3Synchronized加锁工作过程 1.4其他优化操作 二.死锁 2.1什么是死锁 2.2死锁的几个经典场景 2.3死锁产生的条件 2.4如何解决死锁 🎁个人主页:tq02的博客_CSDN博客-C语言,Java,Java数据结构领域博主 🎥 本文由 tq02 原创,首发于

    2024年02月13日
    浏览(40)
  • 【创作赢红包】Java多线程:synchronized锁方法块

    synchronized同步代码块 用synchronized声明方法在某些情况下是有弊端的,比如A线程调用同步方法执行一个较长时间的任务,那么B线程必须等待比较长的时间。这种情况下可以尝试使用synchronized同步语句块来解决问题。看一下例子:     运行结果,分两部分来看: synchr

    2023年04月09日
    浏览(38)
  • 【并发编程】深入理解Java并发之synchronized实现原理

    分析: 通过 new MyThread() 创建了一个对象 myThread ,这时候堆中就存在了共享资源 myThread ,然后对 myThread 对象创建两个线程,那么thread1线程和thread2线程就会共享 myThread 。 thread1.start() 和 thead2.start() 开启了两个线程,CPU会随机调度这两个线程。假如 thread1 先获得 synchronized 锁,

    2024年02月04日
    浏览(60)
  • java八股文面试[多线程]——synchronized锁升级过程

    速记:偏向-轻量-重量 上面讲到锁有四种状态,并且会因实际情况进行膨胀升级,其膨胀方向是: 无锁——偏向锁——轻量级锁——重量级锁 ,并且膨胀方向 不可逆 一.锁升级理论. 在synchronized锁升级过程中涉及到以下几种锁.先说一下这几种锁是什么意思. 偏向锁: 只有一个

    2024年02月10日
    浏览(44)
  • 【并发多线程】java并发中的Synchronized关键词

    如果在多线程的环境中,我们经常会遇到资源竞争的情况,比如多个线程要去同时修改同一个共享变量,这时候,就需要对资源的访问方法进行一定的处理,保证同一时间只有一个线程访问。 java提供了synchronized,方便我们实现上述操作。 我们举个例子,我们创建一个

    2023年04月13日
    浏览(44)
  • java八股文面试[多线程]——synchronized锁升级详细流程

    偏向锁是JDK6中的重要引进,因为HotSpot作者经过研究实践发现,在大多数情况下,锁不仅不存在多线程竞争,而且总是由 同一线程多次获得 ,为了让线程获得锁的代价更低,引进了偏向锁。 偏向锁是在 单线程 执行代码块时使用的机制,如果在多线程并发的环境下(即线程

    2024年02月10日
    浏览(34)
  • java八股文面试[多线程]——Synchronized的底层实现原理

    笔试:画出Synchronized 线程状态流转 实现原理图 synchronized解决的是多个线程之间访问资源的同步性,synchronized 翻译为中文的意思是 同步 ,也称之为”同步锁“。 synchronized的作用是保证在 同一时刻 , 被修饰的代码块或方法只会有一个线程执行,以达到保证并发安全的

    2024年02月10日
    浏览(48)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包