package org.apache.servicemix.file;

import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileFilter;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Comparator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import javax.jbi.JBIException;
import javax.jbi.management.DeploymentException;
import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.InOnly;
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.NormalizedMessage;
import javax.jbi.servicedesc.ServiceEndpoint;
import javax.xml.namespace.QName;
import org.apache.servicemix.common.DefaultComponent;
import org.apache.servicemix.common.ServiceUnit;
import org.apache.servicemix.common.endpoints.PollingEndpoint;
import org.apache.servicemix.common.locks.LockManager;
import org.apache.servicemix.common.locks.impl.SimpleLockManager;
import org.apache.servicemix.components.util.DefaultFileMarshaler;
import org.apache.servicemix.components.util.FileMarshaler;
import org.apache.servicemix.util.FileUtil;

/* loaded from: input_file:apache-servicemix-4.3.1-fuse-02-05/system/org/apache/servicemix/servicemix-file/2011.01.0-fuse-02-05/servicemix-file-2011.01.0-fuse-02-05.jar:org/apache/servicemix/file/FilePollerEndpoint.class */
public class FilePollerEndpoint extends PollingEndpoint implements FileEndpointType {
    private File file;
    private FileFilter filter;
    private boolean deleteFile;
    private boolean recursive;
    private boolean autoCreateDirectory;
    private File archive;
    private Comparator<File> comparator;
    private FileMarshaler marshaler;
    private LockManager lockManager;
    private ConcurrentMap<String, InputStream> openExchanges;
    private int maxConcurrent;
    private Object monitor;
    private AtomicLong throttleCounter;

    public FilePollerEndpoint() {
        this.deleteFile = true;
        this.recursive = true;
        this.autoCreateDirectory = true;
        this.marshaler = new DefaultFileMarshaler();
        this.openExchanges = new ConcurrentHashMap();
        this.maxConcurrent = -1;
        this.monitor = new Object();
        this.throttleCounter = new AtomicLong(0L);
    }

    public FilePollerEndpoint(ServiceUnit serviceUnit, QName qName, String str) {
        super(serviceUnit, qName, str);
        this.deleteFile = true;
        this.recursive = true;
        this.autoCreateDirectory = true;
        this.marshaler = new DefaultFileMarshaler();
        this.openExchanges = new ConcurrentHashMap();
        this.maxConcurrent = -1;
        this.monitor = new Object();
        this.throttleCounter = new AtomicLong(0L);
    }

    public FilePollerEndpoint(DefaultComponent defaultComponent, ServiceEndpoint serviceEndpoint) {
        super(defaultComponent, serviceEndpoint);
        this.deleteFile = true;
        this.recursive = true;
        this.autoCreateDirectory = true;
        this.marshaler = new DefaultFileMarshaler();
        this.openExchanges = new ConcurrentHashMap();
        this.maxConcurrent = -1;
        this.monitor = new Object();
        this.throttleCounter = new AtomicLong(0L);
    }

    @Override // org.apache.servicemix.common.endpoints.PollingEndpoint, org.apache.servicemix.common.endpoints.SimpleEndpoint, org.apache.servicemix.common.endpoints.AbstractEndpoint, org.apache.servicemix.common.Endpoint
    public synchronized void start() throws Exception {
        super.start();
        this.openExchanges = new ConcurrentHashMap();
    }

    @Override // org.apache.servicemix.common.endpoints.PollingEndpoint
    public void poll() throws Exception {
        if (!isThrottled()) {
            pollFileOrDirectory(this.file);
        } else if (this.logger.isDebugEnabled()) {
            this.logger.info("Poller is throttled, skipping this cycle");
        }
    }

    @Override // org.apache.servicemix.common.endpoints.ConsumerEndpoint, org.apache.servicemix.common.endpoints.AbstractEndpoint, org.apache.servicemix.common.Endpoint
    public void validate() throws DeploymentException {
        super.validate();
        if (this.file == null) {
            throw new DeploymentException("You must specify a file property");
        }
        if (isAutoCreateDirectory() && !this.file.exists()) {
            this.file.mkdirs();
        }
        if (this.archive != null) {
            if (!this.deleteFile) {
                throw new DeploymentException("Archive shouldn't be specified unless deleteFile='true'");
            }
            if (isAutoCreateDirectory() && !this.archive.exists()) {
                this.archive.mkdirs();
            }
            if (!this.archive.isDirectory()) {
                throw new DeploymentException("Archive should refer to a directory");
            }
        }
        if (this.lockManager == null) {
            this.lockManager = createLockManager();
        }
    }

    protected LockManager createLockManager() {
        return new SimpleLockManager();
    }

    public void setMaxConcurrent(int i) {
        this.maxConcurrent = i;
    }

    public int getMaxConcurrent() {
        return this.maxConcurrent;
    }

    public void setFile(File file) {
        this.file = file;
    }

    public File getFile() {
        return this.file;
    }

    public void setLockManager(LockManager lockManager) {
        this.lockManager = lockManager;
    }

    public LockManager getLockManager() {
        return this.lockManager;
    }

    public void setFilter(FileFilter fileFilter) {
        this.filter = fileFilter;
    }

    public FileFilter getFilter() {
        return this.filter;
    }

    public void setDeleteFile(boolean z) {
        this.deleteFile = z;
    }

    public boolean isDeleteFile() {
        return this.deleteFile;
    }

    public void setRecursive(boolean z) {
        this.recursive = z;
    }

    public boolean isRecursive() {
        return this.recursive;
    }

    public void setAutoCreateDirectory(boolean z) {
        this.autoCreateDirectory = z;
    }

    public boolean isAutoCreateDirectory() {
        return this.autoCreateDirectory;
    }

    public void setMarshaler(FileMarshaler fileMarshaler) {
        this.marshaler = fileMarshaler;
    }

    public FileMarshaler getMarshaler() {
        return this.marshaler;
    }

    public void setComparator(Comparator<File> comparator) {
        this.comparator = comparator;
    }

    public void setArchive(File file) {
        this.archive = file;
    }

    public File getArchive() {
        return this.archive;
    }

    protected void pollFileOrDirectory(File file) {
        pollFileOrDirectory(file, true);
    }

    protected void pollFileOrDirectory(File file, boolean z) {
        if (!file.isDirectory()) {
            pollFile(file);
            return;
        }
        if (!z) {
            this.logger.debug("Skipping directory " + file);
            return;
        }
        this.logger.debug("Polling directory " + file);
        for (File file2 : sortPolledFiles(file.listFiles(getFilter()))) {
            pollFileOrDirectory(file2, isRecursive());
        }
    }

    private File[] sortPolledFiles(File[] fileArr) {
        if (this.comparator == null) {
            return fileArr;
        }
        Arrays.sort(fileArr, this.comparator);
        return fileArr;
    }

    protected void pollFile(final File file) {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Scheduling file " + file + " for processing");
        }
        if (FileUtil.isFileFullyAvailable(file)) {
            checkThrottle();
            getExecutor().execute(new Runnable() { // from class: org.apache.servicemix.file.FilePollerEndpoint.1
                @Override // java.lang.Runnable
                public void run() {
                    if (FilePollerEndpoint.this.lockManager.getLock(FilePollerEndpoint.this.file.toURI().relativize(file.toURI()).toString()).tryLock()) {
                        FilePollerEndpoint.this.processFileNow(file);
                    } else if (FilePollerEndpoint.this.logger.isDebugEnabled()) {
                        FilePollerEndpoint.this.logger.debug("Unable to acquire lock on " + file);
                    }
                }
            });
        } else if (this.logger.isDebugEnabled()) {
            this.logger.debug("The file " + file + " is still being copied. Skipping...");
        }
    }

    private void checkThrottle() {
        if (this.maxConcurrent <= 0 || this.openExchanges.size() < this.maxConcurrent) {
            return;
        }
        this.throttleCounter.addAndGet(1L);
        synchronized (this.monitor) {
            boolean z = false;
            while (this.openExchanges.size() >= this.maxConcurrent) {
                if (z) {
                    throw new IllegalStateException("Throttle block has been interrupted");
                }
                try {
                    this.monitor.wait();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    z = true;
                }
            }
        }
        this.throttleCounter.decrementAndGet();
    }

    protected boolean isThrottled() {
        return this.throttleCounter.get() > 0;
    }

    protected void processFileNow(File file) {
        try {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Processing file " + file);
            }
            if (file.exists()) {
                processFile(file);
            }
        } catch (Exception e) {
            this.logger.error("Failed to process file: " + file + ". Reason: " + e, e);
            unlockAsyncFile(file);
        }
    }

    protected void processFile(File file) throws Exception {
        BufferedInputStream bufferedInputStream = new BufferedInputStream(new FileInputStream(file));
        InOnly createInOnlyExchange = getExchangeFactory().createInOnlyExchange();
        configureExchangeTarget(createInOnlyExchange);
        NormalizedMessage createMessage = createInOnlyExchange.createMessage();
        createInOnlyExchange.setInMessage(createMessage);
        this.marshaler.readMessage(createInOnlyExchange, createMessage, bufferedInputStream, file.getCanonicalPath());
        createInOnlyExchange.getInMessage().setProperty(FileComponent.FILE_PROPERTY, file);
        this.openExchanges.put(createInOnlyExchange.getExchangeId(), bufferedInputStream);
        send(createInOnlyExchange);
    }

    @Override // org.apache.servicemix.common.endpoints.ConsumerEndpoint
    public String getLocationURI() {
        return this.file.toURI().toString();
    }

    @Override // org.apache.servicemix.common.endpoints.AbstractEndpoint, org.apache.servicemix.common.Endpoint
    public void process(MessageExchange messageExchange) throws Exception {
        if (!this.openExchanges.containsKey(messageExchange.getExchangeId())) {
            this.logger.debug("Received unknown exchange. Will be ignored...");
            return;
        }
        InputStream inputStream = this.openExchanges.get(messageExchange.getExchangeId());
        File file = (File) messageExchange.getMessage("in").getProperty(FileComponent.FILE_PROPERTY);
        if (file == null) {
            throw new JBIException("Property org.apache.servicemix.file was removed from the exchange -- unable to delete/archive the file");
        }
        this.logger.debug("Releasing " + file.getAbsolutePath());
        inputStream.close();
        try {
            if (messageExchange.getStatus() == ExchangeStatus.DONE) {
                if (isDeleteFile()) {
                    if (this.archive != null) {
                        moveFile(file, this.archive);
                    } else if (!file.delete()) {
                        throw new IOException("Could not delete file " + file);
                    }
                }
            } else {
                if (messageExchange.getStatus() != ExchangeStatus.ERROR) {
                    throw new JBIException("Unexpectedly received an exchange with status ACTIVE");
                }
                Exception error = messageExchange.getError();
                if (error == null) {
                    throw new JBIException("Received an exchange with status ERROR, but no exception was set");
                }
                this.logger.warn("Message in file " + file + " could not be handled successfully: " + error.getMessage(), error);
            }
        } finally {
            notifyThrottledThreads();
            this.openExchanges.remove(messageExchange.getExchangeId());
            unlockAsyncFile(file);
        }
    }

    private void notifyThrottledThreads() {
        if (this.maxConcurrent > 0) {
            synchronized (this.monitor) {
                this.monitor.notifyAll();
            }
        }
    }

    private void unlockAsyncFile(File file) {
        String uri = this.file.toURI().relativize(file.toURI()).toString();
        Lock lock = this.lockManager.getLock(uri);
        if (lock != null) {
            try {
                lock.unlock();
            } catch (Exception e) {
                this.logger.error(e);
            }
            this.lockManager.removeLock(uri);
        }
    }

    public static void moveFile(File file, File file2) throws IOException {
        String name = file.getName();
        File file3 = new File(file2, name);
        if (file3.exists() && file3.isFile()) {
            name = String.format("%d_%s", Long.valueOf(System.currentTimeMillis()), file.getName());
        }
        if (!file.renameTo(new File(file2, name))) {
            throw new IOException("Failed to move " + file + " to " + file2 + " with new name " + name);
        }
    }
}
