1 Java层面探究 Java中的原子类Atomic
底层的实现原理是CAS,本文就让我们一起来深入探究CAS。 下面是AtomicInteger
的测试代码 ,执行完毕后会发现原子类实例ai
最终是精确的10000,而普通变量bi
的值是一个小于10000的不固定的值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 package com.hw.review2022.concurrent; import org.junit.Test; import java.util.concurrent.Executors;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicInteger; public class TestCAS { static AtomicInteger ai = new AtomicInteger (0 ); static int bi = 0 ; class Task implements Runnable { private int id; public Task (int i) { this .id = i; } @Override public void run () { for (int i = 0 ; i < 1000 ; i++) { try { if (i % 100 == 0 ) { System.out.println("Thread name : " + Thread.currentThread().getName() + ", Task id : " + id + ", loop : " + i); } Thread.sleep(2 ); ai.addAndGet(1 ); bi++; } catch (InterruptedException e) { e.printStackTrace(); } } } } @Test public void test () throws Exception { ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(3 ); for (int i = 0 ; i < 10 ; i++) { executor.execute(new Task (i)); } executor.shutdown(); executor.awaitTermination(60 , TimeUnit.SECONDS); System.out.println("ai is right : " + ai); System.out.println("bi is wrong : " + bi); } }
上述代码的核心是 AtomicInteger.addAndGet
方法
1 2 3 4 5 6 7 8 9 10 11 12 private volatile int value;private static final long valueOffset; public final int addAndGet (int delta) { return unsafe.getAndAddInt(this , valueOffset, delta) + delta; }
首先我们看一下AtomicInteger
类实例对象的内存布局
this
指向了对象的起始地址,通过this + valueOffset(=12)
我们就可以获得value
字段的内存地址(即C/C++中的指向value
的指针),进而读写该value
值。
然后我们接着看Unsafe.getAndAddInt
方法
1 2 3 4 5 6 7 8 9 public final int getAndAddInt (Object var1, long var2, int var4) { int var5; do { var5 = this .getIntVolatile(var1, var2); } while (!this .compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; }
var5
这个变量是通过ai
的起始地址 + valueOffset
偏移值获取到value
的内存地址,进而获取到的value
的值,我们把这个值称为oldValue
var5+var4
就是我们期望更新的值,我们把它叫做updateValue
接下来发现Unsafe.compareAndSwapInt
是一个 native 方法(就是在Java虚拟机中用 C/C++ 实现的方法)
1 public final native boolean compareAndSwapInt (Object var1, long var2, int var4, int var5) ;
2 C/C++层面探究 现在Java语言层面我们已经分析完了,完全看不到CAS具体是怎么实现的,接下来我们继续去探究JDK源码
Unsafe.compareAndSwapInt
的源码所在目录为hotspot/src/share/vm/prims/unsafe.cpp
这个方法的前两个参数不用去了解(JNI是Java调用C的方式),后四个参数和Unsafe.compareAndSwapInt
方法的一一对应。
1 2 3 4 5 6 7 8 9 10 11 UNSAFE_ENTRY (jboolean, Unsafe_CompareAndSwapInt (JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x)) UnsafeWrapper ("Unsafe_CompareAndSwapInt" ); oop p = JNIHandles::resolve (obj); jint* addr = (jint *) index_oop_from_field_offset_long (p, offset); return (jint)(Atomic::cmpxchg (x, addr, e)) == e; UNSAFE_END
继续往下分析来到了最核心的部分
1 2 3 4 5 6 7 8 inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) { int mp = os::is_MP(); __asm__ volatile (LOCK_IF_MP(%4 ) "cmpxchgl %1,(%3)" : "=a" (exchange_value) : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp) : "cc" , "memory" ) ; return exchange_value; }
cmpxchgl
是一个汇编指令(最后一个字符l
代表了cmpxchg
指令的参数类型为jint
,对于jlong
类型最后一个字符是q
,不同操作系统可能会不同),所以我们需要去理解cmpxchg
这条汇编指令。
3 汇编层面探究 下面给出我在C中实现的CAS来帮助大家理解CAS和cmpxchg
指令。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 #include <unistd.h> #include "stdio.h" #include "pthread.h" int cmp (int oldValue, int * addr, int updateValue) ;void atomic_add (int * addr, int delta) ;void task () ;void atomic_task () ; int atomic_count = 0 , count = 0 ; int main () { pthread_t tid, tid2, tid3, tid4, tid5, tid6; pthread_create(&tid, NULL , (void *) task, NULL ); pthread_create(&tid2, NULL , (void *) task, NULL ); pthread_create(&tid3, NULL , (void *) task, NULL ); pthread_create(&tid4, NULL , (void *) atomic_task , NULL ); pthread_create(&tid5, NULL , (void *) atomic_task , NULL ); pthread_create(&tid6, NULL , (void *) atomic_task , NULL ); pthread_join(tid, NULL ); pthread_join(tid2, NULL ); pthread_join(tid3, NULL ); pthread_join(tid4, NULL ); pthread_join(tid5, NULL ); pthread_join(tid6, NULL ); printf ("count is %d\n" , count); printf ("atomic_count is %d\n" , atomic_count ); return 0 ; } int cmp (int oldValue, int * addr, int updateValue) { int resValue; __asm__ __volatile__ ("lock\n\t" "cmpxchgl %2, (%3)" : "=a" (resValue) : "a" (oldValue), "r" (updateValue), "r" (addr) : "cc" , "memory" ); return resValue; } void atomic_add (int * addr, int delta) { int oldValue; do { oldValue = *addr; } while ( cmp(oldValue, addr, oldValue + delta) != oldValue ); } void atomic_task () { for (int i = 0 ; i < 300000 ; i++) { atomic_add (&atomic_count , 1 ); usleep(60 ); } } void task () { for (int i = 0 ; i < 300000 ; i++) { count += 1 ; usleep(60 ); } }
重点讲解一下cmp
函数
1 2 3 4 5 6 7 8 9 int cmp (int oldValue, int * addr, int updateValue) { int resValue; __asm__ __volatile__ ("lock\n\t" "cmpxchgl %2, (%3)" : "=a" (resValue) : "a" (oldValue), "r" (updateValue), "r" (addr) : "cc" , "memory" ); return resValue; }
其实也就是我自己实现的JDK中的Atomic::cmpxchg
函数,只不过更容易看懂一些。
oldValue 就是我们先前获取到的value
值,addr
就是value
字段的内存地址,在cmpxchg
指令中会用到这个地址,updateValue 就是我们期望更新的value
值(oldValue +增量delta
)
以下关于内联汇编的知识大家可以去参考GCC-Inline-Assembly-HOWTO
输入列表中:
"a"(oldValue)
:a代表EAX寄存器,意思是将变量oldValue 的值输入到EAX寄存器
"r"(updateValue) "r"(addr)
:意思是将变量updateValue 和地址addr
也放到寄存器中,r(register)代表一组寄存器,也就是从这一组寄存器中随便选一个存updateValue ,随便选一个存addr
输出列表中:
"=a"(resValue)
:表示内联汇编执行完后,将EAX寄存器的值存到resValue
这个变量中。
我们在输出列表和输入列表中声明了许多变量,这些变量从%0
开始依次往下标号,所以%0
表示resValue
,%1
代表了oldValue ,%2
代表了updateValue ,%3
代表了addr
。
现在我们结合的cmpxchg
指令的功能,来理解一下这段汇编到底在做什么。汇编语言有Intel和AT&T两种语法,一般我们用的都是AT&T这种,下面的讲解也是基于该语法的:
cmpxchg 指令有两个操作数,同时还使用了EAX 寄存器。首先,它将第二个操作数和EAX寄存器相比较,如果相同则把第一个操作数赋值给第二个操作数,否则将第一个操作数赋值给EAX 寄存器
%2
是第一个操作数,即updateValue ,也就是无冲突的时候我们期望更新的值
(%3)
是第二个操作数,即(addr)
,()
表示取值操作(相当于C语言中的*
),因为addr
是value
变量的地址,所以该操作数是在取此刻value
的值curValue
首先将oldValue 存储到EAX寄存器中,然后用第二个操作数curValue
和oldValue 行比较,如果相等,则说明从得到oldValue 到现在执行cmpxchg
这条指令这段时间内,value
没有被其他线程改写(抛开ABA问题不谈),没有发生冲突,所以我们就可以直接把我们希望更新的新值updateValue 写入到value
中,那我们再来看看cmpxchg
这条指令干了啥,如果相同则把第一个操作数赋值给第二个操作数,也就是将updateValue 赋值给value
对象(成功更新value
的值)。现在这条指令就执行完毕了,EAX寄存器中存储的还是oldValue ,所以最终cmp
函数返回的也就是oldValue
1 2 3 4 5 6 void atomic_add (int * addr, int delta) { int oldValue; do { oldValue = *addr; } while ( cmp(oldValue, addr, oldValue + delta) != oldValue ); }
这个函数也就可以返回了,对应的就是Java中的Unsafe.getAndAddInt
方法
1 2 3 4 5 6 7 8 9 public final int getAndAddInt (Object var1, long var2, int var4) { int var5; do { var5 = this .getIntVolatile(var1, var2); } while (!this .compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; }
如果EAX中保存的oldValue 和我们执行cmpxchg
指令时获取的curValue
不同,说明value
的值被其他线程改写了,那此刻将第一个操作数updateValue 赋值给 EAX 寄存器(我们并没有更新value
的值),所以cmp
函数返回的值为updateValue 。atomic_add
函数中cmp
返回的updateValue 和oldValue 不同,所以要重新执行do while
循环去自旋,直到没有冲突发生。
4 动画展示与总结 该动画使用Python manim
库制作,动画源码
最后我们再用高级语言解释一下cmpxchg
这条汇编指令的功能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 int value; int cmpxchg (int * addr, int oldValue, int updateValue) { int curValue = *addr; if (curValue == oldValue) { *addr == updateValue; return oldValue; } else { return updateValue; } }
5 引用