多线程
4-线程同步-生产者和消费者简易模型
2021-07-07 602 0
简介 线程之间的通讯、简单的生产者和消费者模型的实现
1. 线程的通信
两个线程交替消费10条消息,并打印
涉及到的三个方法:
wait():一旦执行此方法,当前线程就进入阻塞状态,并释放锁(同步监视器)。
notify():一旦执行此方法,就会唤醒被wait的一个线程。如果有多个线程被wait,就唤醒优先级高的那个。
notifyAll():一旦执行此方法,就会唤醒所有被wait的线程。
说明:
1.wait(),notify(),notifyAll()三个方法必须使用在同步代码块或同步方法中。
2.wait(),notify(),notifyAll()三个方法的调用者必须是同步代码块或同步方法中的同步监视器。(都写为obj或都写为this)
否则,会出现IllegalMonitorStateException异常
3.wait(),notify(),notifyAll()三个方法是定义在java.lang.Object类中。
//WaitNotifyTest.java
package
com.ylaihui.thread1;
class
ProcMsg
implements
Runnable{
private
static
int
message =
10
;
private
Object obj =
new
Object();
@Override
public
void
run() {
while
(
true
){
//wait(),notify(),notifyAll()三个方法的调用者必须是同步代码块或同步方法中的同步监视器
// (都写为obj或都写为this) 否则报异常 IllegalMonitorStateException
//synchronized (obj){ // IllegalMonitorStateException
synchronized
(
this
){
// 一旦执行此方法,就会唤醒被wait的一个线程。如果有多个线程被wait,就唤醒优先级高的那个。
this
.notify();
if
(message >
0
)
{
System.out.println(Thread.currentThread().getName() +
":"
+ message);
message--;
try
{
// 一旦执行此方法,当前线程就进入阻塞状态,并释放锁(同步监视器)。
this
.wait();
}
catch
(InterruptedException e) {
e.printStackTrace();
}
}
else
break
;
}
}
}
}
public
class
WaitNotifyTest {
public
static
void
main(String[] args) {
ProcMsg procMsg =
new
ProcMsg();
Thread t1 =
new
Thread(procMsg);
Thread t2 =
new
Thread(procMsg);
t1.start();
t2.start();
}
}
2. sleep和wait的区别
1.相同点:一旦执行方法,都可以使得当前的线程进入阻塞状态。
2.不同点:1)两个方法声明的位置不同:Thread类中声明sleep() , Object类中声明wait()
2)调用的要求不同:sleep()可以在任何需要的场景下调用。 wait()必须使用在同步代码块或同步方法中
3)关于是否释放同步监视器:如果两个方法都使用在同步代码块或同步方法中,sleep()不会释放锁,wait()会释放锁。
3. 生产者消费者模型
线程通信的应用: 生产者/消费者问题
生产者(Productor)将接收到的消息放入缓冲区(MsgQueue),而消费者(Customer)从缓冲区取出消息进行消费,
缓冲区只能存储固定数量的消息,比如20条消息,
如果生产者试图放入多余20条消息,缓冲区满,会停止生产者生产消息。
如果缓冲区有空位放消息,那么通知生产者继续生产;
如果缓冲区没有消息,那么消费者会等待,如果缓冲区有消息了,再通知消费者消费消息。
分析:
1. 是否是多线程问题?是,生产者线程,消费者线程
2. 是否有共享数据?是,缓冲区的消息
3. 如何解决线程的安全问题?同步机制,有三种方法(同步代码块、同步方法、Lock的方式)
4. 是否涉及线程的通信?是
//ProducterConsumerTest.java
package
com.ylaihui.thread1;
class
MsgQueue {
private
int
msgcount =
0
;
public
final
static
int
MSG_MAX =
200
;
MsgQueue(){}
public
int
getMsgcount(){
return
this
.msgcount;
}
public
void
addCount(){
this
.msgcount++;
}
public
void
reduceCount(){
this
.msgcount--;
}
}
class
Producter
implements
Runnable{
private
MsgQueue msgqueue;
Producter(){}
Producter(MsgQueue msgqueue){
this
.msgqueue = msgqueue;
}
@Override
public
void
run() {
while
(
true
){
synchronized
(msgqueue){
if
(msgqueue.getMsgcount() < MsgQueue.MSG_MAX){
msgqueue.addCount();
msgqueue.notify();
System.out.println(
"生产产品"
+ msgqueue.getMsgcount());
}
else
{
try
{
msgqueue.wait();
}
catch
(InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
class
Consumer
implements
Runnable{
private
MsgQueue msgqueue;
Consumer(){}
Consumer(MsgQueue msgqueue){
this
.msgqueue = msgqueue;
}
@Override
public
void
run() {
while
(
true
){
synchronized
(msgqueue){
if
(msgqueue.getMsgcount() >
0
) {
System.out.println(
"消费产品"
+ msgqueue.getMsgcount());
msgqueue.reduceCount();
msgqueue.notify();
}
else
{
try
{
msgqueue.wait();
}
catch
(InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
public
class
ProducterConsumerTest {
public
static
void
main(String[] args) {
// init the message queue
MsgQueue msgq =
new
MsgQueue();
Producter p =
new
Producter(msgq);
Consumer c =
new
Consumer(msgq);
Thread t1 =
new
Thread(p);
Thread t2 =
new
Thread(c);
t1.start();
t2.start();
}
}