多线程编程模板:

1、判断 2、干活 3、通知

生产者消费者模型(synchronized实现)

代码:

package com.atguigu.thread;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class ShareDataOne//资源类
{
  private int number = 0;//初始值为零的一个变量
  public synchronized void increment() throws InterruptedException 
  {
     //1判断
     if(number !=0 ) {
       this.wait();
     }
     //2干活
     ++number;
     System.out.println(Thread.currentThread().getName()+"\t"+number);
     //3通知
     this.notifyAll();
  }
  public synchronized void decrement() throws InterruptedException 
  {
     // 1判断
     if (number == 0) {
       this.wait();
     }
     // 2干活
     --number;
     System.out.println(Thread.currentThread().getName() + "\t" + number);
     // 3通知
     this.notifyAll();
  }
}
/**
 * 
 * @Description:
 *现在两个线程,
 * 可以操作初始值为零的一个变量,
 * 实现一个线程对该变量加1,一个线程对该变量减1,
 * 交替,来10轮。 
 * @author xialei
 *
 *  * 笔记:Java里面如何进行工程级别的多线程编写
 * 1 多线程变成模板(套路)-----上
 *     1.1  线程    操作    资源类  
 *     1.2  高内聚  低耦合
 * 2 多线程变成模板(套路)-----下
 *     2.1  判断
 *     2.2  干活
 *     2.3  通知
 */
public class NotifyWaitDemoOne
{
  public static void main(String[] args)
  {
     ShareDataOne sd = new ShareDataOne();
     new Thread(() -> {
       for (int i = 1; i 
          try {
            sd.increment();
          } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
          }
       }
     }, "A").start();
     new Thread(() -> {
       for (int i = 1; i 
          try {
            sd.decrement();
          } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
          }
       }
     }, "B").start();
  }
}
/*
 * * 
 * 2 多线程变成模板(套路)-----下
 *     2.1  判断
 *     2.2  干活
 *     2.3  通知
 * 3 防止虚假唤醒用while
 * 
 * 
 * */

以上代码如果有四个线程同时执行就会出错,原因是在Java多线程判断时,不能用if,程序问题出在了判断上,假如,突然有一添加的线程进到了if了,突然中断了交出控制权,没有进行验证二适直接走了下去,加了两次甚至多次,这就是虚假唤醒。

虚假唤醒问题解决办法

官方指出不能用if判断而是要用while,这样就不会导致进入判断后突然中断导致数据不合理

package com.atguigu.thread;


import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;


import org.omg.IOP.Codec;




class ShareData//资源类
{
  private int number = 0;//初始值为零的一个变量


  public synchronized void increment() throws InterruptedException 
  {
     //判断
     while(number!=0) {
       this.wait();
     }
     //干活
     ++number;
     System.out.println(Thread.currentThread().getName()+" \t "+number);
     //通知
     this.notifyAll();;
  }
  public synchronized void decrement() throws InterruptedException 
  {
     //判断
     while(number!=1) {
       this.wait();
     }
     //干活
     --number;
     System.out.println(Thread.currentThread().getName()+" \t "+number);
     //通知
     this.notifyAll();
  }
}


/**
 * 
 * @Description:
 *现在两个线程,
 * 可以操作初始值为零的一个变量,
 * 实现一个线程对该变量加1,一个线程对该变量减1,
 * 交替,来10轮。 
 * @author xialei
 *
 *  * 笔记:Java里面如何进行工程级别的多线程编写
 * 1 多线程变成模板(套路)-----上
 *     1.1  线程    操作    资源类  
 *     1.2  高内聚  低耦合
 * 2 多线程变成模板(套路)-----下
 *     2.1  判断
 *     2.2  干活
 *     2.3  通知
 */
public class NotifyWaitDemo
{
  public static void main(String[] args)
  {
     ShareData sd = new ShareData();
     new Thread(() -> {


       for (int i = 1; i 
          try {
            sd.increment();
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
       }
     }, "A").start();
     new Thread(() -> {


       for (int i = 1; i 
          try {
            sd.decrement();
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
       }
     }, "B").start();
     new Thread(() -> {


       for (int i = 1; i 
          try {
            sd.increment();
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
       }
     }, "C").start();
     new Thread(() -> {


       for (int i = 1; i 
          try {
            sd.decrement();
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
       }
     }, "D").start();
  }
}










/*
 * * 
 * 2 多线程变成模板(套路)-----下
 *     2.1  判断
 *     2.2  干活
 *     2.3  通知
 * 3 防止虚假唤醒用while
 * 
 * 
 * */











Java8新版实现

对标实现

Condition是在java 1.5中才出现的,它用来替代传统的Object的wait()、notify()实现线程间的协作,相比使用Object的wait()、notify(),使用Condition的await()、signal()这种方式实现线程间协作更加安全和高效。因此通常来说比较推荐使用Condition,阻塞队列实际上是使用了Condition来模拟线程间协作。 Condition是个接口,基本的方法就是await()和signal()方法; Condition依赖于Lock接口,生成一个Condition的基本代码是lock.newCondition() 调用Condition的await()和signal()方法,都必须在lock保护之内,就是说必须在lock.lock()和lock.unlock之间才可以使用 Conditon中的await()对应Object的wait(); Condition中的signal()对应Object的notify(); Condition中的signalAll()对应Object的notifyAll()。

这是消费者生产者模型中的消费者demo

class BoundedBuffer {
   final Lock lock = new ReentrantLock();
   final Condition notFull  = lock.newCondition(); 
   final Condition notEmpty = lock.newCondition(); 


   final Object[] items = new Object[100];
   int putptr, takeptr, count;


   public void put(Object x) throws InterruptedException {
     lock.lock();
     try {
       while (count == items.length)
         notFull.await();
       items[putptr] = x;
       if (++putptr == items.length) putptr = 0;
       ++count;
       notEmpty.signal();
     } finally {
       lock.unlock();
     }
   }

Condition接口详解分析

condition可以通俗的理解为条件队列。当一个线程在调用了await方法以后,直到线程等待的某个条件为真的时候才会被唤醒。这种方式为线程提供了更加简单的等待/通知模式。Condition必须要配合锁一起使用,因为对共享状态变量的访问发生在多线程环境下。一个Condition的实例必须与一个Lock绑定,因此Condition一般都是作为Lock的内部实现。 await() :造成当前线程在接到信号或被中断之前一直处于等待状态。 await(long time, TimeUnit unit) :造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态 awaitNanos(long nanosTimeout) :造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。返回值表示剩余时间,如果在nanosTimesout之前唤醒,那么返回值 = nanosTimeout - 消耗时间,如果返回值 <= 0 ,则可以认定它已经超时了。 awaitUninterruptibly() :造成当前线程在接到信号之前一直处于等待状态。【注意:该方法对中断不敏感】。 awaitUntil(Date deadline) :造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。如果没有到指定时间就被通知,则返回true,否则表示到了指定时间,返回返回false。 signal() :唤醒一个等待线程。该线程从等待方法返回前必须获得与Condition相关的锁。 signal()All :唤醒所有等待线程。能够从等待方法返回的线程必须获得与Condition相关的锁。

一 condition实现分析:

Condition接口包含了多种await方式和两个通知方法 ConditionObject实现了Condition接口,是AbstractQueuedSynchronizer的内部类(因为Condition的操作都需要获取想关联的锁) Reentrantlock的newCondition方法返回与某个lock实例相关的Condition对象 public abstract class AbstractQueuedLongSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { 结合上面的类图,我们看到condition实现是依赖于aqs,而aqs是个抽象类。里面定义了同步器的基本框架,实现了基本的结构功能。只留有状态条件的维护由具体同步器根据具体场景来定制,如常见的 ReentrantLock 、 RetrantReadWriteLock和CountDownLatch 等等,

1.1 等待队列

Condition是AQS的内部类。每个Condition对象都包含一个队列(等待队列)。等待队列是一个FIFO的队列,在队列中的每个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程,如果一个线程调用了Condition.await()方法,那么该线程将会释放锁、构造成节点加入等待队列并进入等待状态。AQS有一个同步队列和多个等待队列,节点都是Node。等待队列的基本结构如下所示。 等待分为首节点和尾节点。当一个线程调用Condition.await()方法,将会以当前线程构造节点,并将节点从尾部加入等待队列。新增节点就是将尾部节点指向新增的节点。节点引用更新本来就是在获取锁以后的操作,所以不需要CAS保证。同时也是线程安全的操作。 当线程调用了Condition的await()方法以后。线程就作为队列中的一个节点被加入到等待队列中去了。同时会释放锁的拥有。当从await方法返回的时候。当前线程一定会获取condition相关联的锁。

1.2 等待

如果从队列(同步队列和等待队列)的角度去看await()方法,当调用await()方法时,相当于同步队列的首节点(获取锁的节点)移动到Condition的等待队列中。 调用该方法的线程成功的获取锁的线程,也就是同步队列的首节点,该方法会将当前线程构造成节点并加入到等待队列中,然后释放同步状态,唤醒同步队列中的后继节点,然后当前线程会进入等待状态。 当等待队列中的节点被唤醒的时候,则唤醒节点的线程开始尝试获取同步状态。如果不是通过 其他线程调用Condition.signal()方法唤醒,而是对等待线程进行中断,则会抛出InterruptedException异常信息。

1.3 通知

调用Condition的signal()方法,将会唤醒在等待队列中等待最长时间的节点(条件队列里的首节点),在唤醒节点前,会将节点移到同步队列中。当前线程加入到等待队列中如图所示: 回到上面的demo,锁被释放后,线程Consumer开始沉睡,这个时候线程因为线程Consumer沉睡时,会唤醒AQS队列中的头结点,所所以线程Producer会开始竞争锁,并获取到,执行完后线程Producer会调用signal方法,“发出”signal信号

在调用signal()方法之前必须先判断是否获取到了锁(isHeldExclusively方法)。接着获取等待队列的首节点,将其移动到同步队列并且利用LockSupport唤醒节点中的线程。 被唤醒的线程将从await方法中的while循环中退出( while (!isOnSyncQueue(node)) { 方法返回true,节点已经在同步队列中)。随后调用同步器的acquireQueued()方法加入到同步状态的竞争当中去。成功获取到竞争的线程从先前调用await方法返回,此时该线程已经成功获取了锁。 AQS的同步队列与Condition的等待队列,两个队列的作用是不同,事实上,每个线程也仅仅会同时存在以上两个队列中的一个,流程是这样的: 注意: 1.线程producer调用signal方法,这个时候Condition的等待队列中只有线程Consumer一个节点,于是它被取出来,并被加入到AQS的等待队列中。 注意,这个时候,线程Consumer 并没有被唤醒。 2.Sync是AQS的抽象子类,实现可重入和互斥的大部分功能。在Sync的子类中有FairSync和NonfairSync两种代表公平锁策略和非公平锁策略。Sync lock方法留给子类去实现,NonfairSync的实现:

上面是理论概述下面是使用基本步骤

1、有顺序通知,需要有标识位 2、有一个锁Lock,3把钥匙Condition 3、判断标志位 4、输出线程名+第几次+第几轮 5、修改标志位,通知下一个

实战案例
/**
 *
 * @Description:
 * 多线程之间按顺序调用,实现A->B->C
 * 三个线程启动,要求如下:
 *
 * AA打印5次,BB打印10次,CC打印15次
 * 接着
 * AA打印5次,BB打印10次,CC打印15次
 * ......来1轮
 *
 */
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class ShareResource
{
    private int number = 1;//1:A 2:B 3:C
    private Lock lock = new ReentrantLock();
    private Condition c1 = lock.newCondition();
    private Condition c2 = lock.newCondition();
    private Condition c3 = lock.newCondition();
    public void print5(int totalLoopNumber)
    {
        lock.lock();
        try
        {
            //1 判断
            while(number != 1)
            {
                //A 就要停止
                c1.await();
            }
            //2 干活
            for (int i = 1; i<=5;i++)
            {
                System.out.println(Thread.currentThread().getName()+"\t"+i+"\t totalLoopNumber: "+totalLoopNumber);
            }
            //3 通知
            number = 2;
            c2.signal();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
    public void print10(int totalLoopNumber)
    {
        lock.lock();
        try
        {
            //1 判断
            while(number != 2)
            {
                //A 就要停止
                c2.await();
            }
            //2 干活
            for (int i = 1; i<=10;i++)
            {
                System.out.println(Thread.currentThread().getName()+"\t"+i+"\t totalLoopNumber: "+totalLoopNumber);
            }
            //3 通知
            number = 3;
            c3.signal();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
    public void print15(int totalLoopNumber)
    {
        lock.lock();
        try
        {
            //1 判断
            while(number != 3)
            {
                //A 就要停止
                c3.await();
            }
            //2 干活
            for (int i = 1; i<=15;i++)
            {
                System.out.println(Thread.currentThread().getName()+"\t"+i+"\t totalLoopNumber: "+totalLoopNumber);
            }
            //3 通知
            number = 1;
            c1.signal();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

public class ThreadOrderAccess
{
    public static void main(String[] args)
    {
        ShareResource sr = new ShareResource();
        new Thread(() -> {
            for (int i = 1; i<=1;i++)
            {
                sr.print5(i);
            }
        }, "AA").start();
        new Thread(() -> {
            for (int i = 1; i<=1;i++)
            {
                sr.print10(i);
            }
        }, "BB").start();
        new Thread(() -> {
            for (int i = 1; i<=1;i++)
            {
                sr.print15(i);
            }
        }, "CC").start();
    }
}