net.sf.jabb.util.thread
Class QueueBatchUniqueProcessor<E>

java.lang.Object
  extended by net.sf.jabb.util.thread.QueueConsumer<E>
      extended by net.sf.jabb.util.thread.QueueBatchUniqueProcessor<E>
Type Parameters:
E - Type of the data in the queue.
队列中数据的类型
All Implemented Interfaces:
Runnable

public abstract class QueueBatchUniqueProcessor<E>
extends QueueConsumer<E>

A template for processing data in batch from a queue.
一个从队列中取得数据并批量处理的模板,数据一批批被取走并处理。

One working thread will be created for each instance of this class when necessary.

本类的每个实例相应的会有一个工作线程在需要的时候被创建。

For each batch, data will be taken from the queue as much as possible. Maximum size of a batch and maximum time for waiting for new data can be configured. If the maximum size limit reached, or maximum wait time limit reached, all data in current batch will be processed, and further data taken will be put into later batches. Duplicated data will be discarded in a batch, which means, if there are duplicated data taken from the queue in a batch, only one instance of those duplicated will be processed.

每批会尽可能地多取一些数据,可以设定每批最大的数据量,以及最长的等待时间。 如果达到了这个量,或者是达到了这个时间,则当前批的数据就处理掉,然后开始下一批。 每批数据中如果有重复的,会被剔除掉,也就是说,在一批当中不会重复处理。

Author:
Zhengmao HU (James)

Field Summary
protected  int maxBatchSize
           
protected  long pollTimeout
           
protected  TimeUnit pollTimeoutUnit
           
 
Fields inherited from class net.sf.jabb.util.thread.QueueConsumer
defaultThreadPool, mode, MODE_INIT, MODE_RUNNING, MODE_START, MODE_STOP_ASAP, MODE_STOP_WHEN_EMPTY, MODE_STOPPED, name, queue, thread, threadPool
 
Constructor Summary
QueueBatchUniqueProcessor(BlockingQueue<E> workQueue, int batchSize)
          Constructor to create an instance that uses default name - QueueConsumer.class.getSimpleName(), default thread pool and do not wait for new data.
创建一个使用缺省名称(QueueConsumer.class.getSimpleName())、缺省线程池且不等待新数据到来的实例。
QueueBatchUniqueProcessor(BlockingQueue<E> workQueue, int batchSize, long batchWaitTimeout, TimeUnit timeoutUnit)
          Constructor to create an instance with default name: QueueConsumer.class.getSimpleName()
创建一个实例,其名称使用缺省名称:QueueConsumer.class.getSimpleName()。
QueueBatchUniqueProcessor(BlockingQueue<E> workQueue, String name, ExecutorService executorService, int batchSize)
          Constructor to create an instance that do not wait for new data.
创建一个不等待新数据到来的实例。
QueueBatchUniqueProcessor(BlockingQueue<E> workQueue, String name, ExecutorService executorService, int batchSize, long batchWaitTimeout, TimeUnit timeoutUnit)
          Constructor to create an instance.
创建一个实例。
QueueBatchUniqueProcessor(BlockingQueue<E> workQueue, String name, int batchSize)
          Constructor to create an instance that uses default thread pool and do not wait for new data.
创建一个使用缺省线程池且不等待新数据到来的实例。
QueueBatchUniqueProcessor(BlockingQueue<E> workQueue, String name, int batchSize, long batchWaitTimeout, TimeUnit timeoutUnit)
          Constructor to create an instance using default thread pool.
创建一个使用缺省线程池的实例。
QueueBatchUniqueProcessor(int batchSize)
          Constructor to create an instance that uses default name - QueueConsumer.class.getSimpleName(), default thread pool and do not wait for new data.
创建一个使用缺省名称(QueueConsumer.class.getSimpleName())、缺省线程池且不等待新数据到来的实例。
QueueBatchUniqueProcessor(int batchSize, long batchWaitTimeout, TimeUnit timeoutUnit)
          Constructor to create an instance with default name: QueueConsumer.class.getSimpleName()
创建一个实例,其名称使用缺省名称:QueueConsumer.class.getSimpleName()。
QueueBatchUniqueProcessor(String name, ExecutorService executorService, int batchSize)
          Constructor to create an instance that do not wait for new data.
创建一个不等待新数据到来的实例。
QueueBatchUniqueProcessor(String name, ExecutorService executorService, int batchSize, long batchWaitTimeout, TimeUnit timeoutUnit)
          Constructor to create an instance.
创建一个实例。
QueueBatchUniqueProcessor(String name, int batchSize)
          Constructor to create an instance that uses default thread pool and do not wait for new data.
创建一个使用缺省线程池且不等待新数据到来的实例。
QueueBatchUniqueProcessor(String name, int batchSize, long batchWaitTimeout, TimeUnit timeoutUnit)
          Constructor to create an instance using default thread pool.
创建一个使用缺省线程池的实例。
 
Method Summary
protected  void consume()
          This method is overridden over parent class so that a batch of data is taken from the queue and process(Set) is invoked.
这个方法被重载了,从而队列中的一批数据会被取出并调用process(Set)方法。
abstract  void process(Set<E> batch)
          Process one piece of data - this method should be overridden in subclass.
处理一份数据——这个方法应该在子类中被重载。
 
Methods inherited from class net.sf.jabb.util.thread.QueueConsumer
getName, getQueue, preStop, queue, run, setExecutorService, setName, setQueue, start, stop, stop
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

maxBatchSize

protected int maxBatchSize

pollTimeout

protected long pollTimeout

pollTimeoutUnit

protected TimeUnit pollTimeoutUnit
Constructor Detail

QueueBatchUniqueProcessor

public QueueBatchUniqueProcessor(BlockingQueue<E> workQueue,
                                 String name,
                                 ExecutorService executorService,
                                 int batchSize,
                                 long batchWaitTimeout,
                                 TimeUnit timeoutUnit)
Constructor to create an instance.
创建一个实例。

Parameters:
name - Name of this instance, which determines the naming of working thread.
本个实例的名称,会被用在工作线程名里。
workQueue - The queue that data for processing will be fetched from.
本实例将从这个队列取得待处理数据。
executorService - Thread pool that working thread will be get from.
指定让本实例从这里获得工作线程。
batchSize - Maximum size allowed for a batch, remaining data will be put into later batches.
一批最大包含多少个数据,超过这个数量的就要留到下一批处理
batchWaitTimeout - Maximum time period allowed for waiting for new data from the queue before current batch is processed, 0 means no waiting.
超过这个时段如果没有更多数据则留到下一批处理,0表示不等待。
timeoutUnit - Unit of the batchWaitTimeout parameter.
batchWaitTimeout的单位

QueueBatchUniqueProcessor

public QueueBatchUniqueProcessor(String name,
                                 ExecutorService executorService,
                                 int batchSize,
                                 long batchWaitTimeout,
                                 TimeUnit timeoutUnit)
Constructor to create an instance.
创建一个实例。

Parameters:
name - Name of this instance, which determines the naming of working thread.
本个实例的名称,会被用在工作线程名里。
executorService - Thread pool that working thread will be get from.
指定让本实例从这里获得工作线程。
batchSize - Maximum size allowed for a batch, remaining data will be put into later batches.
一批最大包含多少个数据,超过这个数量的就要留到下一批处理
batchWaitTimeout - Maximum time period allowed for waiting for new data from the queue before current batch is processed, 0 means no waiting.
超过这个时段如果没有更多数据则留到下一批处理,0表示不等待。
timeoutUnit - Unit of the batchWaitTimeout parameter.
batchWaitTimeout的单位

QueueBatchUniqueProcessor

public QueueBatchUniqueProcessor(BlockingQueue<E> workQueue,
                                 String name,
                                 int batchSize,
                                 long batchWaitTimeout,
                                 TimeUnit timeoutUnit)
Constructor to create an instance using default thread pool.
创建一个使用缺省线程池的实例。

Parameters:
name - Name of this instance, which determines the naming of working thread.
本个实例的名称,会被用在工作线程名里。
workQueue - The queue that data for processing will be fetched from.
本实例将从这个队列取得待处理数据。
batchSize - Maximum size allowed for a batch, remaining data will be put into later batches.
一批最大包含多少个数据,超过这个数量的就要留到下一批处理
batchWaitTimeout - Maximum time period allowed for waiting for new data from the queue before current batch is processed, 0 means no waiting.
超过这个时段如果没有更多数据则留到下一批处理,0表示不等待。
timeoutUnit - Unit of the batchWaitTimeout parameter.
batchWaitTimeout的单位

QueueBatchUniqueProcessor

public QueueBatchUniqueProcessor(String name,
                                 int batchSize,
                                 long batchWaitTimeout,
                                 TimeUnit timeoutUnit)
Constructor to create an instance using default thread pool.
创建一个使用缺省线程池的实例。

Parameters:
name - Name of this instance, which determines the naming of working thread.
本个实例的名称,会被用在工作线程名里。
batchSize - Maximum size allowed for a batch, remaining data will be put into later batches.
一批最大包含多少个数据,超过这个数量的就要留到下一批处理
batchWaitTimeout - Maximum time period allowed for waiting for new data from the queue before current batch is processed, 0 means no waiting.
超过这个时段如果没有更多数据则留到下一批处理,0表示不等待。
timeoutUnit - Unit of the batchWaitTimeout parameter.
batchWaitTimeout的单位

QueueBatchUniqueProcessor

public QueueBatchUniqueProcessor(BlockingQueue<E> workQueue,
                                 int batchSize,
                                 long batchWaitTimeout,
                                 TimeUnit timeoutUnit)
Constructor to create an instance with default name: QueueConsumer.class.getSimpleName()
创建一个实例,其名称使用缺省名称:QueueConsumer.class.getSimpleName()。

Parameters:
workQueue - The queue that data for processing will be fetched from.
本实例将从这个队列取得待处理数据。
batchSize - Maximum size allowed for a batch, remaining data will be put into later batches.
一批最大包含多少个数据,超过这个数量的就要留到下一批处理
batchWaitTimeout - Maximum time period allowed for waiting for new data from the queue before current batch is processed, 0 means no waiting.
超过这个时段如果没有更多数据则留到下一批处理,0表示不等待。
timeoutUnit - Unit of the batchWaitTimeout parameter.
batchWaitTimeout的单位

QueueBatchUniqueProcessor

public QueueBatchUniqueProcessor(int batchSize,
                                 long batchWaitTimeout,
                                 TimeUnit timeoutUnit)
Constructor to create an instance with default name: QueueConsumer.class.getSimpleName()
创建一个实例,其名称使用缺省名称:QueueConsumer.class.getSimpleName()。

Parameters:
batchSize - Maximum size allowed for a batch, remaining data will be put into later batches.
一批最大包含多少个数据,超过这个数量的就要留到下一批处理
batchWaitTimeout - Maximum time period allowed for waiting for new data from the queue before current batch is processed, 0 means no waiting.
超过这个时段如果没有更多数据则留到下一批处理,0表示不等待。
timeoutUnit - Unit of the batchWaitTimeout parameter.
batchWaitTimeout的单位

QueueBatchUniqueProcessor

public QueueBatchUniqueProcessor(BlockingQueue<E> workQueue,
                                 String name,
                                 ExecutorService executorService,
                                 int batchSize)
Constructor to create an instance that do not wait for new data.
创建一个不等待新数据到来的实例。

Parameters:
name - Name of this instance, which determines the naming of working thread.
本个实例的名称,会被用在工作线程名里。
workQueue - The queue that data for processing will be fetched from.
本实例将从这个队列取得待处理数据。
executorService - Thread pool that working thread will be get from.
指定让本实例从这里获得工作线程。
batchSize - Maximum size allowed for a batch, remaining data will be put into later batches.
一批最大包含多少个数据,超过这个数量的就要留到下一批处理

QueueBatchUniqueProcessor

public QueueBatchUniqueProcessor(String name,
                                 ExecutorService executorService,
                                 int batchSize)
Constructor to create an instance that do not wait for new data.
创建一个不等待新数据到来的实例。

Parameters:
name - Name of this instance, which determines the naming of working thread.
本个实例的名称,会被用在工作线程名里。
executorService - Thread pool that working thread will be get from.
指定让本实例从这里获得工作线程。
batchSize - Maximum size allowed for a batch, remaining data will be put into later batches.
一批最大包含多少个数据,超过这个数量的就要留到下一批处理

QueueBatchUniqueProcessor

public QueueBatchUniqueProcessor(BlockingQueue<E> workQueue,
                                 String name,
                                 int batchSize)
Constructor to create an instance that uses default thread pool and do not wait for new data.
创建一个使用缺省线程池且不等待新数据到来的实例。

Parameters:
name - Name of this instance, which determines the naming of working thread.
本个实例的名称,会被用在工作线程名里。
workQueue - The queue that data for processing will be fetched from.
本实例将从这个队列取得待处理数据。
batchSize - Maximum size allowed for a batch, remaining data will be put into later batches.
一批最大包含多少个数据,超过这个数量的就要留到下一批处理

QueueBatchUniqueProcessor

public QueueBatchUniqueProcessor(String name,
                                 int batchSize)
Constructor to create an instance that uses default thread pool and do not wait for new data.
创建一个使用缺省线程池且不等待新数据到来的实例。

Parameters:
name - Name of this instance, which determines the naming of working thread.
本个实例的名称,会被用在工作线程名里。
batchSize - Maximum size allowed for a batch, remaining data will be put into later batches.
一批最大包含多少个数据,超过这个数量的就要留到下一批处理

QueueBatchUniqueProcessor

public QueueBatchUniqueProcessor(BlockingQueue<E> workQueue,
                                 int batchSize)
Constructor to create an instance that uses default name - QueueConsumer.class.getSimpleName(), default thread pool and do not wait for new data.
创建一个使用缺省名称(QueueConsumer.class.getSimpleName())、缺省线程池且不等待新数据到来的实例。

Parameters:
workQueue - The queue that data for processing will be fetched from.
本实例将从这个队列取得待处理数据。
batchSize - Maximum size allowed for a batch, remaining data will be put into later batches.
一批最大包含多少个数据,超过这个数量的就要留到下一批处理

QueueBatchUniqueProcessor

public QueueBatchUniqueProcessor(int batchSize)
Constructor to create an instance that uses default name - QueueConsumer.class.getSimpleName(), default thread pool and do not wait for new data.
创建一个使用缺省名称(QueueConsumer.class.getSimpleName())、缺省线程池且不等待新数据到来的实例。

Parameters:
batchSize - Maximum size allowed for a batch, remaining data will be put into later batches.
一批最大包含多少个数据,超过这个数量的就要留到下一批处理
Method Detail

consume

protected void consume()
This method is overridden over parent class so that a batch of data is taken from the queue and process(Set) is invoked.
这个方法被重载了,从而队列中的一批数据会被取出并调用process(Set)方法。

Specified by:
consume in class QueueConsumer<E>

process

public abstract void process(Set<E> batch)
Process one piece of data - this method should be overridden in subclass.
处理一份数据——这个方法应该在子类中被重载。

This method may be interrupted while running, so please note the following:
这个方法在运行过程中可能会遇到线程的interrupt,所以如果有以下情况要注意正确处理:

If this thread is blocked in an invocation of the wait(), wait(long), or wait(long, int) methods of the Object class, or of the join(), join(long), join(long, int), sleep(long), or sleep(long, int), methods of this class, then its interrupt status will be cleared and it will receive an InterruptedException.

If this thread is blocked in an I/O operation upon an interruptible channel then the channel will be closed, the thread's interrupt status will be set, and the thread will receive a ClosedByInterruptException.

If this thread is blocked in a Selector then the thread's interrupt status will be set and it will return immediately from the selection operation, possibly with a non-zero value, just as if the selector's wakeup method were invoked.

Parameters:
batch - The data taken from queue, which needs to be processed
从队列中取出的待处理数据。


Copyright © 2012. All Rights Reserved.