学习目标:解决多个线程的数据同步问题、以及线程之间的通信。
多线程的数据同步问题
一个程序中的多个线程可以看成是并发执行的,大多数线程在运行过程中需要读写数据。如果两个或两个以上的线程读写同一数据,可能就会造成数据混乱的问题。
例如电商系统就是多线程应用,每个消费购买商品的过程对电商系统来说都是启动了一个商品购买线程,每个商品购买线程都需要读取和修改商品的库存。
当消费者A准备要购买某一商品时,线程A会读取该商品的库存,假设当前商品库存量为1,线程A会给消费者显示库存1。此时消费者B的线程进入运行,消费者B付款完成,线程B修改商品的库存,这是商品的库存量为0,线程B执行完成后,线程A进入运行,因为此前线程A已读取了商品库存(库存为1,但实际已经为0),线程A会引导消费者进入付款流程,消费者A付款后,就会导致消费者A已付款但商品无货的问题。
消费者已付款但商品无货问题的主要原因是多个商品购买线程都会修改库存量,因为线程是并发执行的,很容易造成商品库存量显示和实际库存量不一致的问题。这类问题也称为多线程数据同步问题,多个线程读取和修改的同一数据也称为线程间的共享数据。共享数据的同步问题主要是发生在共享数据的修改上,如果仅是读取共享数据,就不会造成数据同步问题。
案例6:建立Commodity商品类,在商品类定义一个静态变量stock,用于存储商品库存量。建立BuyThread商品购买线程类,在该类run()方法中完成商品库存量的读取与修改。最后建立ThreadCommodityTest测试类,验证多个线程读写同一数据造成的数据同步问题。
在PUnit12项目新建synchro包,在synchro包下新建Commodity类。代码如下:
package synchro;
public class Commodity {
// 商品库存量
static int stock = 2;
public static int getStock() {
return stock;
}
public static void setStock(int stock) {
Commodity.stock = stock;
}
}
Commodity商品类在内部定义了一个静态变量stock,该变量存储商品的库存,商品库存默认为2,并提供了读取和写入商品库存的方法。
在synchro包下新建BuyThread商品购买线程类。代码如下:
package synchro;
public class BuyThread extends Thread{
String name;
public BuyThread(String name)
{
super(name);
this.name = name;
}
@Override
public void run() {
// TODO Auto-generated method stub
super.run();
// 读取库存
System.out.printf("%s读取的库存为:%d\n",name,Commodity.getStock());
System.out.printf("%s当前库存为:%d\n",name,Commodity.getStock());
System.out.printf("%s付款完成\n",name);
int stock = Commodity.getStock() - 1;
Commodity.setStock(stock);
}
}
BuyThread商品购买线程类在内部定义了变量name,用于存储线程名称。在run()方法内部输出用于显示的库存和付款之前的库存,付款完成后修改库存。
在synchro包下新建ThreadCommodityTest测试类。代码如下:
package synchro;
public class ThreadCommodityTest {
public static void main(String[] args) {
// 实例化四个BuyThread线程对象
BuyThread threadA = new BuyThread("消费者A");
BuyThread threadB = new BuyThread("消费者B");
BuyThread threadC = new BuyThread("消费者C");
BuyThread threadD = new BuyThread("消费者D");
// 启动线程
threadA.start();
threadB.start();
threadC.start();
threadD.start();
}
}
ThreadCommodityTest程序实例化四个BuyThread线程对象,并启动线程。
程序执行结果如下图所示:

从执行结果可以看出,消费者C用于显示的库存为2,但在付款前库存已为0,如果此时消费者付款,就会出现已付款但无货的问题。
线程间保持数据同步
为了解决多个线程修改同一数据而发生数据不同步的问题,Java提供了synchronized关键字来进行数据的同步,以保证数据的安全。synchronized关键字对共享数据的同步有两种方式,一种是用于修饰代码块,一种是用于修饰方法。
修饰代码块是把线程体内执行的方法中会涉及到修改共享数据时的操作,通过{}封装起来,然后用synchronized关键字修饰这个代码块。使用方法如下:
synchronized (Object) {
//同步数据代码块
}
上面代码中的Object是指包含同步数据的对象,同步数据代码块一般是操作共享数据的代码。
synchronized同步数据的原理是Object对象会设置一个标志位,该标志位有0和1两个值,0表示有线程在使用Object对象的数据,1表示无线程使用Object对象的数据。当线程运行到synchronized关键字修饰的代码块时,JVM会检查Object的标志位,如果标志位为0,JVM会让该线程进入就绪状态,直至Object对象的标志位被设置为0后,JVM再调度该线程进入运行状态。
修饰方法就是在方法的声明中添加synchronized关键字,语法如下:
synchronized void method()
{
// 代码块
}
修饰方法同步的数据是调用该方法的对象内的数据,该方法需要在线程内的run()方法内调用。
案例7:建立CommoditySync商品类,该类定义了一个成员变量stock,用于存储商品库存量。建立BuyThreadSync商品购买线程类,在该类run()方法中完成商品库存量的读取与修改,并使用synchronized关键字同步CommoditySync对象的stock数据。最后建立BuyThreadSyncTest测试类,验证synchronized关键字同步数据的效果。
在synchro包下新建CommoditySync商品类。代码如下:
package synchro;
public class CommoditySync {
// 商品库存量
int stock = 2;
public int getStock() {
return this.stock;
}
public void setStock(int stock) {
this.stock = stock;
}
}
CommoditySync类在内部定义了成员变量stock,用于存储商品库存。在这里没有采用静态变量,是因为synchronized关键字同步的是实例对象的数据。
在synchro包下新建BuyThreadSync商品购买线程类。代码如下:
package synchro;
public class BuyThreadSync extends Thread{
// 线程名称
String name;
CommoditySync commodity;
public BuyThreadSync(String name)
{
super(name);
this.name = name;
}
@Override
public void run() {
// TODO Auto-generated method stub
super.run();
// 使用synchronized关键字保持commodity对象的stock同步
synchronized (commodity)
{
// 消费者下订单显示库存
System.out.printf("%s读取的库存为:%d\n", name, commodity.getStock());
// 消费者付款前库存
System.out.printf("%s当前库存为:%d\n", name, commodity.getStock());
System.out.printf("%s付款完成\n", name);
// 付款完成后,修改库存
int stock = commodity.getStock() - 1;
commodity.setStock(stock);
}
}
public CommoditySync getCommodity() {
return commodity;
}
public void setCommodity(CommoditySync commodity) {
this.commodity = commodity;
}
}
CommoditySync类内部定义了成员变量commodity,用于存储CommoditySync类的实例化对象,并使用到synchronized关键字上,同步commodity对象的stock数据。读取和修改commodity对象stock数据的代码被放置在使用synchronized关键字修饰的代码块内。
在synchro包下新建BuyThreadSyncTest测试类。代码如下:
package synchro;
public class BuyThreadSyncTest {
public static void main(String[] args) {
// 实例化CommoditySync对象
CommoditySync commodity = new CommoditySync();
// 实例化四个BuyThread线程对象
BuyThreadSync threadA = new BuyThreadSync("消费者A");
threadA.setCommodity(commodity);
BuyThreadSync threadB = new BuyThreadSync("消费者B");
threadB.setCommodity(commodity);
BuyThreadSync threadC = new BuyThreadSync("消费者C");
threadC.setCommodity(commodity);
BuyThreadSync threadD = new BuyThreadSync("消费者D");
threadD.setCommodity(commodity);
// 启动线程
threadA.start();
threadB.start();
threadC.start();
threadD.start();
}
}
BuyThreadSyncTest程序实例化CommoditySync对象,并赋值给实例化BuyThreadSync对象的commodity成员变量,BuyThreadSync对象内部使用synchronized关键字修饰的代码块,对CommoditySync 实例对象的commodity成员变量进行数据同步。
程序执行结果如下图所示:

从程序执行结果可以看出,每个线程订单显示的库存量和付款前的库存量完全相同。消费者B线程出现-1的状况是因为程序没有对库存量判断。在实际编程时,应对商品库存量做出判断,针对不同的商品库存量做出不同的响应处理。
其数据同步原理是当消费者A线程进入同步代码块后,JVM会设置synchronized关键字后面的commodity对象标志位为0,此时commodity对象被锁定,如果其它线程需要操作该对象,只能进入就绪状态,等待该对象解锁。消费者A线程执行完成后,JVM会设置synchronized关键字后面的commodity对象标志位为1,此时commodity对象被解锁,等待该对象解锁并进入就绪状态的线程可以操作该对象了。
线程间的通信
线程间进行通信是多线程编程经常遇到的情况,线程间通信的目的是使线程间能够互相发送信号。例如线程B可以等待线程A的一个信号,这个信号会通知线程B,A线程已经把数据准备好了。
Java.lang.object类提供了wati()、notify()、notifyAll()方法用于线程间的通信,它们与synchronized关键字结合使用,可以建立很多优秀的数据同步模型。当synchronized修饰的方法或代码块中的wati()方法被调用时,当前线程将被中断运行,并且放弃该对象的锁。
当另外的线程执行了某个对象的notify()方法后,会唤醒在此对象等待池中的某个线程,使之成为可运行的线程。notifyAll()方法会唤醒所有等待这个对象的线程,使之成为可运行的线程。
下面来看一个比较经典的问题:生产者(Producer)和消费者(Consumer)问题。这个问题的解决就是通过灵活使用wati()、notify()、notifyAll()方法来实现的。
案例8:生产者将产品交给店铺,消费者从店铺取走产品,店铺一次只能存储固定数量的产品,如果生产者生产了过多产品,店铺会让生产者等一下,如果店中有空间存储产品了,再通知生产者继续生产;如果店中没有产品了,店铺会告诉消费者等一下,如果店中有产品了,再通知消费者来取走产品。
要实现上述案例要求,我们需要定义一个生产者线程类和消费者线程类。再建立一个全局数组作为存储产品的缓冲区。其控制过程是,生产者向缓冲区存入产品,消费者从缓冲区取走产品。当缓冲区满时,生产者必须阻塞,等待消费者取走产品后将其唤醒。当缓冲区空时,消费者被阻塞,等待生产者生产了产品后将其唤醒。
在PUnit12项目新建producer包,在producer包下新建Product类。代码如下:
package produce;
public class Product {
// 产品id
int id;
// 构造函数
public Product(int id)
{
this.id = id;
}
// 重写父类的toString()方法
public String toString()
{
return "Product" + id;
}
}
在Product产品类内部定义了一个产品标识的id成员变量,另外要在生产或消费时打印产品的详细内容,因此重写了父类的toString()方法。
在producer包下新建Shop店铺类。代码如下:
package produce;
public class Shop {
// 定义存储产品的数组
Product[] pro = new Product[10];
// 产品数组的索引,默认为0
int nIndex = 0;
/**
* @Title: addProduct
* @Description: 该方法用synchronized关键字修饰
* 生产者将产品交给店铺
* 如果产品已满,店铺通知生产者停止生产
* wait方法会导致调用该方法的线程进入等待
* @param @param pd 参数
* @return void 返回类型
* @throws
*/
public synchronized void addProduct(Product pd) {
while (nIndex == pro.length) {
try {
// 产品已满,稍后生产
System.out.println("产品已满,生产者停止生产");
this.wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
// 通知等待区的消费者可以取产品了,唤醒等待的消费者线程
this.notify();
// 产品添加到数组
pro[nIndex] = pd;
this.nIndex++;
}
/**
* @Title: getProduct
* @Description: 消费者取走产品
* 该方法用synchronized关键字修饰
* wait方法会导致调用该方法的线程进入等待
* @param @return 参数
* @return Product 返回类型
* @throws
*/
public synchronized Product getProduct() {
while (nIndex == 0) {
try {
this.wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
// 唤醒等待的生产者线程
this.notify();
this.nIndex--;
return pro[nIndex];
}
}
店铺一次只能持有10份产品,如果生产者生产的产品多余10分,则会让当前正在此对象上操作的线程等待。一个线程访问addProduct方法时,它已经拿到这个锁了,当遇到产品大于10份时,它会阻赛。如果没有大于10份,则继续生产产品,并且调用notify方法,叫醒一个正在当前这个对象上等待的线程。这里请注意,notify和wait一般是一一对应的。
在producer包下新建Producer生产者类。代码如下:
package produce;
public class Producer extends Thread{
// 定义shop成员变量,存储shop实例对象引用
Shop shop;
public Producer(Shop inShop) {
this.shop = inShop;
}
@Override
public void run() {
// TODO Auto-generated method stub
// 生产产品,生产数量要大于店员能够存储的数量
for (int i = 0; i < 15; i++) {
Product pro = new Product(i);
shop.addProduct(pro);
System.out.println("生产了:" + pro);
// 模拟生产一件产品花费的时间
try {
Thread.sleep((long) (Math.random() * 1000));
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
Producer为生产者线程类,生产者负责生产产品,每生产完一个产品,调用Thread类的sleep方法休眠一段时间,模拟生产过程。
在producer包下新建Consumer消费者类。代码如下:
package produce;
public class Consumer extends Thread{
// 定义shop成员变量,存储shop实例对象引用
Shop shop;
public Consumer(Shop inShop) {
this.shop = inShop;
}
@Override
public void run() {
for (int i = 0; i < 15; i++) {
Product pd = shop.getProduct();
System.out.println("消费了:" + pd);
// 模拟消费一件产品要花费的时间
try {
Thread.sleep((long) (Math.random() * 1000));
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
Consumer为消费者线程类,消费者负责消费产品,消费者每消费完一个产品,调用Thread类的sleep方法休眠一段时间,模拟消费过程。
在producer包下新建MainTest主线程类。代码如下:
package produce;
public class MainTest {
public static void main(String[] args) {
// 实例化Shop对象
Shop shop = new Shop();
// 创建生产者线程
Thread t1 = new Thread(new Producer(shop));
t1.start();
// 创建消费者线程
Thread t2 = new Thread(new Consumer(shop));
t2.start();
}
}
主线程类首先实例化Shop对象,然后分别实例化和启动生产者线程和消费者线程。
程序执行结果如下图所示:

案例有两个线程类,分别是Producer类(生产者)和Consumer类(消费者),Shop类(店铺)负责产品的购进(存储产品的数量有限)与销售,因此Shop类需要协调Producer类和Consumer类,当Producer类生产过多产品时,Shop类需要调用wait方法让Producer类不要再继续生产,同时调用notify方法通知消费者来取走产品(如果有正在等待产品的消费者)。反之亦然,当无产品可取时,Shop类需要调用wait方法让Consumer类等待产品,同时调用notify方法通知生产者继续生产产品。