/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.network.jms;

import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.naming.NamingException;
import org.apache.activemq.Service;
import org.apache.activemq.network.jms.JmsConnector;
import org.apache.activemq.network.jms.JmsMesageConvertor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class DestinationBridge
implements Service,
MessageListener {
    private static final Logger LOG = LoggerFactory.getLogger(DestinationBridge.class);
    protected MessageConsumer consumer;
    protected AtomicBoolean started = new AtomicBoolean(false);
    protected JmsMesageConvertor jmsMessageConvertor;
    protected boolean doHandleReplyTo = true;
    protected JmsConnector jmsConnector;
    private int maximumRetries = 10;

    public MessageConsumer getConsumer() {
        return this.consumer;
    }

    public void setConsumer(MessageConsumer consumer) {
        this.consumer = consumer;
    }

    public void setJmsConnector(JmsConnector connector) {
        this.jmsConnector = connector;
    }

    public JmsMesageConvertor getJmsMessageConvertor() {
        return this.jmsMessageConvertor;
    }

    public void setJmsMessageConvertor(JmsMesageConvertor jmsMessageConvertor) {
        this.jmsMessageConvertor = jmsMessageConvertor;
    }

    public int getMaximumRetries() {
        return this.maximumRetries;
    }

    public void setMaximumRetries(int maximumRetries) {
        this.maximumRetries = maximumRetries;
    }

    protected Destination processReplyToDestination(Destination destination) {
        return this.jmsConnector.createReplyToBridge(destination, this.getConnnectionForConsumer(), this.getConnectionForProducer());
    }

    @Override
    public void start() throws Exception {
        if (this.started.compareAndSet(false, true)) {
            MessageConsumer consumer = this.createConsumer();
            consumer.setMessageListener(this);
            this.createProducer();
        }
    }

    @Override
    public void stop() throws Exception {
        this.started.set(false);
    }

    @Override
    public void onMessage(Message message) {
        int attempt = 0;
        while (this.started.get() && message != null) {
            try {
                Message converted;
                if (attempt > 0) {
                    this.restartProducer();
                }
                if (this.doHandleReplyTo) {
                    Destination replyTo = message.getJMSReplyTo();
                    converted = replyTo != null ? this.jmsMessageConvertor.convert(message, this.processReplyToDestination(replyTo)) : this.jmsMessageConvertor.convert(message);
                } else {
                    message.setJMSReplyTo(null);
                    converted = this.jmsMessageConvertor.convert(message);
                }
                this.sendMessage(converted);
                message.acknowledge();
                return;
            }
            catch (Exception e) {
                LOG.error("failed to forward message on attempt: " + ++attempt + " reason: " + e + " message: " + message, e);
                if (this.maximumRetries <= 0 || attempt < this.maximumRetries) continue;
                try {
                    this.stop();
                }
                catch (Exception e1) {
                    LOG.warn("Failed to stop cleanly", e1);
                }
            }
        }
    }

    protected boolean isDoHandleReplyTo() {
        return this.doHandleReplyTo;
    }

    protected void setDoHandleReplyTo(boolean doHandleReplyTo) {
        this.doHandleReplyTo = doHandleReplyTo;
    }

    protected abstract MessageConsumer createConsumer() throws JMSException;

    protected abstract MessageProducer createProducer() throws JMSException;

    protected abstract void sendMessage(Message var1) throws JMSException;

    protected abstract Connection getConnnectionForConsumer();

    protected abstract Connection getConnectionForProducer();

    protected void restartProducer() throws JMSException, NamingException {
        try {
            Thread.sleep(1000L);
            this.getConnectionForProducer().close();
        }
        catch (Exception e) {
            LOG.debug("Ignoring failure to close producer connection: " + e, e);
        }
        this.jmsConnector.restartProducerConnection();
        this.createProducer();
    }
}

