/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.grpcio.common.impl.stub;

import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.StreamObserver;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.concurrent.OutboundMessageQueue;
import io.vertx.core.streams.WriteStream;

public class GrpcWriteStream<T>
implements WriteStream<T> {
    private static final Object END_SENTINEL = new Object();
    private final OutboundMessageQueue<T> queue;
    private Handler<Void> drainHandler;
    private boolean ended;

    public GrpcWriteStream(ContextInternal context, StreamObserver<T> observer) {
        final CallStreamObserver streamObserver = (CallStreamObserver)observer;
        this.queue = new OutboundMessageQueue<T>(context.executor()){

            public boolean test(T msg) {
                if (msg == END_SENTINEL) {
                    streamObserver.onCompleted();
                    return true;
                }
                boolean ready = streamObserver.isReady();
                if (ready) {
                    streamObserver.onNext(msg);
                }
                return ready;
            }

            protected void handleDrained() {
                Handler<Void> handler = GrpcWriteStream.this.drainHandler();
                if (handler != null) {
                    handler.handle(null);
                }
            }
        };
        streamObserver.setOnReadyHandler(() -> this.queue.tryDrain());
    }

    public WriteStream<T> exceptionHandler(Handler<Throwable> hndlr) {
        return this;
    }

    public Future<Void> write(T data) {
        if (this.ended) {
            throw new IllegalStateException();
        }
        this.queue.write(data);
        return Future.succeededFuture();
    }

    public Future<Void> end() {
        if (this.ended) {
            throw new IllegalStateException();
        }
        this.ended = true;
        this.queue.write(END_SENTINEL);
        return Future.succeededFuture();
    }

    public WriteStream<T> setWriteQueueMaxSize(int i) {
        return this;
    }

    public boolean writeQueueFull() {
        return !this.queue.isWritable();
    }

    public synchronized WriteStream<T> drainHandler(Handler<Void> handler) {
        this.drainHandler = handler;
        return this;
    }

    private synchronized Handler<Void> drainHandler() {
        return this.drainHandler;
    }
}

