package org.fusesource.fabric.bridge.internal;

import java.beans.PropertyDescriptor;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.fusesource.fabric.bridge.model.BridgeDestinationsConfig;
import org.fusesource.fabric.bridge.model.BridgedDestination;
import org.fusesource.fabric.bridge.model.BrokerConfig;
import org.fusesource.fabric.bridge.model.DispatchPolicy;
import org.springframework.beans.BeanUtils;
import org.springframework.jms.JmsException;
import org.springframework.jms.UncategorizedJmsException;
import org.springframework.jms.listener.AbstractMessageListenerContainer;
import org.springframework.jms.listener.DefaultMessageListenerContainer;

/* loaded from: input_file:org/fusesource/fabric/bridge/internal/SourceConnector.class */
public class SourceConnector extends AbstractConnector {
    private BrokerConfig localBrokerConfig;
    private BrokerConfig remoteBrokerConfig;
    private BridgeDestinationsConfig outboundDestinations;
    private boolean reuseSession;
    private boolean reuseMessage;
    private Destination stagingQueue;
    private ConnectionFactory localConnectionFactory;
    private ConnectionFactory remoteConnectionFactory;
    private Map<BridgedDestination, AbstractMessageListenerContainer> listenerMap = new HashMap();

    @Override // org.fusesource.fabric.bridge.internal.AbstractConnector
    public void afterPropertiesSet() throws Exception {
        if (this.outboundDestinations == null || this.localBrokerConfig == null) {
            throw new IllegalArgumentException("Both outboundDestinations and localBrokerConfig properties must be set");
        }
        if (this.remoteBrokerConfig == null && !this.outboundDestinations.isUseStagingQueue()) {
            throw new IllegalArgumentException("Property remoteBrokerConfig is missing but property useStagingQueue is false");
        }
        if (this.remoteBrokerConfig == null && this.outboundDestinations.isDefaultStagingLocation()) {
            throw new IllegalArgumentException("Property remoteBrokerConfig is missing but property defaultStagingLocation is true");
        }
        if (this.remoteBrokerConfig != null && !this.outboundDestinations.isDefaultStagingLocation()) {
            throw new IllegalArgumentException("Property remoteBrokerConfig is set but property defaultStagingLocation is false");
        }
        if ((this.localBrokerConfig.getBrokerUrl() == null && this.localBrokerConfig.getConnectionFactory() == null) || (this.localBrokerConfig.getBrokerUrl() != null && this.localBrokerConfig.getConnectionFactory() != null)) {
            throw new IllegalArgumentException("Either a local broker url or connection factory must be provided");
        }
        if (this.remoteBrokerConfig != null) {
            if ((this.remoteBrokerConfig.getBrokerUrl() == null && this.remoteBrokerConfig.getConnectionFactory() == null) || (this.remoteBrokerConfig.getBrokerUrl() != null && this.remoteBrokerConfig.getConnectionFactory() != null)) {
                throw new IllegalArgumentException("Either a remote broker url or connection factory must be provided");
            }
        }
    }

    @Override // org.fusesource.fabric.bridge.internal.AbstractConnector
    protected void doInitialize() {
        this.localConnectionFactory = getConnectionFactory(this.localBrokerConfig);
        this.LOG.debug("Using local connection factory " + this.localConnectionFactory);
        if (this.remoteBrokerConfig == null || !this.outboundDestinations.isDefaultStagingLocation()) {
            this.LOG.warn("Using local broker for staging queue");
            this.outboundDestinations.setDefaultStagingLocation(false);
            this.remoteConnectionFactory = this.localConnectionFactory;
            this.reuseSession = true;
        } else {
            this.remoteConnectionFactory = getConnectionFactory(this.remoteBrokerConfig);
            this.LOG.debug("Using remote connection factory " + this.remoteConnectionFactory);
            this.reuseSession = false;
        }
        this.reuseMessage = false;
        Connection connection = null;
        Connection connection2 = null;
        try {
            try {
                connection = this.localConnectionFactory.createConnection();
                Message createMessage = connection.createSession(false, 1).createMessage();
                connection2 = this.remoteConnectionFactory.createConnection();
                Session createSession = connection2.createSession(false, 1);
                if (createMessage.getClass().isInstance(createSession.createMessage())) {
                    this.reuseMessage = true;
                }
                this.stagingQueue = getStagingQueue(createSession);
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (JMSException e) {
                    }
                }
                if (connection2 != null) {
                    try {
                        connection2.close();
                    } catch (JMSException e2) {
                    }
                }
                for (BridgedDestination bridgedDestination : this.outboundDestinations.getDestinations()) {
                    try {
                        this.listenerMap.put(bridgedDestination, createListenerContainer(bridgedDestination));
                    } catch (Exception e3) {
                        String str = "Error creating listener for destination: " + bridgedDestination;
                        this.LOG.error(str, e3);
                        throw new IllegalStateException(str, e3);
                    }
                }
                this.LOG.info("Initialized");
            } catch (JMSException e4) {
                String str2 = "Error checking whether local and remote broker providers are the same: " + e4.getMessage();
                this.LOG.error(str2, e4);
                throw new IllegalStateException(str2, e4);
            }
        } catch (Throwable th) {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e5) {
                }
            }
            if (connection2 != null) {
                try {
                    connection2.close();
                } catch (JMSException e6) {
                }
            }
            throw th;
        }
    }

    private Destination getStagingQueue(Session session) throws JMSException {
        if (this.outboundDestinations.isUseStagingQueue()) {
            return (this.remoteBrokerConfig != null ? this.remoteBrokerConfig.getDestinationResolver() : this.localBrokerConfig.getDestinationResolver()).resolveDestinationName(session, this.outboundDestinations.getStagingQueueName(), false);
        }
        return null;
    }

    @Override // org.fusesource.fabric.bridge.internal.AbstractConnector
    protected void doStart() {
        JmsException jmsException = null;
        for (Map.Entry<BridgedDestination, AbstractMessageListenerContainer> entry : this.listenerMap.entrySet()) {
            BridgedDestination key = entry.getKey();
            AbstractMessageListenerContainer value = entry.getValue();
            if (!value.isRunning()) {
                try {
                    if (this.LOG.isDebugEnabled()) {
                        this.LOG.debug("Starting listener for " + key);
                    }
                    value.start();
                    if (this.LOG.isDebugEnabled()) {
                        this.LOG.debug("Listener started for " + key);
                    }
                } catch (JmsException e) {
                    jmsException = e;
                    this.LOG.error("Error stopping connector for listener " + value + ": " + e.getMessage(), e);
                }
            }
        }
        if (jmsException != null) {
            stop();
            throw jmsException;
        }
        this.LOG.info("Started");
    }

    @Override // org.fusesource.fabric.bridge.internal.AbstractConnector
    protected void doStop() {
        for (Map.Entry<BridgedDestination, AbstractMessageListenerContainer> entry : this.listenerMap.entrySet()) {
            BridgedDestination key = entry.getKey();
            AbstractMessageListenerContainer value = entry.getValue();
            if (value.isRunning()) {
                try {
                    if (this.LOG.isDebugEnabled()) {
                        this.LOG.debug("Stopping listener for " + key);
                    }
                    value.stop();
                    if (this.LOG.isDebugEnabled()) {
                        this.LOG.debug("Listener stopped for " + key);
                    }
                } catch (JmsException e) {
                    this.LOG.error("Error stopping connector for listener " + value + ": " + e.getMessage(), e);
                }
            }
        }
        this.LOG.info("Stopped");
    }

    @Override // org.fusesource.fabric.bridge.internal.AbstractConnector
    protected void doDestroy() {
        for (Map.Entry<BridgedDestination, AbstractMessageListenerContainer> entry : this.listenerMap.entrySet()) {
            BridgedDestination key = entry.getKey();
            AbstractMessageListenerContainer value = entry.getValue();
            if (value.isActive()) {
                try {
                    if (this.LOG.isDebugEnabled()) {
                        this.LOG.debug("Destroying listener for " + key);
                    }
                    value.destroy();
                    if (this.LOG.isDebugEnabled()) {
                        this.LOG.debug("Listener destroyed for " + key);
                    }
                } catch (JmsException e) {
                    this.LOG.error("Error destroying listener for destination " + key.getName() + " : " + e.getMessage(), e);
                }
            }
        }
        this.listenerMap.clear();
        if (this.localBrokerConfig.getConnectionFactory() == null && this.localConnectionFactory != null) {
            this.localConnectionFactory.stop();
        }
        if (this.remoteBrokerConfig != null && this.remoteBrokerConfig.getConnectionFactory() == null && this.remoteConnectionFactory != null) {
            this.remoteConnectionFactory.stop();
        }
        this.LOG.info("Destroyed");
    }

    protected AbstractMessageListenerContainer createListenerContainer(BridgedDestination bridgedDestination) {
        DefaultMessageListenerContainer defaultMessageListenerContainer;
        DispatchPolicy resolvedPolicy = getResolvedPolicy(bridgedDestination.getDispatchPolicy(), this.outboundDestinations.getDispatchPolicy());
        if (this.LOG.isDebugEnabled()) {
            this.LOG.debug("Resolved policy for destination " + bridgedDestination + " is : " + resolvedPolicy);
        }
        AbstractDeliveryHandler createDeliveryHandler = createDeliveryHandler(resolvedPolicy, bridgedDestination.getTargetName() != null ? bridgedDestination.getTargetName() : bridgedDestination.getName(), bridgedDestination.isPubSubDomain());
        if (resolvedPolicy.getBatchSize() <= 0 || resolvedPolicy.getBatchTimeout() <= 0) {
            if (this.LOG.isDebugEnabled()) {
                this.LOG.debug("Creating default message listener container for " + bridgedDestination.getName());
            }
            defaultMessageListenerContainer = new DefaultMessageListenerContainer();
            configureListenerContainer(defaultMessageListenerContainer, resolvedPolicy, true, this.localBrokerConfig.getDestinationResolver());
            defaultMessageListenerContainer.setMessageListener(createDeliveryHandler);
        } else {
            if (this.LOG.isDebugEnabled()) {
                this.LOG.debug("Creating batch message listener container for " + bridgedDestination.getName());
            }
            defaultMessageListenerContainer = new BatchMessageListenerContainer();
            configureListenerContainer((BatchMessageListenerContainer) defaultMessageListenerContainer, resolvedPolicy, true, this.localBrokerConfig.getDestinationResolver());
            ((BatchMessageListenerContainer) defaultMessageListenerContainer).setBatchMessageListener(createDeliveryHandler);
        }
        defaultMessageListenerContainer.setConnectionFactory(this.localConnectionFactory);
        defaultMessageListenerContainer.setDestinationName(bridgedDestination.getName());
        defaultMessageListenerContainer.setPubSubDomain(bridgedDestination.isPubSubDomain());
        defaultMessageListenerContainer.setClientId(this.localBrokerConfig.getClientId());
        defaultMessageListenerContainer.setDurableSubscriptionName(bridgedDestination.getDurableSubscriptionName());
        defaultMessageListenerContainer.setSubscriptionDurable(bridgedDestination.isSubscriptionDurable());
        defaultMessageListenerContainer.afterPropertiesSet();
        return defaultMessageListenerContainer;
    }

    protected DispatchPolicy getResolvedPolicy(DispatchPolicy dispatchPolicy, DispatchPolicy dispatchPolicy2) {
        if (dispatchPolicy == null) {
            return dispatchPolicy2;
        }
        DispatchPolicy dispatchPolicy3 = new DispatchPolicy();
        BeanUtils.copyProperties(dispatchPolicy2, dispatchPolicy3);
        Set<String> propertiesSet = dispatchPolicy.getPropertiesSet();
        PropertyDescriptor[] propertyDescriptors = BeanUtils.getPropertyDescriptors(DispatchPolicy.class);
        HashSet hashSet = new HashSet();
        for (PropertyDescriptor propertyDescriptor : propertyDescriptors) {
            if (!propertiesSet.contains(propertyDescriptor.getName())) {
                hashSet.add(propertyDescriptor.getName());
            }
        }
        BeanUtils.copyProperties(dispatchPolicy, dispatchPolicy3, (String[]) hashSet.toArray(new String[hashSet.size()]));
        return dispatchPolicy3;
    }

    protected AbstractDeliveryHandler createDeliveryHandler(DispatchPolicy dispatchPolicy, String str, boolean z) {
        SourceDeliveryHandler sourceDeliveryHandler = new SourceDeliveryHandler();
        sourceDeliveryHandler.setDispatchPolicy(dispatchPolicy);
        sourceDeliveryHandler.setDestinationNameHeader(this.outboundDestinations.getDestinationNameHeader());
        sourceDeliveryHandler.setDestinationTypeHeader(this.outboundDestinations.getDestinationTypeHeader());
        sourceDeliveryHandler.setDestinationName(str);
        sourceDeliveryHandler.setPubSubDomain(z);
        sourceDeliveryHandler.setReuseSession(this.reuseSession);
        sourceDeliveryHandler.setReuseMessage(this.reuseMessage);
        sourceDeliveryHandler.setTargetConnectionFactory(this.remoteConnectionFactory);
        if (this.stagingQueue != null) {
            sourceDeliveryHandler.setStagingDestination(this.stagingQueue);
        } else {
            Connection connection = null;
            try {
                try {
                    connection = this.remoteConnectionFactory.createConnection();
                    sourceDeliveryHandler.setStagingDestination(this.remoteBrokerConfig.getDestinationResolver().resolveDestinationName(connection.createSession(false, 1), str, z));
                    try {
                        connection.close();
                    } catch (JMSException e) {
                    }
                } catch (JMSException e2) {
                    String str2 = "Error resolving remote destination " + str + " : " + e2.getMessage();
                    this.LOG.error(str2, e2);
                    throw new IllegalStateException(str2, e2);
                }
            } catch (Throwable th) {
                try {
                    connection.close();
                } catch (JMSException e3) {
                }
                throw th;
            }
        }
        return sourceDeliveryHandler;
    }

    @Override // org.fusesource.fabric.bridge.internal.AbstractConnector, org.fusesource.fabric.bridge.DestinationsConfigManager
    public BridgeDestinationsConfig getDestinationsConfig() {
        BridgeDestinationsConfig bridgeDestinationsConfig;
        synchronized (this.lifecycleMonitor) {
            bridgeDestinationsConfig = this.outboundDestinations;
        }
        return bridgeDestinationsConfig;
    }

    @Override // org.fusesource.fabric.bridge.internal.AbstractConnector, org.fusesource.fabric.bridge.DestinationsConfigManager
    public void setDestinationsConfig(BridgeDestinationsConfig bridgeDestinationsConfig) {
        synchronized (this.lifecycleMonitor) {
            if (bridgeDestinationsConfig != null) {
                if (bridgeDestinationsConfig.getDestinations() != null) {
                    boolean isRunning = isRunning();
                    try {
                        destroy();
                    } catch (Exception e) {
                        this.LOG.error("Error destorying connector: " + e.getMessage(), e);
                    }
                    this.outboundDestinations = bridgeDestinationsConfig;
                    try {
                        afterPropertiesSet();
                        if (isRunning) {
                            start();
                        }
                    } catch (Exception e2) {
                        throw new UncategorizedJmsException(e2.getMessage(), e2);
                    }
                }
            }
            throw new UncategorizedJmsException("Invalid destinations config");
        }
    }

    @Override // org.fusesource.fabric.bridge.internal.AbstractConnector, org.fusesource.fabric.bridge.DestinationsConfigManager
    public void addDestinations(List<BridgedDestination> list) {
        synchronized (this.lifecycleMonitor) {
            Set<String> outboundDestinationNames = getOutboundDestinationNames();
            ArrayList<BridgedDestination> arrayList = new ArrayList();
            for (BridgedDestination bridgedDestination : list) {
                if (outboundDestinationNames.contains(bridgedDestination.getName())) {
                    this.LOG.warn("Ignoring destination " + bridgedDestination.getName() + " as it already exists as an outbound destination");
                } else {
                    arrayList.add(bridgedDestination);
                }
            }
            if (isInitialized()) {
                HashMap hashMap = new HashMap();
                for (BridgedDestination bridgedDestination2 : arrayList) {
                    try {
                        hashMap.put(bridgedDestination2, createListenerContainer(bridgedDestination2));
                    } catch (Exception e) {
                        String str = "Error creating listener for new destination: " + bridgedDestination2;
                        this.LOG.error(str, e);
                        for (Map.Entry entry : hashMap.entrySet()) {
                            try {
                                ((AbstractMessageListenerContainer) entry.getValue()).destroy();
                            } catch (Exception e2) {
                                this.LOG.warn("Error destorying listener for " + entry.getKey(), e2);
                            }
                        }
                        hashMap.clear();
                        arrayList.clear();
                        throw new IllegalStateException(str, e);
                    }
                }
                this.outboundDestinations.getDestinations().addAll(arrayList);
                this.listenerMap.putAll(hashMap);
                if (isRunning()) {
                    start();
                }
            } else {
                this.outboundDestinations.getDestinations().addAll(arrayList);
            }
        }
    }

    @Override // org.fusesource.fabric.bridge.internal.AbstractConnector, org.fusesource.fabric.bridge.DestinationsConfigManager
    public void removeDestinations(List<BridgedDestination> list) {
        synchronized (this.lifecycleMonitor) {
            Set<String> outboundDestinationNames = getOutboundDestinationNames();
            ArrayList<BridgedDestination> arrayList = new ArrayList();
            for (BridgedDestination bridgedDestination : list) {
                if (outboundDestinationNames.contains(bridgedDestination.getName())) {
                    arrayList.add(bridgedDestination);
                } else {
                    this.LOG.warn("Ignoring destination " + bridgedDestination + " as it does not exist as an outbound destination");
                }
            }
            if (isInitialized()) {
                for (BridgedDestination bridgedDestination2 : arrayList) {
                    try {
                        this.listenerMap.remove(bridgedDestination2).destroy();
                    } catch (Exception e) {
                        this.LOG.warn("Error destroying listener for destination: " + bridgedDestination2, e);
                    }
                }
                this.outboundDestinations.getDestinations().removeAll(arrayList);
            } else {
                this.outboundDestinations.getDestinations().removeAll(arrayList);
            }
        }
    }

    private Set<String> getOutboundDestinationNames() {
        HashSet hashSet = new HashSet();
        Iterator<BridgedDestination> it = this.outboundDestinations.getDestinations().iterator();
        while (it.hasNext()) {
            hashSet.add(it.next().getName());
        }
        return hashSet;
    }

    public BrokerConfig getLocalBrokerConfig() {
        return this.localBrokerConfig;
    }

    public void setLocalBrokerConfig(BrokerConfig brokerConfig) {
        this.localBrokerConfig = brokerConfig;
    }

    public BrokerConfig getRemoteBrokerConfig() {
        return this.remoteBrokerConfig;
    }

    public void setRemoteBrokerConfig(BrokerConfig brokerConfig) {
        this.remoteBrokerConfig = brokerConfig;
    }

    public BridgeDestinationsConfig getOutboundDestinations() {
        return this.outboundDestinations;
    }

    public void setOutboundDestinations(BridgeDestinationsConfig bridgeDestinationsConfig) {
        this.outboundDestinations = bridgeDestinationsConfig;
    }
}
