CAS相关知识剖析:疑问与应对之策
何为CAS?
CAS的全称是Compare And Swap
,即比较与交换,是乐观锁的关键实现形式。CAS能在无需使用锁的状况下达成多线程间的变量同步。ReentrantLock
内部的AQS以及原子类内部均运用了CAS。
CAS算法涉及三个操作数:
- 需读写的内存值V。
- 用于比较的值A。
- 要写入的新值B。
仅当V的值与A相等时,才会以原子方式用新值B对V的值进行更新,不然便会持续重试直至成功完成更新。
在Java中,CAS操作主要借助java.util.concurrent.atomic包中的类来实现。例如AtomicInteger、AtomicBoolean、AtomicReference等。通过这些类的操作,Java应用能够在多线程环境下安全地对基本数据类型进行操作,无需开展显式锁定。
以AtomicInteger为例,AtomicInteger的getAndIncrement()方法底层由CAS实现,关键代码为compareAndSwapInt(obj, offset, expect, update)
,其含义是:若obj内的value和expect一致,表明无其他线程对该变量进行过改动,于是将其更新为update;若不一致,则会持续重试直至成功完成值的更新。
CAS的优势?存在的问题?解决办法是什么?
优势:
- 无锁机制:CAS允许众多线程尝试对同一个变量进行更新,无需采用锁定方式,极大地减少了锁竞争。
- 性能提升:在无锁的情境下,通常具备较好的性能表现,适用于高并发场景。
存在的问题:
- ABA问题:CAS在操作值时需检查内存值是否发生变化,仅在未变化时才会更新内存值。然而,若内存值原本为A,后变为B,接着又变回A,那么CAS在检查时会发现值未发生变化,但实际上是有变动的。解决ABA问题的思路是在变量前添加版本号,每次变量更新时将版本号加一,如此变化过程便从
A-B-A
转变为1A-2B-3A
。
解决办法:JDK从1.5开始提供AtomicStampedReference类来解决ABA问题,该类能够原子更新带有版本号的引用类型。 - 循环时间长开销大:若CAS操作长时间未能成功,会致使其持续自旋,给CPU带来极大的开销。
解决办法:可运用Java8中的LongAdder,它采用分段CAS和自动分段迁移的方式。 - 仅能保障一个共享变量的原子操作:当对单个共享变量进行操作时,CAS能够保障原子操作,但对于多个共享变量的操作,CAS无法保障操作的原子性。
解决办法:可使用AtomicReference,它用于封装自定义对象,多个变量可放置在一个自定义对象中,然后它会对该对象的引用是否相同进行检查。若多个线程同时对一个对象变量的引用进行赋值,利用AtomicReference的CAS操作可解决并发冲突问题。
怎样实现无锁并发?
CAS能够保障对共享变量操作的原子性,而volatile能够实现可见性和有序性,将CAS与volatile相结合可实现无锁并发,适用于竞争不激烈、多核CPU的场景。
CAS效率较高的原因在于其内部未使用synchronized关键字,CAS不会使线程进入阻塞状态,从而避免了synchronized中用户态和内核态切换所带来的性能消耗问题,也规避了线程挂起等状况。若竞争极为激烈,那么CAS会出现线程大量重试的情况,由于多个线程进行竞争,很可能众多线程设置取值失败,进而需进行while循环重试,也就是大量线程开展重试操作,成功存储的线程相对较少,如此反而会使性能大幅降低。所以若竞争过于激烈仍采用CAS机制,其性能可能比synchronized还要低。
扩展知识
AtomicInteger
问题
public class Demo01 {
private static int num = 0;
public static void main(String[] args) throws InterruptedException {
Runnable mr = () -> {
for (int i = 0; i < 10000; i++) {
num++;
}
};
ArrayList<Thread> ts = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Thread t = new Thread(mr);
t.start();
ts.add(t);
}
for (Thread t : ts) {
t.join();
}
System.out.println("num = " + num);
}
}
改为原子类:
public class Demo01 {
public static void main(String[] args) throws InterruptedException {
AtomicInteger atomicInteger = new AtomicInteger();
Runnable mr = () -> {
for (int i = 0; i < 10000; i++) {
atomicInteger.incrementAndGet();
}
};
ArrayList<Thread> ts = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Thread t = new Thread(mr);
t.start();
ts.add(t);
}
for (Thread t : ts) {
t.join();
}
System.out.println("number = " + atomicInteger.get());
}
}
底层源码
AtomicInteger类内部包含一个UnSafe类,该类可保障变量赋值时的原子操作;
/* AtomicInteger.java */
private volatile int value;
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
其中,变量valueOffset表示该变量值在内存中的偏移地址,Unsafe依据内存偏移地址获取数据;变量value采用volatile修饰,确保了多线程之间的内存可见性。
/* Unsafe.class */
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while (!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
变量解释:var5是从主内存拷贝到工作内存中的值;val1是AtomicInteger对象自身;var2是该对象值的valueOffset;var4是需变动的数量;var5是通过var1和var2找到的内存中的真实值。compareAndSwapInt(var1, var2, var5, var5 + var4)表示用该对象当前的值与var5进行比较,若相同,便更新为var5 + var4并返回true;若不同,就持续取值并再次比较,直至更新完成。需对工作内存中的值和主内存中的值进行比较。若执行compareAndSwapInt返回false,便持续执行while方法,直至期望的值与真实值一致。
假设线程A和线程B同时执行getAndInt操作(分别运行在不同的CPU上):
1. AtomicInteger里的value原始值为3,即主内存中AtomicInteger的value为3,依据JMM模型,线程A和线程B各自持有一份值为3的副本,分别存储在各自的工作内存;
2. 线程A通过getIntVolatile(var1 , var2)获取到value值3,此时线程A被挂起(该线程失去CPU执行权);
3. 线程B也通过getIntVolatile(var1, var2)方法获取到value值也是3,此时线程B未被挂起,执行compareAndSwapInt方法,比较内存的值也是3,成功将内存值修改为4,线程B完成操作;
4. 此时线程A恢复,执行CAS方法,发现自身持有的数字3与主内存中的数字4不一致,表明该值已被其他线程抢先修改,于是线程A本次修改失败,只能重新读取后再次尝试,即继续执行do while;
5. 线程A重新获取value值,由于变量value被volatile修饰,其他线程对它的修改,线程A能够察觉,线程A继续执行compareAndSwapInt进行比较替换,直至成功。
不过AtomicInteger存在CAS循环开销大的问题,因此JDK8引入LongAdder来解决此问题。
LongAdder
LongAdder主要通过分段CAS以及自动分段迁移的方式,大幅提升多线程高并发执行CAS操作的性能。
实现过程:
1. 在LongAdder的底层实现中,起初有一个base值,刚开始多线程不断累加数值时,均对base进行累加,例如刚开始累加成base = 5。
2. 接着若发现并发更新的线程数量过多,便开始施行分段CAS机制,也就是内部会创建一个Cell数组,每个数组为一个数值分段。
3. 这时,让大量线程分别对不同Cell内部的value值进行CAS累加操作,从而将CAS计算压力分散到不同的Cell分段数值中;
4. 这样便能大幅降低多线程并发更新同一个数值时出现的无限循环问题,显著提升多线程并发更新数值的性能和效率;
5. 内部实现了自动分段迁移机制,即若某个Cell的value执行CAS失败,会自动查找另一个Cell分段内的value值进行CAS操作。这也解决了线程空旋转、自旋不停等待执行CAS操作的问题,使一个线程前来执行CAS时能快速完成该操作。
最后,若要从LongAdder中获取当前累加的总值,会将base值和所有Cell分段数值相加后返回。总体而言,LongAdder减少了乐观锁的重试次数。
add源码
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
longAccumulate方法
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current();
h = getProbe();
wasUncontended = true;
}
boolean collide = false;
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) {
Cell r = new Cell(x);
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try {
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue;
}
}
collide = false;
} else if (!wasUncontended) {
wasUncontended = true;
} else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x)))) {
break;
} else if (n >= NCPU || cells != as) {
collide = false;
} else if (!collide) {
collide = true;
} else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) {
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue;
}
h = advanceProbe(h);
} else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try {
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
} else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x)))) {
break;
}
}
}
AtomicStampedReference
AtomicStampedReference主要维护包含一个对象引用以及一个可自动更新的整数"stamp"的pair对象来解决ABA问题。
public class AtomicStampedReference<V> {
private static class Pair<T> {
final T reference;
final int stamp;
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}
private volatile Pair<V> pair;
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}
private boolean casPair(Pair<V> cmp, Pair<V> val) {
return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
}
若元素值和版本号均无变化,且与新值也相同,返回true;若元素值和版本号均无变化,但与新值不完全相同,则构造一个新的Pair对象并执行CAS更新pair。可以看出,java中的实现与我们上述讲解的ABA解决方法一致。首先,利用版本号进行控制;其次,不重复使用节点(Pair)的引用,每次均新建一个新的Pair作为CAS比较的对象,而非复用旧的;最后,外部传入元素值及版本号,而非节点(Pair)的引用。
文章整理自互联网,只做测试使用。发布者:Lomu,转转请注明出处:https://www.it1024doc.com/12617.html