/*
 * Decompiled with CFR 0.152.
 */
package org.hornetq.core.replication;

import java.io.FileInputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.HornetQExceptionType;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.protocol.core.Channel;
import org.hornetq.core.protocol.core.ChannelHandler;
import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.ChannelImpl;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationAddMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationAddTXMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCommitMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeginMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageEndMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLiveIsStoppingMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPageEventMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPageWriteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPrepareMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
import org.hornetq.core.server.HornetQComponent;
import org.hornetq.core.server.HornetQServerLogger;
import org.hornetq.utils.ExecutorFactory;

public final class ReplicationManager
implements HornetQComponent {
    private final ResponseHandler responseHandler = new ResponseHandler();
    private final Channel replicatingChannel;
    private boolean started;
    private volatile boolean enabled;
    private final Object replicationLock = new Object();
    private final Object largeMessageSyncGuard = new Object();
    private final HashMap<Long, Pair<String, Long>> largeMessagesToSync = new HashMap();
    private final Queue<OperationContext> pendingTokens = new ConcurrentLinkedQueue<OperationContext>();
    private final ExecutorFactory executorFactory;
    private SessionFailureListener failureListener;
    private CoreRemotingConnection remotingConnection;
    private volatile boolean inSync = true;

    public ReplicationManager(CoreRemotingConnection remotingConnection, ExecutorFactory executorFactory) {
        this.executorFactory = executorFactory;
        this.replicatingChannel = remotingConnection.getChannel(ChannelImpl.CHANNEL_ID.REPLICATION.id, -1);
        this.remotingConnection = remotingConnection;
    }

    public void appendUpdateRecord(byte journalID, ADD_OPERATION_TYPE operation, long id, byte recordType, EncodingSupport record) throws Exception {
        if (this.enabled) {
            this.sendReplicatePacket(new ReplicationAddMessage(journalID, operation, id, recordType, record));
        }
    }

    public void appendDeleteRecord(byte journalID, long id) throws Exception {
        if (this.enabled) {
            this.sendReplicatePacket(new ReplicationDeleteMessage(journalID, id));
        }
    }

    public void appendAddRecordTransactional(byte journalID, ADD_OPERATION_TYPE operation, long txID, long id, byte recordType, EncodingSupport record) throws Exception {
        if (this.enabled) {
            this.sendReplicatePacket(new ReplicationAddTXMessage(journalID, operation, txID, id, recordType, record));
        }
    }

    public void appendCommitRecord(byte journalID, long txID, boolean sync, boolean lineUp) throws Exception {
        if (this.enabled) {
            this.sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID), lineUp);
        }
    }

    public void appendDeleteRecordTransactional(byte journalID, long txID, long id, EncodingSupport record) throws Exception {
        if (this.enabled) {
            this.sendReplicatePacket(new ReplicationDeleteTXMessage(journalID, txID, id, record));
        }
    }

    public void appendDeleteRecordTransactional(byte journalID, long txID, long id) throws Exception {
        if (this.enabled) {
            this.sendReplicatePacket(new ReplicationDeleteTXMessage(journalID, txID, id, NullEncoding.instance));
        }
    }

    public void appendPrepareRecord(byte journalID, long txID, EncodingSupport transactionData) throws Exception {
        if (this.enabled) {
            this.sendReplicatePacket(new ReplicationPrepareMessage(journalID, txID, transactionData));
        }
    }

    public void appendRollbackRecord(byte journalID, long txID) throws Exception {
        if (this.enabled) {
            this.sendReplicatePacket(new ReplicationCommitMessage(journalID, true, txID));
        }
    }

    public void pageClosed(SimpleString storeName, int pageNumber) {
        if (this.enabled) {
            this.sendReplicatePacket(new ReplicationPageEventMessage(storeName, pageNumber, false));
        }
    }

    public void pageDeleted(SimpleString storeName, int pageNumber) {
        if (this.enabled) {
            this.sendReplicatePacket(new ReplicationPageEventMessage(storeName, pageNumber, true));
        }
    }

    public void pageWrite(PagedMessage message, int pageNumber) {
        if (this.enabled) {
            this.sendReplicatePacket(new ReplicationPageWriteMessage(message, pageNumber));
        }
    }

    public void largeMessageBegin(long messageId) {
        if (this.enabled) {
            this.sendReplicatePacket(new ReplicationLargeMessageBeginMessage(messageId));
        }
    }

    public void largeMessageDelete(Long messageId) {
        if (this.enabled) {
            this.sendReplicatePacket(new ReplicationLargeMessageEndMessage(messageId));
        }
    }

    public void largeMessageWrite(long messageId, byte[] body) {
        if (this.enabled) {
            this.sendReplicatePacket(new ReplicationLargeMessageWriteMessage(messageId, body));
        }
    }

    @Override
    public synchronized boolean isStarted() {
        return this.started;
    }

    @Override
    public synchronized void start() throws HornetQException {
        if (this.started) {
            throw new IllegalStateException("ReplicationManager is already started");
        }
        this.replicatingChannel.setHandler(this.responseHandler);
        this.failureListener = new ReplicatedSessionFailureListener();
        this.remotingConnection.addFailureListener(this.failureListener);
        this.started = true;
        this.enabled = true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized void stop() throws Exception {
        if (!this.started) {
            return;
        }
        Object object = this.replicationLock;
        synchronized (object) {
            this.enabled = false;
            if (this.replicatingChannel != null) {
                this.replicatingChannel.close();
            }
            this.clearReplicationTokens();
        }
        CoreRemotingConnection toStop = this.remotingConnection;
        if (toStop != null) {
            toStop.removeFailureListener(this.failureListener);
        }
        this.remotingConnection = null;
        this.started = false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public void clearReplicationTokens() {
        Object object = this.replicationLock;
        synchronized (object) {
            while (!this.pendingTokens.isEmpty()) {
                OperationContext ctx = this.pendingTokens.poll();
                try {
                    ctx.replicationDone();
                }
                catch (Throwable e) {
                    HornetQServerLogger.LOGGER.errorCompletingCallbackOnReplicationManager(e);
                }
            }
            return;
        }
    }

    public Set<OperationContext> getActiveTokens() {
        LinkedHashSet<OperationContext> activeContexts = new LinkedHashSet<OperationContext>();
        for (OperationContext ctx : this.pendingTokens) {
            activeContexts.add(ctx);
        }
        return activeContexts;
    }

    private OperationContext sendReplicatePacket(Packet packet) {
        return this.sendReplicatePacket(packet, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private OperationContext sendReplicatePacket(Packet packet, boolean lineUp) {
        if (!this.enabled) {
            return null;
        }
        boolean runItNow = false;
        OperationContext repliToken = OperationContextImpl.getContext(this.executorFactory);
        if (lineUp) {
            repliToken.replicationLineUp();
        }
        Object object = this.replicationLock;
        synchronized (object) {
            if (this.enabled) {
                this.pendingTokens.add(repliToken);
                this.replicatingChannel.send(packet);
            } else {
                runItNow = true;
            }
        }
        if (runItNow) {
            repliToken.replicationDone();
        }
        return repliToken;
    }

    private void replicated() {
        OperationContext ctx = this.pendingTokens.poll();
        if (ctx == null) {
            throw new IllegalStateException("Missing replication token on the queue.");
        }
        ctx.replicationDone();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void syncJournalFile(JournalFile jf, JournalStorageManager.JournalContent content) throws Exception {
        if (!this.enabled) {
            return;
        }
        SequentialFile file = jf.getFile().cloneFile();
        try {
            HornetQServerLogger.LOGGER.journalSynch(jf, file.size(), file);
            this.sendLargeFile(content, null, jf.getFileID(), file, Long.MAX_VALUE);
        }
        finally {
            if (file.isOpen()) {
                file.close();
            }
        }
    }

    public Map.Entry<Long, Pair<String, Long>> getNextLargeMessageToSync() {
        Iterator<Map.Entry<Long, Pair<String, Long>>> iter = this.largeMessagesToSync.entrySet().iterator();
        if (!iter.hasNext()) {
            return null;
        }
        Map.Entry<Long, Pair<String, Long>> entry = iter.next();
        iter.remove();
        return entry;
    }

    public void syncLargeMessageFile(SequentialFile file, long size, long id) throws Exception {
        if (this.enabled) {
            this.sendLargeFile(null, null, id, file, size);
        }
    }

    public void syncPages(SequentialFile file, long id, SimpleString queueName) throws Exception {
        if (this.enabled) {
            this.sendLargeFile(null, queueName, id, file, Long.MAX_VALUE);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private void sendLargeFile(JournalStorageManager.JournalContent content, SimpleString pageStore, long id, SequentialFile file, long maxBytesToSend) throws Exception {
        if (!this.enabled) {
            return;
        }
        if (!file.isOpen()) {
            file.open();
        }
        try (FileInputStream fis = new FileInputStream(file.getJavaFile());
             FileChannel channel = fis.getChannel();){
            ByteBuffer buffer = ByteBuffer.allocate(131072);
            do {
                int bytesRead;
                buffer.clear();
                int toSend = bytesRead = channel.read(buffer);
                if (bytesRead > 0) {
                    if ((long)bytesRead >= maxBytesToSend) {
                        toSend = (int)maxBytesToSend;
                        maxBytesToSend = 0L;
                    } else {
                        maxBytesToSend -= (long)bytesRead;
                    }
                    buffer.limit(toSend);
                }
                buffer.rewind();
                this.sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer));
                if (bytesRead == -1 || bytesRead == 0) return;
            } while (maxBytesToSend != 0L);
            return;
        }
        finally {
            if (file.isOpen()) {
                file.close();
            }
        }
    }

    public void sendStartSyncMessage(JournalFile[] datafiles, JournalStorageManager.JournalContent contentType, String nodeID, boolean allowsAutoFailBack) throws HornetQException {
        if (this.enabled) {
            this.sendReplicatePacket(new ReplicationStartSyncMessage(datafiles, contentType, nodeID, allowsAutoFailBack));
        }
    }

    public void sendSynchronizationDone(String nodeID) {
        if (this.enabled) {
            this.sendReplicatePacket(new ReplicationStartSyncMessage(nodeID));
            this.inSync = false;
        }
    }

    public void sendLargeMessageIdListMessage(Map<Long, Pair<String, Long>> largeMessages) {
        this.largeMessagesToSync.putAll(largeMessages);
        ArrayList<Long> idsToSend = new ArrayList<Long>(this.largeMessagesToSync.keySet());
        if (this.enabled) {
            this.sendReplicatePacket(new ReplicationStartSyncMessage(idsToSend));
        }
    }

    public OperationContext sendLiveIsStopping(ReplicationLiveIsStoppingMessage.LiveStopping finalMessage) {
        HornetQServerLogger.LOGGER.warn("LIVE IS STOPPING?!? message=" + (Object)((Object)finalMessage) + " enabled=" + this.enabled);
        if (this.enabled) {
            HornetQServerLogger.LOGGER.warn("LIVE IS STOPPING?!? message=" + (Object)((Object)finalMessage) + " " + this.enabled);
            return this.sendReplicatePacket(new ReplicationLiveIsStoppingMessage(finalMessage));
        }
        return null;
    }

    public CoreRemotingConnection getBackupTransportConnection() {
        return this.remotingConnection;
    }

    public boolean isSynchronizing() {
        return this.inSync;
    }

    private static final class NullEncoding
    implements EncodingSupport {
        static final NullEncoding instance = new NullEncoding();

        private NullEncoding() {
        }

        @Override
        public void decode(HornetQBuffer buffer) {
        }

        @Override
        public void encode(HornetQBuffer buffer) {
        }

        @Override
        public int getEncodeSize() {
            return 0;
        }
    }

    private final class ResponseHandler
    implements ChannelHandler {
        private ResponseHandler() {
        }

        @Override
        public void handlePacket(Packet packet) {
            if (packet.getType() == 90) {
                ReplicationManager.this.replicated();
            }
        }
    }

    private final class ReplicatedSessionFailureListener
    implements SessionFailureListener {
        private ReplicatedSessionFailureListener() {
        }

        @Override
        public void connectionFailed(HornetQException me, boolean failedOver) {
            if (me.getType() == HornetQExceptionType.DISCONNECTED) {
                HornetQServerLogger.LOGGER.replicationStopOnBackupShutdown();
            } else {
                HornetQServerLogger.LOGGER.replicationStopOnBackupFail(me);
            }
            try {
                ReplicationManager.this.stop();
            }
            catch (Exception e) {
                HornetQServerLogger.LOGGER.errorStoppingReplication(e);
            }
        }

        @Override
        public void beforeReconnect(HornetQException me) {
        }
    }

    public static enum ADD_OPERATION_TYPE {
        UPDATE{

            @Override
            public boolean toBoolean() {
                return true;
            }
        }
        ,
        ADD{

            @Override
            public boolean toBoolean() {
                return false;
            }
        };


        public abstract boolean toBoolean();

        public static ADD_OPERATION_TYPE toOperation(boolean isUpdate) {
            return isUpdate ? UPDATE : ADD;
        }
    }
}

