package org.jboss.messaging.core.server.cluster.impl;

import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.client.ClientProducer;
import org.jboss.messaging.core.client.ClientSessionFactory;
import org.jboss.messaging.core.client.SendAcknowledgementHandler;
import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
import org.jboss.messaging.core.client.impl.ClientSessionImpl;
import org.jboss.messaging.core.client.impl.ClientSessionInternal;
import org.jboss.messaging.core.client.management.impl.ManagementHelper;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.filter.impl.FilterImpl;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.management.NotificationType;
import org.jboss.messaging.core.management.ResourceNames;
import org.jboss.messaging.core.message.Message;
import org.jboss.messaging.core.message.impl.MessageImpl;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.BindingType;
import org.jboss.messaging.core.remoting.Channel;
import org.jboss.messaging.core.remoting.FailureListener;
import org.jboss.messaging.core.remoting.RemotingConnection;
import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateAcknowledgeMessage;
import org.jboss.messaging.core.security.impl.SecurityStoreImpl;
import org.jboss.messaging.core.server.HandleStatus;
import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.cluster.Bridge;
import org.jboss.messaging.core.server.cluster.MessageFlowRecord;
import org.jboss.messaging.core.server.cluster.Transformer;
import org.jboss.messaging.utils.Future;
import org.jboss.messaging.utils.Pair;
import org.jboss.messaging.utils.SimpleString;
import org.jboss.messaging.utils.UUID;

/* loaded from: input_file:org/jboss/messaging/core/server/cluster/impl/BridgeImpl.class */
public class BridgeImpl implements Bridge, FailureListener, SendAcknowledgementHandler {
    private static final Logger log = Logger.getLogger(BridgeImpl.class);
    private final UUID nodeUUID;
    private final SimpleString name;
    private Queue queue;
    private final Executor executor;
    private final SimpleString filterString;
    private final Filter filter;
    private final SimpleString forwardingAddress;
    private final java.util.Queue<MessageReference> refs;
    private final Transformer transformer;
    private volatile ClientSessionFactory csf;
    private volatile ClientSessionInternal session;
    private volatile ClientProducer producer;
    private volatile boolean started;
    private final boolean useDuplicateDetection;
    private volatile boolean active;
    private final Pair<TransportConfiguration, TransportConfiguration> connectorPair;
    private final long retryInterval;
    private final double retryIntervalMultiplier;
    private final int reconnectAttempts;
    private final boolean failoverOnServerShutdown;
    private final SimpleString idsHeaderName;
    private MessageFlowRecord flowRecord;
    private final SimpleString managementAddress;
    private final SimpleString managementNotificationAddress;
    private final String clusterPassword;
    private Channel replicatingChannel;
    private boolean activated;

    /* loaded from: input_file:org/jboss/messaging/core/server/cluster/impl/BridgeImpl$CreateObjectsRunnable.class */
    private class CreateObjectsRunnable implements Runnable {
        private CreateObjectsRunnable() {
        }

        @Override // java.lang.Runnable
        public synchronized void run() {
            if (BridgeImpl.this.createObjects()) {
                return;
            }
            BridgeImpl.this.active = false;
            BridgeImpl.this.started = false;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/jboss/messaging/core/server/cluster/impl/BridgeImpl$FailRunnable.class */
    public class FailRunnable implements Runnable {
        private FailRunnable() {
        }

        @Override // java.lang.Runnable
        public void run() {
            synchronized (BridgeImpl.this) {
                if (BridgeImpl.this.started) {
                    if (BridgeImpl.this.flowRecord != null) {
                    }
                    BridgeImpl.this.active = false;
                    try {
                        BridgeImpl.this.queue.removeConsumer(BridgeImpl.this);
                        BridgeImpl.this.session.cleanUp();
                        BridgeImpl.this.cancelRefs();
                        BridgeImpl.this.csf.close();
                    } catch (Exception e) {
                        BridgeImpl.log.error("Failed to stop", e);
                    }
                    if (BridgeImpl.this.createObjects()) {
                        return;
                    }
                    BridgeImpl.this.started = false;
                }
            }
        }
    }

    /* loaded from: input_file:org/jboss/messaging/core/server/cluster/impl/BridgeImpl$StopRunnable.class */
    private class StopRunnable implements Runnable {
        private StopRunnable() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                synchronized (BridgeImpl.this) {
                    if (BridgeImpl.this.started) {
                        if (BridgeImpl.this.session != null) {
                            BridgeImpl.this.session.close();
                        }
                        BridgeImpl.this.started = false;
                        BridgeImpl.this.active = false;
                        BridgeImpl.this.queue.removeConsumer(BridgeImpl.this);
                        BridgeImpl.this.cancelRefs();
                    }
                }
            } catch (Exception e) {
                BridgeImpl.log.error("Failed to stop bridge", e);
            }
        }
    }

    public BridgeImpl(UUID uuid, SimpleString simpleString, Queue queue, Pair<TransportConfiguration, TransportConfiguration> pair, Executor executor, SimpleString simpleString2, SimpleString simpleString3, ScheduledExecutorService scheduledExecutorService, Transformer transformer, long j, double d, int i, boolean z, boolean z2, SimpleString simpleString4, SimpleString simpleString5, String str, Channel channel, boolean z3, StorageManager storageManager) throws Exception {
        this(uuid, simpleString, queue, pair, executor, simpleString2, simpleString3, scheduledExecutorService, transformer, j, d, i, z, z2, simpleString4, simpleString5, str, null, channel, z3, storageManager, null);
    }

    public BridgeImpl(UUID uuid, SimpleString simpleString, Queue queue, Pair<TransportConfiguration, TransportConfiguration> pair, Executor executor, SimpleString simpleString2, SimpleString simpleString3, ScheduledExecutorService scheduledExecutorService, Transformer transformer, long j, double d, int i, boolean z, boolean z2, SimpleString simpleString4, SimpleString simpleString5, String str, MessageFlowRecord messageFlowRecord, Channel channel, boolean z3, StorageManager storageManager, MessagingServer messagingServer) throws Exception {
        this.refs = new LinkedList();
        this.nodeUUID = uuid;
        this.name = simpleString;
        this.queue = queue;
        this.executor = executor;
        this.filterString = simpleString2;
        if (this.filterString != null) {
            this.filter = new FilterImpl(simpleString2);
        } else {
            this.filter = null;
        }
        this.forwardingAddress = simpleString3;
        this.transformer = transformer;
        this.useDuplicateDetection = z2;
        this.connectorPair = pair;
        this.retryInterval = j;
        this.retryIntervalMultiplier = d;
        this.reconnectAttempts = i;
        this.failoverOnServerShutdown = z;
        this.idsHeaderName = MessageImpl.HDR_ROUTE_TO_IDS.concat(simpleString);
        this.managementAddress = simpleString4;
        this.managementNotificationAddress = simpleString5;
        this.clusterPassword = str;
        this.flowRecord = messageFlowRecord;
        this.replicatingChannel = channel;
        this.activated = z3;
    }

    @Override // org.jboss.messaging.core.server.MessagingComponent
    public synchronized void start() throws Exception {
        if (this.started) {
            return;
        }
        this.started = true;
        if (this.activated) {
            this.executor.execute(new CreateObjectsRunnable());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void cancelRefs() throws Exception {
        LinkedList linkedList = new LinkedList();
        while (true) {
            MessageReference poll = this.refs.poll();
            if (poll == null) {
                break;
            } else {
                linkedList.addFirst(poll);
            }
        }
        Iterator it = linkedList.iterator();
        while (it.hasNext()) {
            MessageReference messageReference = (MessageReference) it.next();
            messageReference.getQueue().cancel(messageReference);
        }
    }

    @Override // org.jboss.messaging.core.server.MessagingComponent
    public void stop() throws Exception {
        if (this.started && this.csf != null) {
            this.csf.close();
        }
        this.executor.execute(new StopRunnable());
        waitForRunnablesToComplete();
    }

    @Override // org.jboss.messaging.core.server.MessagingComponent
    public boolean isStarted() {
        return this.started;
    }

    @Override // org.jboss.messaging.core.server.cluster.Bridge
    public synchronized void activate() {
        this.replicatingChannel = null;
        this.activated = true;
        this.executor.execute(new CreateObjectsRunnable());
    }

    @Override // org.jboss.messaging.core.server.cluster.Bridge
    public SimpleString getName() {
        return this.name;
    }

    @Override // org.jboss.messaging.core.server.cluster.Bridge
    public Queue getQueue() {
        return this.queue;
    }

    @Override // org.jboss.messaging.core.server.cluster.Bridge
    public void setQueue(Queue queue) {
        this.queue = queue;
    }

    @Override // org.jboss.messaging.core.server.Consumer
    public Filter getFilter() {
        return this.filter;
    }

    @Override // org.jboss.messaging.core.server.cluster.Bridge
    public SimpleString getForwardingAddress() {
        return this.forwardingAddress;
    }

    @Override // org.jboss.messaging.core.server.cluster.Bridge
    public Transformer getTransformer() {
        return this.transformer;
    }

    @Override // org.jboss.messaging.core.server.cluster.Bridge
    public boolean isUseDuplicateDetection() {
        return this.useDuplicateDetection;
    }

    public RemotingConnection getForwardingConnection() {
        if (this.session == null) {
            return null;
        }
        return ((ClientSessionImpl) this.session).getConnection();
    }

    @Override // org.jboss.messaging.core.client.SendAcknowledgementHandler
    public void sendAcknowledged(Message message) {
        try {
            final MessageReference poll = this.refs.poll();
            if (poll != null) {
                if (this.replicatingChannel == null) {
                    poll.getQueue().acknowledge(poll);
                } else {
                    this.replicatingChannel.replicatePacket(new ReplicateAcknowledgeMessage(this.name, poll.getMessage().getMessageID()), 1L, new Runnable() { // from class: org.jboss.messaging.core.server.cluster.impl.BridgeImpl.1
                        @Override // java.lang.Runnable
                        public void run() {
                            try {
                                poll.getQueue().acknowledge(poll);
                            } catch (Exception e) {
                                BridgeImpl.log.error("Failed to ack", e);
                            }
                        }
                    });
                }
            }
        } catch (Exception e) {
            log.error("Failed to ack", e);
        }
    }

    @Override // org.jboss.messaging.core.server.Consumer
    public HandleStatus handle(MessageReference messageReference) throws Exception {
        HandleStatus handleStatus;
        if (this.filter != null && !this.filter.match(messageReference.getMessage())) {
            return HandleStatus.NO_MATCH;
        }
        if (!this.active) {
            return HandleStatus.BUSY;
        }
        synchronized (this) {
            messageReference.getQueue().referenceHandled();
            ServerMessage message = messageReference.getMessage();
            this.refs.add(messageReference);
            if (this.flowRecord != null) {
                message = message.copy();
                HashSet<SimpleString> hashSet = new HashSet(message.getPropertyNames());
                byte[] bArr = (byte[]) message.getProperty(this.idsHeaderName);
                for (SimpleString simpleString : hashSet) {
                    if (simpleString.startsWith(MessageImpl.HDR_ROUTE_TO_IDS)) {
                        message.removeProperty(simpleString);
                    }
                }
                message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, bArr);
                message.putBooleanProperty(MessageImpl.HDR_FROM_CLUSTER, Boolean.TRUE.booleanValue());
            }
            if (this.useDuplicateDetection && !message.containsProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID)) {
                byte[] bArr2 = new byte[24];
                ByteBuffer wrap = ByteBuffer.wrap(bArr2);
                wrap.put(this.nodeUUID.asBytes());
                wrap.putLong(message.getMessageID());
                message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, bArr2);
            }
            if (this.transformer != null) {
                message = this.transformer.transform(message);
            }
            this.producer.send(this.forwardingAddress != null ? this.forwardingAddress : message.getDestination(), message);
            handleStatus = HandleStatus.HANDLED;
        }
        return handleStatus;
    }

    @Override // org.jboss.messaging.core.remoting.FailureListener
    public boolean connectionFailed(MessagingException messagingException) {
        fail();
        return true;
    }

    private void waitForRunnablesToComplete() {
        Future future = new Future();
        this.executor.execute(future);
        if (future.await(10000L)) {
            return;
        }
        log.warn("Timed out waiting to stop");
    }

    private void fail() {
        if (this.started) {
            this.executor.execute(new FailRunnable());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized boolean createObjects() {
        if (!this.started) {
            return false;
        }
        try {
            this.queue.addConsumer(this);
            this.csf = new ClientSessionFactoryImpl(this.connectorPair.a, this.connectorPair.b);
            this.csf.setFailoverOnServerShutdown(this.failoverOnServerShutdown);
            this.csf.setRetryInterval(this.retryInterval);
            this.csf.setRetryIntervalMultiplier(this.retryIntervalMultiplier);
            this.csf.setReconnectAttempts(this.reconnectAttempts);
            this.session = (ClientSessionInternal) this.csf.createSession(SecurityStoreImpl.CLUSTER_ADMIN_USER, this.clusterPassword, false, true, true, true, 1);
            if (this.session == null) {
                return false;
            }
            this.producer = this.session.createProducer();
            this.session.addFailureListener(this);
            this.session.setSendAcknowledgementHandler(this);
            if (this.flowRecord != null) {
                SimpleString simpleString = new SimpleString("notif." + this.nodeUUID.toString() + "." + this.name.toString());
                try {
                    this.session.createQueue(this.managementNotificationAddress, simpleString, new SimpleString(((Object) ManagementHelper.HDR_BINDING_TYPE) + "<>" + BindingType.DIVERT.toInt() + " AND " + ((Object) ManagementHelper.HDR_NOTIFICATION_TYPE) + " IN ('" + NotificationType.BINDING_ADDED + "','" + NotificationType.BINDING_REMOVED + "','" + NotificationType.CONSUMER_CREATED + "','" + NotificationType.CONSUMER_CLOSED + "') AND " + ((Object) ManagementHelper.HDR_DISTANCE) + "<" + this.flowRecord.getMaxHops() + " AND (" + ((Object) ManagementHelper.HDR_ADDRESS) + " LIKE '" + this.flowRecord.getAddress() + "%')"), false);
                } catch (MessagingException e) {
                    if (e.getCode() != 101) {
                        throw e;
                    }
                }
                this.session.createConsumer(simpleString).setMessageHandler(this.flowRecord);
                this.session.start();
                ClientMessage createClientMessage = this.session.createClientMessage(false);
                ManagementHelper.putOperationInvocation(createClientMessage, ResourceNames.CORE_SERVER, "sendQueueInfoToQueue", simpleString.toString(), this.flowRecord.getAddress());
                this.session.createProducer(this.managementAddress).send(createClientMessage);
            }
            this.active = true;
            this.queue.deliverAsync(this.executor);
            return true;
        } catch (Exception e2) {
            log.warn("Unable to connect. Bridge is now disabled.", e2);
            return false;
        }
    }
}
