package org.apache.activemq.ra;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.jms.JMSException;
import javax.jms.ServerSession;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.resource.spi.UnavailableException;
import javax.resource.spi.endpoint.MessageEndpoint;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQQueueSession;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.ActiveMQTopicSession;
import org.apache.activemq.command.MessageDispatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:activemq-ra-5.11.0.redhat-630329-07.jar:org/apache/activemq/ra/ServerSessionPoolImpl.class */
public class ServerSessionPoolImpl implements ServerSessionPool {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ServerSessionPoolImpl.class);
    private final ActiveMQEndpointWorker activeMQAsfEndpointWorker;
    private final int maxSessions;
    private final List<ServerSessionImpl> idleSessions = new ArrayList();
    private final List<ServerSessionImpl> activeSessions = new ArrayList();
    private final Lock sessionLock = new ReentrantLock();
    private final AtomicBoolean closing = new AtomicBoolean(false);

    public ServerSessionPoolImpl(ActiveMQEndpointWorker activeMQEndpointWorker, int i) {
        this.activeMQAsfEndpointWorker = activeMQEndpointWorker;
        this.maxSessions = i;
    }

    private ServerSessionImpl createServerSessionImpl() throws JMSException {
        MessageActivationSpec activationSpec = this.activeMQAsfEndpointWorker.endpointActivationKey.getActivationSpec();
        int acknowledgeModeForSession = this.activeMQAsfEndpointWorker.transacted ? 0 : activationSpec.getAcknowledgeModeForSession();
        ActiveMQConnection connection = this.activeMQAsfEndpointWorker.getConnection();
        if (connection == null) {
            return null;
        }
        ActiveMQSession activeMQSession = (ActiveMQSession) connection.createSession(this.activeMQAsfEndpointWorker.transacted, acknowledgeModeForSession);
        try {
            int i = 0;
            if (activationSpec.getEnableBatchBooleanValue()) {
                i = activationSpec.getMaxMessagesPerBatchIntValue();
            }
            if (activationSpec.isUseRAManagedTransactionEnabled()) {
                return new ServerSessionImpl(this, activeMQSession, this.activeMQAsfEndpointWorker.workManager, createEndpoint(null), true, i);
            }
            return new ServerSessionImpl(this, activeMQSession, this.activeMQAsfEndpointWorker.workManager, createEndpoint(new LocalAndXATransaction(activeMQSession.getTransactionContext())), false, i);
        } catch (UnavailableException e) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Could not create an endpoint.", e);
            }
            activeMQSession.close();
            return null;
        }
    }

    private MessageEndpoint createEndpoint(LocalAndXATransaction localAndXATransaction) throws UnavailableException {
        return new MessageEndpointProxy(this.activeMQAsfEndpointWorker.endpointFactory.createEndpoint(localAndXATransaction));
    }

    public ServerSession getServerSession() throws JMSException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("ServerSession requested.");
        }
        if (this.closing.get()) {
            throw new JMSException("Session Pool Shutting Down.");
        }
        this.sessionLock.lock();
        try {
            ServerSessionImpl existingServerSession = getExistingServerSession(false);
            this.sessionLock.unlock();
            if (existingServerSession != null) {
                return existingServerSession;
            }
            ServerSessionImpl createServerSessionImpl = createServerSessionImpl();
            this.sessionLock.lock();
            try {
                if (createServerSessionImpl != null) {
                    this.activeSessions.add(createServerSessionImpl);
                } else {
                    if (this.activeSessions.isEmpty() && this.idleSessions.isEmpty()) {
                        throw new JMSException("Endpoint factory did not allow creation of any endpoints.");
                    }
                    createServerSessionImpl = getExistingServerSession(true);
                }
                this.sessionLock.unlock();
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Created a new session: " + createServerSessionImpl);
                }
                return createServerSessionImpl;
            } finally {
            }
        } finally {
        }
    }

    private ServerSessionImpl getExistingServerSession(boolean z) {
        ServerSessionImpl serverSessionImpl = null;
        if (this.idleSessions.size() > 0) {
            serverSessionImpl = this.idleSessions.remove(this.idleSessions.size() - 1);
        }
        if (serverSessionImpl != null) {
            this.activeSessions.add(serverSessionImpl);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Using idle session: " + serverSessionImpl);
            }
        } else if (z || this.activeSessions.size() >= this.maxSessions) {
            serverSessionImpl = getExistingActiveServerSession();
        }
        return serverSessionImpl;
    }

    private ServerSessionImpl getExistingActiveServerSession() {
        ServerSessionImpl serverSessionImpl = null;
        if (!this.activeSessions.isEmpty()) {
            if (this.activeSessions.size() > 1) {
                serverSessionImpl = this.activeSessions.remove(0);
                this.activeSessions.add(serverSessionImpl);
            } else {
                serverSessionImpl = this.activeSessions.get(0);
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Reusing an active session: " + serverSessionImpl);
        }
        return serverSessionImpl;
    }

    public void returnToPool(ServerSessionImpl serverSessionImpl) {
        this.sessionLock.lock();
        this.activeSessions.remove(serverSessionImpl);
        try {
            if (serverSessionImpl.isStale()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Discarding stale ServerSession to be returned to pool: " + serverSessionImpl);
                }
                serverSessionImpl.close();
            } else {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("ServerSession returned to pool: " + serverSessionImpl);
                }
                this.idleSessions.add(serverSessionImpl);
            }
            synchronized (this.closing) {
                this.closing.notify();
            }
        } finally {
            this.sessionLock.unlock();
        }
    }

    public void removeFromPool(ServerSessionImpl serverSessionImpl) {
        this.sessionLock.lock();
        try {
            this.activeSessions.remove(serverSessionImpl);
            this.sessionLock.unlock();
            try {
                ActiveMQSession activeMQSession = (ActiveMQSession) serverSessionImpl.getSession();
                List<MessageDispatch> unconsumedMessages = activeMQSession.getUnconsumedMessages();
                if (!unconsumedMessages.isEmpty()) {
                    ActiveMQConnection connection = this.activeMQAsfEndpointWorker.getConnection();
                    if (connection != null) {
                        for (MessageDispatch messageDispatch : unconsumedMessages) {
                            if (connection.hasDispatcher(messageDispatch.getConsumerId())) {
                                dispatchToSession(messageDispatch);
                                LOG.trace("on remove of {} redispatch of {}", activeMQSession, messageDispatch);
                            } else {
                                LOG.trace("on remove not redispatching {}, dispatcher no longer present on {}", messageDispatch, activeMQSession.getConnection());
                            }
                        }
                    } else {
                        LOG.trace("on remove of {} not redispatching while disconnected", activeMQSession);
                    }
                }
            } catch (Throwable th) {
                LOG.error("Error redispatching unconsumed messages from stale server session {}", serverSessionImpl, th);
            }
            serverSessionImpl.close();
            synchronized (this.closing) {
                this.closing.notify();
            }
        } catch (Throwable th2) {
            this.sessionLock.unlock();
            throw th2;
        }
    }

    private void dispatchToSession(MessageDispatch messageDispatch) throws JMSException {
        ServerSession serverSession = getServerSession();
        Session session = serverSession.getSession();
        ActiveMQSession activeMQSession = null;
        if (session instanceof ActiveMQSession) {
            activeMQSession = (ActiveMQSession) session;
        } else if (session instanceof ActiveMQQueueSession) {
            activeMQSession = (ActiveMQSession) session;
        } else if (session instanceof ActiveMQTopicSession) {
            activeMQSession = (ActiveMQSession) session;
        } else {
            this.activeMQAsfEndpointWorker.getConnection().onAsyncException(new JMSException("Session pool provided an invalid session type: " + session.getClass()));
        }
        activeMQSession.dispatch(messageDispatch);
        serverSession.start();
    }

    public void close() {
        this.closing.set(true);
        int closeSessions = closeSessions();
        while (true) {
            int i = closeSessions;
            if (i <= 0) {
                return;
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Active Sessions = " + i);
            }
            try {
                synchronized (this.closing) {
                    this.closing.wait(250L);
                }
                closeSessions = closeSessions();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int closeSessions() {
        this.sessionLock.lock();
        try {
            for (ServerSessionImpl serverSessionImpl : this.activeSessions) {
                try {
                    ActiveMQSession activeMQSession = (ActiveMQSession) serverSessionImpl.getSession();
                    if (!activeMQSession.isClosed()) {
                        activeMQSession.close();
                    }
                } catch (JMSException e) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Failed to close active running server session {}, reason:{}", serverSessionImpl, e.toString(), e);
                    }
                }
            }
            Iterator<ServerSessionImpl> it = this.idleSessions.iterator();
            while (it.hasNext()) {
                it.next().close();
            }
            this.idleSessions.clear();
            int size = this.activeSessions.size();
            this.sessionLock.unlock();
            return size;
        } catch (Throwable th) {
            this.sessionLock.unlock();
            throw th;
        }
    }

    public boolean isClosing() {
        return this.closing.get();
    }

    public void setClosing(boolean z) {
        this.closing.set(z);
    }
}
