package org.hornetq.core.server.cluster.impl;

import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.ClientProducer;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.SendAcknowledgementHandler;
import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.api.core.management.ResourceNames;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.filter.impl.FilterImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.server.HandleStatus;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.cluster.Bridge;
import org.hornetq.core.server.cluster.Transformer;
import org.hornetq.core.server.management.Notification;
import org.hornetq.core.server.management.NotificationService;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.utils.Future;
import org.hornetq.utils.TypedProperties;
import org.hornetq.utils.UUID;

/* loaded from: input_file:WEB-INF/lib/hornetq-core-2.2.10.Final.jar:org/hornetq/core/server/cluster/impl/BridgeImpl.class */
public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowledgementHandler {
    private static final Logger log = Logger.getLogger(BridgeImpl.class);
    private static final boolean isTrace = log.isTraceEnabled();
    private static final SimpleString JMS_QUEUE_ADDRESS_PREFIX = new SimpleString(ResourceNames.JMS_QUEUE);
    private static final SimpleString JMS_TOPIC_ADDRESS_PREFIX = new SimpleString(ResourceNames.JMS_TOPIC);
    protected final ServerLocatorInternal serverLocator;
    private final UUID nodeUUID;
    private final SimpleString name;
    private final Queue queue;
    protected final Executor executor;
    protected final ScheduledExecutorService scheduledExecutor;
    protected ScheduledFuture<?> futureScheduledReconnection;
    private final Filter filter;
    private final SimpleString forwardingAddress;
    private final Transformer transformer;
    private volatile ClientSessionFactoryInternal csf;
    protected volatile ClientSessionInternal session;
    private volatile ClientProducer producer;
    private volatile boolean started;
    private final boolean useDuplicateDetection;
    private volatile boolean active;
    private final String user;
    private final String password;
    private boolean activated;
    private final int reconnectAttempts;
    private final long retryInterval;
    private final double retryMultiplier;
    private final long maxRetryInterval;
    private NotificationService notificationService;
    private final java.util.Queue<MessageReference> refs = new ConcurrentLinkedQueue();
    private int retryCount = 0;
    private boolean stopping = false;
    private int reconnectAttemptsInUse = -1;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/hornetq-core-2.2.10.Final.jar:org/hornetq/core/server/cluster/impl/BridgeImpl$ConnectRunnable.class */
    public class ConnectRunnable implements Runnable {
        private ConnectRunnable() {
        }

        @Override // java.lang.Runnable
        public synchronized void run() {
            if (BridgeImpl.this.stopping) {
                return;
            }
            BridgeImpl.this.connect();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/hornetq-core-2.2.10.Final.jar:org/hornetq/core/server/cluster/impl/BridgeImpl$FutureConnectRunnable.class */
    public class FutureConnectRunnable implements Runnable {
        private FutureConnectRunnable() {
        }

        @Override // java.lang.Runnable
        public void run() {
            BridgeImpl.this.executor.execute(new ConnectRunnable());
        }
    }

    /* loaded from: input_file:WEB-INF/lib/hornetq-core-2.2.10.Final.jar:org/hornetq/core/server/cluster/impl/BridgeImpl$PauseRunnable.class */
    private class PauseRunnable implements Runnable {
        private PauseRunnable() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                synchronized (BridgeImpl.this) {
                    BridgeImpl.this.started = false;
                    BridgeImpl.this.active = false;
                }
                BridgeImpl.this.queue.removeConsumer(BridgeImpl.this);
                BridgeImpl.this.internalCancelReferences();
                BridgeImpl.log.info("paused bridge " + ((Object) BridgeImpl.this.name));
            } catch (Exception e) {
                BridgeImpl.log.error("Failed to pause bridge", e);
            }
        }
    }

    /* loaded from: input_file:WEB-INF/lib/hornetq-core-2.2.10.Final.jar:org/hornetq/core/server/cluster/impl/BridgeImpl$StopRunnable.class */
    private class StopRunnable implements Runnable {
        private StopRunnable() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                BridgeImpl.log.debug("stopping bridge " + BridgeImpl.this);
                if (BridgeImpl.this.session != null) {
                    BridgeImpl.log.debug("Cleaning up session " + BridgeImpl.this.session);
                    BridgeImpl.this.session.removeFailureListener(BridgeImpl.this);
                    try {
                        BridgeImpl.this.session.close();
                        BridgeImpl.this.session = null;
                    } catch (Exception e) {
                    }
                }
                if (BridgeImpl.this.csf != null) {
                    BridgeImpl.this.csf.cleanup();
                }
                BridgeImpl.this.queue.removeConsumer(BridgeImpl.this);
                BridgeImpl.this.internalCancelReferences();
                synchronized (BridgeImpl.this) {
                    BridgeImpl.log.debug("Closing Session for bridge " + ((Object) BridgeImpl.this.name));
                    BridgeImpl.this.started = false;
                    BridgeImpl.this.active = false;
                }
                if (BridgeImpl.isTrace) {
                    BridgeImpl.log.trace("Removing consumer on stopRunnable " + this + " from queue " + BridgeImpl.this.queue);
                }
                BridgeImpl.log.info("stopped bridge " + ((Object) BridgeImpl.this.name));
            } catch (Exception e2) {
                BridgeImpl.log.error("Failed to stop bridge", e2);
            }
        }
    }

    public BridgeImpl(ServerLocatorInternal serverLocatorInternal, int i, long j, double d, long j2, UUID uuid, SimpleString simpleString, Queue queue, Executor executor, SimpleString simpleString2, SimpleString simpleString3, ScheduledExecutorService scheduledExecutorService, Transformer transformer, boolean z, String str, String str2, boolean z2, StorageManager storageManager) throws Exception {
        this.reconnectAttempts = i;
        this.retryInterval = j;
        this.retryMultiplier = d;
        this.maxRetryInterval = j2;
        this.serverLocator = serverLocatorInternal;
        this.nodeUUID = uuid;
        this.name = simpleString;
        this.queue = queue;
        this.executor = executor;
        this.scheduledExecutor = scheduledExecutorService;
        this.filter = FilterImpl.createFilter(simpleString2);
        this.forwardingAddress = simpleString3;
        this.transformer = transformer;
        this.useDuplicateDetection = z;
        this.user = str;
        this.password = str2;
        this.activated = z2;
    }

    @Override // org.hornetq.core.server.cluster.Bridge
    public void setNotificationService(NotificationService notificationService) {
        this.notificationService = notificationService;
    }

    @Override // org.hornetq.core.server.HornetQComponent
    public synchronized void start() throws Exception {
        if (this.started) {
            return;
        }
        this.started = true;
        this.stopping = false;
        if (this.activated) {
            activate();
        }
        if (this.notificationService != null) {
            TypedProperties typedProperties = new TypedProperties();
            typedProperties.putSimpleStringProperty(new SimpleString("name"), this.name);
            this.notificationService.sendNotification(new Notification(this.nodeUUID.toString(), NotificationType.BRIDGE_STARTED, typedProperties));
        }
    }

    @Override // org.hornetq.core.server.Consumer
    public String debug() {
        return toString();
    }

    private void cancelRefs() {
        LinkedList linkedList = new LinkedList();
        while (true) {
            MessageReference poll = this.refs.poll();
            if (poll == null) {
                break;
            }
            if (isTrace) {
                log.trace("Cancelling reference " + poll + " on bridge " + this);
            }
            linkedList.addFirst(poll);
        }
        if (isTrace && linkedList.isEmpty()) {
            log.trace("didn't have any references to cancel on bridge " + this);
        }
        long currentTimeMillis = System.currentTimeMillis();
        Iterator it = linkedList.iterator();
        while (it.hasNext()) {
            MessageReference messageReference = (MessageReference) it.next();
            try {
                messageReference.getQueue().cancel(messageReference, currentTimeMillis);
            } catch (Exception e) {
                log.error("Couldn't cancel reference " + messageReference, e);
            }
        }
    }

    @Override // org.hornetq.core.server.cluster.Bridge
    public void flushExecutor() {
        Future future = new Future();
        this.executor.execute(future);
        if (future.await(10000L)) {
            return;
        }
        log.warn("Timed out waiting to stop");
    }

    @Override // org.hornetq.core.server.cluster.Bridge
    public void disconnect() {
        this.executor.execute(new Runnable() { // from class: org.hornetq.core.server.cluster.impl.BridgeImpl.1
            @Override // java.lang.Runnable
            public void run() {
                if (BridgeImpl.this.session != null) {
                    try {
                        BridgeImpl.this.session.cleanUp(false);
                    } catch (Exception e) {
                        BridgeImpl.log.debug(e.getMessage(), e);
                    }
                    BridgeImpl.this.session = null;
                }
            }
        });
    }

    @Override // org.hornetq.core.server.cluster.Bridge
    public boolean isConnected() {
        return this.session != null;
    }

    public Executor getExecutor() {
        return this.executor;
    }

    @Override // org.hornetq.core.server.HornetQComponent
    public void stop() throws Exception {
        if (this.stopping) {
            return;
        }
        this.stopping = true;
        if (log.isDebugEnabled()) {
            log.debug("Bridge " + ((Object) this.name) + " being stopped");
        }
        if (this.futureScheduledReconnection != null) {
            this.futureScheduledReconnection.cancel(true);
        }
        this.executor.execute(new StopRunnable());
        if (this.notificationService != null) {
            TypedProperties typedProperties = new TypedProperties();
            typedProperties.putSimpleStringProperty(new SimpleString("name"), this.name);
            try {
                this.notificationService.sendNotification(new Notification(this.nodeUUID.toString(), NotificationType.BRIDGE_STOPPED, typedProperties));
            } catch (Exception e) {
                log.warn("unable to send notification when broadcast group is stopped", e);
            }
        }
    }

    @Override // org.hornetq.core.server.cluster.Bridge
    public void pause() throws Exception {
        if (log.isDebugEnabled()) {
            log.debug("Bridge " + ((Object) this.name) + " being paused");
        }
        this.executor.execute(new PauseRunnable());
        if (this.notificationService != null) {
            TypedProperties typedProperties = new TypedProperties();
            typedProperties.putSimpleStringProperty(new SimpleString("name"), this.name);
            try {
                this.notificationService.sendNotification(new Notification(this.nodeUUID.toString(), NotificationType.BRIDGE_STOPPED, typedProperties));
            } catch (Exception e) {
                log.warn("unable to send notification when broadcast group is stopped", e);
            }
        }
    }

    @Override // org.hornetq.core.server.cluster.Bridge
    public void resume() throws Exception {
        this.queue.addConsumer(this);
        this.queue.deliverAsync();
    }

    @Override // org.hornetq.core.server.HornetQComponent
    public boolean isStarted() {
        return this.started;
    }

    @Override // org.hornetq.core.server.cluster.Bridge
    public synchronized void activate() {
        this.activated = true;
        this.executor.execute(new ConnectRunnable());
    }

    @Override // org.hornetq.core.server.cluster.Bridge
    public SimpleString getName() {
        return this.name;
    }

    @Override // org.hornetq.core.server.cluster.Bridge
    public Queue getQueue() {
        return this.queue;
    }

    @Override // org.hornetq.core.server.Consumer
    public Filter getFilter() {
        return this.filter;
    }

    @Override // org.hornetq.core.server.cluster.Bridge
    public SimpleString getForwardingAddress() {
        return this.forwardingAddress;
    }

    @Override // org.hornetq.core.server.cluster.Bridge
    public Transformer getTransformer() {
        return this.transformer;
    }

    @Override // org.hornetq.core.server.cluster.Bridge
    public boolean isUseDuplicateDetection() {
        return this.useDuplicateDetection;
    }

    @Override // org.hornetq.core.server.cluster.Bridge
    public RemotingConnection getForwardingConnection() {
        if (this.session == null) {
            return null;
        }
        return this.session.getConnection();
    }

    @Override // org.hornetq.api.core.client.SendAcknowledgementHandler
    public void sendAcknowledged(Message message) {
        try {
            MessageReference poll = this.refs.poll();
            if (poll != null) {
                if (isTrace) {
                    log.trace(this + " Acking " + poll + " on queue " + poll.getQueue());
                }
                poll.getQueue().acknowledge(poll);
            }
        } catch (Exception e) {
            log.error("Failed to ack", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ServerMessage beforeForward(ServerMessage serverMessage) {
        if (this.useDuplicateDetection) {
            serverMessage.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, getDuplicateBytes(this.nodeUUID, serverMessage.getMessageID()));
        }
        if (this.transformer == null) {
            return serverMessage;
        }
        ServerMessage transform = this.transformer.transform(serverMessage);
        if (transform != serverMessage && log.isDebugEnabled()) {
            log.debug("The transformer " + this.transformer + " made a copy of the message " + serverMessage + " as transformedMessage");
        }
        return transform;
    }

    public static byte[] getDuplicateBytes(UUID uuid, long j) {
        byte[] bArr = new byte[24];
        ByteBuffer wrap = ByteBuffer.wrap(bArr);
        wrap.put(uuid.asBytes());
        wrap.putLong(j);
        return bArr;
    }

    @Override // org.hornetq.core.server.Consumer
    public HandleStatus handle(MessageReference messageReference) throws Exception {
        if (this.filter != null && !this.filter.match(messageReference.getMessage())) {
            return HandleStatus.NO_MATCH;
        }
        synchronized (this) {
            if (!this.active) {
                if (log.isDebugEnabled()) {
                    log.debug(this + "::Ignoring reference on bridge as it is set to iniactive ref=" + messageReference);
                }
                return HandleStatus.BUSY;
            }
            if (isTrace) {
                log.trace("Bridge " + this + " is handling reference=" + messageReference);
            }
            messageReference.handled();
            ServerMessage message = messageReference.getMessage();
            this.refs.add(messageReference);
            ServerMessage beforeForward = beforeForward(message);
            SimpleString address = this.forwardingAddress != null ? this.forwardingAddress : beforeForward.getAddress();
            if (log.isTraceEnabled()) {
                log.trace("going to send message " + beforeForward);
            }
            try {
                this.producer.send(address, beforeForward);
                return HandleStatus.HANDLED;
            } catch (HornetQException e) {
                log.warn("Unable to send message " + messageReference + ", will try again once bridge reconnects", e);
                this.refs.remove(messageReference);
                this.executor.execute(new Runnable() { // from class: org.hornetq.core.server.cluster.impl.BridgeImpl.2
                    @Override // java.lang.Runnable
                    public void run() {
                        BridgeImpl.this.connectionFailed(e, false);
                    }
                });
                return HandleStatus.BUSY;
            }
        }
    }

    @Override // org.hornetq.core.remoting.FailureListener
    public void connectionFailed(HornetQException hornetQException, boolean z) {
        log.warn(this + "::Connection failed with failedOver=" + z + "-" + hornetQException, hornetQException);
        try {
            if (this.producer != null) {
                this.producer.close();
            }
            this.csf.cleanup();
        } catch (Throwable th) {
        }
        try {
            this.session.cleanUp(false);
        } catch (Throwable th2) {
        }
        fail(hornetQException.getCode() == 4);
        tryScheduleRetryReconnect(hornetQException.getCode());
    }

    protected void tryScheduleRetryReconnect(int i) {
        scheduleRetryConnect();
    }

    @Override // org.hornetq.api.core.client.SessionFailureListener
    public void beforeReconnect(HornetQException hornetQException) {
    }

    public String toString() {
        return getClass().getSimpleName() + "@" + Integer.toHexString(System.identityHashCode(this)) + " [name=" + ((Object) this.name) + ", queue=" + this.queue + " targetConnector=" + this.serverLocator + "]";
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void fail(boolean z) {
        log.debug(this + "\n\t::fail being called, permanently=" + z);
        if (this.queue != null) {
            try {
                if (isTrace) {
                    log.trace("Removing consumer on fail " + this + " from queue " + this.queue);
                }
                this.queue.removeConsumer(this);
            } catch (Exception e) {
                log.debug(e);
            }
        }
        cancelRefs();
        if (this.queue != null) {
            this.queue.deliverAsync();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void afterConnect() throws Exception {
        this.retryCount = 0;
        this.reconnectAttemptsInUse = this.reconnectAttempts;
        if (this.futureScheduledReconnection != null) {
            this.futureScheduledReconnection.cancel(true);
            this.futureScheduledReconnection = null;
        }
    }

    protected ClientSessionFactoryInternal getCurrentFactory() {
        return this.csf;
    }

    protected ClientSessionFactoryInternal createSessionFactory() throws Exception {
        ClientSessionFactoryInternal clientSessionFactoryInternal = (ClientSessionFactoryInternal) this.serverLocator.createSessionFactory();
        clientSessionFactoryInternal.setReconnectAttempts(0);
        return clientSessionFactoryInternal;
    }

    protected void connect() {
        log.debug("Connecting  " + this + " to its destination [" + this.nodeUUID.toString() + "], csf=" + this.csf);
        this.retryCount++;
        try {
            if (this.csf == null || this.csf.isClosed()) {
                this.csf = createSessionFactory();
                if (this.csf == null) {
                    scheduleRetryConnect();
                    return;
                }
                this.session = (ClientSessionInternal) this.csf.createSession(this.user, this.password, false, true, true, true, 1);
            }
            if (this.forwardingAddress != null) {
                try {
                    ClientSession.BindingQuery bindingQuery = this.session.bindingQuery(this.forwardingAddress);
                    if (this.forwardingAddress.startsWith(JMS_QUEUE_ADDRESS_PREFIX) || this.forwardingAddress.startsWith(JMS_TOPIC_ADDRESS_PREFIX)) {
                        if (!bindingQuery.isExists()) {
                            log.warn("Address " + ((Object) this.forwardingAddress) + " doesn't have any bindings yet, retry #(" + this.retryCount + ")");
                            scheduleRetryConnect();
                            return;
                        }
                    } else if (!bindingQuery.isExists()) {
                        log.info("Bridge " + ((Object) getName()) + " connected to fowardingAddress=" + ((Object) getForwardingAddress()) + ". " + ((Object) getForwardingAddress()) + " doesn't have any bindings what means messages will be ignored until a binding is created.");
                    }
                } catch (Throwable th) {
                    log.warn("Error on querying binding on bridge " + ((Object) this.name) + ". Retrying in 100 milliseconds", th);
                    this.retryCount--;
                    scheduleRetryConnectFixedTimeout(100L);
                    return;
                }
            }
            this.producer = this.session.createProducer();
            this.session.addFailureListener(this);
            this.session.setSendAcknowledgementHandler(this);
            afterConnect();
            this.active = true;
            this.queue.addConsumer(this);
            this.queue.deliverAsync();
            log.info("Bridge " + this + " is connected");
        } catch (HornetQException e) {
            if (e.getCode() == 112) {
                log.warn("Server is starting, retry to create the session for bridge " + ((Object) this.name));
                this.retryCount--;
                scheduleRetryConnectFixedTimeout(this.retryInterval);
            } else {
                if (log.isDebugEnabled()) {
                    log.debug("Bridge " + this + " is unable to connect to destination. Retrying", e);
                }
                scheduleRetryConnect();
            }
        } catch (Exception e2) {
            log.warn("Bridge " + this + " is unable to connect to destination. It will be disabled.", e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void scheduleRetryConnect() {
        if (this.serverLocator.isClosed()) {
            log.warn("ServerLocator was shutdown, can't retry on opening connection for bridge");
            return;
        }
        if (this.stopping) {
            log.info("Bridge is stopping, will not retry");
            return;
        }
        if (this.reconnectAttemptsInUse >= 0 && this.retryCount > this.reconnectAttempts) {
            log.warn("Bridge " + ((Object) this.name) + " achieved " + this.retryCount + " maxattempts=" + this.reconnectAttempts + " it will stop retrying to reconnect");
            fail(true);
            return;
        }
        long pow = (long) (this.retryInterval * Math.pow(this.retryMultiplier, this.retryCount));
        if (pow == 0) {
            pow = this.retryInterval;
        }
        if (pow > this.maxRetryInterval) {
            pow = this.maxRetryInterval;
        }
        log.debug("Bridge " + this + " retrying connection #" + this.retryCount + ", maxRetry=" + this.reconnectAttemptsInUse + ", timeout=" + pow);
        scheduleRetryConnectFixedTimeout(pow);
    }

    protected void scheduleRetryConnectFixedTimeout(long j) {
        if (this.csf != null) {
            try {
                this.csf.cleanup();
            } catch (Throwable th) {
            }
        }
        if (log.isDebugEnabled()) {
            log.debug("Scheduling retry for bridge " + ((Object) this.name) + " in " + j + " milliseconds");
        }
        this.futureScheduledReconnection = this.scheduledExecutor.schedule(new FutureConnectRunnable(), j, TimeUnit.MILLISECONDS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void internalCancelReferences() {
        cancelRefs();
        if (this.queue != null) {
            this.queue.deliverAsync();
        }
    }
}
