package org.apache.activemq.ra;

import java.lang.reflect.Method;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.Topic;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.resource.ResourceException;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.resource.spi.work.ExecutionContext;
import javax.resource.spi.work.Work;
import javax.resource.spi.work.WorkException;
import javax.resource.spi.work.WorkListener;
import javax.resource.spi.work.WorkManager;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionConsumer;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.PropertyAccessor;

/* loaded from: input_file:activemq-ra-5.11.0.redhat-630284-04.jar:org/apache/activemq/ra/ActiveMQEndpointWorker.class */
public class ActiveMQEndpointWorker {
    public static final Method ON_MESSAGE_METHOD;
    private static final long INITIAL_RECONNECT_DELAY = 1000;
    private static final long MAX_RECONNECT_DELAY = 30000;
    protected final ActiveMQEndpointActivationKey endpointActivationKey;
    protected final MessageEndpointFactory endpointFactory;
    protected final WorkManager workManager;
    protected final boolean transacted;
    private final ActiveMQDestination dest;
    private final Work connectWork;
    private final AtomicBoolean connecting = new AtomicBoolean(false);
    private final Object shutdownMutex = new String("shutdownMutex");
    private ActiveMQConnection connection;
    private ActiveMQConnectionConsumer consumer;
    private ServerSessionPoolImpl serverSessionPool;
    private boolean running;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ActiveMQEndpointWorker.class);
    private static final ThreadLocal<Session> THREAD_LOCAL = new ThreadLocal<>();

    /* JADX INFO: Access modifiers changed from: protected */
    public ActiveMQEndpointWorker(final MessageResourceAdapter messageResourceAdapter, ActiveMQEndpointActivationKey activeMQEndpointActivationKey) throws ResourceException {
        this.endpointActivationKey = activeMQEndpointActivationKey;
        this.endpointFactory = this.endpointActivationKey.getMessageEndpointFactory();
        this.workManager = messageResourceAdapter.getBootstrapContext().getWorkManager();
        try {
            this.transacted = this.endpointFactory.isDeliveryTransacted(ON_MESSAGE_METHOD);
            this.connectWork = new Work() { // from class: org.apache.activemq.ra.ActiveMQEndpointWorker.1
                long currentReconnectDelay = ActiveMQEndpointWorker.INITIAL_RECONNECT_DELAY;

                public void release() {
                }

                public void run() {
                    this.currentReconnectDelay = ActiveMQEndpointWorker.INITIAL_RECONNECT_DELAY;
                    MessageActivationSpec activationSpec = ActiveMQEndpointWorker.this.endpointActivationKey.getActivationSpec();
                    if (ActiveMQEndpointWorker.LOG.isInfoEnabled()) {
                        ActiveMQEndpointWorker.LOG.info("Establishing connection to broker [" + messageResourceAdapter.getInfo().getServerUrl() + PropertyAccessor.PROPERTY_KEY_SUFFIX);
                    }
                    while (ActiveMQEndpointWorker.this.connecting.get() && ActiveMQEndpointWorker.this.running) {
                        try {
                            ActiveMQEndpointWorker.this.connection = messageResourceAdapter.makeConnection(activationSpec);
                            ActiveMQEndpointWorker.this.connection.setExceptionListener(new ExceptionListener() { // from class: org.apache.activemq.ra.ActiveMQEndpointWorker.1.1
                                public void onException(JMSException jMSException) {
                                    if (ActiveMQEndpointWorker.this.serverSessionPool.isClosing()) {
                                        return;
                                    }
                                    ActiveMQEndpointWorker.LOG.error("Connection to broker failed: " + jMSException.getMessage(), (Throwable) jMSException);
                                    if (!ActiveMQEndpointWorker.this.connecting.compareAndSet(false, true)) {
                                        ActiveMQEndpointWorker.LOG.info("Connection attempt already in progress, ignoring connection exception");
                                        return;
                                    }
                                    synchronized (ActiveMQEndpointWorker.this.connectWork) {
                                        ActiveMQEndpointWorker.this.disconnect();
                                        ActiveMQEndpointWorker.this.serverSessionPool.closeSessions();
                                        ActiveMQEndpointWorker.this.connect();
                                    }
                                }
                            });
                            ActiveMQEndpointWorker.this.connection.start();
                            if (activationSpec.isDurableSubscription()) {
                                ActiveMQEndpointWorker.this.consumer = (ActiveMQConnectionConsumer) ActiveMQEndpointWorker.this.connection.createDurableConnectionConsumer((Topic) ActiveMQEndpointWorker.this.dest, activationSpec.getSubscriptionName(), ActiveMQEndpointWorker.this.emptyToNull(activationSpec.getMessageSelector()), ActiveMQEndpointWorker.this.serverSessionPool, ActiveMQEndpointWorker.this.connection.getPrefetchPolicy().getDurableTopicPrefetch(), activationSpec.getNoLocalBooleanValue());
                            } else {
                                ActiveMQEndpointWorker.this.consumer = (ActiveMQConnectionConsumer) ActiveMQEndpointWorker.this.connection.createConnectionConsumer(ActiveMQEndpointWorker.this.dest, ActiveMQEndpointWorker.this.emptyToNull(activationSpec.getMessageSelector()), ActiveMQEndpointWorker.this.serverSessionPool, getPrefetch(activationSpec, ActiveMQEndpointWorker.this.connection, ActiveMQEndpointWorker.this.dest), activationSpec.getNoLocalBooleanValue());
                            }
                            if (!ActiveMQEndpointWorker.this.connecting.compareAndSet(true, false)) {
                                ActiveMQEndpointWorker.LOG.error("Could not release connection lock");
                            } else if (ActiveMQEndpointWorker.LOG.isInfoEnabled()) {
                                ActiveMQEndpointWorker.LOG.info("Successfully established connection to broker [" + messageResourceAdapter.getInfo().getServerUrl() + PropertyAccessor.PROPERTY_KEY_SUFFIX);
                            }
                            if (ActiveMQEndpointWorker.this.consumer.getConsumerInfo().getCurrentPrefetchSize() == 0) {
                                ActiveMQEndpointWorker.LOG.error("Endpoint " + ActiveMQEndpointWorker.this.endpointActivationKey.getActivationSpec() + " will not receive any messages due to broker 'zero prefetch' configuration for: " + ActiveMQEndpointWorker.this.dest);
                            }
                        } catch (JMSException e) {
                            if (ActiveMQEndpointWorker.LOG.isDebugEnabled()) {
                                ActiveMQEndpointWorker.LOG.debug("Failed to connect: " + e.getMessage(), e);
                            }
                            ActiveMQEndpointWorker.this.disconnect();
                            pause(e);
                        }
                    }
                }

                private int getPrefetch(MessageActivationSpec messageActivationSpec, ActiveMQConnection activeMQConnection, ActiveMQDestination activeMQDestination) {
                    return activeMQDestination.isTopic() ? activeMQConnection.getPrefetchPolicy().getTopicPrefetch() : activeMQDestination.isQueue() ? activeMQConnection.getPrefetchPolicy().getQueuePrefetch() : messageActivationSpec.getMaxMessagesPerSessionsIntValue() * messageActivationSpec.getMaxSessionsIntValue();
                }

                private void pause(JMSException jMSException) {
                    if (this.currentReconnectDelay == 30000) {
                        ActiveMQEndpointWorker.LOG.error("Failed to connect to broker [" + messageResourceAdapter.getInfo().getServerUrl() + "]: " + jMSException.getMessage(), (Throwable) jMSException);
                        ActiveMQEndpointWorker.LOG.error("Endpoint will try to reconnect to the JMS broker in 30 seconds");
                    }
                    try {
                        synchronized (ActiveMQEndpointWorker.this.shutdownMutex) {
                            ActiveMQEndpointWorker.this.shutdownMutex.wait(this.currentReconnectDelay);
                        }
                    } catch (InterruptedException e) {
                        Thread.interrupted();
                    }
                    this.currentReconnectDelay *= 2;
                    if (this.currentReconnectDelay > 30000) {
                        this.currentReconnectDelay = 30000L;
                    }
                }
            };
            MessageActivationSpec activationSpec = this.endpointActivationKey.getActivationSpec();
            if (activationSpec.isUseJndi()) {
                try {
                    this.dest = (ActiveMQDestination) new InitialContext().lookup(activationSpec.getDestination());
                } catch (NamingException e) {
                    throw new ResourceException("JNDI lookup failed for " + activationSpec.getDestination());
                }
            } else if ("javax.jms.Queue".equals(activationSpec.getDestinationType())) {
                this.dest = new ActiveMQQueue(activationSpec.getDestination());
            } else {
                if (!"javax.jms.Topic".equals(activationSpec.getDestinationType())) {
                    throw new ResourceException("Unknown destination type: " + activationSpec.getDestinationType());
                }
                this.dest = new ActiveMQTopic(activationSpec.getDestination());
            }
        } catch (NoSuchMethodException e2) {
            throw new ResourceException("Endpoint does not implement the onMessage method.");
        }
    }

    public static void safeClose(Connection connection) {
        if (connection != null) {
            try {
                LOG.debug("Closing connection to broker");
                connection.close();
            } catch (JMSException e) {
                LOG.trace("failed to close c {}", connection, e);
            }
        }
    }

    public static void safeClose(ConnectionConsumer connectionConsumer) {
        if (connectionConsumer != null) {
            try {
                LOG.debug("Closing ConnectionConsumer");
                connectionConsumer.close();
            } catch (JMSException e) {
                LOG.trace("failed to close cc {}", connectionConsumer, e);
            }
        }
    }

    public void start() throws ResourceException {
        synchronized (this.connectWork) {
            if (this.running) {
                return;
            }
            this.running = true;
            if (this.connecting.compareAndSet(false, true)) {
                LOG.info("Starting");
                this.serverSessionPool = new ServerSessionPoolImpl(this, this.endpointActivationKey.getActivationSpec().getMaxSessionsIntValue());
                connect();
            } else {
                LOG.warn("Ignoring start command, EndpointWorker is already trying to connect");
            }
        }
    }

    public void stop() throws InterruptedException {
        synchronized (this.shutdownMutex) {
            if (this.running) {
                this.running = false;
                LOG.info("Stopping");
                this.shutdownMutex.notifyAll();
                this.serverSessionPool.close();
                disconnect();
            }
        }
    }

    private boolean isRunning() {
        return this.running;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void connect() {
        synchronized (this.connectWork) {
            if (this.running) {
                try {
                    this.workManager.scheduleWork(this.connectWork, Long.MAX_VALUE, (ExecutionContext) null, (WorkListener) null);
                } catch (WorkException e) {
                    this.running = false;
                    LOG.error("Work Manager did not accept work: ", e);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void disconnect() {
        synchronized (this.connectWork) {
            safeClose(this.consumer);
            this.consumer = null;
            safeClose(this.connection);
            this.connection = null;
        }
    }

    protected void registerThreadSession(Session session) {
        THREAD_LOCAL.set(session);
    }

    protected void unregisterThreadSession(Session session) {
        THREAD_LOCAL.set(null);
    }

    public void setConnection(ActiveMQConnection activeMQConnection) {
        this.connection = activeMQConnection;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ActiveMQConnection getConnection() {
        ActiveMQConnection activeMQConnection;
        synchronized (this.connectWork) {
            activeMQConnection = this.connection;
        }
        return activeMQConnection;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String emptyToNull(String str) {
        if (str == null || str.length() == 0) {
            return null;
        }
        return str;
    }

    static {
        try {
            ON_MESSAGE_METHOD = MessageListener.class.getMethod("onMessage", Message.class);
        } catch (Exception e) {
            throw new ExceptionInInitializerError(e);
        }
    }
}
