package reactor.rx.stream.io;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.locks.LockSupport;
import net.openhft.chronicle.Chronicle;
import net.openhft.chronicle.ChronicleQueueBuilder;
import net.openhft.chronicle.ExcerptTailer;
import org.jetbrains.annotations.NotNull;
import org.reactivestreams.Subscriber;
import reactor.core.support.NamedDaemonThreadFactory;
import reactor.fn.Consumer;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
import reactor.io.queue.spec.PersistentQueueSpec;
import reactor.rx.Stream;
import reactor.rx.action.Action;
import reactor.rx.stream.MapStream;
import reactor.rx.subscription.PushSubscription;

/* loaded from: input_file:reactor/rx/stream/io/ChronicleReaderStream.class */
public class ChronicleReaderStream<K, V> extends MapStream<K, V> {
    private final ExecutorService executor;
    protected final String name;
    protected final Codec<Buffer, K, K> keyCodec;
    protected final Codec<Buffer, V, V> valueCodec;
    protected final Chronicle chronicle;
    protected final Map<K, V> localCache;
    protected volatile int consumers;
    protected static final AtomicIntegerFieldUpdater<ChronicleReaderStream> CONSUMER_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ChronicleReaderStream.class, "consumers");

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: reactor.rx.stream.io.ChronicleReaderStream$1, reason: invalid class name */
    /* loaded from: input_file:reactor/rx/stream/io/ChronicleReaderStream$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$reactor$rx$stream$MapStream$Operation = new int[MapStream.Operation.values().length];

        static {
            try {
                $SwitchMap$reactor$rx$stream$MapStream$Operation[MapStream.Operation.put.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$reactor$rx$stream$MapStream$Operation[MapStream.Operation.putAll.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$reactor$rx$stream$MapStream$Operation[MapStream.Operation.remove.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$reactor$rx$stream$MapStream$Operation[MapStream.Operation.clear.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* loaded from: input_file:reactor/rx/stream/io/ChronicleReaderStream$ChronicleSubscription.class */
    class ChronicleSubscription extends PushSubscription<MapStream.Signal<K, V>> {
        final MapStream.MutableSignal<K, V> signalContainer;

        public ChronicleSubscription(Stream<MapStream.Signal<K, V>> stream, Subscriber<? super MapStream.Signal<K, V>> subscriber) {
            super(stream, subscriber);
            this.signalContainer = new MapStream.MutableSignal<>();
        }

        @Override // reactor.rx.subscription.PushSubscription
        public void cancel() {
            super.cancel();
            ChronicleReaderStream.CONSUMER_UPDATER.decrementAndGet(ChronicleReaderStream.this);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // reactor.rx.subscription.PushSubscription
        public void onRequest(final long j) {
            ChronicleReaderStream.this.executor.execute(new Runnable() { // from class: reactor.rx.stream.io.ChronicleReaderStream.ChronicleSubscription.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        ExcerptTailer createTailer = ChronicleReaderStream.this.chronicle.createTailer();
                        long j2 = 0;
                        while (true) {
                            if ((j == Long.MAX_VALUE || j2 < j) && ChronicleSubscription.this.terminated == 0) {
                                if (createTailer.nextIndex()) {
                                    j2++;
                                    ChronicleSubscription.this.readExcerpt(createTailer);
                                    ChronicleSubscription.this.signalContainer.op(MapStream.Operation.put);
                                    ChronicleSubscription.this.signalContainer.key(null);
                                    ChronicleSubscription.this.signalContainer.value(null);
                                } else {
                                    LockSupport.parkNanos(1L);
                                }
                            }
                        }
                    } catch (Throwable th) {
                        ChronicleSubscription.this.subscriber.onError(th);
                    }
                }
            });
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* JADX WARN: Multi-variable type inference failed */
        public void readExcerpt(ExcerptTailer excerptTailer) {
            long position = excerptTailer.position();
            MapStream.Operation operation = (MapStream.Operation) excerptTailer.readEnum(MapStream.Operation.class);
            if (operation == null) {
                excerptTailer.position(position);
                return;
            }
            try {
                switch (AnonymousClass1.$SwitchMap$reactor$rx$stream$MapStream$Operation[operation.ordinal()]) {
                    case 1:
                        onExcerptPut(excerptTailer);
                        return;
                    case 2:
                        int readInt = excerptTailer.readInt();
                        for (int i = 0; i < readInt; i++) {
                            onExcerptPut(excerptTailer);
                        }
                        return;
                    case 3:
                        this.signalContainer.op(MapStream.Operation.remove);
                        this.signalContainer.key(readKey(excerptTailer));
                        break;
                    case Action.RESERVED_SLOTS /* 4 */:
                        this.signalContainer.op(MapStream.Operation.clear);
                        break;
                }
                sync(this.signalContainer);
                this.subscriber.onNext(this.signalContainer);
            } catch (Exception e) {
                this.subscriber.onError(e);
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        private void onExcerptPut(ExcerptTailer excerptTailer) {
            Object readKey = readKey(excerptTailer);
            Object readValue = readValue(excerptTailer);
            this.signalContainer.op(MapStream.Operation.put);
            this.signalContainer.key(readKey);
            this.signalContainer.value(readValue);
            sync(this.signalContainer);
            this.subscriber.onNext(this.signalContainer);
        }

        private V readValue(ExcerptTailer excerptTailer) {
            if (ChronicleReaderStream.this.valueCodec == null) {
                return (V) excerptTailer.readObject();
            }
            ByteBuffer allocate = ByteBuffer.allocate(excerptTailer.readInt());
            excerptTailer.read(allocate);
            allocate.flip();
            return (V) ChronicleReaderStream.this.valueCodec.decoder((Consumer) null).apply(new Buffer(allocate));
        }

        private K readKey(ExcerptTailer excerptTailer) {
            if (ChronicleReaderStream.this.keyCodec == null) {
                return (K) excerptTailer.readObject();
            }
            ByteBuffer allocate = ByteBuffer.allocate(excerptTailer.readInt());
            excerptTailer.read(allocate);
            allocate.flip();
            return (K) ChronicleReaderStream.this.keyCodec.decoder((Consumer) null).apply(new Buffer(allocate));
        }

        public void sync(MapStream.Signal<K, V> signal) {
            switch (AnonymousClass1.$SwitchMap$reactor$rx$stream$MapStream$Operation[signal.op().ordinal()]) {
                case 1:
                    ChronicleReaderStream.this.localCache.put(signal.key(), signal.value());
                    return;
                case 2:
                    ChronicleReaderStream.this.localCache.put(signal.key(), signal.value());
                    return;
                case 3:
                    ChronicleReaderStream.this.localCache.remove(signal.key());
                    return;
                case Action.RESERVED_SLOTS /* 4 */:
                    ChronicleReaderStream.this.localCache.clear();
                    return;
                default:
                    return;
            }
        }
    }

    public ChronicleReaderStream(String str) throws IOException {
        this(str, ChronicleQueueBuilder.indexed(PersistentQueueSpec.DEFAULT_BASE_PATH, str).build());
    }

    public ChronicleReaderStream(String str, Chronicle chronicle) {
        this(str, chronicle, null, null);
    }

    public ChronicleReaderStream(String str, Chronicle chronicle, Codec<Buffer, K, K> codec, Codec<Buffer, V, V> codec2) {
        this.localCache = new ConcurrentHashMap();
        this.consumers = 0;
        this.executor = Executors.newSingleThreadExecutor(new NamedDaemonThreadFactory(str));
        this.keyCodec = codec;
        this.valueCodec = codec2;
        this.chronicle = chronicle;
        this.name = str;
    }

    public void subscribe(Subscriber<? super MapStream.Signal<K, V>> subscriber) {
        CONSUMER_UPDATER.incrementAndGet(this);
        subscriber.onSubscribe(new ChronicleSubscription(this, subscriber));
    }

    @Override // java.util.Map
    public boolean containsKey(Object obj) {
        return this.localCache.containsKey(obj);
    }

    @Override // java.util.Map
    public boolean containsValue(Object obj) {
        return this.localCache.containsValue(obj);
    }

    @Override // java.util.Map
    @NotNull
    public Set<Map.Entry<K, V>> entrySet() {
        return this.localCache.entrySet();
    }

    @Override // java.util.Map
    public boolean equals(Object obj) {
        return this.localCache.equals(obj);
    }

    @Override // java.util.Map
    public V get(Object obj) {
        return this.localCache.get(obj);
    }

    @Override // java.util.Map
    public int hashCode() {
        return this.localCache.hashCode();
    }

    @Override // java.util.Map
    public boolean isEmpty() {
        return this.localCache.isEmpty();
    }

    @Override // java.util.Map
    @NotNull
    public Set<K> keySet() {
        return this.localCache.keySet();
    }

    @Override // java.util.Map
    public void clear() {
        throw new UnsupportedOperationException();
    }

    @Override // java.util.Map
    public V put(K k, V v) {
        throw new UnsupportedOperationException();
    }

    @Override // java.util.Map
    public void putAll(Map<? extends K, ? extends V> map) {
        throw new UnsupportedOperationException();
    }

    @Override // java.util.Map
    public V remove(Object obj) {
        throw new UnsupportedOperationException();
    }

    @Override // java.util.Map
    public int size() {
        return this.localCache.size();
    }

    public Map<K, V> localCache() {
        return this.localCache;
    }

    public Chronicle chronicle() {
        return this.chronicle;
    }

    public Codec<Buffer, K, K> keyCodec() {
        return this.keyCodec;
    }

    public Codec<Buffer, V, V> valueCodec() {
        return this.valueCodec;
    }

    @Override // java.util.Map
    @NotNull
    public Collection<V> values() {
        return this.localCache.values();
    }

    @Override // reactor.rx.Stream
    public String toString() {
        return this.localCache.toString() + "{name=" + this.name + "}";
    }
}
