`

j2se------多线程--队与信号量

    博客分类:
  • J2SE
阅读更多
下面介绍的是对wait()和notify()以及synchronized()的封装


等待队列的等待和唤醒机制与常用的信号量机制有些相似之处。没错,实际上信号量机制是等待队列机制是同步机制在更高层次上的封装和改良,因为信号量的操作在底层实际上就是通过等待队列的同步机制实现。只不过在在使用信号量时,我们通常强调是资源共享,因而把信号量预先设置为或者,表示资源可用与否。而使用等待队列则意味着,我们在执行某个操作时,强调某个条件满足与否,从而决定当前进程的休眠


1。BlockingQueue

阻塞队列提供了可以阻塞的put和take方法,以及与之等价的可以指定超时的offer和 poll。如果Queue是空的,那么take方法会一直阻塞,直到有元素可用。如果Queue是有线长度的,队列满的时候put方法也会阻塞。 BlockingQueue可以很好的支持生产者和消费者模式,生产者往队列里put,消费者从队列里get,两者能够得好很好的同步。 BlockingQueue的实现类LinkedBlockingQueue和ArrayBlockingQueue是FIFO队列,PriorityBlockingQueue是一个按优先级排序的队列。使用BlockingQueue构建的一个生产者与消费例子:

消费者:
public class Consumer implements Runnable {

	private BlockingQueue<Food> queue;
	private ExecutorService exec;

	public Consumer(BlockingQueue<Food> queue, ExecutorService exec) {
		this.queue = queue;
		this.exec = exec;
	}

	@Override
	public void run() {
		while (!exec.isShutdown()) {
			try {
				Thread.sleep(2000);
				Food food = queue.take();
				System.out.println("Consumer " + food);
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (RejectedExecutionException e) {

			}
		}
	}
}
生产者:
public class Producer implements Runnable {

	private BlockingQueue<Food> queue;
	private ExecutorService exec;

	public Producer(BlockingQueue<Food> queue, ExecutorService exec) {
		this.queue = queue;
		this.exec = exec;
	}

	@Override
	public void run() {
		while (!exec.isShutdown()) {
			Food food = new Food();
			try {
				Thread.sleep(4000);
				queue.put(food);
				System.out.println("Produce " + food);
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (RejectedExecutionException e) {

			}
		}
	}
}
Main:
BlockingQueue<Food> queue = new ArrayBlockingQueue<Food>(5);
ExecutorService exec = Executors.newFixedThreadPool(3);
Producer p1 = new Producer(queue, exec);
Producer p2 = new Producer(queue, exec);

Consumer c1 = new Consumer(queue, exec);
	
exec.execute(p1);
exec.execute(p2);
exec.execute(c1);
try {
	Thread.sleep(10000);
} catch (InterruptedException ignored) {
}
exec.shutdown();

2。Semaphore,信号量
使用信号量进行同步和互斥的控制是最经典的并发模型,java中也提高支持。一个Semaphore管理一个有效的许可 集,许可基的数量通过构造函数传入,通过acquire方法申请一个许可,许可数为0则阻塞线程,否则许可数减一,使用release方法释放一个许个,许可数加一。一个技术量为1的Semaphore为二元信号量,相当于一个互斥锁,表示不可重入的锁。一个使用信号量控制并发容器上届的例子:
public class BoundedHashSet<T> {
	private final Set<T> set;
	private final Semaphore sem;

	public BoundedHashSet(int bound) {
		set = Collections.synchronizedSet(new HashSet<T>());
		sem = new Semaphore(bound);
	}

	public boolean add(T o) throws InterruptedException {
		sem.acquire();
		boolean wasAdded = false;
		try {
			wasAdded = set.add(o);
			return wasAdded;
		} finally {
			if (!wasAdded)
				sem.release();
		}
	}

	public boolean remove(Object o) {
		boolean wasRemoved = set.remove(o);
		if (wasRemoved)
			sem.release();
		return wasRemoved;
	}
}




分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics