/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle;

import java.io.IOException;
import java.io.StreamCorruptedException;
import java.nio.ByteBuffer;
import net.openhft.chronicle.ChronicleQueueBuilder;
import net.openhft.chronicle.Excerpt;
import net.openhft.chronicle.ExcerptAppender;
import net.openhft.chronicle.ExcerptCommon;
import net.openhft.chronicle.ExcerptTailer;
import net.openhft.chronicle.MappingFunction;
import net.openhft.chronicle.tcp.AppenderAdapter;
import net.openhft.chronicle.tcp.AppenderAdapters;
import net.openhft.chronicle.tcp.ChronicleTcp;
import net.openhft.chronicle.tcp.SinkTcp;
import net.openhft.chronicle.tools.ChronicleTools;
import net.openhft.chronicle.tools.ResizableDirectByteBufferBytes;
import net.openhft.chronicle.tools.WrappedChronicle;
import net.openhft.chronicle.tools.WrappedExcerpt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ChronicleQueueSink
extends WrappedChronicle {
    private final SinkTcp connection;
    private final ChronicleQueueBuilder.ReplicaChronicleQueueBuilder builder;
    private final boolean isLocal;
    private final int readSpinCount;
    private volatile boolean closed;
    private ExcerptCommon excerpt;

    ChronicleQueueSink(ChronicleQueueBuilder.ReplicaChronicleQueueBuilder builder, SinkTcp connection) {
        super(builder.chronicle());
        this.connection = connection;
        this.builder = builder.clone();
        this.closed = false;
        this.isLocal = builder.sharedChronicle() && connection.isLocalhost();
        this.excerpt = null;
        this.readSpinCount = builder.readSpinCount();
    }

    @Override
    public void close() throws IOException {
        if (!this.closed) {
            this.closed = true;
            if (this.connection != null) {
                this.connection.close();
            }
        }
        super.close();
    }

    @Override
    public Excerpt createExcerpt() throws IOException {
        return (Excerpt)this.createExcerpt0();
    }

    @Override
    public synchronized ExcerptTailer createTailer() throws IOException {
        return (ExcerptTailer)this.createExcerpt0();
    }

    @Override
    public ExcerptAppender createAppender() throws IOException {
        throw new UnsupportedOperationException();
    }

    private ExcerptCommon createExcerpt0() throws IOException {
        if (this.excerpt != null) {
            throw new IllegalStateException("An excerpt has already been created");
        }
        this.excerpt = this.isLocal ? new StatefulLocalExcerpt(this.wrappedChronicle.createTailer()) : new StatefulExcerpt(this.wrappedChronicle.createTailer());
        return this.excerpt;
    }

    private final class StatefulExcerpt
    extends AbstractStatefulExcerpt {
        private AppenderAdapter adapter;
        private long lastLocalIndex;

        public StatefulExcerpt(ExcerptCommon common) {
            super(common);
            this.adapter = null;
            this.lastLocalIndex = -1L;
            this.withMapping(ChronicleQueueSink.this.builder.withMapping());
        }

        @Override
        protected boolean doReadNext() throws IOException {
            if (this.openConnection()) {
                this.readBuffer.clear();
                this.readBuffer.limit(0);
                if (this.adapter == null) {
                    this.adapter = AppenderAdapters.createAdapter(ChronicleQueueSink.this.wrappedChronicle);
                }
                this.lastLocalIndex = ChronicleQueueSink.this.wrappedChronicle.lastIndex();
                this.subscribe(this.lastLocalIndex);
                return true;
            }
            return false;
        }

        @Override
        protected boolean doReadNextExcerpt() throws IOException {
            if (!ChronicleQueueSink.this.connection.read(this.readBuffer, 12, 20, ChronicleQueueSink.this.readSpinCount)) {
                return false;
            }
            int size = this.readBuffer.getInt();
            long scIndex = this.readBuffer.getLong();
            switch (size) {
                case -128: {
                    return false;
                }
                case -127: {
                    this.adapter.writePaddedEntry();
                    return this.readNextExcerpt();
                }
                case -126: {
                    return this.readNextExcerpt();
                }
            }
            if (size > 0x8000000 || size < 0) {
                throw new StreamCorruptedException("size was " + size);
            }
            if (this.lastLocalIndex != scIndex) {
                this.adapter.startExcerpt(size, scIndex);
                long remaining = size;
                int limit = this.readBuffer.limit();
                int size2 = (int)Math.min((long)this.readBuffer.remaining(), remaining);
                remaining -= (long)size2;
                this.readBuffer.limit(this.readBuffer.position() + size2);
                this.adapter.write(this.readBuffer);
                this.readBuffer.limit(limit);
                while (remaining > 0L) {
                    int size3 = (int)Math.min((long)this.readBuffer.capacity(), remaining);
                    ChronicleQueueSink.this.connection.readUpTo(this.readBuffer, size3, -1);
                    remaining -= (long)this.readBuffer.remaining();
                    this.adapter.write(this.readBuffer);
                }
            } else {
                this.readBuffer.position(this.readBuffer.position() + size);
                return this.readNextExcerpt();
            }
            this.adapter.finish();
            return true;
        }

        @Override
        public void close() {
            if (this.adapter != null) {
                this.adapter.close();
                this.adapter = null;
            }
            super.close();
        }
    }

    private class StatefulLocalExcerpt
    extends AbstractStatefulExcerpt {
        public StatefulLocalExcerpt(ExcerptCommon common) {
            super(common);
        }

        @Override
        protected boolean doReadNext() throws IOException {
            if (this.openConnection()) {
                this.readBuffer.clear();
                this.readBuffer.limit(0);
                return true;
            }
            return false;
        }

        @Override
        protected boolean doReadNextExcerpt() throws IOException {
            this.query(ChronicleQueueSink.this.wrappedChronicle.lastIndex());
            if (ChronicleQueueSink.this.connection.readUpTo(this.readBuffer, 12, ChronicleQueueSink.this.readSpinCount)) {
                int size = this.readBuffer.getInt();
                this.readBuffer.getLong();
                switch (size) {
                    case -128: {
                        return false;
                    }
                    case -127: {
                        return false;
                    }
                    case -126: {
                        return true;
                    }
                }
            }
            return false;
        }
    }

    private abstract class AbstractStatefulExcerpt
    extends WrappedExcerpt {
        protected final Logger logger;
        protected final ResizableDirectByteBufferBytes writeBuffer;
        protected final ByteBuffer readBuffer;
        private long lastReconnectionAttemptMS;
        private long reconnectionIntervalMS;
        private long lastReconnectionAttempt;

        protected AbstractStatefulExcerpt(ExcerptCommon excerpt) {
            super(excerpt);
            this.logger = LoggerFactory.getLogger((String)(this.getClass().getName() + "@" + ChronicleQueueSink.this.connection.toString()));
            this.writeBuffer = new ResizableDirectByteBufferBytes(ChronicleQueueSink.this.builder.minBufferSize());
            this.readBuffer = ChronicleTcp.createBuffer(ChronicleQueueSink.this.builder.minBufferSize());
            this.reconnectionIntervalMS = ChronicleQueueSink.this.builder.reconnectionIntervalMillis();
            this.lastReconnectionAttemptMS = 0L;
            this.lastReconnectionAttempt = 0L;
        }

        @Override
        public boolean nextIndex() {
            return super.nextIndex() || this.readNext() && super.nextIndex();
        }

        @Override
        public boolean index(long index) throws IndexOutOfBoundsException {
            return super.index(index) || index >= 0L && this.readNext() && super.index(index);
        }

        public synchronized void close() {
            try {
                ChronicleQueueSink.this.connection.close();
            }
            catch (IOException e) {
                this.logger.warn("Error closing socketChannel", (Throwable)e);
            }
            super.close();
            ChronicleQueueSink.this.excerpt = null;
        }

        protected boolean openConnection() {
            boolean connected;
            if (!ChronicleQueueSink.this.connection.isOpen()) {
                try {
                    ChronicleQueueSink.this.connection.open();
                }
                catch (IOException e) {
                    // empty catch block
                }
            }
            if (connected = ChronicleQueueSink.this.connection.isOpen()) {
                ChronicleQueueSink.this.builder.connectionListener().onConnect(ChronicleQueueSink.this.connection.socketChannel());
                this.lastReconnectionAttempt = 0L;
                this.lastReconnectionAttemptMS = 0L;
            } else {
                ++this.lastReconnectionAttempt;
                if (ChronicleQueueSink.this.builder.reconnectionWarningThreshold() > 0 && this.lastReconnectionAttempt > (long)ChronicleQueueSink.this.builder.reconnectionWarningThreshold()) {
                    this.logger.warn("Failed to establish a connection {}", (Object)ChronicleTcp.connectionName("", ChronicleQueueSink.this.builder));
                }
            }
            return connected;
        }

        protected boolean shouldConnect() {
            if (this.lastReconnectionAttempt >= (long)ChronicleQueueSink.this.builder.reconnectionAttempts()) {
                long now = System.currentTimeMillis();
                if (now < this.lastReconnectionAttemptMS + this.reconnectionIntervalMS) {
                    return false;
                }
                this.lastReconnectionAttemptMS = now;
            }
            return true;
        }

        protected void subscribe(long index) throws IOException {
            this.writeBuffer.clearAll();
            this.writeBuffer.writeLong(1L);
            this.writeBuffer.writeLong(index);
            MappingFunction mapping = this.withMapping();
            if (mapping != null) {
                this.writeBuffer.writeLong(30L);
                long pos = this.writeBuffer.position();
                this.writeBuffer.skip(4L);
                long start = this.writeBuffer.position();
                this.writeBuffer.writeObject(mapping);
                this.writeBuffer.writeInt(pos, (int)(this.writeBuffer.position() - start));
            }
            this.writeBuffer.setBufferPositionAndLimit(0, this.writeBuffer.position());
            ChronicleQueueSink.this.connection.writeAllOrEOF(this.writeBuffer);
        }

        protected void query(long index) throws IOException {
            this.writeBuffer.clearAll();
            this.writeBuffer.writeLong(10L);
            this.writeBuffer.writeLong(index);
            this.writeBuffer.setBufferPositionAndLimit(0, this.writeBuffer.position());
            ChronicleQueueSink.this.connection.writeAllOrEOF(this.writeBuffer);
        }

        protected boolean readNext() {
            block3: {
                if (!ChronicleQueueSink.this.closed && !ChronicleQueueSink.this.connection.isOpen() && this.shouldConnect()) {
                    try {
                        this.doReadNext();
                    }
                    catch (IOException e) {
                        ChronicleTools.logIOException(this.logger, "Exception reading from socket", e);
                        if (ChronicleQueueSink.this.closed) break block3;
                        ChronicleQueueSink.this.builder.connectionListener().onError(ChronicleQueueSink.this.connection.socketChannel(), e);
                    }
                }
            }
            return !ChronicleQueueSink.this.closed && ChronicleQueueSink.this.connection.isOpen() && this.readNextExcerpt();
        }

        protected boolean readNextExcerpt() {
            try {
                if (!ChronicleQueueSink.this.closed) {
                    return this.doReadNextExcerpt();
                }
            }
            catch (IOException e) {
                ChronicleTools.logIOException(this.logger, "Exception reading from socket", e);
                if (!ChronicleQueueSink.this.closed) {
                    ChronicleQueueSink.this.builder.connectionListener().onError(ChronicleQueueSink.this.connection.socketChannel(), e);
                }
                try {
                    ChronicleQueueSink.this.connection.close();
                    ChronicleQueueSink.this.builder.connectionListener().onDisconnect(ChronicleQueueSink.this.connection.socketChannel());
                }
                catch (IOException e2) {
                    this.logger.warn("Error closing socketChannel", (Throwable)e2);
                }
            }
            return false;
        }

        protected abstract boolean doReadNext() throws IOException;

        protected abstract boolean doReadNextExcerpt() throws IOException;
    }
}

