001 /**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one or more
004 * contributor license agreements. See the NOTICE file distributed with
005 * this work for additional information regarding copyright ownership.
006 * The ASF licenses this file to You under the Apache License, Version 2.0
007 * (the "License"); you may not use this file except in compliance with
008 * the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.camel.component.queue;
019
020 import org.apache.camel.AlreadyStoppedException;
021 import org.apache.camel.Consumer;
022 import org.apache.camel.Exchange;
023 import org.apache.camel.Processor;
024 import org.apache.camel.impl.ServiceSupport;
025 import org.apache.commons.logging.Log;
026 import org.apache.commons.logging.LogFactory;
027
028 import java.util.concurrent.TimeUnit;
029
030 /**
031 * @version $Revision: 553477 $
032 */
033 public class QueueConsumer<E extends Exchange> extends ServiceSupport implements Consumer<E>, Runnable {
034 private static final Log log = LogFactory.getLog(QueueConsumer.class);
035
036 private QueueEndpoint<E> endpoint;
037 private Processor processor;
038 private Thread thread;
039
040 public QueueConsumer(QueueEndpoint<E> endpoint, Processor processor) {
041 this.endpoint = endpoint;
042 this.processor = processor;
043 }
044
045 @Override
046 public String toString() {
047 return "QueueConsumer: " + endpoint.getEndpointUri();
048 }
049
050 public void run() {
051 while (!isStopping()) {
052 E exchange;
053 try {
054 exchange = endpoint.getQueue().poll(1000, TimeUnit.MILLISECONDS);
055 }
056 catch (InterruptedException e) {
057 break;
058 }
059 if (exchange != null && !isStopping()) {
060 try {
061 processor.process(exchange);
062 }
063 catch (AlreadyStoppedException e) {
064 log.debug("Ignoring failed message due to shutdown: " + e, e);
065 break;
066 }
067 catch (Throwable e) {
068 log.error(e);
069 }
070 }
071 }
072 }
073
074 protected void doStart() throws Exception {
075 thread = new Thread(this, getThreadName(endpoint.getEndpointUri()));
076 thread.setDaemon(true);
077 thread.start();
078 }
079
080 protected void doStop() throws Exception {
081 thread.join();
082 }
083
084 }