package org.apache.servicemix.jbi.messaging;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jbi.JBIException;
import javax.jbi.messaging.DeliveryChannel;
import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.MessageExchangeFactory;
import javax.jbi.messaging.MessagingException;
import javax.jbi.servicedesc.ServiceEndpoint;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import javax.xml.namespace.QName;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.servicemix.JbiConstants;
import org.apache.servicemix.id.IdGenerator;
import org.apache.servicemix.jbi.ExchangeTimeoutException;
import org.apache.servicemix.jbi.container.ActivationSpec;
import org.apache.servicemix.jbi.container.JBIContainer;
import org.apache.servicemix.jbi.event.ExchangeEvent;
import org.apache.servicemix.jbi.event.ExchangeListener;
import org.apache.servicemix.jbi.framework.ComponentContextImpl;
import org.apache.servicemix.jbi.framework.ComponentMBeanImpl;
import org.apache.servicemix.jbi.listener.MessageExchangeListener;

/* loaded from: input_file:org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.class */
public class DeliveryChannelImpl implements DeliveryChannel {
    private static final Log LOG = LogFactory.getLog(DeliveryChannelImpl.class);
    private JBIContainer container;
    private ComponentContextImpl context;
    private ComponentMBeanImpl component;
    private BlockingQueue<MessageExchangeImpl> queue;
    private MessageExchangeFactory inboundFactory;
    private int intervalCount;
    private TransactionManager transactionManager;
    private IdGenerator idGenerator = new IdGenerator();
    private AtomicBoolean closed = new AtomicBoolean(false);
    private Map<Thread, Boolean> waiters = new ConcurrentHashMap();
    private Map<String, MessageExchangeImpl> exchangesById = new ConcurrentHashMap();

    public DeliveryChannelImpl(ComponentMBeanImpl componentMBeanImpl) {
        this.component = componentMBeanImpl;
        this.container = componentMBeanImpl.getContainer();
        this.queue = new ArrayBlockingQueue(componentMBeanImpl.getInboundQueueCapacity());
        this.transactionManager = (TransactionManager) this.container.getTransactionManager();
    }

    public int getQueueSize() {
        return this.queue.size();
    }

    public void close() throws MessagingException {
        if (this.closed.compareAndSet(false, true)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Closing DeliveryChannel " + this);
            }
            ArrayList<MessageExchangeImpl> arrayList = new ArrayList(this.queue.size());
            this.queue.drainTo(arrayList);
            for (MessageExchangeImpl messageExchangeImpl : arrayList) {
                if (messageExchangeImpl.getTransactionContext() != null && messageExchangeImpl.getMirror().getSyncState() == 1) {
                    notifyExchange(messageExchangeImpl.getMirror(), messageExchangeImpl.getMirror(), "close");
                }
            }
            for (Thread thread : (Thread[]) this.waiters.keySet().toArray(new Thread[this.waiters.size()])) {
                thread.interrupt();
            }
            for (ServiceEndpoint serviceEndpoint : this.container.getRegistry().getEndpointsForComponent(this.component.getComponentNameSpace())) {
                try {
                    this.component.getContext().deactivateEndpoint(serviceEndpoint);
                } catch (JBIException e) {
                    LOG.error("Error deactivating endpoint", e);
                }
            }
        }
    }

    protected void checkNotClosed() throws MessagingException {
        if (this.closed.get()) {
            throw new MessagingException(this + " has been closed.");
        }
    }

    public MessageExchangeFactory createExchangeFactory() {
        MessageExchangeFactoryImpl createMessageExchangeFactory = createMessageExchangeFactory();
        createMessageExchangeFactory.setContext(this.context);
        ActivationSpec activationSpec = this.context.getActivationSpec();
        if (activationSpec != null) {
            String name = this.context.getComponentNameSpace().getName();
            QName destinationService = activationSpec.getDestinationService();
            if (destinationService != null) {
                createMessageExchangeFactory.setServiceName(destinationService);
                LOG.debug("default destination serviceName for " + name + " = " + destinationService);
            }
            QName destinationInterface = activationSpec.getDestinationInterface();
            if (destinationInterface != null) {
                createMessageExchangeFactory.setInterfaceName(destinationInterface);
                LOG.debug("default destination interfaceName for " + name + " = " + destinationInterface);
            }
            QName destinationOperation = activationSpec.getDestinationOperation();
            if (destinationOperation != null) {
                createMessageExchangeFactory.setOperationName(destinationOperation);
                LOG.debug("default destination operationName for " + name + " = " + destinationOperation);
            }
            String destinationEndpoint = activationSpec.getDestinationEndpoint();
            if (destinationEndpoint != null) {
                boolean z = false;
                LOG.debug("default destination endpointName for " + name + " = " + destinationEndpoint);
                if (destinationService != null && destinationEndpoint != null) {
                    destinationEndpoint = destinationEndpoint.trim();
                    ServiceEndpoint endpoint = this.container.getRegistry().getEndpoint(destinationService, destinationEndpoint);
                    if (endpoint != null) {
                        createMessageExchangeFactory.setEndpoint(endpoint);
                        LOG.info("Set default destination endpoint for " + name + " to " + endpoint);
                        z = true;
                    }
                }
                if (!z) {
                    LOG.warn("Could not find destination endpoint for " + name + " service(" + destinationService + ") with endpointName " + destinationEndpoint);
                }
            }
        }
        return createMessageExchangeFactory;
    }

    public MessageExchangeFactory createExchangeFactory(QName qName) {
        MessageExchangeFactoryImpl createMessageExchangeFactory = createMessageExchangeFactory();
        createMessageExchangeFactory.setInterfaceName(qName);
        return createMessageExchangeFactory;
    }

    public MessageExchangeFactory createExchangeFactoryForService(QName qName) {
        MessageExchangeFactoryImpl createMessageExchangeFactory = createMessageExchangeFactory();
        createMessageExchangeFactory.setServiceName(qName);
        return createMessageExchangeFactory;
    }

    public MessageExchangeFactory createExchangeFactory(ServiceEndpoint serviceEndpoint) {
        MessageExchangeFactoryImpl createMessageExchangeFactory = createMessageExchangeFactory();
        createMessageExchangeFactory.setEndpoint(serviceEndpoint);
        return createMessageExchangeFactory;
    }

    protected MessageExchangeFactoryImpl createMessageExchangeFactory() {
        MessageExchangeFactoryImpl messageExchangeFactoryImpl = new MessageExchangeFactoryImpl(this.idGenerator, this.closed);
        messageExchangeFactoryImpl.setContext(this.context);
        return messageExchangeFactoryImpl;
    }

    public MessageExchange accept() throws MessagingException {
        return accept(Long.MAX_VALUE);
    }

    public MessageExchange accept(long j) throws MessagingException {
        try {
            checkNotClosed();
            MessageExchangeImpl poll = this.queue.poll(j, TimeUnit.MILLISECONDS);
            if (poll != null) {
                if (poll.getPacket().isAborted()) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Aborted " + poll.getExchangeId() + " in " + this);
                    }
                    poll = null;
                } else {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Accepting " + poll.getExchangeId() + " in " + this);
                    }
                    if (poll.getTxLock() != null && poll.getStatus() != ExchangeStatus.ACTIVE) {
                        notifyExchange(poll.getMirror(), poll.getTxLock(), "acceptFinishedExchangeWithTxLock");
                        poll.handleAccept();
                        if (LOG.isTraceEnabled()) {
                            LOG.trace("Accepted: " + poll);
                        }
                    } else if (!poll.isTransacted() || poll.getStatus() == ExchangeStatus.ACTIVE) {
                        resumeTx(poll);
                        poll.handleAccept();
                        if (LOG.isTraceEnabled()) {
                            LOG.trace("Accepted: " + poll);
                        }
                    } else {
                        poll.handleAccept();
                        if (LOG.isTraceEnabled()) {
                            LOG.trace("Accepted: " + poll);
                        }
                    }
                }
            }
            if (poll != null) {
                ExchangeListener[] exchangeListenerArr = (ExchangeListener[]) this.container.getListeners(ExchangeListener.class);
                ExchangeEvent exchangeEvent = new ExchangeEvent(poll, 1);
                for (ExchangeListener exchangeListener : exchangeListenerArr) {
                    try {
                        exchangeListener.exchangeAccepted(exchangeEvent);
                    } catch (Exception e) {
                        LOG.warn("Error calling listener: " + e.getMessage(), e);
                    }
                }
            }
            return poll;
        } catch (InterruptedException e2) {
            throw new MessagingException("accept failed", e2);
        }
    }

    protected void autoSetPersistent(MessageExchangeImpl messageExchangeImpl) {
        if (messageExchangeImpl.getPersistent() == null) {
            messageExchangeImpl.setPersistent(this.context.getActivationSpec().getPersistent() != null ? this.context.getActivationSpec().getPersistent() : Boolean.valueOf(this.context.getContainer().isPersistent()));
        }
    }

    protected void throttle() {
        if (this.component.isExchangeThrottling()) {
            if (this.component.getThrottlingInterval() > this.intervalCount) {
                this.intervalCount = 0;
                try {
                    Thread.sleep(this.component.getThrottlingTimeout());
                } catch (InterruptedException e) {
                    LOG.warn("throttling failed", e);
                }
            }
            this.intervalCount++;
        }
    }

    protected void doSend(MessageExchangeImpl messageExchangeImpl, boolean z) throws MessagingException {
        MessageExchangeImpl mirror = messageExchangeImpl.getMirror();
        boolean z2 = messageExchangeImpl.getStatus() != ExchangeStatus.ACTIVE;
        try {
            try {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Sent: " + messageExchangeImpl);
                }
                if (messageExchangeImpl.getPacket().isAborted()) {
                    throw new ExchangeTimeoutException(messageExchangeImpl);
                }
                autoEnlistInTx(messageExchangeImpl);
                autoSetPersistent(messageExchangeImpl);
                throttle();
                if (messageExchangeImpl.getRole() == MessageExchange.Role.CONSUMER) {
                    messageExchangeImpl.setSourceId(this.component.getComponentNameSpace());
                }
                ExchangeListener[] exchangeListenerArr = (ExchangeListener[]) this.container.getListeners(ExchangeListener.class);
                ExchangeEvent exchangeEvent = new ExchangeEvent(messageExchangeImpl, 0);
                for (ExchangeListener exchangeListener : exchangeListenerArr) {
                    try {
                        exchangeListener.exchangeSent(exchangeEvent);
                    } catch (Exception e) {
                        LOG.warn("Error calling listener: " + e.getMessage(), e);
                    }
                }
                messageExchangeImpl.handleSend(z);
                mirror.setTxState(0);
                if (z2 && messageExchangeImpl.getTxLock() == null && messageExchangeImpl.getTxState() == 2 && !messageExchangeImpl.isPushDelivery() && messageExchangeImpl.getRole() == MessageExchange.Role.CONSUMER) {
                    messageExchangeImpl.setTransactionContext(null);
                }
                this.container.sendExchange(mirror);
                if (messageExchangeImpl.getTxLock() != null) {
                    if (mirror.getTxState() == 1) {
                        suspendTx(mirror);
                    }
                    synchronized (messageExchangeImpl.getTxLock()) {
                        notifyExchange(messageExchangeImpl, messageExchangeImpl.getTxLock(), "doSendWithTxLock");
                    }
                }
            } catch (MessagingException e2) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Exception processing: " + messageExchangeImpl.getExchangeId() + " in " + this);
                }
                throw e2;
            }
        } catch (Throwable th) {
            if (messageExchangeImpl.getTxLock() != null) {
                if (mirror.getTxState() == 1) {
                    suspendTx(mirror);
                }
                synchronized (messageExchangeImpl.getTxLock()) {
                    notifyExchange(messageExchangeImpl, messageExchangeImpl.getTxLock(), "doSendWithTxLock");
                }
            }
            throw th;
        }
    }

    public void send(MessageExchange messageExchange) throws MessagingException {
        checkNotClosed();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Send " + messageExchange.getExchangeId() + " in " + this);
        }
        messageExchange.setProperty(JbiConstants.SEND_SYNC, (Object) null);
        doSend((MessageExchangeImpl) messageExchange, false);
    }

    public boolean sendSync(MessageExchange messageExchange) throws MessagingException {
        return sendSync(messageExchange, 0L);
    }

    public boolean sendSync(MessageExchange messageExchange, long j) throws MessagingException {
        boolean z;
        checkNotClosed();
        if (LOG.isDebugEnabled()) {
            LOG.debug("SendSync " + messageExchange.getExchangeId() + " in " + this);
        }
        messageExchange.setProperty(JbiConstants.SEND_SYNC, Boolean.TRUE);
        MessageExchangeImpl messageExchangeImpl = (MessageExchangeImpl) messageExchange;
        String key = messageExchangeImpl.getKey();
        try {
            try {
                try {
                    this.exchangesById.put(key, messageExchangeImpl);
                    synchronized (messageExchangeImpl) {
                        doSend(messageExchangeImpl, true);
                        if (messageExchangeImpl.getSyncState() != 2) {
                            waitForExchange(messageExchangeImpl, messageExchangeImpl, j, "sendSync");
                        } else if (LOG.isDebugEnabled()) {
                            LOG.debug("Exchange " + messageExchange.getExchangeId() + " has already been answered (no need to wait)");
                        }
                    }
                    if (messageExchangeImpl.getSyncState() == 2) {
                        messageExchangeImpl.handleAccept();
                        resumeTx(messageExchangeImpl);
                        ExchangeListener[] exchangeListenerArr = (ExchangeListener[]) this.container.getListeners(ExchangeListener.class);
                        ExchangeEvent exchangeEvent = new ExchangeEvent(messageExchangeImpl, 1);
                        for (ExchangeListener exchangeListener : exchangeListenerArr) {
                            try {
                                exchangeListener.exchangeAccepted(exchangeEvent);
                            } catch (Exception e) {
                                LOG.warn("Error calling listener: " + e.getMessage(), e);
                            }
                        }
                        z = true;
                    } else {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Exchange " + messageExchange.getExchangeId() + " has been aborted");
                        }
                        messageExchangeImpl.getPacket().setAborted(true);
                        messageExchangeImpl.getPacket().setError(new RuntimeException("sendSync timeout for " + messageExchange.getExchangeId()));
                        z = false;
                    }
                    return z;
                } catch (InterruptedException e2) {
                    throw new MessagingException(e2);
                }
            } catch (RuntimeException e3) {
                throw e3;
            }
        } finally {
            this.exchangesById.remove(key);
        }
    }

    public JBIContainer getContainer() {
        return this.container;
    }

    public void setContainer(JBIContainer jBIContainer) {
        this.container = jBIContainer;
    }

    public ComponentMBeanImpl getComponent() {
        return this.component;
    }

    public ComponentContextImpl getContext() {
        return this.context;
    }

    public void setContext(ComponentContextImpl componentContextImpl) {
        this.context = componentContextImpl;
    }

    /* JADX WARN: Finally extract failed */
    public void processInBound(MessageExchangeImpl messageExchangeImpl) throws MessagingException {
        if (LOG.isTraceEnabled()) {
            LOG.trace("Processing inbound exchange: " + messageExchangeImpl);
        }
        checkNotClosed();
        MessageExchangeImpl messageExchangeImpl2 = this.exchangesById.get(messageExchangeImpl.getKey());
        if (messageExchangeImpl2 != null && messageExchangeImpl != messageExchangeImpl2) {
            messageExchangeImpl2.copyFrom(messageExchangeImpl);
            messageExchangeImpl = messageExchangeImpl2;
        }
        if (messageExchangeImpl.getSyncState() == 1) {
            suspendTx(messageExchangeImpl2);
            messageExchangeImpl.setSyncState(2);
            notifyExchange(messageExchangeImpl2, messageExchangeImpl2, "processInboundSynchronousExchange");
            return;
        }
        MessageExchangeListener exchangeListener = getExchangeListener();
        if (exchangeListener != null) {
            messageExchangeImpl.handleAccept();
            if (LOG.isTraceEnabled()) {
                LOG.trace("Received: " + messageExchangeImpl);
            }
            ExchangeListener[] exchangeListenerArr = (ExchangeListener[]) this.container.getListeners(ExchangeListener.class);
            ExchangeEvent exchangeEvent = new ExchangeEvent(messageExchangeImpl, 1);
            for (ExchangeListener exchangeListener2 : exchangeListenerArr) {
                try {
                    exchangeListener2.exchangeAccepted(exchangeEvent);
                } catch (Exception e) {
                    LOG.warn("Error calling listener: " + e.getMessage(), e);
                }
            }
            messageExchangeImpl.setPushDeliver(true);
            ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
            try {
                Thread.currentThread().setContextClassLoader(this.component.getComponent().getClass().getClassLoader());
                exchangeListener.onMessageExchange(messageExchangeImpl);
                Thread.currentThread().setContextClassLoader(contextClassLoader);
                return;
            } catch (Throwable th) {
                Thread.currentThread().setContextClassLoader(contextClassLoader);
                throw th;
            }
        }
        if (!messageExchangeImpl.isTransacted() || messageExchangeImpl.getStatus() != ExchangeStatus.ACTIVE) {
            try {
                this.queue.put(messageExchangeImpl);
                return;
            } catch (InterruptedException e2) {
                LOG.debug("Exchange " + messageExchangeImpl.getExchangeId() + " aborted due to thread interruption", e2);
                messageExchangeImpl.getPacket().setAborted(true);
                return;
            }
        }
        if (messageExchangeImpl.getTxState() == 2) {
            try {
                suspendTx(messageExchangeImpl);
                this.queue.put(messageExchangeImpl);
                return;
            } catch (InterruptedException e3) {
                LOG.debug("Exchange " + messageExchangeImpl.getExchangeId() + " aborted due to thread interruption", e3);
                messageExchangeImpl.getPacket().setAborted(true);
                return;
            }
        }
        Object obj = new Object();
        synchronized (obj) {
            try {
                try {
                    messageExchangeImpl.setTxLock(obj);
                    suspendTx(messageExchangeImpl);
                    this.queue.put(messageExchangeImpl);
                    waitForExchange(messageExchangeImpl, obj, 0L, "processInboundTransactionalExchange");
                    messageExchangeImpl.setTxLock(null);
                    resumeTx(messageExchangeImpl);
                } catch (InterruptedException e4) {
                    LOG.debug("Exchange " + messageExchangeImpl.getExchangeId() + " aborted due to thread interruption", e4);
                    messageExchangeImpl.getPacket().setAborted(true);
                    messageExchangeImpl.setTxLock(null);
                    resumeTx(messageExchangeImpl);
                }
            } catch (Throwable th2) {
                messageExchangeImpl.setTxLock(null);
                resumeTx(messageExchangeImpl);
                throw th2;
            }
        }
    }

    protected MessageExchangeListener getExchangeListener() {
        MessageExchangeListener component = this.component.getComponent();
        if (component instanceof MessageExchangeListener) {
            return component;
        }
        MessageExchangeListener lifeCycle = this.component.getLifeCycle();
        if (lifeCycle instanceof MessageExchangeListener) {
            return lifeCycle;
        }
        return null;
    }

    protected void waitForExchange(MessageExchangeImpl messageExchangeImpl, Object obj, long j, String str) throws InterruptedException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Waiting for exchange " + messageExchangeImpl.getExchangeId() + " (" + Integer.toHexString(messageExchangeImpl.hashCode()) + ") to be answered in " + this + " from " + str);
        }
        Thread currentThread = Thread.currentThread();
        try {
            this.waiters.put(currentThread, Boolean.TRUE);
            obj.wait(j);
            this.waiters.remove(currentThread);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Notified: " + messageExchangeImpl.getExchangeId() + "(" + Integer.toHexString(messageExchangeImpl.hashCode()) + ") in " + this + " from " + str);
            }
        } catch (Throwable th) {
            this.waiters.remove(currentThread);
            throw th;
        }
    }

    protected void notifyExchange(MessageExchangeImpl messageExchangeImpl, Object obj, String str) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Notifying exchange " + messageExchangeImpl.getExchangeId() + "(" + Integer.toHexString(messageExchangeImpl.hashCode()) + ") in " + this + " from " + str);
        }
        synchronized (obj) {
            obj.notify();
        }
    }

    public MessageExchangeFactory getInboundFactory() {
        if (this.inboundFactory == null) {
            this.inboundFactory = createExchangeFactory();
        }
        return this.inboundFactory;
    }

    protected void suspendTx(MessageExchangeImpl messageExchangeImpl) {
        if (this.transactionManager == null || this.container.isUseNewTransactionModel()) {
            return;
        }
        try {
            Transaction transactionContext = messageExchangeImpl.getTransactionContext();
            if (transactionContext != null) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Suspending transaction for " + messageExchangeImpl.getExchangeId() + " in " + this);
                }
                if (this.transactionManager.suspend() != transactionContext) {
                    throw new IllegalStateException("the transaction context set in the messageExchange is not bound to the current thread");
                }
            }
        } catch (Exception e) {
            LOG.info("Exchange " + messageExchangeImpl.getExchangeId() + " aborted due to transaction exception", e);
            messageExchangeImpl.getPacket().setAborted(true);
        }
    }

    protected void resumeTx(MessageExchangeImpl messageExchangeImpl) throws MessagingException {
        if (this.transactionManager == null || this.container.isUseNewTransactionModel()) {
            return;
        }
        try {
            Transaction transactionContext = messageExchangeImpl.getTransactionContext();
            if (transactionContext != null) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Resuming transaction for " + messageExchangeImpl.getExchangeId() + " in " + this);
                }
                this.transactionManager.resume(transactionContext);
            }
        } catch (Exception e) {
            throw new MessagingException(e);
        }
    }

    protected void autoEnlistInTx(MessageExchangeImpl messageExchangeImpl) throws MessagingException {
        if (this.transactionManager == null || !this.container.isAutoEnlistInTransaction() || this.container.isUseNewTransactionModel()) {
            return;
        }
        try {
            Transaction transaction = this.transactionManager.getTransaction();
            if (transaction != null && transaction.getStatus() == 0) {
                Transaction transactionContext = messageExchangeImpl.getTransactionContext();
                if (transactionContext == null) {
                    messageExchangeImpl.setTransactionContext(transaction);
                } else if (transactionContext != transaction) {
                    throw new IllegalStateException("the transaction context set in the messageExchange is not bound to the current thread");
                }
            }
        } catch (Exception e) {
            throw new MessagingException(e);
        }
    }

    public String toString() {
        return "DeliveryChannel{" + this.component.getName() + "}";
    }

    public void cancelPendingExchanges() {
        Iterator<String> it = this.exchangesById.keySet().iterator();
        while (it.hasNext()) {
            MessageExchangeImpl messageExchangeImpl = this.exchangesById.get(it.next());
            synchronized (messageExchangeImpl) {
                messageExchangeImpl.notifyAll();
            }
        }
    }
}
