net.sf.jabb.util.thread
Class QueueConsumer<E>

java.lang.Object
  extended by net.sf.jabb.util.thread.QueueConsumer<E>
Type Parameters:
E - Type of the data in the queue.
队列中数据的类型
All Implemented Interfaces:
Runnable
Direct Known Subclasses:
QueueBatchUniqueProcessor, QueueProcessor

public abstract class QueueConsumer<E>
extends Object
implements Runnable

A template for consuming data from a queue.
一个从队列中取数据进行处理的模板。

One working thread will be created for each instance of this class when necessary.

本类的每个实例相应的会有一个工作线程在需要的时候被创建。

Author:
Zhengmao HU (James)

Field Summary
protected static ExecutorService defaultThreadPool
          If no ExecutorService is specified in constructor, then the working thread will come from this thread pool.
如果在构造方法中没有指定ExecutorService,则它的工作线程 将取自这个线程池。
protected  AtomicInteger mode
           
protected static int MODE_INIT
           
protected static int MODE_RUNNING
           
protected static int MODE_START
           
protected static int MODE_STOP_ASAP
           
protected static int MODE_STOP_WHEN_EMPTY
           
protected static int MODE_STOPPED
           
protected  String name
          Name of this instance, which determines the naming of working thread.
本个实例的名字,它决定了工作线程的名字。
protected  BlockingQueue<E> queue
           
protected  Thread thread
          The working thread.
工作线程。
protected  ExecutorService threadPool
          Effective thread pool of this instance.
本个实例实际使用的线程池。
 
Constructor Summary
QueueConsumer()
          Constructor to create an instance with default name and using default thread pool.
创建一个实例,使用缺省的名称和缺省的线程池。
QueueConsumer(BlockingQueue<E> workQueue)
          Constructor to create an instance with default name and using default thread pool.
创建一个实例,使用缺省的名称和缺省的线程池。
QueueConsumer(BlockingQueue<E> workQueue, String name)
          Constructor to create an instance using default thread pool.
创建一个使用缺省线程池的实例。
QueueConsumer(BlockingQueue<E> workQueue, String name, ExecutorService executorService)
          Constructor to create an instance.
创建一个实例。
QueueConsumer(String name)
          Constructor to create an instance using default thread pool.
创建一个使用缺省线程池的实例。
QueueConsumer(String name, ExecutorService executorService)
          Constructor to create an instance.
创建一个实例。
 
Method Summary
protected abstract  void consume()
          Consume the data in queue - this method should be overridden in subclass.
处理队列中的数据——这个方法应该在子类中被重载。
 String getName()
           
 BlockingQueue<E> getQueue()
          Get the work queue.
取得工作队列。
 void preStop(boolean afterQueueEmpty)
          Ask the working thread to stop; This method will return immediately.
要求工作线程停止;这个方法立即返回。
 void queue(E obj)
          Put data into the queue for processing, if the queue still has space this method will return immediately without waiting for the data to be actually processed.
把待处理数据放入队列,如果队列中还有空位置则这个方法会立即返回而不是等待实际处理完成。
 void run()
           
 void setExecutorService(ExecutorService threadPool)
           
 void setName(String name)
           
 void setQueue(BlockingQueue<E> workQueue)
          Use this method if you don't want the queue to be passed in from constructor.
如果你不想通过构造方法来传递队列对象,就用这个方法。
 void start()
          Start working thread; This method will return immediately without waiting for anything to be actually processed.
启动工作处理线程,这个方法立即返回,而不等待任何实际处理发生。
 void stop()
          Stop working thread after the queue is empty; This method will not return until working thread finishes.
等待队列处理空了之后停止处理线程,这个方法会等到工作处理线程结束才返回。
 void stop(boolean afterQueueEmpty)
          Stop working thread; This method will not return until working thread finishes.
停止处理线程,这个方法会等到工作处理线程结束才返回。
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

queue

protected BlockingQueue<E> queue

thread

protected Thread thread
The working thread.
工作线程。


mode

protected AtomicInteger mode

name

protected String name
Name of this instance, which determines the naming of working thread.
本个实例的名字,它决定了工作线程的名字。


MODE_INIT

protected static final int MODE_INIT
See Also:
Constant Field Values

MODE_START

protected static final int MODE_START
See Also:
Constant Field Values

MODE_RUNNING

protected static final int MODE_RUNNING
See Also:
Constant Field Values

MODE_STOP_ASAP

protected static final int MODE_STOP_ASAP
See Also:
Constant Field Values

MODE_STOP_WHEN_EMPTY

protected static final int MODE_STOP_WHEN_EMPTY
See Also:
Constant Field Values

MODE_STOPPED

protected static final int MODE_STOPPED
See Also:
Constant Field Values

defaultThreadPool

protected static ExecutorService defaultThreadPool
If no ExecutorService is specified in constructor, then the working thread will come from this thread pool.
如果在构造方法中没有指定ExecutorService,则它的工作线程 将取自这个线程池。

defaultThreadPool = Executors.newCachedThreadPool()


threadPool

protected ExecutorService threadPool
Effective thread pool of this instance.
本个实例实际使用的线程池。

Constructor Detail

QueueConsumer

public QueueConsumer(BlockingQueue<E> workQueue,
                     String name,
                     ExecutorService executorService)
Constructor to create an instance.
创建一个实例。

Parameters:
name - Name of this instance, which determines the naming of working thread.
本个实例的名称,会被用在工作线程名里。
workQueue - The queue that data for processing will be fetched from.
本实例将从这个队列取得待处理数据。
executorService - Thread pool that working thread will be get from.
指定让本实例从这里获得工作线程。

QueueConsumer

public QueueConsumer(String name,
                     ExecutorService executorService)
Constructor to create an instance.
创建一个实例。

Parameters:
name - Name of this instance, which determines the naming of working thread.
本个实例的名称,会被用在工作线程名里。
executorService - Thread pool that working thread will be get from.
指定让本实例从这里获得工作线程。

QueueConsumer

public QueueConsumer(BlockingQueue<E> workQueue,
                     String name)
Constructor to create an instance using default thread pool.
创建一个使用缺省线程池的实例。

Parameters:
name - Name of this instance, which determines the naming of working thread.
本个实例的名称,会被用在工作线程名里。
workQueue - The queue that data for processing will be fetched from.
本实例将从这个队列取得待处理数据。

QueueConsumer

public QueueConsumer(String name)
Constructor to create an instance using default thread pool.
创建一个使用缺省线程池的实例。

Parameters:
name - Name of this instance, which determines the naming of working thread.
本个实例的名称,会被用在工作线程名里。

QueueConsumer

public QueueConsumer(BlockingQueue<E> workQueue)
Constructor to create an instance with default name and using default thread pool.
创建一个实例,使用缺省的名称和缺省的线程池。

Parameters:
workQueue - The queue that data for processing will be fetched from.
本实例将从这个队列取得待处理数据。

QueueConsumer

public QueueConsumer()
Constructor to create an instance with default name and using default thread pool.
创建一个实例,使用缺省的名称和缺省的线程池。

Method Detail

setName

public void setName(String name)

getName

public String getName()

setExecutorService

public void setExecutorService(ExecutorService threadPool)

setQueue

public void setQueue(BlockingQueue<E> workQueue)
Use this method if you don't want the queue to be passed in from constructor.
如果你不想通过构造方法来传递队列对象,就用这个方法。

Parameters:
workQueue - The queue that data for processing will be fetched from.
本实例将从这个队列取得待处理数据。

getQueue

public BlockingQueue<E> getQueue()
Get the work queue.
取得工作队列。

Returns:
The work queue.
工作队列。

queue

public void queue(E obj)
           throws InterruptedException
Put data into the queue for processing, if the queue still has space this method will return immediately without waiting for the data to be actually processed.
把待处理数据放入队列,如果队列中还有空位置则这个方法会立即返回而不是等待实际处理完成。

If the queue has no space left, this method will wait for the space then put data into the queue for processing, after that, this method will return immediately without waiting for the data to be actually processed.

如果队列中没有空位置了,则会等待队列空出位置来之后再把数据放进去,放完之后这个方法会立即返回而不是等待实际处理完成。

Parameters:
obj - Data need to be processed
待处理的数据。
Throws:
InterruptedException - if interrupted while waiting for space to become available.
如果队列已满而在等待空出位置的时候发生了中断。

start

public void start()
Start working thread; This method will return immediately without waiting for anything to be actually processed.
启动工作处理线程,这个方法立即返回,而不等待任何实际处理发生。


stop

public void stop()
Stop working thread after the queue is empty; This method will not return until working thread finishes.
等待队列处理空了之后停止处理线程,这个方法会等到工作处理线程结束才返回。


stop

public void stop(boolean afterQueueEmpty)
Stop working thread; This method will not return until working thread finishes.
停止处理线程,这个方法会等到工作处理线程结束才返回。

Parameters:
afterQueueEmpty - true if working thread should keep processing until the queue is empty;
false if working thread should stop after finished current work;
如果为true,则工作线程要等到队列处理空了才结束;
如果为false,则工作线程处理完当前数据就结束。

preStop

public void preStop(boolean afterQueueEmpty)
Ask the working thread to stop; This method will return immediately.
要求工作线程停止;这个方法立即返回。

Parameters:
afterQueueEmpty - true if working thread should keep processing until the queue is empty;
false if working thread should stop after finished current work;
如果为true,则工作线程要等到队列处理空了才结束;
如果为false,则工作线程处理完当前数据就结束。

run

public void run()
Specified by:
run in interface Runnable

consume

protected abstract void consume()
Consume the data in queue - this method should be overridden in subclass.
处理队列中的数据——这个方法应该在子类中被重载。

This method may be interrupted while running, so please note the following:
这个方法在运行过程中可能会遇到线程的interrupt,所以如果有以下情况要注意正确处理:

If this thread is blocked in an invocation of the wait(), wait(long), or wait(long, int) methods of the Object class, or of the join(), join(long), join(long, int), sleep(long), or sleep(long, int), methods of this class, then its interrupt status will be cleared and it will receive an InterruptedException.

If this thread is blocked in an I/O operation upon an interruptible channel then the channel will be closed, the thread's interrupt status will be set, and the thread will receive a ClosedByInterruptException.

If this thread is blocked in a Selector then the thread's interrupt status will be set and it will return immediately from the selection operation, possibly with a non-zero value, just as if the selector's wakeup method were invoked.



Copyright © 2012. All Rights Reserved.