001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.component.file.remote;
018
019 import java.io.ByteArrayOutputStream;
020 import java.io.IOException;
021 import java.util.concurrent.ScheduledExecutorService;
022
023 import org.apache.camel.Processor;
024 import org.apache.commons.net.ftp.FTPClient;
025 import org.apache.commons.net.ftp.FTPFile;
026
027 public class FtpConsumer extends RemoteFileConsumer<RemoteFileExchange> {
028 private boolean recursive = true;
029 private String regexPattern = "";
030 private long lastPollTime;
031 private final FtpEndpoint endpoint;
032 private FTPClient client;
033
034 public FtpConsumer(FtpEndpoint endpoint, Processor processor, FTPClient client) {
035 super(endpoint, processor);
036 this.endpoint = endpoint;
037 this.client = client;
038 }
039
040 public FtpConsumer(FtpEndpoint endpoint, Processor processor, FTPClient client, ScheduledExecutorService executor) {
041 super(endpoint, processor, executor);
042 this.endpoint = endpoint;
043 this.client = client;
044 }
045
046 protected void poll() throws Exception {
047 final String fileName = endpoint.getConfiguration().getFile();
048 if (endpoint.getConfiguration().isDirectory()) {
049 pollDirectory(fileName);
050 } else {
051 client.changeWorkingDirectory(fileName.substring(0, fileName.lastIndexOf('/')));
052 final FTPFile[] files = client.listFiles(fileName.substring(fileName.lastIndexOf('/') + 1));
053 pollFile(files[0]);
054 }
055 lastPollTime = System.currentTimeMillis();
056 }
057
058 protected void pollDirectory(String dir) throws Exception {
059 client.changeWorkingDirectory(dir);
060 for (FTPFile ftpFile : client.listFiles()) {
061 if (ftpFile.isFile()) {
062 pollFile(ftpFile);
063 } else if (ftpFile.isDirectory()) {
064 if (isRecursive()) {
065 pollDirectory(getFullFileName(ftpFile));
066 }
067 } else {
068 throw new RuntimeException("");
069 }
070 }
071 }
072
073 protected String getFullFileName(FTPFile ftpFile) throws IOException {
074 return client.printWorkingDirectory() + "/" + ftpFile.getName();
075 }
076
077 private void pollFile(FTPFile ftpFile) throws Exception {
078 if (ftpFile.getTimestamp().getTimeInMillis() > lastPollTime) { // TODO
079 // do we
080 // need
081 // to
082 // adjust
083 // the
084 // TZ?
085 // can
086 // we?
087 if (isMatched(ftpFile)) {
088 final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
089 client.retrieveFile(ftpFile.getName(), byteArrayOutputStream);
090 getProcessor().process(endpoint.createExchange(getFullFileName(ftpFile), byteArrayOutputStream));
091 }
092 }
093 }
094
095 protected boolean isMatched(FTPFile file) {
096 boolean result = true;
097 if (regexPattern != null && regexPattern.length() > 0) {
098 result = file.getName().matches(getRegexPattern());
099 }
100 return result;
101 }
102
103 public boolean isRecursive() {
104 return recursive;
105 }
106
107 public void setRecursive(boolean recursive) {
108 this.recursive = recursive;
109 }
110
111 public long getLastPollTime() {
112 return lastPollTime;
113 }
114
115 public void setLastPollTime(long lastPollTime) {
116 this.lastPollTime = lastPollTime;
117 }
118
119 public String getRegexPattern() {
120 return regexPattern;
121 }
122
123 public void setRegexPattern(String regexPattern) {
124 this.regexPattern = regexPattern;
125 }
126 }