net.sf.jabb.util.thread
Class QueueConsumerGroup<E>

java.lang.Object
  extended by net.sf.jabb.util.thread.QueueConsumerGroup<E>
Type Parameters:
E - Type of the data in the queue.
队列中数据的类型

public class QueueConsumerGroup<E>
extends Object

A group of QueueConsumer(s) that work on on the same queue simultaneously.
一批并行处理同一个队列的QueueConsumer。

One working thread will be created for each QueueConsumer when necessary.

每个QueueConsumer相应的会有一个工作线程在需要的时候被创建。

Author:
Zhengmao HU (James)

Field Summary
protected  Map<String,QueueConsumer<E>> consumers
           
protected  BlockingQueue<E> queue
           
protected  ExecutorService threadPool
           
 
Constructor Summary
protected QueueConsumerGroup(BlockingQueue<E> workQueue)
          Internal constructor, without specifying thread pool.
(内部用)创建实例,不指定统一的线程池。
  QueueConsumerGroup(BlockingQueue<E> workQueue, Collection<? extends QueueConsumer<E>> queueConsumers)
          Constructor, without specifying thread pool.
创建实例,不指定统一的线程池。
protected QueueConsumerGroup(BlockingQueue<E> workQueue, ExecutorService executorService)
          Internal constructor, specifying one thread pool for all QueueConsumers to use.
(内部用)创建实例,让所有的QueueConsumer统一使用指定的线程池。
  QueueConsumerGroup(BlockingQueue<E> workQueue, ExecutorService executorService, Collection<? extends QueueConsumer<E>> queueConsumers)
          Constructor, specifying one thread pool for all QueueConsumers to use.
创建实例,让所有的QueueConsumer统一使用指定的线程池。
  QueueConsumerGroup(BlockingQueue<E> workQueue, ExecutorService executorService, QueueConsumer<E>... queueConsumers)
          Constructor, specifying one thread pool for all QueueConsumers to use.
创建实例,让所有的QueueConsumer统一使用指定的线程池。
  QueueConsumerGroup(BlockingQueue<E> workQueue, QueueConsumer<E>... queueConsumers)
          Constructor, without specifying thread pool.
创建实例,不指定统一的线程池。
protected QueueConsumerGroup(int workQueueSize)
          Internal constructor, without specifying thread pool.
(内部用)创建实例,不指定统一的线程池。
  QueueConsumerGroup(int workQueueSize, Collection<? extends QueueConsumer<E>> queueConsumers)
          Constructor, without specifying thread pool.
创建实例,不指定统一的线程池。
protected QueueConsumerGroup(int workQueueSize, ExecutorService executorService)
          Internal constructor, specifying one thread pool for all QueueConsumers to use.
(内部用)创建实例,让所有的QueueConsumer统一使用指定的线程池。
  QueueConsumerGroup(int workQueueSize, ExecutorService executorService, Collection<? extends QueueConsumer<E>> queueConsumers)
          Constructor, specifying one thread pool for all QueueConsumers to use.
创建实例,让所有的QueueConsumer统一使用指定的线程池。
  QueueConsumerGroup(int workQueueSize, ExecutorService executorService, QueueConsumer<E>... queueConsumers)
          Constructor, specifying one thread pool for all QueueConsumers to use.
创建实例,让所有的QueueConsumer统一使用指定的线程池。
  QueueConsumerGroup(int workQueueSize, QueueConsumer<E>... queueConsumers)
          Constructor, without specifying thread pool.
创建实例,不指定统一的线程池。
 
Method Summary
 QueueConsumer<E> getConsumer(String name)
          Get QueueConsumer instance by its name.
按名称寻找得到QueueConsumer。
 Map<String,QueueConsumer<E>> getConsumers()
          Get the Map of all QueueConsumer.
获得含有全部QueueConsumer的Map。
 BlockingQueue<E> getQueue()
          Get the work queue.
取得工作队列。
 void queue(E obj)
          Put data into the queue for processing, if the queue still has space this method will return immediately without waiting for the data to be actually processed.
把待处理数据放入队列,如果队列中还有空位置则这个方法会立即返回而不是等待实际处理完成。
 void start()
          Start all QueueConsumer(s) one by one.
逐个启动所有Consumer。
 void stop()
          Stop working threads after the queue is empty; This method will not return until working thread finishes.
让所有处理线程在队列处理空了之后停止,这个方法会等到所有工作处理线程结束才返回。
 void stop(boolean afterQueueEmpty)
          Stop all the working threads one by one; This method will not return until all threads are stopped.
逐个停止所工作线程,这个方法会等到所有工作线程结束才返回。
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

queue

protected BlockingQueue<E> queue

consumers

protected Map<String,QueueConsumer<E>> consumers

threadPool

protected ExecutorService threadPool
Constructor Detail

QueueConsumerGroup

protected QueueConsumerGroup(BlockingQueue<E> workQueue)
Internal constructor, without specifying thread pool.
(内部用)创建实例,不指定统一的线程池。

Parameters:
workQueue - The queue that data for processing will be fetched from.
本实例将从这个队列取得待处理数据。

QueueConsumerGroup

protected QueueConsumerGroup(int workQueueSize)
Internal constructor, without specifying thread pool.
(内部用)创建实例,不指定统一的线程池。

Parameters:
workQueueSize - Size of the ArrayBlockingQueue to be created from which data for processing will be fetched.
将被创建的ArrayBlockingQueue队列的大小,本实例将从这个队列取得待处理数据。

QueueConsumerGroup

protected QueueConsumerGroup(BlockingQueue<E> workQueue,
                             ExecutorService executorService)
Internal constructor, specifying one thread pool for all QueueConsumers to use.
(内部用)创建实例,让所有的QueueConsumer统一使用指定的线程池。

Parameters:
workQueue - The queue that data for processing will be fetched from.
本实例将从这个队列取得待处理数据。
executorService - Thread pool that working threads will be get from.
指定让本实例从这里获得所有工作线程。

QueueConsumerGroup

protected QueueConsumerGroup(int workQueueSize,
                             ExecutorService executorService)
Internal constructor, specifying one thread pool for all QueueConsumers to use.
(内部用)创建实例,让所有的QueueConsumer统一使用指定的线程池。

Parameters:
workQueueSize - Size of the ArrayBlockingQueue to be created from which data for processing will be fetched.
将被创建的ArrayBlockingQueue队列的大小,本实例将从这个队列取得待处理数据。
executorService - Thread pool that working threads will be get from.
指定让本实例从这里获得所有工作线程。

QueueConsumerGroup

public QueueConsumerGroup(BlockingQueue<E> workQueue,
                          ExecutorService executorService,
                          QueueConsumer<E>... queueConsumers)
Constructor, specifying one thread pool for all QueueConsumers to use.
创建实例,让所有的QueueConsumer统一使用指定的线程池。

Duplicated names of QueueConsumer(s) will be renamed automatically when adding to this QueueConsumerGroup.

当被加入这个QueueConsumerGroup的时候,QueueConsumer如果有名称重复,会被自动改名。

Parameters:
workQueue - The queue that data for processing will be fetched from.
本实例将从这个队列取得待处理数据。
executorService - Thread pool that working threads will be get from.
指定让本实例从这里获得所有工作线程。
queueConsumers - QueueConsumer(s) that will work together.
会一起工作的QueueConsumer。

QueueConsumerGroup

public QueueConsumerGroup(int workQueueSize,
                          ExecutorService executorService,
                          QueueConsumer<E>... queueConsumers)
Constructor, specifying one thread pool for all QueueConsumers to use.
创建实例,让所有的QueueConsumer统一使用指定的线程池。

Duplicated names of QueueConsumer(s) will be renamed automatically when adding to this QueueConsumerGroup.

当被加入这个QueueConsumerGroup的时候,QueueConsumer如果有名称重复,会被自动改名。

Parameters:
workQueueSize - Size of the ArrayBlockingQueue to be created from which data for processing will be fetched.
将被创建的ArrayBlockingQueue队列的大小,本实例将从这个队列取得待处理数据。
executorService - Thread pool that working threads will be get from.
指定让本实例从这里获得所有工作线程。
queueConsumers - QueueConsumer(s) that will work together.
会一起工作的QueueConsumer。

QueueConsumerGroup

public QueueConsumerGroup(BlockingQueue<E> workQueue,
                          QueueConsumer<E>... queueConsumers)
Constructor, without specifying thread pool.
创建实例,不指定统一的线程池。

Duplicated names of QueueConsumer(s) will be renamed automatically when adding to this QueueConsumerGroup.

当被加入这个QueueConsumerGroup的时候,QueueConsumer如果有名称重复,会被自动改名。

Parameters:
workQueue - The queue that data for processing will be fetched from.
本实例将从这个队列取得待处理数据。
queueConsumers - QueueConsumer(s) that will work together.
会一起工作的QueueConsumer。

QueueConsumerGroup

public QueueConsumerGroup(int workQueueSize,
                          QueueConsumer<E>... queueConsumers)
Constructor, without specifying thread pool.
创建实例,不指定统一的线程池。

Duplicated names of QueueConsumer(s) will be renamed automatically when adding to this QueueConsumerGroup.

当被加入这个QueueConsumerGroup的时候,QueueConsumer如果有名称重复,会被自动改名。

Parameters:
workQueueSize - Size of the ArrayBlockingQueue to be created from which data for processing will be fetched.
将被创建的ArrayBlockingQueue队列的大小,本实例将从这个队列取得待处理数据。
queueConsumers - QueueConsumer(s) that will work together.
会一起工作的QueueConsumer。

QueueConsumerGroup

public QueueConsumerGroup(BlockingQueue<E> workQueue,
                          ExecutorService executorService,
                          Collection<? extends QueueConsumer<E>> queueConsumers)
Constructor, specifying one thread pool for all QueueConsumers to use.
创建实例,让所有的QueueConsumer统一使用指定的线程池。

Duplicated names of QueueConsumer(s) will be renamed automatically when adding to this QueueConsumerGroup.

当被加入这个QueueConsumerGroup的时候,QueueConsumer如果有名称重复,会被自动改名。

Parameters:
workQueue - The queue that data for processing will be fetched from.
本实例将从这个队列取得待处理数据。
executorService - Thread pool that working threads will be get from.
指定让本实例从这里获得所有工作线程。
queueConsumers - QueueConsumer(s) that will work together.
会一起工作的QueueConsumer。

QueueConsumerGroup

public QueueConsumerGroup(int workQueueSize,
                          ExecutorService executorService,
                          Collection<? extends QueueConsumer<E>> queueConsumers)
Constructor, specifying one thread pool for all QueueConsumers to use.
创建实例,让所有的QueueConsumer统一使用指定的线程池。

Duplicated names of QueueConsumer(s) will be renamed automatically when adding to this QueueConsumerGroup.

当被加入这个QueueConsumerGroup的时候,QueueConsumer如果有名称重复,会被自动改名。

Parameters:
workQueueSize - Size of the ArrayBlockingQueue to be created from which data for processing will be fetched.
将被创建的ArrayBlockingQueue队列的大小,本实例将从这个队列取得待处理数据。
executorService - Thread pool that working threads will be get from.
指定让本实例从这里获得所有工作线程。
queueConsumers - QueueConsumer(s) that will work together.
会一起工作的QueueConsumer。

QueueConsumerGroup

public QueueConsumerGroup(BlockingQueue<E> workQueue,
                          Collection<? extends QueueConsumer<E>> queueConsumers)
Constructor, without specifying thread pool.
创建实例,不指定统一的线程池。

Duplicated names of QueueConsumer(s) will be renamed automatically when adding to this QueueConsumerGroup.

当被加入这个QueueConsumerGroup的时候,QueueConsumer如果有名称重复,会被自动改名。

Parameters:
workQueue - The queue that data for processing will be fetched from.
本实例将从这个队列取得待处理数据。
queueConsumers - QueueConsumer(s) that will work together.
会一起工作的QueueConsumer。

QueueConsumerGroup

public QueueConsumerGroup(int workQueueSize,
                          Collection<? extends QueueConsumer<E>> queueConsumers)
Constructor, without specifying thread pool.
创建实例,不指定统一的线程池。

Duplicated names of QueueConsumer(s) will be renamed automatically when adding to this QueueConsumerGroup.

当被加入这个QueueConsumerGroup的时候,QueueConsumer如果有名称重复,会被自动改名。

Parameters:
workQueueSize - Size of the ArrayBlockingQueue to be created from which data for processing will be fetched.
将被创建的ArrayBlockingQueue队列的大小,本实例将从这个队列取得待处理数据。
queueConsumers - QueueConsumer(s) that will work together.
会一起工作的QueueConsumer。
Method Detail

getConsumer

public QueueConsumer<E> getConsumer(String name)
Get QueueConsumer instance by its name.
按名称寻找得到QueueConsumer。

Parameters:
name - Name of the QueueConsumer
Returns:
The instance with the name specified

getConsumers

public Map<String,QueueConsumer<E>> getConsumers()
Get the Map of all QueueConsumer.
获得含有全部QueueConsumer的Map。

Returns:
A Map, its key is the name of QueueConsumer, its value is QueueConsumer itself.
一个Map,其key是QueueConsumer的名称,值是QueueConsumer本身。

getQueue

public BlockingQueue<E> getQueue()
Get the work queue.
取得工作队列。

Returns:
The work queue.
工作队列。

start

public void start()
Start all QueueConsumer(s) one by one.
逐个启动所有Consumer。


queue

public void queue(E obj)
           throws InterruptedException
Put data into the queue for processing, if the queue still has space this method will return immediately without waiting for the data to be actually processed.
把待处理数据放入队列,如果队列中还有空位置则这个方法会立即返回而不是等待实际处理完成。

If the queue has no space left, this method will wait for the space then put data into the queue for processing, after that, this method will return immediately without waiting for the data to be actually processed.

如果队列中没有空位置了,则会等待队列空出位置来之后再把数据放进去,放完之后这个方法会立即返回而不是等待实际处理完成。

Parameters:
obj - Data need to be processed
待处理的数据。
Throws:
InterruptedException - if interrupted while waiting for space to become available.
如果队列已满而在等待空出位置的时候发生了中断。

stop

public void stop(boolean afterQueueEmpty)
Stop all the working threads one by one; This method will not return until all threads are stopped.
逐个停止所工作线程,这个方法会等到所有工作线程结束才返回。

Parameters:
afterQueueEmpty - true if working thread should keep processing until the queue is empty;
false if working thread should stop after finished current work;
如果为true,则工作线程要等到队列处理空了才结束;
如果为false,则工作线程处理完当前数据就结束。

stop

public void stop()
Stop working threads after the queue is empty; This method will not return until working thread finishes.
让所有处理线程在队列处理空了之后停止,这个方法会等到所有工作处理线程结束才返回。



Copyright © 2012. All Rights Reserved.