001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.component.file;
018
019 import org.apache.camel.Processor;
020 import org.apache.camel.component.file.strategy.FileProcessStrategy;
021 import org.apache.camel.impl.ScheduledPollConsumer;
022 import org.apache.commons.logging.Log;
023 import org.apache.commons.logging.LogFactory;
024
025 import java.io.File;
026
027 /**
028 * @version $Revision: 523016 $
029 */
030 public class FileConsumer extends ScheduledPollConsumer<FileExchange> {
031 private static final transient Log LOG = LogFactory.getLog(FileConsumer.class);
032 private final FileEndpoint endpoint;
033 private boolean recursive = true;
034 private String regexPattern = "";
035 private long lastPollTime;
036
037 public FileConsumer(final FileEndpoint endpoint, Processor processor) {
038 super(endpoint, processor);
039 this.endpoint = endpoint;
040 }
041
042 protected void poll() throws Exception {
043 pollFileOrDirectory(endpoint.getFile(), isRecursive());
044 lastPollTime = System.currentTimeMillis();
045 }
046
047 protected void pollFileOrDirectory(File fileOrDirectory, boolean processDir) {
048 if (!fileOrDirectory.isDirectory()) {
049 pollFile(fileOrDirectory); // process the file
050 }
051 else if (processDir) {
052 if (isValidFile(fileOrDirectory)) {
053 LOG.debug("Polling directory " + fileOrDirectory);
054 File[] files = fileOrDirectory.listFiles();
055 for (int i = 0; i < files.length; i++) {
056 pollFileOrDirectory(files[i], isRecursive()); // self-recursion
057 }
058 }
059 }
060 else {
061 LOG.debug("Skipping directory " + fileOrDirectory);
062 }
063 }
064
065 protected void pollFile(final File file) {
066 if (!file.exists()) {
067 return;
068 }
069 if (isValidFile(file)) {
070 // we only care about file modified times if we are not deleting/moving files
071 if (endpoint.isNoop()) {
072 long fileModified = file.lastModified();
073 if (fileModified <= lastPollTime) {
074 if (LOG.isDebugEnabled()) {
075 LOG.debug("Ignoring file: " + file + " as modified time: " + fileModified + " less than last poll time: " + lastPollTime);
076 }
077 return;
078 }
079 }
080
081 FileProcessStrategy processStrategy = endpoint.getFileStrategy();
082 FileExchange exchange = endpoint.createExchange(file);
083
084 try {
085 if (LOG.isDebugEnabled()) {
086 LOG.debug("About to process file: " + file + " using exchange: " + exchange);
087 }
088 if (processStrategy.begin(endpoint, exchange, file)) {
089 getProcessor().process(exchange);
090 processStrategy.commit(endpoint, exchange, file);
091 }
092 else {
093 if (LOG.isDebugEnabled()) {
094 LOG.debug(endpoint + " cannot process file: " + file);
095 }
096 }
097 }
098 catch (Throwable e) {
099 handleException(e);
100 }
101 }
102 }
103
104 protected boolean isValidFile(File file) {
105 boolean result = false;
106 if (file != null && file.exists()) {
107 if (isMatched(file)) {
108 result = true;
109 }
110 }
111 return result;
112 }
113
114 protected boolean isMatched(File file) {
115 String name = file.getName();
116 if (regexPattern != null && regexPattern.length() > 0) {
117 if (!name.matches(getRegexPattern())) {
118 return false;
119 }
120 }
121 String[] prefixes = endpoint.getExcludedNamePrefixes();
122 if (prefixes != null) {
123 for (String prefix : prefixes) {
124 if (name.startsWith(prefix)) {
125 return false;
126 }
127 }
128 }
129 String[] postfixes = endpoint.getExcludedNamePostfixes();
130 if (postfixes != null) {
131 for (String postfix : postfixes) {
132 if (name.endsWith(postfix)) {
133 return false;
134 }
135 }
136 }
137 return true;
138 }
139
140 /**
141 * @return the recursive
142 */
143 public boolean isRecursive() {
144 return this.recursive;
145 }
146
147 /**
148 * @param recursive the recursive to set
149 */
150 public void setRecursive(boolean recursive) {
151 this.recursive = recursive;
152 }
153
154 /**
155 * @return the regexPattern
156 */
157 public String getRegexPattern() {
158 return this.regexPattern;
159 }
160
161 /**
162 * @param regexPattern the regexPattern to set
163 */
164 public void setRegexPattern(String regexPattern) {
165 this.regexPattern = regexPattern;
166 }
167 }