package org.apache.activemq.artemis.core.replication;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.journal.impl.FileWrapperJournal;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageInSync;
import org.apache.activemq.artemis.core.protocol.core.Channel;
import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.BackupReplicationStartFailedMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationAddMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationAddTXMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationCommitMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeginMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLargeMessageEndMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLiveIsStoppingMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationPageEventMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationPageWriteMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationPrepareMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessageV2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
import org.apache.activemq.artemis.core.replication.ReplicationManager;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation;
import org.apache.activemq.artemis.utils.PasswordMaskingUtil;
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
import org.jboss.logging.Logger;

/* loaded from: input_file:artemis-server-2.6.1.amq-720004-redhat-1.jar:org/apache/activemq/artemis/core/replication/ReplicationEndpoint.class */
public final class ReplicationEndpoint implements ChannelHandler, ActiveMQComponent {
    private static final Logger logger = Logger.getLogger(ReplicationEndpoint.class);
    private final IOCriticalErrorListener criticalErrorListener;
    private final ActiveMQServerImpl server;
    private final boolean wantedFailBack;
    private final SharedNothingBackupActivation activation;
    private Channel channel;
    private Journal[] journals;
    private StorageManager storageManager;
    private PagingManager pageManager;
    private volatile boolean started;
    private SharedNothingBackupQuorum backupQuorum;
    private Executor executor;
    private final boolean noSync = false;
    private final JournalLoadInformation[] journalLoadInformation = new JournalLoadInformation[2];
    private final Map<AbstractJournalStorageManager.JournalContent, Map<Long, JournalSyncFile>> filesReservedForSync = new HashMap();
    private Map<AbstractJournalStorageManager.JournalContent, Journal> journalsHolder = new HashMap();
    private final ConcurrentMap<SimpleString, ConcurrentMap<Integer, Page>> pageIndex = new ConcurrentHashMap();
    private final ConcurrentMap<Long, ReplicatedLargeMessage> largeMessages = new ConcurrentHashMap();
    private boolean deletePages = true;

    /* loaded from: input_file:artemis-server-2.6.1.amq-720004-redhat-1.jar:org/apache/activemq/artemis/core/replication/ReplicationEndpoint$JournalSyncFile.class */
    public static final class JournalSyncFile {
        private FileChannel channel;
        private final File file;
        private FileOutputStream fos;

        public JournalSyncFile(JournalFile journalFile) throws Exception {
            SequentialFile file = journalFile.getFile();
            this.file = file.getJavaFile();
            file.close();
        }

        synchronized FileChannel getChannel() throws Exception {
            if (this.channel == null) {
                this.fos = new FileOutputStream(this.file);
                this.channel = this.fos.getChannel();
            }
            return this.channel;
        }

        synchronized void close() throws IOException {
            if (this.fos != null) {
                this.fos.close();
            }
            if (this.channel != null) {
                this.channel.close();
            }
        }

        public String toString() {
            return "JournalSyncFile(file=" + this.file.getAbsolutePath() + PasswordMaskingUtil.END_ENC;
        }
    }

    public ReplicationEndpoint(ActiveMQServerImpl activeMQServerImpl, IOCriticalErrorListener iOCriticalErrorListener, boolean z, SharedNothingBackupActivation sharedNothingBackupActivation) {
        this.server = activeMQServerImpl;
        this.criticalErrorListener = iOCriticalErrorListener;
        this.wantedFailBack = z;
        this.activation = sharedNothingBackupActivation;
    }

    public synchronized void registerJournal(byte b, Journal journal) {
        if (this.journals == null || b >= this.journals.length) {
            Journal[] journalArr = this.journals;
            this.journals = new Journal[b + 1];
            if (journalArr != null) {
                System.arraycopy(journalArr, 0, this.journals, 0, journalArr.length);
            }
        }
        this.journals[b] = journal;
    }

    @Override // org.apache.activemq.artemis.core.protocol.core.ChannelHandler
    public void handlePacket(Packet packet) {
        if (logger.isTraceEnabled()) {
            logger.trace("handlePacket::handling " + packet);
        }
        PacketImpl replicationResponseMessage = new ReplicationResponseMessage();
        byte type = packet.getType();
        try {
        } catch (ActiveMQException e) {
            logger.warn(e.getMessage(), e);
            ActiveMQServerLogger.LOGGER.errorHandlingReplicationPacket(e, packet);
            replicationResponseMessage = new ActiveMQExceptionMessage(e);
        } catch (Exception e2) {
            logger.warn(e2.getMessage(), e2);
            ActiveMQServerLogger.LOGGER.errorHandlingReplicationPacket(e2, packet);
            replicationResponseMessage = new ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.replicationUnhandledError(e2));
        }
        if (!this.started) {
            if (logger.isTraceEnabled()) {
                logger.trace("handlePacket::ignoring " + packet);
                return;
            }
            return;
        }
        if (type == 91) {
            handleAppendAddRecord((ReplicationAddMessage) packet);
        } else if (type == 92) {
            handleAppendAddTXRecord((ReplicationAddTXMessage) packet);
        } else if (type == 93) {
            handleAppendDelete((ReplicationDeleteMessage) packet);
        } else if (type == 94) {
            handleAppendDeleteTX((ReplicationDeleteTXMessage) packet);
        } else if (type == 95) {
            handlePrepare((ReplicationPrepareMessage) packet);
        } else if (type == 96) {
            handleCommitRollback((ReplicationCommitMessage) packet);
        } else if (type == 97) {
            handlePageWrite((ReplicationPageWriteMessage) packet);
        } else if (type == 98) {
            handlePageEvent((ReplicationPageEventMessage) packet);
        } else if (type == 99) {
            handleLargeMessageBegin((ReplicationLargeMessageBeginMessage) packet);
        } else if (type == 101) {
            handleLargeMessageWrite((ReplicationLargeMessageWriteMessage) packet);
        } else if (type == 100) {
            handleLargeMessageEnd((ReplicationLargeMessageEndMessage) packet);
        } else if (type == 120) {
            replicationResponseMessage = handleStartReplicationSynchronization((ReplicationStartSyncMessage) packet);
        } else if (type == 103) {
            handleReplicationSynchronization((ReplicationSyncFileMessage) packet);
        } else if (type == 121) {
            handleLiveStopping((ReplicationLiveIsStoppingMessage) packet);
        } else if (type == 116) {
            handleFatalError((BackupReplicationStartFailedMessage) packet);
        } else {
            ActiveMQServerLogger.LOGGER.invalidPacketForReplication(packet);
        }
        if (replicationResponseMessage == null) {
            logger.trace("Response is null, ignoring response");
            return;
        }
        if (logger.isTraceEnabled()) {
            logger.trace("Returning " + replicationResponseMessage);
        }
        this.channel.send(replicationResponseMessage);
    }

    private void handleFatalError(BackupReplicationStartFailedMessage backupReplicationStartFailedMessage) {
        ActiveMQServerLogger.LOGGER.errorStartingReplication(backupReplicationStartFailedMessage.getRegistrationProblem());
        this.server.stopTheServer(false);
    }

    private void handleLiveStopping(ReplicationLiveIsStoppingMessage replicationLiveIsStoppingMessage) throws ActiveMQException {
        this.activation.remoteFailOver(replicationLiveIsStoppingMessage.isFinalMessage());
    }

    @Override // org.apache.activemq.artemis.core.server.ActiveMQComponent
    public boolean isStarted() {
        return this.started;
    }

    @Override // org.apache.activemq.artemis.core.server.ActiveMQComponent
    public synchronized void start() throws Exception {
        Configuration configuration = this.server.getConfiguration();
        try {
            this.storageManager = this.server.getStorageManager();
            this.storageManager.start();
            this.server.getManagementService().setStorageManager(this.storageManager);
            this.journalsHolder.put(AbstractJournalStorageManager.JournalContent.BINDINGS, this.storageManager.getBindingsJournal());
            this.journalsHolder.put(AbstractJournalStorageManager.JournalContent.MESSAGES, this.storageManager.getMessageJournal());
            Iterator it = EnumSet.allOf(AbstractJournalStorageManager.JournalContent.class).iterator();
            while (it.hasNext()) {
                AbstractJournalStorageManager.JournalContent journalContent = (AbstractJournalStorageManager.JournalContent) it.next();
                this.filesReservedForSync.put(journalContent, new HashMap());
                this.journalLoadInformation[journalContent.typeByte] = this.journalsHolder.get(journalContent).loadSyncOnly(Journal.JournalState.SYNCING);
            }
            this.pageManager = new PagingManagerImpl(new PagingStoreFactoryNIO(this.storageManager, configuration.getPagingLocation(), configuration.getJournalBufferTimeout_NIO(), this.server.getScheduledPool(), this.server.getIOExecutorFactory(), configuration.isJournalSyncNonTransactional(), this.criticalErrorListener), this.server.getAddressSettingsRepository());
            this.pageManager.start();
            this.started = true;
        } catch (Exception e) {
            if (this.server.isStarted()) {
                throw e;
            }
        }
    }

    @Override // org.apache.activemq.artemis.core.server.ActiveMQComponent
    public synchronized void stop() throws Exception {
        if (this.started) {
            logger.trace("Stopping endpoint");
            this.started = false;
            OrderedExecutorFactory.flushExecutor(this.executor);
            if (this.channel != null) {
                this.channel.close();
            }
            Iterator<ReplicatedLargeMessage> it = this.largeMessages.values().iterator();
            while (it.hasNext()) {
                it.next().releaseResources();
            }
            this.largeMessages.clear();
            Iterator<Map.Entry<AbstractJournalStorageManager.JournalContent, Map<Long, JournalSyncFile>>> it2 = this.filesReservedForSync.entrySet().iterator();
            while (it2.hasNext()) {
                Iterator<JournalSyncFile> it3 = it2.next().getValue().values().iterator();
                while (it3.hasNext()) {
                    it3.next().close();
                }
            }
            this.filesReservedForSync.clear();
            if (this.journals != null) {
                for (Journal journal : this.journals) {
                    if (journal instanceof FileWrapperJournal) {
                        journal.stop();
                    }
                }
            }
            Iterator<ConcurrentMap<Integer, Page>> it4 = this.pageIndex.values().iterator();
            while (it4.hasNext()) {
                for (Page page : it4.next().values()) {
                    try {
                        page.sync();
                        page.close(false);
                    } catch (Exception e) {
                        ActiveMQServerLogger.LOGGER.errorClosingPageOnReplication(e);
                    }
                }
            }
            this.pageManager.stop();
            this.pageIndex.clear();
            this.storageManager.stop();
            this.started = false;
        }
    }

    public Channel getChannel() {
        return this.channel;
    }

    public void setChannel(Channel channel) {
        this.channel = channel;
    }

    /* JADX WARN: Finally extract failed */
    private synchronized void finishSynchronization(String str) throws Exception {
        if (logger.isTraceEnabled()) {
            logger.trace("BACKUP-SYNC-START: finishSynchronization::" + str);
        }
        Iterator it = EnumSet.allOf(AbstractJournalStorageManager.JournalContent.class).iterator();
        while (it.hasNext()) {
            AbstractJournalStorageManager.JournalContent journalContent = (AbstractJournalStorageManager.JournalContent) it.next();
            Journal remove = this.journalsHolder.remove(journalContent);
            if (logger.isTraceEnabled()) {
                logger.trace("getting lock on " + journalContent + ", journal = " + remove);
            }
            registerJournal(journalContent.typeByte, remove);
            remove.synchronizationLock();
            try {
                if (logger.isTraceEnabled()) {
                    logger.trace("lock acquired on " + journalContent);
                }
                this.filesReservedForSync.remove(journalContent);
                if (logger.isTraceEnabled()) {
                    logger.trace("stopping journal for " + journalContent);
                }
                remove.stop();
                if (logger.isTraceEnabled()) {
                    logger.trace("starting journal for " + journalContent);
                }
                remove.start();
                if (logger.isTraceEnabled()) {
                    logger.trace("loadAndSync " + journalContent);
                }
                remove.loadSyncOnly(Journal.JournalState.SYNCING_UP_TO_DATE);
                if (logger.isTraceEnabled()) {
                    logger.trace("unlocking " + journalContent);
                }
                remove.synchronizationUnlock();
            } catch (Throwable th) {
                if (logger.isTraceEnabled()) {
                    logger.trace("unlocking " + journalContent);
                }
                remove.synchronizationUnlock();
                throw th;
            }
        }
        if (logger.isTraceEnabled()) {
            logger.trace("Sync on large messages...");
        }
        ByteBuffer allocate = ByteBuffer.allocate(4096);
        Iterator<Map.Entry<Long, ReplicatedLargeMessage>> it2 = this.largeMessages.entrySet().iterator();
        while (it2.hasNext()) {
            ReplicatedLargeMessage value = it2.next().getValue();
            if (value instanceof LargeServerMessageInSync) {
                LargeServerMessageInSync largeServerMessageInSync = (LargeServerMessageInSync) value;
                if (logger.isTraceEnabled()) {
                    logger.trace("lmSync on " + largeServerMessageInSync.toString());
                }
                largeServerMessageInSync.joinSyncedData(allocate);
            }
        }
        if (logger.isTraceEnabled()) {
            logger.trace("setRemoteBackupUpToDate and liveIDSet for " + str);
        }
        this.journalsHolder = null;
        this.backupQuorum.liveIDSet(str);
        this.activation.setRemoteBackupUpToDate();
        if (logger.isTraceEnabled()) {
            logger.trace("Backup is synchronized / BACKUP-SYNC-DONE");
        }
        ActiveMQServerLogger.LOGGER.backupServerSynched(this.server);
    }

    private void handleReplicationSynchronization(ReplicationSyncFileMessage replicationSyncFileMessage) throws Exception {
        SequentialFile file;
        long id = replicationSyncFileMessage.getId();
        byte[] data = replicationSyncFileMessage.getData();
        switch (replicationSyncFileMessage.getFileType()) {
            case LARGE_MESSAGE:
                ReplicatedLargeMessage lookupLargeMessage = lookupLargeMessage(id, false, false);
                if (!(lookupLargeMessage instanceof LargeServerMessageInSync)) {
                    ActiveMQServerLogger.LOGGER.largeMessageIncompatible();
                    return;
                } else {
                    file = ((LargeServerMessageInSync) lookupLargeMessage).getSyncFile();
                    break;
                }
            case PAGE:
                file = getPage(replicationSyncFileMessage.getPageStore(), (int) replicationSyncFileMessage.getId()).getFile();
                break;
            case JOURNAL:
                FileChannel channel = this.filesReservedForSync.get(replicationSyncFileMessage.getJournalContent()).get(Long.valueOf(id)).getChannel();
                if (data == null) {
                    channel.close();
                    return;
                } else {
                    channel.write(ByteBuffer.wrap(data));
                    return;
                }
            default:
                throw ActiveMQMessageBundle.BUNDLE.replicationUnhandledFileType(replicationSyncFileMessage.getFileType());
        }
        if (data == null) {
            return;
        }
        if (!file.isOpen()) {
            file.open();
        }
        file.writeDirect(ByteBuffer.wrap(data), false);
    }

    private ReplicationResponseMessageV2 handleStartReplicationSynchronization(ReplicationStartSyncMessage replicationStartSyncMessage) throws Exception {
        if (logger.isTraceEnabled()) {
            logger.trace("handleStartReplicationSynchronization:: nodeID = " + replicationStartSyncMessage);
        }
        ReplicationResponseMessageV2 replicationResponseMessageV2 = new ReplicationResponseMessageV2();
        if (!this.started) {
            return replicationResponseMessageV2;
        }
        if (replicationStartSyncMessage.isSynchronizationFinished()) {
            finishSynchronization(replicationStartSyncMessage.getNodeID());
            replicationResponseMessageV2.setSynchronizationIsFinishedAcknowledgement(true);
            return replicationResponseMessageV2;
        }
        switch (replicationStartSyncMessage.getDataType()) {
            case LargeMessages:
                for (long j : replicationStartSyncMessage.getFileIds()) {
                    createLargeMessage(j, true);
                }
                break;
            case JournalBindings:
            case JournalMessages:
                if (this.wantedFailBack && !replicationStartSyncMessage.isServerToFailBack()) {
                    ActiveMQServerLogger.LOGGER.autoFailBackDenied();
                }
                AbstractJournalStorageManager.JournalContent journalContentType = ReplicationStartSyncMessage.SyncDataType.getJournalContentType(replicationStartSyncMessage.getDataType());
                Journal journal = this.journalsHolder.get(journalContentType);
                if (replicationStartSyncMessage.getNodeID() != null) {
                    this.backupQuorum.liveIDSet(replicationStartSyncMessage.getNodeID());
                }
                Map<Long, JournalSyncFile> map = this.filesReservedForSync.get(journalContentType);
                for (Map.Entry<Long, JournalFile> entry : journal.createFilesForBackupSync(replicationStartSyncMessage.getFileIds()).entrySet()) {
                    map.put(entry.getKey(), new JournalSyncFile(entry.getValue()));
                }
                registerJournal(journalContentType.typeByte, new FileWrapperJournal(journal));
                break;
            default:
                throw ActiveMQMessageBundle.BUNDLE.replicationUnhandledDataType();
        }
        return replicationResponseMessageV2;
    }

    private void handleLargeMessageEnd(final ReplicationLargeMessageEndMessage replicationLargeMessageEndMessage) {
        if (logger.isTraceEnabled()) {
            logger.trace("handleLargeMessageEnd on " + replicationLargeMessageEndMessage.getMessageId());
        }
        final ReplicatedLargeMessage lookupLargeMessage = lookupLargeMessage(replicationLargeMessageEndMessage.getMessageId(), true, false);
        if (lookupLargeMessage != null) {
            lookupLargeMessage.setPendingRecordID(replicationLargeMessageEndMessage.getPendingRecordId());
            this.executor.execute(new Runnable() { // from class: org.apache.activemq.artemis.core.replication.ReplicationEndpoint.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        if (ReplicationEndpoint.logger.isTraceEnabled()) {
                            ReplicationEndpoint.logger.trace("Deleting LargeMessage " + replicationLargeMessageEndMessage.getMessageId() + " on the executor @ handleLargeMessageEnd");
                        }
                        lookupLargeMessage.deleteFile();
                    } catch (Exception e) {
                        ActiveMQServerLogger.LOGGER.errorDeletingLargeMessage(e, replicationLargeMessageEndMessage.getMessageId());
                    }
                }
            });
        }
    }

    private void handleLargeMessageWrite(ReplicationLargeMessageWriteMessage replicationLargeMessageWriteMessage) throws Exception {
        ReplicatedLargeMessage lookupLargeMessage = lookupLargeMessage(replicationLargeMessageWriteMessage.getMessageId(), false, true);
        if (lookupLargeMessage != null) {
            lookupLargeMessage.addBytes(replicationLargeMessageWriteMessage.getBody());
        }
    }

    private ReplicatedLargeMessage lookupLargeMessage(long j, boolean z, boolean z2) {
        ReplicatedLargeMessage replicatedLargeMessage;
        if (z) {
            replicatedLargeMessage = this.largeMessages.remove(Long.valueOf(j));
        } else {
            replicatedLargeMessage = this.largeMessages.get(Long.valueOf(j));
            if (replicatedLargeMessage == null) {
                if (z2) {
                    createLargeMessage(j, false);
                    replicatedLargeMessage = this.largeMessages.get(Long.valueOf(j));
                } else {
                    ActiveMQServerLogger.LOGGER.largeMessageNotAvailable(j);
                }
            }
        }
        return replicatedLargeMessage;
    }

    private void handleLargeMessageBegin(ReplicationLargeMessageBeginMessage replicationLargeMessageBeginMessage) {
        long messageId = replicationLargeMessageBeginMessage.getMessageId();
        createLargeMessage(messageId, false);
        if (logger.isTraceEnabled()) {
            logger.trace("Receiving Large Message Begin " + messageId + " on backup");
        }
    }

    private void createLargeMessage(long j, boolean z) {
        ReplicatedLargeMessage largeServerMessageInSync = z ? new LargeServerMessageInSync(this.storageManager) : this.storageManager.createLargeMessage();
        largeServerMessageInSync.setDurable(true);
        largeServerMessageInSync.setMessageID(j);
        this.largeMessages.put(Long.valueOf(j), largeServerMessageInSync);
    }

    private void handleCommitRollback(ReplicationCommitMessage replicationCommitMessage) throws Exception {
        Journal journal = getJournal(replicationCommitMessage.getJournalID());
        if (replicationCommitMessage.isRollback()) {
            journal.appendRollbackRecord(replicationCommitMessage.getTxId(), false);
        } else {
            journal.appendCommitRecord(replicationCommitMessage.getTxId(), false);
        }
    }

    private void handlePrepare(ReplicationPrepareMessage replicationPrepareMessage) throws Exception {
        getJournal(replicationPrepareMessage.getJournalID()).appendPrepareRecord(replicationPrepareMessage.getTxId(), replicationPrepareMessage.getRecordData(), false);
    }

    private void handleAppendDeleteTX(ReplicationDeleteTXMessage replicationDeleteTXMessage) throws Exception {
        getJournal(replicationDeleteTXMessage.getJournalID()).appendDeleteRecordTransactional(replicationDeleteTXMessage.getTxId(), replicationDeleteTXMessage.getId(), replicationDeleteTXMessage.getRecordData());
    }

    private void handleAppendDelete(ReplicationDeleteMessage replicationDeleteMessage) throws Exception {
        getJournal(replicationDeleteMessage.getJournalID()).appendDeleteRecord(replicationDeleteMessage.getId(), false);
    }

    private void handleAppendAddTXRecord(ReplicationAddTXMessage replicationAddTXMessage) throws Exception {
        Journal journal = getJournal(replicationAddTXMessage.getJournalID());
        if (replicationAddTXMessage.getOperation() == ReplicationManager.ADD_OPERATION_TYPE.UPDATE) {
            journal.appendUpdateRecordTransactional(replicationAddTXMessage.getTxId(), replicationAddTXMessage.getId(), replicationAddTXMessage.getRecordType(), replicationAddTXMessage.getRecordData());
        } else {
            journal.appendAddRecordTransactional(replicationAddTXMessage.getTxId(), replicationAddTXMessage.getId(), replicationAddTXMessage.getRecordType(), replicationAddTXMessage.getRecordData());
        }
    }

    private void handleAppendAddRecord(ReplicationAddMessage replicationAddMessage) throws Exception {
        Journal journal = getJournal(replicationAddMessage.getJournalID());
        if (replicationAddMessage.getRecord() == ReplicationManager.ADD_OPERATION_TYPE.UPDATE) {
            if (logger.isTraceEnabled()) {
                logger.trace("Endpoint appendUpdate id = " + replicationAddMessage.getId());
            }
            journal.appendUpdateRecord(replicationAddMessage.getId(), replicationAddMessage.getJournalRecordType(), replicationAddMessage.getRecordData(), false);
        } else {
            if (logger.isTraceEnabled()) {
                logger.trace("Endpoint append id = " + replicationAddMessage.getId());
            }
            journal.appendAddRecord(replicationAddMessage.getId(), replicationAddMessage.getJournalRecordType(), replicationAddMessage.getRecordData(), false);
        }
    }

    private void handlePageEvent(ReplicationPageEventMessage replicationPageEventMessage) throws Exception {
        Page remove = getPageMap(replicationPageEventMessage.getStoreName()).remove(Integer.valueOf(replicationPageEventMessage.getPageNumber()));
        if (remove == null) {
            remove = getPage(replicationPageEventMessage.getStoreName(), replicationPageEventMessage.getPageNumber());
        }
        if (remove != null) {
            if (!replicationPageEventMessage.isDelete()) {
                remove.close(false);
            } else if (this.deletePages) {
                remove.delete(null);
            }
        }
    }

    private void handlePageWrite(ReplicationPageWriteMessage replicationPageWriteMessage) throws Exception {
        PagedMessage pagedMessage = replicationPageWriteMessage.getPagedMessage();
        pagedMessage.initMessage(this.storageManager);
        getPage(pagedMessage.getMessage().getAddressSimpleString(), replicationPageWriteMessage.getPageNumber()).write(pagedMessage);
    }

    private ConcurrentMap<Integer, Page> getPageMap(SimpleString simpleString) {
        ConcurrentMap<Integer, Page> concurrentMap = this.pageIndex.get(simpleString);
        if (concurrentMap == null) {
            concurrentMap = new ConcurrentHashMap();
            ConcurrentMap<Integer, Page> putIfAbsent = this.pageIndex.putIfAbsent(simpleString, concurrentMap);
            if (putIfAbsent != null) {
                concurrentMap = putIfAbsent;
            }
        }
        return concurrentMap;
    }

    private Page getPage(SimpleString simpleString, int i) throws Exception {
        ConcurrentMap<Integer, Page> pageMap = getPageMap(simpleString);
        Page page = pageMap.get(Integer.valueOf(i));
        if (page == null) {
            page = newPage(i, simpleString, pageMap);
        }
        return page;
    }

    private synchronized Page newPage(int i, SimpleString simpleString, ConcurrentMap<Integer, Page> concurrentMap) throws Exception {
        Page page = concurrentMap.get(Integer.valueOf(i));
        if (page == null) {
            page = this.pageManager.getPageStore(simpleString).createPage(i);
            page.open();
            concurrentMap.put(Integer.valueOf(i), page);
        }
        return page;
    }

    private Journal getJournal(byte b) {
        return this.journals[b];
    }

    public void setBackupQuorum(SharedNothingBackupQuorum sharedNothingBackupQuorum) {
        this.backupQuorum = sharedNothingBackupQuorum;
    }

    public void setExecutor(Executor executor) {
        this.executor = executor;
    }
}
