001 /**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one or more
004 * contributor license agreements. See the NOTICE file distributed with
005 * this work for additional information regarding copyright ownership.
006 * The ASF licenses this file to You under the Apache License, Version 2.0
007 * (the "License"); you may not use this file except in compliance with
008 * the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.camel.component.mail;
019
020 import org.apache.camel.Consumer;
021 import org.apache.camel.Processor;
022 import org.apache.camel.impl.ScheduledPollConsumer;
023 import org.apache.commons.logging.Log;
024 import org.apache.commons.logging.LogFactory;
025
026 import javax.mail.Flags;
027 import javax.mail.Folder;
028 import javax.mail.Message;
029 import javax.mail.MessagingException;
030 import javax.mail.Transport;
031 import javax.mail.event.MessageCountEvent;
032 import javax.mail.event.MessageCountListener;
033
034 /**
035 * A {@link Consumer} which consumes messages from JavaMail using a {@link Transport} and dispatches them
036 * to the {@link Processor}
037 *
038 * @version $Revision: 523430 $
039 */
040 public class MailConsumer extends ScheduledPollConsumer<MailExchange> implements MessageCountListener {
041 private static final transient Log log = LogFactory.getLog(MailConsumer.class);
042 private final MailEndpoint endpoint;
043 private final Folder folder;
044
045 public MailConsumer(MailEndpoint endpoint, Processor processor, Folder folder) {
046 super(endpoint, processor);
047 this.endpoint = endpoint;
048 this.folder = folder;
049 }
050
051 @Override
052 protected void doStart() throws Exception {
053 super.doStart();
054 ensureFolderIsOpen();
055 folder.addMessageCountListener(this);
056 }
057
058 @Override
059 protected void doStop() throws Exception {
060 folder.removeMessageCountListener(this);
061 folder.close(true);
062 super.doStop();
063 }
064
065 public void messagesAdded(MessageCountEvent event) {
066 Message[] messages = event.getMessages();
067 for (Message message : messages) {
068 try {
069 if (!message.getFlags().contains(Flags.Flag.DELETED)) {
070 processMessage(message);
071
072 flagMessageDeleted(message);
073 }
074 }
075 catch (MessagingException e) {
076 handleException(e);
077 }
078 }
079 }
080
081 public void messagesRemoved(MessageCountEvent event) {
082 Message[] messages = event.getMessages();
083 for (Message message : messages) {
084 if (log.isDebugEnabled()) {
085 try {
086 log.debug("Removing message: " + message.getSubject());
087 }
088 catch (MessagingException e) {
089 log.debug("Ignored: " + e);
090 }
091 }
092 }
093 }
094
095 protected void poll() throws Exception {
096 ensureFolderIsOpen();
097
098 int count = folder.getMessageCount();
099 if (count > 0) {
100 Message[] messages = folder.getMessages();
101 MessageCountEvent event = new MessageCountEvent(folder, MessageCountEvent.ADDED, true, messages);
102 messagesAdded(event);
103 }
104 else if (count == -1) {
105 throw new MessagingException("Folder: " + folder.getFullName() + " is closed");
106 }
107
108 folder.close(true);
109 }
110
111 protected void processMessage(Message message) {
112 try {
113 MailExchange exchange = endpoint.createExchange(message);
114 getProcessor().process(exchange);
115 }
116 catch (Throwable e) {
117 handleException(e);
118 }
119 }
120
121 protected void ensureFolderIsOpen() throws MessagingException {
122 if (!folder.isOpen()) {
123 folder.open(Folder.READ_WRITE);
124 }
125 }
126
127 protected void flagMessageDeleted(Message message) throws MessagingException {
128 if (endpoint.getConfiguration().isDeleteProcessedMessages()) {
129 message.setFlag(Flags.Flag.DELETED, true);
130 }
131 else {
132 message.setFlag(Flags.Flag.SEEN, true);
133 }
134 }
135 }