package org.apache.servicemix.eip.support;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.InOnly;
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.NormalizedMessage;
import javax.jbi.messaging.RobustInOnly;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.servicemix.common.JbiConstants;
import org.apache.servicemix.common.util.MessageUtil;
import org.apache.servicemix.eip.EIPEndpoint;
import org.apache.servicemix.store.Store;
import org.apache.servicemix.store.StoreFactory;
import org.apache.servicemix.store.memory.MemoryStoreFactory;
import org.apache.servicemix.timers.Timer;
import org.apache.servicemix.timers.TimerListener;

/* loaded from: input_file:apache-servicemix-4.3.1-fuse-02-05/system/org/apache/servicemix/servicemix-eip/2011.01.0-fuse-02-05/servicemix-eip-2011.01.0-fuse-02-05.jar:org/apache/servicemix/eip/support/AbstractAggregator.class */
public abstract class AbstractAggregator extends EIPEndpoint {
    private static final Log LOG = LogFactory.getLog(AbstractAggregator.class);
    private ExchangeTarget target;
    private boolean rescheduleTimeouts;
    private boolean synchronous;
    private Store closedAggregates;
    private StoreFactory closedAggregatesStoreFactory;
    private boolean reportTimeoutAsErrors;
    private boolean copyProperties = true;
    private boolean copyAttachments = true;
    private boolean reportErrors = false;
    private boolean reportClosedAggregatesAsErrors = false;
    private ConcurrentMap<String, Timer> timers = new ConcurrentHashMap();

    /* loaded from: input_file:apache-servicemix-4.3.1-fuse-02-05/system/org/apache/servicemix/servicemix-eip/2011.01.0-fuse-02-05/servicemix-eip-2011.01.0-fuse-02-05.jar:org/apache/servicemix/eip/support/AbstractAggregator$ClosedAggregateException.class */
    public static class ClosedAggregateException extends Exception {
    }

    public boolean isSynchronous() {
        return this.synchronous;
    }

    public void setSynchronous(boolean z) {
        this.synchronous = z;
    }

    public boolean isRescheduleTimeouts() {
        return this.rescheduleTimeouts;
    }

    public void setRescheduleTimeouts(boolean z) {
        this.rescheduleTimeouts = z;
    }

    public ExchangeTarget getTarget() {
        return this.target;
    }

    public void setTarget(ExchangeTarget exchangeTarget) {
        this.target = exchangeTarget;
    }

    public boolean isCopyProperties() {
        return this.copyProperties;
    }

    public void setCopyProperties(boolean z) {
        this.copyProperties = z;
    }

    public boolean isCopyAttachments() {
        return this.copyAttachments;
    }

    public void setCopyAttachments(boolean z) {
        this.copyAttachments = z;
    }

    public boolean isReportErrors() {
        return this.reportErrors;
    }

    public void setReportErrors(boolean z) {
        this.reportErrors = z;
    }

    public boolean isReportClosedAggregatesAsErrors() {
        return this.reportClosedAggregatesAsErrors;
    }

    public void setReportClosedAggregatesAsErrors(boolean z) {
        this.reportClosedAggregatesAsErrors = z;
    }

    public void setReportTimeoutAsErrors(boolean z) {
        this.reportTimeoutAsErrors = z;
    }

    public boolean isReportTimeoutAsErrors() {
        return this.reportTimeoutAsErrors;
    }

    @Override // org.apache.servicemix.eip.EIPEndpoint
    protected void processSync(MessageExchange messageExchange) throws Exception {
        throw new IllegalStateException();
    }

    public StoreFactory getClosedAggregatesStoreFactory() {
        return this.closedAggregatesStoreFactory;
    }

    public void setClosedAggregatesStoreFactory(StoreFactory storeFactory) {
        this.closedAggregatesStoreFactory = storeFactory;
    }

    @Override // org.apache.servicemix.eip.EIPEndpoint
    protected void processAsync(MessageExchange messageExchange) throws Exception {
        throw new IllegalStateException();
    }

    @Override // org.apache.servicemix.eip.EIPEndpoint, org.apache.servicemix.common.endpoints.SimpleEndpoint, org.apache.servicemix.common.endpoints.AbstractEndpoint, org.apache.servicemix.common.Endpoint
    public void start() throws Exception {
        super.start();
        if (this.closedAggregatesStoreFactory == null) {
            this.closedAggregatesStoreFactory = new MemoryStoreFactory();
        }
        this.closedAggregates = this.closedAggregatesStoreFactory.open(getService().toString() + getEndpoint() + "-closed-aggregates");
        if (this.reportTimeoutAsErrors && !this.reportErrors) {
            throw new IllegalArgumentException("ReportTimeoutAsErrors property may only be set if ReportTimeout property is also set!");
        }
    }

    @Override // org.apache.servicemix.eip.EIPEndpoint, org.apache.servicemix.common.endpoints.ProviderEndpoint, org.apache.servicemix.common.endpoints.AbstractEndpoint, org.apache.servicemix.common.Endpoint
    public void process(MessageExchange messageExchange) throws Exception {
        if (messageExchange.getRole() == MessageExchange.Role.PROVIDER) {
            if (messageExchange.getStatus() != ExchangeStatus.ACTIVE) {
                return;
            }
            if ((messageExchange instanceof InOnly) || (messageExchange instanceof RobustInOnly)) {
                processProvider(messageExchange);
                return;
            } else {
                fail(messageExchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
                return;
            }
        }
        if (messageExchange.getStatus() == ExchangeStatus.ACTIVE) {
            throw new IllegalStateException("Unexpected active consumer exchange received");
        }
        if (this.reportErrors) {
            List<MessageExchange> list = (List) this.store.load(((String) messageExchange.getProperty(getService().toString() + ":" + getEndpoint() + ":correlation")) + "-exchanges");
            if (list != null) {
                for (MessageExchange messageExchange2 : list) {
                    if (messageExchange.getStatus() == ExchangeStatus.ERROR) {
                        messageExchange2.setError(messageExchange.getError());
                    }
                    messageExchange2.setStatus(messageExchange.getStatus());
                    send(messageExchange2);
                }
            }
        }
    }

    private void processProvider(MessageExchange messageExchange) throws Exception {
        final String str = (String) messageExchange.getProperty(JbiConstants.CORRELATION_ID);
        NormalizedMessage copyIn = MessageUtil.copyIn(messageExchange);
        final String correlationID = getCorrelationID(messageExchange, copyIn);
        if (correlationID == null || correlationID.length() == 0) {
            throw new IllegalArgumentException("Could not retrieve correlation id for incoming exchange");
        }
        Lock lock = getLockManager().getLock(correlationID);
        lock.lock();
        boolean z = true;
        try {
            Object load = this.store.load(correlationID);
            Date date = null;
            if (load == null) {
                if (!isAggregationClosed(correlationID)) {
                    load = createAggregation(correlationID);
                    date = getTimeout(load);
                }
            } else if (isRescheduleTimeouts()) {
                date = getTimeout(load);
            }
            if (load != null) {
                if (this.reportErrors) {
                    List list = (List) this.store.load(correlationID + "-exchanges");
                    if (list == null) {
                        list = new ArrayList();
                    }
                    list.add(messageExchange);
                    this.store.store(correlationID + "-exchanges", list);
                    z = false;
                }
                if (addMessage(load, copyIn, messageExchange)) {
                    sendAggregate(str, correlationID, load, false, isSynchronous(messageExchange));
                } else {
                    this.store.store(correlationID, load);
                    if (date != null) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Scheduling timeout at " + date + " for aggregate " + correlationID);
                        }
                        this.timers.put(correlationID, getTimerManager().schedule(new TimerListener() { // from class: org.apache.servicemix.eip.support.AbstractAggregator.1
                            @Override // org.apache.servicemix.timers.TimerListener
                            public void timerExpired(Timer timer) {
                                AbstractAggregator.this.onTimeout(str, correlationID, timer);
                            }
                        }, date));
                    }
                    z = false;
                }
                if (!this.reportErrors) {
                    done(messageExchange);
                }
            } else if (this.reportClosedAggregatesAsErrors) {
                fail(messageExchange, new ClosedAggregateException());
            } else {
                done(messageExchange);
            }
            try {
                lock.unlock();
            } catch (Exception e) {
                LOG.info("Caught exception while attempting to release aggregation lock", e);
            }
            if (z) {
                this.lockManager.removeLock(correlationID);
            }
        } catch (Throwable th) {
            try {
                lock.unlock();
            } catch (Exception e2) {
                LOG.info("Caught exception while attempting to release aggregation lock", e2);
            }
            if (1 != 0) {
                this.lockManager.removeLock(correlationID);
            }
            throw th;
        }
    }

    protected void sendAggregate(String str, String str2, Object obj, boolean z, boolean z2) throws Exception {
        InOnly createInOnlyExchange = getExchangeFactory().createInOnlyExchange();
        if (str != null) {
            createInOnlyExchange.setProperty(JbiConstants.CORRELATION_ID, str);
        }
        createInOnlyExchange.setProperty(getService().toString() + ":" + getEndpoint() + ":correlation", str2);
        this.target.configureTarget(createInOnlyExchange, getContext());
        NormalizedMessage createMessage = createInOnlyExchange.createMessage();
        createInOnlyExchange.setInMessage(createMessage);
        buildAggregate(obj, createMessage, createInOnlyExchange, z);
        closeAggregation(str2);
        if (z2) {
            sendSync(createInOnlyExchange);
        } else {
            send(createInOnlyExchange);
        }
    }

    protected void onTimeout(String str, String str2, Timer timer) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Timeout expired for aggregate " + str2);
        }
        Lock lock = getLockManager().getLock(str2);
        lock.lock();
        try {
            try {
                Timer timer2 = this.timers.get(str2);
                if (timer2 == null || !timer2.equals(timer)) {
                    try {
                        lock.unlock();
                    } catch (Exception e) {
                        LOG.info("Caught exception while attempting to release timeout aggregation lock", e);
                    }
                    this.lockManager.removeLock(str2);
                    return;
                }
                this.timers.remove(str2);
                Object load = this.store.load(str2);
                if (load == null) {
                    if (!isAggregationClosed(str2)) {
                        throw new IllegalStateException("Aggregation is not closed, but can not be retrieved from the store");
                    }
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Aggregate " + str2 + " is closed");
                    }
                } else if (this.reportTimeoutAsErrors) {
                    List<MessageExchange> list = (List) this.store.load(str2 + "-exchanges");
                    if (list != null) {
                        TimeoutException timeoutException = new TimeoutException();
                        for (MessageExchange messageExchange : list) {
                            messageExchange.setError(timeoutException);
                            messageExchange.setStatus(ExchangeStatus.ERROR);
                            send(messageExchange);
                        }
                    }
                    closeAggregation(str2);
                } else {
                    sendAggregate(str, str2, load, true, isSynchronous());
                }
                try {
                    lock.unlock();
                } catch (Exception e2) {
                    LOG.info("Caught exception while attempting to release timeout aggregation lock", e2);
                }
                this.lockManager.removeLock(str2);
            } catch (Exception e3) {
                LOG.info("Caught exception while processing timeout aggregation", e3);
                try {
                    lock.unlock();
                } catch (Exception e4) {
                    LOG.info("Caught exception while attempting to release timeout aggregation lock", e4);
                }
                this.lockManager.removeLock(str2);
            }
        } catch (Throwable th) {
            try {
                lock.unlock();
            } catch (Exception e5) {
                LOG.info("Caught exception while attempting to release timeout aggregation lock", e5);
            }
            this.lockManager.removeLock(str2);
            throw th;
        }
    }

    protected boolean isAggregationClosed(String str) throws Exception {
        Object load = this.closedAggregates.load(str);
        if (load != null) {
            this.closedAggregates.store(str, load);
        }
        return load != null;
    }

    protected void closeAggregation(String str) throws Exception {
        this.closedAggregates.store(str, Boolean.TRUE);
    }

    private boolean isSynchronous(MessageExchange messageExchange) {
        return isSynchronous() || (messageExchange.isTransacted() && Boolean.TRUE.equals(messageExchange.getProperty("javax.jbi.messaging.sendSync")));
    }

    protected abstract String getCorrelationID(MessageExchange messageExchange, NormalizedMessage normalizedMessage) throws Exception;

    protected abstract Object createAggregation(String str) throws Exception;

    protected abstract Date getTimeout(Object obj);

    protected abstract boolean addMessage(Object obj, NormalizedMessage normalizedMessage, MessageExchange messageExchange) throws Exception;

    protected abstract void buildAggregate(Object obj, NormalizedMessage normalizedMessage, MessageExchange messageExchange, boolean z) throws Exception;
}
