package org.hornetq.core.replication.impl;

import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.journal.IOCriticalErrorListener;
import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.Page;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.impl.PagingManagerImpl;
import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.protocol.core.Channel;
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.PacketImpl;
import org.hornetq.core.protocol.core.impl.wireformat.HornetQExceptionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NullResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationAddMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationAddTXMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCommitMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargemessageEndMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPageEventMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPageWriteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPrepareMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
import org.hornetq.core.replication.ReplicationEndpoint;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.LargeServerMessage;

/* loaded from: input_file:WEB-INF/lib/hornetq-core-2.2.10.Final.jar:org/hornetq/core/replication/impl/ReplicationEndpointImpl.class */
public class ReplicationEndpointImpl implements ReplicationEndpoint {
    private static final Logger log = Logger.getLogger(ReplicationEndpointImpl.class);
    private static final boolean trace = log.isTraceEnabled();
    private final IOCriticalErrorListener criticalErrorListener;
    private final HornetQServer server;
    private Channel channel;
    private Journal[] journals;
    private JournalStorageManager storage;
    private PagingManager pageManager;
    private JournalLoadInformation[] journalLoadInformation;
    private final ConcurrentMap<SimpleString, ConcurrentMap<Integer, Page>> pageIndex = new ConcurrentHashMap();
    private final ConcurrentMap<Long, LargeServerMessage> largeMessages = new ConcurrentHashMap();
    private boolean deletePages = true;

    private static void trace(String str) {
        log.trace(str);
    }

    public ReplicationEndpointImpl(HornetQServer hornetQServer, IOCriticalErrorListener iOCriticalErrorListener) {
        this.server = hornetQServer;
        this.criticalErrorListener = iOCriticalErrorListener;
    }

    @Override // org.hornetq.core.replication.ReplicationEndpoint
    public void registerJournal(byte b, Journal journal) {
        if (this.journals == null || b >= this.journals.length) {
            Journal[] journalArr = this.journals;
            this.journals = new Journal[b + 1];
            if (journalArr != null) {
                for (int i = 0; i < journalArr.length; i++) {
                    this.journals[i] = journalArr[i];
                }
            }
        }
        this.journals[b] = journal;
    }

    @Override // org.hornetq.core.protocol.core.ChannelHandler
    public void handlePacket(Packet packet) {
        PacketImpl replicationResponseMessage = new ReplicationResponseMessage();
        try {
            if (packet.getType() == 91) {
                handleAppendAddRecord((ReplicationAddMessage) packet);
            } else if (packet.getType() == 92) {
                handleAppendAddTXRecord((ReplicationAddTXMessage) packet);
            } else if (packet.getType() == 93) {
                handleAppendDelete((ReplicationDeleteMessage) packet);
            } else if (packet.getType() == 94) {
                handleAppendDeleteTX((ReplicationDeleteTXMessage) packet);
            } else if (packet.getType() == 95) {
                handlePrepare((ReplicationPrepareMessage) packet);
            } else if (packet.getType() == 96) {
                handleCommitRollback((ReplicationCommitMessage) packet);
            } else if (packet.getType() == 97) {
                handlePageWrite((ReplicationPageWriteMessage) packet);
            } else if (packet.getType() == 98) {
                handlePageEvent((ReplicationPageEventMessage) packet);
            } else if (packet.getType() == 99) {
                handleLargeMessageBegin((ReplicationLargeMessageBeingMessage) packet);
            } else if (packet.getType() == 101) {
                handleLargeMessageWrite((ReplicationLargeMessageWriteMessage) packet);
            } else if (packet.getType() == 100) {
                handleLargeMessageEnd((ReplicationLargemessageEndMessage) packet);
            } else if (packet.getType() == 102) {
                handleCompareDataMessage((ReplicationCompareDataMessage) packet);
                replicationResponseMessage = new NullResponseMessage();
            } else {
                log.warn("Packet " + packet + " can't be processed by the ReplicationEndpoint");
            }
        } catch (Exception e) {
            log.warn(e.getMessage(), e);
            replicationResponseMessage = new HornetQExceptionMessage((HornetQException) e);
        }
        this.channel.send(replicationResponseMessage);
    }

    @Override // org.hornetq.core.server.HornetQComponent
    public boolean isStarted() {
        return true;
    }

    @Override // org.hornetq.core.server.HornetQComponent
    public void start() throws Exception {
        Configuration configuration = this.server.getConfiguration();
        this.storage = new JournalStorageManager(configuration, this.server.getExecutorFactory(), this.criticalErrorListener);
        this.storage.start();
        this.server.getManagementService().setStorageManager(this.storage);
        registerJournal((byte) 1, this.storage.getMessageJournal());
        registerJournal((byte) 0, this.storage.getBindingsJournal());
        this.journalLoadInformation = this.storage.loadInternalOnly();
        this.pageManager = new PagingManagerImpl(new PagingStoreFactoryNIO(configuration.getPagingDirectory(), configuration.getJournalBufferSize_NIO(), this.server.getScheduledPool(), this.server.getExecutorFactory(), configuration.isJournalSyncNonTransactional(), this.criticalErrorListener), this.storage, this.server.getAddressSettingsRepository());
        this.pageManager.start();
    }

    @Override // org.hornetq.core.server.HornetQComponent
    public void stop() throws Exception {
        if (this.channel != null) {
            this.channel.close();
        }
        this.storage.stop();
        Iterator<ConcurrentMap<Integer, Page>> it = this.pageIndex.values().iterator();
        while (it.hasNext()) {
            Iterator<Page> it2 = it.next().values().iterator();
            while (it2.hasNext()) {
                try {
                    it2.next().close();
                } catch (Exception e) {
                    log.warn("Error while closing the page on backup", e);
                }
            }
        }
        this.pageIndex.clear();
        Iterator<LargeServerMessage> it3 = this.largeMessages.values().iterator();
        while (it3.hasNext()) {
            it3.next().releaseResources();
        }
        this.largeMessages.clear();
        this.pageManager.stop();
    }

    @Override // org.hornetq.core.replication.ReplicationEndpoint
    public Channel getChannel() {
        return this.channel;
    }

    @Override // org.hornetq.core.replication.ReplicationEndpoint
    public void setChannel(Channel channel) {
        this.channel = channel;
    }

    @Override // org.hornetq.core.replication.ReplicationEndpoint
    public void compareJournalInformation(JournalLoadInformation[] journalLoadInformationArr) throws HornetQException {
        if (this.journalLoadInformation == null || this.journalLoadInformation.length != journalLoadInformationArr.length) {
            throw new HornetQException(0, "Live Node contains more journals than the backup node. Probably a version match error");
        }
        for (int i = 0; i < journalLoadInformationArr.length; i++) {
            if (!journalLoadInformationArr[i].equals(this.journalLoadInformation[i])) {
                log.warn("Journal comparisson mismatch:\n" + journalParametersToString(journalLoadInformationArr));
                throw new HornetQException(104, "Backup node can't connect to the live node as the data differs");
            }
        }
    }

    public void setDeletePages(boolean z) {
        this.deletePages = z;
    }

    private String journalParametersToString(JournalLoadInformation[] journalLoadInformationArr) {
        return "**********************************************************\nparameters:\nBindings = " + journalLoadInformationArr[0] + "\nMessaging = " + journalLoadInformationArr[1] + "\n**********************************************************\nExpected:\nBindings = " + this.journalLoadInformation[0] + "\nMessaging = " + this.journalLoadInformation[1] + "\n**********************************************************";
    }

    private void handleLargeMessageEnd(ReplicationLargemessageEndMessage replicationLargemessageEndMessage) {
        LargeServerMessage lookupLargeMessage = lookupLargeMessage(replicationLargemessageEndMessage.getMessageId(), true);
        if (lookupLargeMessage != null) {
            try {
                lookupLargeMessage.deleteFile();
            } catch (Exception e) {
                log.warn("Error deleting large message ID = " + replicationLargemessageEndMessage.getMessageId(), e);
            }
        }
    }

    private void handleLargeMessageWrite(ReplicationLargeMessageWriteMessage replicationLargeMessageWriteMessage) throws Exception {
        LargeServerMessage lookupLargeMessage = lookupLargeMessage(replicationLargeMessageWriteMessage.getMessageId(), false);
        if (lookupLargeMessage != null) {
            lookupLargeMessage.addBytes(replicationLargeMessageWriteMessage.getBody());
        }
    }

    private void handleCompareDataMessage(ReplicationCompareDataMessage replicationCompareDataMessage) throws HornetQException {
        compareJournalInformation(replicationCompareDataMessage.getJournalInformation());
    }

    private LargeServerMessage lookupLargeMessage(long j, boolean z) {
        LargeServerMessage remove = z ? this.largeMessages.remove(Long.valueOf(j)) : this.largeMessages.get(Long.valueOf(j));
        if (remove == null) {
            log.warn("Large MessageID " + j + "  is not available on backup server. Ignoring replication message");
        }
        return remove;
    }

    private void handleLargeMessageBegin(ReplicationLargeMessageBeingMessage replicationLargeMessageBeingMessage) {
        LargeServerMessage createLargeMessage = this.storage.createLargeMessage();
        createLargeMessage.setDurable(true);
        createLargeMessage.setMessageID(replicationLargeMessageBeingMessage.getMessageId());
        trace("Receiving Large Message " + createLargeMessage.getMessageID() + " on backup");
        this.largeMessages.put(Long.valueOf(createLargeMessage.getMessageID()), createLargeMessage);
    }

    private void handleCommitRollback(ReplicationCommitMessage replicationCommitMessage) throws Exception {
        Journal journal = getJournal(replicationCommitMessage.getJournalID());
        if (replicationCommitMessage.isRollback()) {
            journal.appendRollbackRecord(replicationCommitMessage.getTxId(), false);
        } else {
            journal.appendCommitRecord(replicationCommitMessage.getTxId(), false);
        }
    }

    private void handlePrepare(ReplicationPrepareMessage replicationPrepareMessage) throws Exception {
        getJournal(replicationPrepareMessage.getJournalID()).appendPrepareRecord(replicationPrepareMessage.getTxId(), replicationPrepareMessage.getRecordData(), false);
    }

    private void handleAppendDeleteTX(ReplicationDeleteTXMessage replicationDeleteTXMessage) throws Exception {
        getJournal(replicationDeleteTXMessage.getJournalID()).appendDeleteRecordTransactional(replicationDeleteTXMessage.getTxId(), replicationDeleteTXMessage.getId(), replicationDeleteTXMessage.getRecordData());
    }

    private void handleAppendDelete(ReplicationDeleteMessage replicationDeleteMessage) throws Exception {
        getJournal(replicationDeleteMessage.getJournalID()).appendDeleteRecord(replicationDeleteMessage.getId(), false);
    }

    private void handleAppendAddTXRecord(ReplicationAddTXMessage replicationAddTXMessage) throws Exception {
        Journal journal = getJournal(replicationAddTXMessage.getJournalID());
        if (replicationAddTXMessage.isUpdate()) {
            journal.appendUpdateRecordTransactional(replicationAddTXMessage.getTxId(), replicationAddTXMessage.getId(), replicationAddTXMessage.getRecordType(), replicationAddTXMessage.getRecordData());
        } else {
            journal.appendAddRecordTransactional(replicationAddTXMessage.getTxId(), replicationAddTXMessage.getId(), replicationAddTXMessage.getRecordType(), replicationAddTXMessage.getRecordData());
        }
    }

    private void handleAppendAddRecord(ReplicationAddMessage replicationAddMessage) throws Exception {
        Journal journal = getJournal(replicationAddMessage.getJournalID());
        if (replicationAddMessage.isUpdate()) {
            if (trace) {
                trace("Endpoint appendUpdate id = " + replicationAddMessage.getId());
            }
            journal.appendUpdateRecord(replicationAddMessage.getId(), replicationAddMessage.getRecordType(), replicationAddMessage.getRecordData(), false);
        } else {
            if (trace) {
                trace("Endpoint append id = " + replicationAddMessage.getId());
            }
            journal.appendAddRecord(replicationAddMessage.getId(), replicationAddMessage.getRecordType(), replicationAddMessage.getRecordData(), false);
        }
    }

    private void handlePageEvent(ReplicationPageEventMessage replicationPageEventMessage) throws Exception {
        Page remove = getPageMap(replicationPageEventMessage.getStoreName()).remove(Integer.valueOf(replicationPageEventMessage.getPageNumber()));
        if (remove == null) {
            remove = getPage(replicationPageEventMessage.getStoreName(), replicationPageEventMessage.getPageNumber());
        }
        if (remove != null) {
            if (!replicationPageEventMessage.isDelete()) {
                remove.close();
            } else if (this.deletePages) {
                remove.delete(null);
            }
        }
    }

    private void handlePageWrite(ReplicationPageWriteMessage replicationPageWriteMessage) throws Exception {
        PagedMessage pagedMessage = replicationPageWriteMessage.getPagedMessage();
        pagedMessage.initMessage(this.storage);
        getPage(pagedMessage.getMessage().getAddress(), replicationPageWriteMessage.getPageNumber()).write(pagedMessage);
    }

    private ConcurrentMap<Integer, Page> getPageMap(SimpleString simpleString) {
        ConcurrentMap<Integer, Page> concurrentMap = this.pageIndex.get(simpleString);
        if (concurrentMap == null) {
            concurrentMap = new ConcurrentHashMap();
            ConcurrentMap<Integer, Page> putIfAbsent = this.pageIndex.putIfAbsent(simpleString, concurrentMap);
            if (putIfAbsent != null) {
                concurrentMap = putIfAbsent;
            }
        }
        return concurrentMap;
    }

    private Page getPage(SimpleString simpleString, int i) throws Exception {
        ConcurrentMap<Integer, Page> pageMap = getPageMap(simpleString);
        Page page = pageMap.get(Integer.valueOf(i));
        if (page == null) {
            page = newPage(i, simpleString, pageMap);
        }
        return page;
    }

    private synchronized Page newPage(int i, SimpleString simpleString, ConcurrentMap<Integer, Page> concurrentMap) throws Exception {
        Page page = concurrentMap.get(Integer.valueOf(i));
        if (page == null) {
            page = this.pageManager.getPageStore(simpleString).createPage(i);
            page.open();
            concurrentMap.put(Integer.valueOf(i), page);
        }
        return page;
    }

    private Journal getJournal(byte b) {
        return this.journals[b];
    }
}
