/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.ext.reactivestreams.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.net.impl.ConnectionBase;
import io.vertx.ext.reactivestreams.ReactiveWriteStream;
import java.util.ArrayDeque;
import java.util.HashSet;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class ReactiveWriteStreamImpl<T>
implements ReactiveWriteStream<T> {
    private Set<SubscriptionImpl> subscriptions = new HashSet<SubscriptionImpl>();
    private final Queue<Item<T>> pending = new ArrayDeque<Item<T>>();
    private Handler<Void> drainHandler;
    private int writeQueueMaxSize = 32;
    protected final Context ctx;
    private boolean closed;

    public ReactiveWriteStreamImpl(Vertx vertx) {
        this.ctx = vertx.getOrCreateContext();
    }

    private void checkClosed() {
        if (this.closed) {
            throw new IllegalStateException("Closed");
        }
    }

    public synchronized void subscribe(Subscriber<? super T> subscriber) {
        this.checkClosed();
        Objects.requireNonNull(subscriber);
        SubscriptionImpl sub = new SubscriptionImpl(subscriber);
        if (!this.subscriptions.add(sub)) {
            throw new IllegalStateException("1.10 Cannot subscribe multiple times with the same subscriber.");
        }
        this.ctx.runOnContext(v -> {
            try {
                subscriber.onSubscribe((Subscription)sub);
            }
            catch (Throwable t) {
                this.signalError(sub.subscriber, t);
            }
        });
    }

    @Override
    public synchronized ReactiveWriteStream<T> write(T data) {
        return this.write((Object)data, (Handler)null);
    }

    @Override
    public ReactiveWriteStream<T> write(T data, Handler<AsyncResult<Void>> handler) {
        this.checkClosed();
        this.pending.add(new Item<T>(data, handler));
        this.checkSend();
        return this;
    }

    @Override
    public synchronized ReactiveWriteStream<T> setWriteQueueMaxSize(int maxSize) {
        this.checkClosed();
        if (this.writeQueueMaxSize < 1) {
            throw new IllegalArgumentException("writeQueueMaxSize must be >=1");
        }
        this.writeQueueMaxSize = maxSize;
        return this;
    }

    public synchronized boolean writeQueueFull() {
        this.checkClosed();
        return this.pending.size() >= this.writeQueueMaxSize;
    }

    @Override
    public synchronized ReactiveWriteStream<T> drainHandler(Handler<Void> handler) {
        this.checkClosed();
        this.drainHandler = handler;
        return this;
    }

    @Override
    public synchronized ReactiveWriteStream<T> exceptionHandler(Handler<Throwable> handler) {
        return this;
    }

    @Override
    public void end() {
        this.close();
    }

    public void end(Handler<AsyncResult<Void>> handler) {
        this.close();
        if (handler != null) {
            this.ctx.runOnContext(v -> handler.handle((Object)Future.succeededFuture()));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ReactiveWriteStream<T> close() {
        ReactiveWriteStreamImpl reactiveWriteStreamImpl = this;
        synchronized (reactiveWriteStreamImpl) {
            if (this.closed) {
                return this;
            }
            this.closed = true;
            this.complete();
            this.subscriptions.clear();
            Future closedFut = Future.failedFuture((Throwable)ConnectionBase.CLOSED_EXCEPTION);
            for (Item item : this.pending) {
                Handler<AsyncResult<Void>> handler = item.handler;
                if (handler == null) continue;
                this.ctx.runOnContext(v -> handler.handle((Object)closedFut));
            }
            this.pending.clear();
        }
        return this;
    }

    private synchronized void checkSend() {
        if (!this.subscriptions.isEmpty()) {
            long availableTokens = this.getAvailable();
            long toSend = Math.min(availableTokens, (long)this.pending.size());
            this.takeTokens(toSend);
            for (long i = 0L; i < toSend; ++i) {
                this.sendToSubscribers(this.pending.poll());
            }
            if (this.drainHandler != null && this.pending.size() < this.writeQueueMaxSize) {
                this.callDrainHandler();
            }
        }
    }

    private void callDrainHandler() {
        Handler<Void> dh = this.drainHandler;
        this.ctx.runOnContext(v -> dh.handle(null));
    }

    private long getAvailable() {
        long min = Long.MAX_VALUE;
        for (SubscriptionImpl subscription : this.subscriptions) {
            min = Math.min(subscription.tokens(), min);
        }
        return min;
    }

    private void takeTokens(long toSend) {
        for (SubscriptionImpl subscription : this.subscriptions) {
            subscription.takeTokens(toSend);
        }
    }

    private void complete() {
        for (SubscriptionImpl sub : this.subscriptions) {
            this.ctx.runOnContext(v -> sub.subscriber.onComplete());
        }
    }

    private void sendToSubscribers(Item<T> item) {
        for (SubscriptionImpl sub : this.subscriptions) {
            this.onNext(this.ctx, sub.subscriber, item.value);
        }
        if (item.handler != null) {
            item.handler.handle((Object)Future.succeededFuture());
        }
    }

    protected void onNext(Context context, Subscriber<? super T> subscriber, T data) {
        context.runOnContext(v -> {
            try {
                subscriber.onNext(data);
            }
            catch (Throwable t) {
                this.signalError(subscriber, t);
            }
        });
    }

    private void signalError(Subscriber<? super T> subscriber, Throwable error) {
        this.subscriptions.removeIf(sub -> ((SubscriptionImpl)sub).subscriber == subscriber);
        subscriber.onError(error);
    }

    static class Item<T> {
        final T value;
        final Handler<AsyncResult<Void>> handler;

        Item(T value, Handler<AsyncResult<Void>> handler) {
            this.value = value;
            this.handler = handler;
        }
    }

    public class SubscriptionImpl
    implements Subscription {
        private final Subscriber<? super T> subscriber;
        private final AtomicLong tokens = new AtomicLong(Long.MIN_VALUE);

        private SubscriptionImpl(Subscriber<? super T> subscriber) {
            this.subscriber = subscriber;
        }

        public long tokens() {
            return -(Long.MIN_VALUE - this.tokens.get());
        }

        public void takeTokens(long amount) {
            this.tokens.addAndGet(-amount);
        }

        public void request(long n) {
            if (n > 0L) {
                if (this.tokens.addAndGet(n) > 0L) {
                    ReactiveWriteStreamImpl.this.signalError(this.subscriber, new IllegalStateException("3.17 Subscriber has more then Long.MAX_VALUE (2^63-1) currently pending."));
                } else {
                    ReactiveWriteStreamImpl.this.checkSend();
                }
            } else {
                ReactiveWriteStreamImpl.this.signalError(this.subscriber, new IllegalArgumentException("3.9 Subscriber cannot request less then 1 for the number of elements."));
            }
        }

        public void cancel() {
            ReactiveWriteStreamImpl.this.subscriptions.remove(this);
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            SubscriptionImpl that = (SubscriptionImpl)o;
            return this.subscriber == that.subscriber;
        }

        public int hashCode() {
            return this.subscriber.hashCode();
        }
    }
}

