001 /**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one or more
004 * contributor license agreements. See the NOTICE file distributed with
005 * this work for additional information regarding copyright ownership.
006 * The ASF licenses this file to You under the Apache License, Version 2.0
007 * (the "License"); you may not use this file except in compliance with
008 * the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.camel.component.file;
019
020 import org.apache.camel.Processor;
021 import org.apache.camel.impl.ScheduledPollConsumer;
022 import org.apache.commons.logging.Log;
023 import org.apache.commons.logging.LogFactory;
024
025 import java.io.File;
026 import java.io.IOException;
027 import java.io.RandomAccessFile;
028 import java.nio.channels.FileChannel;
029
030 /**
031 * @version $Revision: 523016 $
032 */
033 public class FileConsumer extends ScheduledPollConsumer<FileExchange> {
034 private static final transient Log log = LogFactory.getLog(FileConsumer.class);
035 private final FileEndpoint endpoint;
036 private boolean recursive = true;
037 private boolean attemptFileLock = false;
038 private String regexPattern = "";
039 private long lastPollTime = 0l;
040
041 public FileConsumer(final FileEndpoint endpoint, Processor processor) {
042 super(endpoint, processor);
043 this.endpoint = endpoint;
044 }
045
046 protected void poll() throws Exception {
047 pollFileOrDirectory(endpoint.getFile(), isRecursive());
048 lastPollTime = System.currentTimeMillis();
049 }
050
051 protected void pollFileOrDirectory(File fileOrDirectory, boolean processDir) {
052 if (!fileOrDirectory.isDirectory()) {
053 pollFile(fileOrDirectory); // process the file
054 }
055 else if (processDir) {
056 log.debug("Polling directory " + fileOrDirectory);
057 File[] files = fileOrDirectory.listFiles();
058 for (int i = 0; i < files.length; i++) {
059 pollFileOrDirectory(files[i], isRecursive()); // self-recursion
060 }
061 }
062 else {
063 log.debug("Skipping directory " + fileOrDirectory);
064 }
065 }
066
067 protected void pollFile(final File file) {
068 if (file.exists() && file.lastModified() > lastPollTime) {
069 if (isValidFile(file)) {
070 processFile(file);
071 }
072 }
073 }
074
075 protected void processFile(File file) {
076 try {
077 getProcessor().process(endpoint.createExchange(file));
078 } catch (Throwable e) {
079 handleException(e);
080 }
081 }
082
083 protected boolean isValidFile(File file) {
084 boolean result = false;
085 if (file != null && file.exists()) {
086 if (isMatched(file)) {
087 if (isAttemptFileLock()) {
088 FileChannel fc = null;
089 try {
090 fc = new RandomAccessFile(file, "rw").getChannel();
091 fc.lock();
092 result = true;
093 }
094 catch (Throwable e) {
095 log.debug("Failed to get the lock on file: " + file, e);
096 }
097 finally {
098 if (fc != null) {
099 try {
100 fc.close();
101 }
102 catch (IOException e) {
103 }
104 }
105 }
106 }
107 else {
108 result = true;
109 }
110 }
111 }
112 return result;
113 }
114
115 protected boolean isMatched(File file) {
116 boolean result = true;
117 if (regexPattern != null && regexPattern.length() > 0) {
118 result = file.getName().matches(getRegexPattern());
119 }
120 return result;
121 }
122
123 /**
124 * @return the recursive
125 */
126 public boolean isRecursive() {
127 return this.recursive;
128 }
129
130 /**
131 * @param recursive the recursive to set
132 */
133 public void setRecursive(boolean recursive) {
134 this.recursive = recursive;
135 }
136
137 /**
138 * @return the attemptFileLock
139 */
140 public boolean isAttemptFileLock() {
141 return this.attemptFileLock;
142 }
143
144 /**
145 * @param attemptFileLock the attemptFileLock to set
146 */
147 public void setAttemptFileLock(boolean attemptFileLock) {
148 this.attemptFileLock = attemptFileLock;
149 }
150
151 /**
152 * @return the regexPattern
153 */
154 public String getRegexPattern() {
155 return this.regexPattern;
156 }
157
158 /**
159 * @param regexPattern the regexPattern to set
160 */
161 public void setRegexPattern(String regexPattern) {
162 this.regexPattern = regexPattern;
163 }
164 }