今天从以上几个方面说说怎么保障线程安全(理解的数据一致性、可读性)。
Synchronized
来段synchronized 代码
public static int a;
public static void main(String[] args) {
for (int i = 0; i < 100; i++) {
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
synchronized (Main.class) {
System.out.println(++a);
}
}
}
}).start();
}
}
我们用javap -c看下汇编后的
monitorexit指令出现了两次,第1次为同步正常退出释放锁;第2次为发生异步退出释放锁;
monitorexit这个锁也不是凭空出现的,在我们计算机硬件就有这样的功能。
总线锁:就是使用处理器提供的一个LOCK#信号,当一个处理器在总线上输此信号时,其他处理器的请求将被阻塞住,那么该处理器可以独占共享内存。
缓存锁:所谓“缓存锁定”是指内存区域如果被缓存在处理器的缓存行中,并且在Lock操作期间被锁定,那么当它执行锁操作回写到内存时,处理器不在总线上声言LOCK#信号,而是修改内部的内存地址,并允许它的缓存一致性机制来保证操作的原子性,因为缓存一致性机制会阻止同时修改由两个以上处理器缓存的内存区域数据,当其他处理器回写已被锁定的缓存行数据时,会使缓存行无效。
不管是总线锁还是缓存锁,都跟北桥有关。北桥用于CPU和内存、显卡、PCI交换数据
ReentrantLock
Lock lock = new ReentrantLock();
CountDownLatch countDownLatch = new CountDownLatch(100000);
Long st = System.currentTimeMillis();
for (int i = 0; i < 100; i++) {
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
try {
lock.lock();
//synchronized (Main.class) {
System.out.println(++a);
countDownLatch.countDown();
//}
} finally {
lock.unlock();
}
}
}
}).start();
}
ReadWriteLock
public static void main(String[] args) throws Exception {
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
Lock readLock = readWriteLock.readLock();
Lock writeLock = readWriteLock.writeLock();
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 3; i++) {
try {
Thread.sleep(500);
readLock.lock();
System.out.println("readLock:" + a);
readLock.unlock();
} catch (Exception ex) {
}
}
}
}).start();
Thread.sleep(1000);
new Thread(new Runnable() {
@Override
public void run() {
writeLock.lock();
for (int i = 0; i < 3; i++) {
try {
Thread.sleep(500);
System.out.println("writeLock:" + (++a));
} catch (Exception ex) {
}
}
writeLock.unlock();
}
}).start();
}
readLock:0
readLock:0
writeLock:1
writeLock:2
writeLock:3
readLock:3
很漂亮的读写锁。写的时候不允许读,是不是可以提高平常的性能
自定义自旋锁 当出现高并发的时候,系统内核态就需要不断的去挂起线程和恢复线程,频繁的此类操作会对我们系统的并发性能有一定影响。在程序的执行过程中锁定“共享资源“的时间片是极短的,如果仅仅是为了这点时间而去不断挂起、恢复线程的话,消耗的时间可能会更长,这是能不能让当前线程执行一个忙循环(自旋操作)。
class SimpleSpinningLock implements Lock {
/**
* 持有锁的线程,null表示锁未被线程持有
*/
private AtomicReference<Thread> owner = new AtomicReference<>();
private int count = 0;
@Override
public void lock() {
Thread t = Thread.currentThread();
// if re-enter, increment the count.
if (t == owner.get()) {
++count;
return;
}
//spin
while (owner.compareAndSet(null, t)) {
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock() {
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public void unlock() {
Thread t = Thread.currentThread();
//only the owner could do unlock;
if (t == owner.get()) {
if (count > 0) {
// reentrant count not zero, just decrease the counter.
--count;
} else {
// compareAndSet is not need here, already checked
owner.set(null);
}
}
}
@Override
public Condition newCondition() {
return null;
}
}
public class Main {
static int count = 0;
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(100);
CountDownLatch countDownLatch = new CountDownLatch(100000);
Long st = System.currentTimeMillis();
SimpleSpinningLock simpleSpinningLock = new SimpleSpinningLock();
for (int i = 0; i < 100; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
simpleSpinningLock.lock();
System.out.println(++count);
countDownLatch.countDown();
simpleSpinningLock.unlock();
}
}
});
}
countDownLatch.await();
System.out.println("耗时:" + (System.currentTimeMillis() - st));
}
}
TicketLock
public class TicketLock implements Lock {
private AtomicInteger serviceNum = new AtomicInteger(0);
private AtomicInteger ticketNum = new AtomicInteger(0);
private final ThreadLocal<Integer> myNum = new ThreadLocal<>();
@Override
public void lock() {
myNum.set(ticketNum.getAndIncrement());
while (serviceNum.get() != myNum.get()) {
}
}
@Override
public void unlock() {
serviceNum.compareAndSet(myNum.get(), myNum.get() + 1);
myNum.remove();
}
}
CLHLock
public class CLHLock implements Lock {
/**
* 锁等待队列的尾部
*/
private AtomicReference<QNode> tail;
private ThreadLocal<QNode> preNode;
private ThreadLocal<QNode> myNode;
public CLHLock() {
tail = new AtomicReference<>(null);
myNode = ThreadLocal.withInitial(QNode::new);
preNode = ThreadLocal.withInitial(() -> null);
}
@Override
public void lock() {
QNode qnode = myNode.get();
//设置自己的状态为locked=true表示需要获取锁
qnode.locked = true;
//链表的尾部设置为本线程的qNode,并将之前的尾部设置为当前线程的preNode
QNode pre = tail.getAndSet(qnode);
preNode.set(pre);
if(pre != null) {
//当前线程在前驱节点的locked字段上旋转,直到前驱节点释放锁资源
while (pre.locked) {
}
}
}
@Override
public void unlock() {
QNode qnode = myNode.get();
//释放锁操作时将自己的locked设置为false,可以使得自己的后继节点可以结束自旋
qnode.locked = false;
//回收自己这个节点,从虚拟队列中删除
//将当前节点引用置为自己的preNode,那么下一个节点的preNode就变为了当前节点的preNode,这样就将当前节点移出了队列
myNode.set(preNode.get());
}
private class QNode {
/**
* true表示该线程需要获取锁,且不释放锁,为false表示线程释放了锁,且不需要锁
*/
private volatile boolean locked = false;
}
}
MCSLock
public class MCSLock implements Lock {
private AtomicReference<QNode> tail;
private ThreadLocal<QNode> myNode;
public MCSLock() {
tail = new AtomicReference<>(null);
myNode = ThreadLocal.withInitial(QNode::new);
}
@Override
public void lock() {
QNode qnode = myNode.get();
QNode preNode = tail.getAndSet(qnode);
if (preNode != null) {
qnode.locked = false;
preNode.next = qnode;
//wait until predecessor gives up the lock
while (!qnode.locked) {
}
}
qnode.locked = true;
}
@Override
public void unlock() {
QNode qnode = myNode.get();
if (qnode.next == null) {
//后面没有等待线程的情况
if (tail.compareAndSet(qnode, null)) {
//真的没有等待线程,则直接返回,不需要通知
return;
}
//wait until predecessor fills in its next field
// 突然有人排在自己后面了,可能还不知道是谁,下面是等待后续者
while (qnode.next == null) {
}
}
//后面有等待线程,则通知后面的线程
qnode.next.locked = true;
qnode.next = null;
}
private class QNode {
/**
* 是否被qNode所属线程锁定
*/
private volatile boolean locked = false;
/**
* 与CLHLock相比,多了这个真正的next
*/
private volatile QNode next = null;
}
}
volatile
参考: https://www.wglbk.cn/post/43
ThreadLocalMap
public static void main(String[] args) throws Exception {
final ThreadLocal<Integer> local = new ThreadLocal<>();
local.set(10);
Thread t = new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " local: " + local.get());
}
});
t.start();
System.out.println("Main local: " + local.get());
}
输出结果:
Thread-0 local: null
Main local: 10
原因是在主线程中set了值,在子线程中没有set。threadLocal保存的是每个线程的值。
解析:
threadLocal类的set方法:
key为当前threadLocal,value为要设置的值。
如果为空时,getMap方法:
t为当前线程,t.threadLocals的意思是,在每个线程的内部,都有一个threadLocals,这个threadLocals是一个threadLocalMap,源码:
也就是说,每个线程内部都有一个threadLocalMap,这个map没有继承Map,而是一个ThreadLocal的内部类,自己实现的map,key规定了只能是threadLocal。具体关于threadLocalMap的解释:threadLocalMap
为什么key为threadLocal呢?
因为每个线程都有一个threadLocalMap,里面可以存放多个threadLocal,这样就可以区分每个线程各自存放的值。
(这里起初很容易误解成多个线程共享同一个threadLocalMap,key为线程,这样的话每个线程只能存放一个值,实际不是这样的。)
一个threadLocal对于一个线程只能存放一个值,如果要一个线程要存放多个值就要创建多个threadLocal。
get方法:
也是用当前的threadLocal为key来get值的。源码中的getMap方法就是拿的当前thread的threadLocals。
ps:threadLocalMap中的key为弱引用,在下一次GC时会被回收掉,但是value是强引用,是不会回收掉的,这里有发生内存泄漏的可能。最好的是在get完之后调用remove,断开引用链,这样value就可以被gc回收掉了。
注意:本文归作者所有,未经作者允许,不得转载