package org.hornetq.core.replication.impl;

import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.protocol.core.Channel;
import org.hornetq.core.protocol.core.ChannelHandler;
import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.wireformat.CreateReplicationSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationAddMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationAddTXMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCommitMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargemessageEndMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPageEventMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPageWriteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPrepareMessage;
import org.hornetq.core.replication.ReplicationManager;
import org.hornetq.utils.ExecutorFactory;

/* loaded from: input_file:org/hornetq/core/replication/impl/ReplicationManagerImpl.class */
public class ReplicationManagerImpl implements ReplicationManager {
    private static final Logger log = Logger.getLogger(ReplicationManagerImpl.class);
    private final ClientSessionFactoryInternal sessionFactory;
    private CoreRemotingConnection replicatingConnection;
    private Channel replicatingChannel;
    private boolean started;
    private volatile boolean enabled;
    private final ExecutorFactory executorFactory;
    private SessionFailureListener failureListener;
    private final ResponseHandler responseHandler = new ResponseHandler();
    private final Object replicationLock = new Object();
    private final Queue<OperationContext> pendingTokens = new ConcurrentLinkedQueue();

    /* loaded from: input_file:org/hornetq/core/replication/impl/ReplicationManagerImpl$NullEncoding.class */
    private static class NullEncoding implements EncodingSupport {
        static NullEncoding instance = new NullEncoding();

        private NullEncoding() {
        }

        @Override // org.hornetq.core.journal.EncodingSupport
        public void decode(HornetQBuffer hornetQBuffer) {
        }

        @Override // org.hornetq.core.journal.EncodingSupport
        public void encode(HornetQBuffer hornetQBuffer) {
        }

        @Override // org.hornetq.core.journal.EncodingSupport
        public int getEncodeSize() {
            return 0;
        }
    }

    /* loaded from: input_file:org/hornetq/core/replication/impl/ReplicationManagerImpl$ResponseHandler.class */
    protected class ResponseHandler implements ChannelHandler {
        protected ResponseHandler() {
        }

        @Override // org.hornetq.core.protocol.core.ChannelHandler
        public void handlePacket(Packet packet) {
            if (packet.getType() == 90) {
                ReplicationManagerImpl.this.replicated();
            }
        }
    }

    public ReplicationManagerImpl(ClientSessionFactoryInternal clientSessionFactoryInternal, ExecutorFactory executorFactory) {
        this.sessionFactory = clientSessionFactoryInternal;
        this.executorFactory = executorFactory;
    }

    @Override // org.hornetq.core.replication.ReplicationManager
    public void appendAddRecord(byte b, long j, byte b2, EncodingSupport encodingSupport) {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationAddMessage(b, false, j, b2, encodingSupport));
        }
    }

    @Override // org.hornetq.core.replication.ReplicationManager
    public void appendUpdateRecord(byte b, long j, byte b2, EncodingSupport encodingSupport) throws Exception {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationAddMessage(b, true, j, b2, encodingSupport));
        }
    }

    @Override // org.hornetq.core.replication.ReplicationManager
    public void appendDeleteRecord(byte b, long j) throws Exception {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationDeleteMessage(b, j));
        }
    }

    @Override // org.hornetq.core.replication.ReplicationManager
    public void appendAddRecordTransactional(byte b, long j, long j2, byte b2, EncodingSupport encodingSupport) throws Exception {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationAddTXMessage(b, false, j, j2, b2, encodingSupport));
        }
    }

    @Override // org.hornetq.core.replication.ReplicationManager
    public void appendUpdateRecordTransactional(byte b, long j, long j2, byte b2, EncodingSupport encodingSupport) throws Exception {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationAddTXMessage(b, true, j, j2, b2, encodingSupport));
        }
    }

    @Override // org.hornetq.core.replication.ReplicationManager
    public void appendCommitRecord(byte b, long j, boolean z) throws Exception {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationCommitMessage(b, false, j), z);
        }
    }

    @Override // org.hornetq.core.replication.ReplicationManager
    public void appendDeleteRecordTransactional(byte b, long j, long j2, EncodingSupport encodingSupport) throws Exception {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationDeleteTXMessage(b, j, j2, encodingSupport));
        }
    }

    @Override // org.hornetq.core.replication.ReplicationManager
    public void appendDeleteRecordTransactional(byte b, long j, long j2) throws Exception {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationDeleteTXMessage(b, j, j2, NullEncoding.instance));
        }
    }

    @Override // org.hornetq.core.replication.ReplicationManager
    public void appendPrepareRecord(byte b, long j, EncodingSupport encodingSupport) throws Exception {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationPrepareMessage(b, j, encodingSupport));
        }
    }

    @Override // org.hornetq.core.replication.ReplicationManager
    public void appendRollbackRecord(byte b, long j) throws Exception {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationCommitMessage(b, false, j));
        }
    }

    @Override // org.hornetq.core.replication.ReplicationManager
    public void pageClosed(SimpleString simpleString, int i) {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationPageEventMessage(simpleString, i, false));
        }
    }

    @Override // org.hornetq.core.replication.ReplicationManager
    public void pageDeleted(SimpleString simpleString, int i) {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationPageEventMessage(simpleString, i, true));
        }
    }

    @Override // org.hornetq.core.replication.ReplicationManager
    public void pageWrite(PagedMessage pagedMessage, int i) {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationPageWriteMessage(pagedMessage, i));
        }
    }

    @Override // org.hornetq.core.replication.ReplicationManager
    public void largeMessageBegin(long j) {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationLargeMessageBeingMessage(j));
        }
    }

    @Override // org.hornetq.core.replication.ReplicationManager
    public void largeMessageDelete(long j) {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationLargemessageEndMessage(j));
        }
    }

    @Override // org.hornetq.core.replication.ReplicationManager
    public void largeMessageWrite(long j, byte[] bArr) {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationLargeMessageWriteMessage(j, bArr));
        }
    }

    @Override // org.hornetq.core.server.HornetQComponent
    public synchronized boolean isStarted() {
        return this.started;
    }

    @Override // org.hornetq.core.server.HornetQComponent
    public synchronized void start() throws Exception {
        if (this.started) {
            throw new IllegalStateException("ReplicationManager is already started");
        }
        this.replicatingConnection = this.sessionFactory.getConnection();
        if (this.replicatingConnection == null) {
            log.warn("Backup server MUST be started before live server. Initialisation will not proceed.");
            throw new HornetQException(104, "Backup server MUST be started before live server. Initialisation will not proceed.");
        }
        long generateChannelID = this.replicatingConnection.generateChannelID();
        Channel channel = this.replicatingConnection.getChannel(1L, -1);
        this.replicatingChannel = this.replicatingConnection.getChannel(generateChannelID, -1);
        this.replicatingChannel.setHandler(this.responseHandler);
        channel.sendBlocking(new CreateReplicationSessionMessage(generateChannelID));
        this.failureListener = new SessionFailureListener() { // from class: org.hornetq.core.replication.impl.ReplicationManagerImpl.1
            @Override // org.hornetq.core.remoting.FailureListener
            public void connectionFailed(HornetQException hornetQException, boolean z) {
                if (hornetQException.getCode() == 4) {
                    ReplicationManagerImpl.log.warn("The backup node has been shut-down, replication will now stop");
                } else {
                    ReplicationManagerImpl.log.warn("Connection to the backup node failed, removing replication now", hornetQException);
                }
                try {
                    ReplicationManagerImpl.this.stop();
                } catch (Exception e) {
                    ReplicationManagerImpl.log.warn(e.getMessage(), e);
                }
            }

            @Override // org.hornetq.api.core.client.SessionFailureListener
            public void beforeReconnect(HornetQException hornetQException) {
            }
        };
        this.sessionFactory.addFailureListener(this.failureListener);
        this.started = true;
        this.enabled = true;
    }

    @Override // org.hornetq.core.server.HornetQComponent
    public void stop() throws Exception {
        if (this.started) {
            this.enabled = false;
            while (!this.pendingTokens.isEmpty()) {
                try {
                    this.pendingTokens.poll().replicationDone();
                } catch (Throwable th) {
                    log.warn("Error completing callback on replication manager", th);
                }
            }
            if (this.replicatingChannel != null) {
                this.replicatingChannel.close();
            }
            this.sessionFactory.causeExit();
            this.sessionFactory.removeFailureListener(this.failureListener);
            if (this.replicatingConnection != null) {
                this.replicatingConnection.destroy();
            }
            this.replicatingConnection = null;
            this.started = false;
        }
    }

    @Override // org.hornetq.core.replication.ReplicationManager
    public Set<OperationContext> getActiveTokens() {
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        Iterator<OperationContext> it = this.pendingTokens.iterator();
        while (it.hasNext()) {
            linkedHashSet.add(it.next());
        }
        return linkedHashSet;
    }

    @Override // org.hornetq.core.replication.ReplicationManager
    public void compareJournals(JournalLoadInformation[] journalLoadInformationArr) throws HornetQException {
        this.replicatingChannel.sendBlocking(new ReplicationCompareDataMessage(journalLoadInformationArr));
    }

    private void sendReplicatePacket(Packet packet) {
        sendReplicatePacket(packet, true);
    }

    private void sendReplicatePacket(Packet packet, boolean z) {
        boolean z2 = false;
        OperationContext context = OperationContextImpl.getContext(this.executorFactory);
        if (z) {
            context.replicationLineUp();
        }
        synchronized (this.replicationLock) {
            if (this.enabled) {
                this.pendingTokens.add(context);
                this.replicatingChannel.send(packet);
            } else {
                z2 = true;
            }
        }
        if (z2) {
            context.replicationDone();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void replicated() {
        OperationContext poll = this.pendingTokens.poll();
        if (poll == null) {
            throw new IllegalStateException("Missing replication token on the queue.");
        }
        poll.replicationDone();
    }
}
