/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.ra;

import java.lang.reflect.Method;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ServerSession;
import javax.jms.Session;
import javax.resource.spi.endpoint.MessageEndpoint;
import javax.resource.spi.work.Work;
import javax.resource.spi.work.WorkEvent;
import javax.resource.spi.work.WorkException;
import javax.resource.spi.work.WorkListener;
import javax.resource.spi.work.WorkManager;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.TransactionContext;
import org.apache.activemq.ra.InboundContext;
import org.apache.activemq.ra.InboundContextSupport;
import org.apache.activemq.ra.ServerSessionPoolImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ServerSessionImpl
implements ServerSession,
InboundContext,
Work,
ActiveMQSession.DeliveryListener {
    public static final Method ON_MESSAGE_METHOD;
    private static int nextLogId;
    private final Logger log = LoggerFactory.getLogger(ServerSessionImpl.class);
    private ActiveMQSession session;
    private WorkManager workManager;
    private MessageEndpoint endpoint;
    private MessageProducer messageProducer;
    private final ServerSessionPoolImpl pool;
    private Object runControlMutex = new Object();
    private boolean runningFlag;
    private boolean stale;
    private final boolean useRAManagedTx;
    private final int batchSize;
    private int currentBatchSize;

    public ServerSessionImpl(ServerSessionPoolImpl pool, ActiveMQSession session, WorkManager workManager, MessageEndpoint endpoint, boolean useRAManagedTx, int batchSize) throws JMSException {
        this.pool = pool;
        this.session = session;
        this.workManager = workManager;
        this.endpoint = endpoint;
        this.useRAManagedTx = useRAManagedTx;
        this.session.setMessageListener((MessageListener)endpoint);
        this.session.setDeliveryListener((ActiveMQSession.DeliveryListener)this);
        this.batchSize = batchSize;
    }

    private static synchronized int getNextLogId() {
        return nextLogId++;
    }

    @Override
    public Session getSession() throws JMSException {
        return this.session;
    }

    protected boolean isStale() {
        return this.stale || !this.session.isRunning();
    }

    @Override
    public MessageProducer getMessageProducer() throws JMSException {
        if (this.messageProducer == null) {
            this.messageProducer = this.getSession().createProducer(null);
        }
        return this.messageProducer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() throws JMSException {
        Object object = this.runControlMutex;
        synchronized (object) {
            if (this.runningFlag) {
                this.log.debug("Start request ignored, already running.");
                return;
            }
            this.runningFlag = true;
        }
        this.log.debug("Starting run.");
        try {
            this.workManager.scheduleWork((Work)this, Long.MAX_VALUE, null, new WorkListener(){

                public void workAccepted(WorkEvent event) {
                    ServerSessionImpl.this.log.debug("Work accepted: " + event);
                }

                public void workRejected(WorkEvent event) {
                    ServerSessionImpl.this.log.debug("Work rejected: " + event);
                }

                public void workStarted(WorkEvent event) {
                    ServerSessionImpl.this.log.debug("Work started: " + event);
                }

                public void workCompleted(WorkEvent event) {
                    ServerSessionImpl.this.log.debug("Work completed: " + event);
                }
            });
        }
        catch (WorkException e) {
            throw (JMSException)new JMSException("Start failed: " + (Object)((Object)e)).initCause((Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run() {
        this.log.debug("{} Running", (Object)this);
        this.currentBatchSize = 0;
        while (true) {
            this.log.debug("{} run loop", (Object)this);
            try {
                InboundContextSupport.register(this);
                if (this.session.isClosed()) {
                    this.stale = true;
                    continue;
                }
                if (this.session.isRunning()) {
                    this.session.run();
                    continue;
                }
                this.log.debug("JMS Session {} with unconsumed {} is no longer running (maybe due to loss of connection?), marking ServerSession as stale", (Object)this.session, (Object)this.session.getUnconsumedMessages().size());
                this.stale = true;
                continue;
            }
            catch (Throwable e) {
                this.stale = true;
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Endpoint {} failed to process message.", (Object)this, (Object)e);
                    continue;
                }
                if (!this.log.isInfoEnabled()) continue;
                this.log.info("Endpoint {} failed to process message. Reason: " + e.getMessage(), (Object)this);
                continue;
            }
            finally {
                InboundContextSupport.unregister(this);
                this.log.debug("run loop end");
                Object object = this.runControlMutex;
                synchronized (object) {
                    if (this.stale) {
                        this.log.debug("Session {} stale, removing from pool", (Object)this);
                        this.runningFlag = false;
                        this.pool.removeFromPool(this);
                        break;
                    }
                    if (!this.session.hasUncomsumedMessages()) {
                        this.runningFlag = false;
                        this.log.debug("Session {} has no unconsumed message, returning to pool", (Object)this);
                        this.pool.returnToPool(this);
                        break;
                    }
                    this.log.debug("Session has session has more work to do b/c of unconsumed", (Object)this);
                }
                continue;
            }
            break;
        }
        this.log.debug("{} Run finished", (Object)this);
    }

    public void beforeDelivery(ActiveMQSession session, Message msg) {
        if (this.currentBatchSize == 0) {
            try {
                this.endpoint.beforeDelivery(ON_MESSAGE_METHOD);
            }
            catch (Throwable e) {
                throw new RuntimeException("Endpoint before delivery notification failure", e);
            }
        }
    }

    public void afterDelivery(ActiveMQSession session, Message msg) {
        if (++this.currentBatchSize >= this.batchSize || !session.hasUncomsumedMessages()) {
            this.currentBatchSize = 0;
            try {
                this.endpoint.afterDelivery();
            }
            catch (Throwable e) {
                throw new RuntimeException("Endpoint after delivery notification failure: " + e, e);
            }
            finally {
                TransactionContext transactionContext = session.getTransactionContext();
                if (transactionContext != null && transactionContext.isInLocalTransaction()) {
                    if (!this.useRAManagedTx) {
                        this.log.warn("Local transaction had not been commited. Commiting now.");
                    }
                    try {
                        session.commit();
                    }
                    catch (JMSException e) {
                        this.log.info("Commit failed:", (Throwable)e);
                    }
                }
            }
        }
    }

    public void release() {
        this.log.debug("release called");
    }

    public String toString() {
        return "ServerSessionImpl:{" + this.session + "}";
    }

    public void close() {
        try {
            this.endpoint.release();
        }
        catch (Throwable e) {
            this.log.debug("Endpoint did not release properly: " + e.getMessage(), e);
        }
        try {
            this.session.close();
        }
        catch (Throwable e) {
            this.log.debug("Session did not close properly: " + e.getMessage(), e);
        }
    }

    static {
        try {
            ON_MESSAGE_METHOD = MessageListener.class.getMethod("onMessage", Message.class);
        }
        catch (Exception e) {
            throw new ExceptionInInitializerError(e);
        }
    }
}

