package io.axoniq.axonserver.connector.query.impl;

import io.axoniq.axonserver.connector.ErrorCategory;
import io.axoniq.axonserver.connector.InstructionHandler;
import io.axoniq.axonserver.connector.Registration;
import io.axoniq.axonserver.connector.ReplyChannel;
import io.axoniq.axonserver.connector.ResultStream;
import io.axoniq.axonserver.connector.impl.AbstractAxonServerChannel;
import io.axoniq.axonserver.connector.impl.AbstractBufferedStream;
import io.axoniq.axonserver.connector.impl.AbstractIncomingInstructionStream;
import io.axoniq.axonserver.connector.impl.AsyncRegistration;
import io.axoniq.axonserver.connector.impl.AxonServerManagedChannel;
import io.axoniq.axonserver.connector.impl.ObjectUtils;
import io.axoniq.axonserver.connector.query.QueryChannel;
import io.axoniq.axonserver.connector.query.QueryDefinition;
import io.axoniq.axonserver.connector.query.QueryHandler;
import io.axoniq.axonserver.connector.query.SubscriptionQueryResult;
import io.axoniq.axonserver.grpc.ErrorMessage;
import io.axoniq.axonserver.grpc.FlowControl;
import io.axoniq.axonserver.grpc.InstructionAck;
import io.axoniq.axonserver.grpc.SerializedObject;
import io.axoniq.axonserver.grpc.control.ClientIdentification;
import io.axoniq.axonserver.grpc.query.QueryComplete;
import io.axoniq.axonserver.grpc.query.QueryProviderInbound;
import io.axoniq.axonserver.grpc.query.QueryProviderOutbound;
import io.axoniq.axonserver.grpc.query.QueryRequest;
import io.axoniq.axonserver.grpc.query.QueryResponse;
import io.axoniq.axonserver.grpc.query.QueryServiceGrpc;
import io.axoniq.axonserver.grpc.query.QuerySubscription;
import io.axoniq.axonserver.grpc.query.QueryUpdate;
import io.axoniq.axonserver.grpc.query.QueryUpdateComplete;
import io.axoniq.axonserver.grpc.query.SubscriptionQuery;
import io.axoniq.axonserver.grpc.query.SubscriptionQueryRequest;
import io.axoniq.axonserver.grpc.query.SubscriptionQueryResponse;
import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/axoniq/axonserver/connector/query/impl/QueryChannelImpl.class */
public class QueryChannelImpl extends AbstractAxonServerChannel<QueryProviderOutbound> implements QueryChannel {
    private static final Logger logger = LoggerFactory.getLogger(QueryChannelImpl.class);
    private static final QueryResponse TERMINAL = QueryResponse.newBuilder().setErrorCode("__TERMINAL__").m2412build();
    private final AtomicReference<CallStreamObserver<QueryProviderOutbound>> outboundQueryStream;
    private final Map<QueryDefinition, AtomicInteger> supportedQueries;
    private final ConcurrentMap<String, Set<QueryHandler>> queryHandlers;
    private final ConcurrentMap<Enum<?>, InstructionHandler<QueryProviderInbound, QueryProviderOutbound>> instructionHandlers;
    private final ClientIdentification clientIdentification;
    private final String context;
    private final int permits;
    private final int permitsBatch;
    private final Object queryHandlerMonitor;
    private final Map<String, Set<Registration>> subscriptionQueries;
    private final QueryServiceGrpc.QueryServiceStub queryServiceStub;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/axoniq/axonserver/connector/query/impl/QueryChannelImpl$IncomingQueryInstructionStream.class */
    public class IncomingQueryInstructionStream extends AbstractIncomingInstructionStream<QueryProviderInbound, QueryProviderOutbound> {
        public IncomingQueryInstructionStream(String str, int i, int i2, Consumer<Throwable> consumer, Consumer<CallStreamObserver<QueryProviderOutbound>> consumer2) {
            super(str, i, i2, consumer, consumer2);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.axoniq.axonserver.connector.impl.FlowControlledStream
        public QueryProviderOutbound buildFlowControlMessage(FlowControl flowControl) {
            return QueryProviderOutbound.newBuilder().setFlowControl(flowControl).m2316build();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // io.axoniq.axonserver.connector.impl.AbstractIncomingInstructionStream
        public QueryProviderOutbound buildAckMessage(InstructionAck instructionAck) {
            return QueryProviderOutbound.newBuilder().setAck(instructionAck).m2316build();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.axoniq.axonserver.connector.impl.AbstractIncomingInstructionStream
        public String getInstructionId(QueryProviderInbound queryProviderInbound) {
            return queryProviderInbound.getInstructionId();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.axoniq.axonserver.connector.impl.AbstractIncomingInstructionStream
        public InstructionHandler<QueryProviderInbound, QueryProviderOutbound> getHandler(QueryProviderInbound queryProviderInbound) {
            return queryProviderInbound.getRequestCase() == QueryProviderInbound.RequestCase.SUBSCRIPTION_QUERY_REQUEST ? (InstructionHandler) QueryChannelImpl.this.instructionHandlers.get(queryProviderInbound.getSubscriptionQueryRequest().getRequestCase()) : (InstructionHandler) QueryChannelImpl.this.instructionHandlers.get(queryProviderInbound.getRequestCase());
        }

        @Override // io.axoniq.axonserver.connector.impl.AbstractIncomingInstructionStream
        protected boolean unregisterOutboundStream(CallStreamObserver<QueryProviderOutbound> callStreamObserver) {
            if (!QueryChannelImpl.this.outboundQueryStream.compareAndSet(callStreamObserver, null)) {
                return false;
            }
            QueryChannelImpl.this.cancelAllSubscriptionQueries();
            return true;
        }
    }

    public QueryChannelImpl(ClientIdentification clientIdentification, String str, int i, int i2, ScheduledExecutorService scheduledExecutorService, AxonServerManagedChannel axonServerManagedChannel) {
        super(clientIdentification, scheduledExecutorService, axonServerManagedChannel);
        this.outboundQueryStream = new AtomicReference<>();
        this.supportedQueries = new ConcurrentHashMap();
        this.queryHandlers = new ConcurrentHashMap();
        this.instructionHandlers = new ConcurrentHashMap();
        this.queryHandlerMonitor = new Object();
        this.subscriptionQueries = new ConcurrentHashMap();
        this.clientIdentification = clientIdentification;
        this.context = str;
        this.permits = i;
        this.permitsBatch = i2;
        this.instructionHandlers.put(QueryProviderInbound.RequestCase.QUERY, this::handleQuery);
        this.instructionHandlers.put(QueryProviderInbound.RequestCase.ACK, this::handleAck);
        this.instructionHandlers.put(SubscriptionQueryRequest.RequestCase.GET_INITIAL_RESULT, this::getInitialResult);
        this.instructionHandlers.put(SubscriptionQueryRequest.RequestCase.SUBSCRIBE, this::subscribeToQueryUpdates);
        this.instructionHandlers.put(SubscriptionQueryRequest.RequestCase.UNSUBSCRIBE, this::unsubscribeToQueryUpdates);
        this.queryServiceStub = QueryServiceGrpc.newStub(axonServerManagedChannel);
    }

    private void handleAck(QueryProviderInbound queryProviderInbound, ReplyChannel<QueryProviderOutbound> replyChannel) {
        processAck(queryProviderInbound.getAck());
        replyChannel.complete();
    }

    private void unsubscribeToQueryUpdates(QueryProviderInbound queryProviderInbound, ReplyChannel<QueryProviderOutbound> replyChannel) {
        Set<Registration> remove = this.subscriptionQueries.remove(queryProviderInbound.getSubscriptionQueryRequest().getUnsubscribe().getSubscriptionIdentifier());
        if (remove != null) {
            remove.forEach((v0) -> {
                v0.cancel();
            });
        }
    }

    private void subscribeToQueryUpdates(QueryProviderInbound queryProviderInbound, ReplyChannel<QueryProviderOutbound> replyChannel) {
        SubscriptionQuery subscribe = queryProviderInbound.getSubscriptionQueryRequest().getSubscribe();
        String subscriptionIdentifier = subscribe.getSubscriptionIdentifier();
        this.queryHandlers.getOrDefault(subscribe.getQueryRequest().getQuery(), Collections.emptySet()).forEach(queryHandler -> {
            Registration registerSubscriptionQuery = queryHandler.registerSubscriptionQuery(subscribe, new QueryHandler.UpdateHandler() { // from class: io.axoniq.axonserver.connector.query.impl.QueryChannelImpl.1
                @Override // io.axoniq.axonserver.connector.query.QueryHandler.UpdateHandler
                public void sendUpdate(QueryUpdate queryUpdate) {
                    replyChannel.send(QueryProviderOutbound.newBuilder().setSubscriptionQueryResponse(SubscriptionQueryResponse.newBuilder().setSubscriptionIdentifier(subscriptionIdentifier).setUpdate(queryUpdate).setMessageIdentifier(queryUpdate.getMessageIdentifier()).m2749build()).m2316build());
                    QueryChannelImpl.logger.debug("Subscription Query Update [id: {}] for subscription {}, sent to client {}.", new Object[]{queryUpdate.getMessageIdentifier(), subscribe.getSubscriptionIdentifier(), queryUpdate.getClientId()});
                }

                @Override // io.axoniq.axonserver.connector.query.QueryHandler.UpdateHandler
                public void complete() {
                    QueryUpdateComplete m2558build = QueryUpdateComplete.newBuilder().setClientId(QueryChannelImpl.this.clientIdentification.getClientId()).setComponentName(QueryChannelImpl.this.clientIdentification.getComponentName()).m2558build();
                    replyChannel.send(QueryProviderOutbound.newBuilder().setSubscriptionQueryResponse(SubscriptionQueryResponse.newBuilder().setSubscriptionIdentifier(subscriptionIdentifier).setComplete(m2558build).m2749build()).m2316build());
                    QueryChannelImpl.logger.debug("Subscription Query Update completion sent to client {}.", m2558build.getClientId());
                }
            });
            if (registerSubscriptionQuery != null) {
                this.subscriptionQueries.compute(subscriptionIdentifier, (str, set) -> {
                    return set != null ? set : new CopyOnWriteArraySet();
                }).add(registerSubscriptionQuery);
            }
        });
    }

    @Override // io.axoniq.axonserver.connector.impl.AbstractAxonServerChannel
    public void connect() {
        if (this.queryHandlers.isEmpty()) {
            return;
        }
        doConnectQueryStream();
    }

    private synchronized void doConnectQueryStream() {
        if (this.outboundQueryStream.get() != null) {
            logger.debug("QueryChannel for context '{}' is already connected", this.context);
            return;
        }
        StreamObserver<QueryProviderInbound> incomingQueryInstructionStream = new IncomingQueryInstructionStream(this.clientIdentification.getClientId(), this.permits, this.permitsBatch, this::onConnectionError, this::registerOutboundStream);
        this.queryServiceStub.openStream(incomingQueryInstructionStream);
        CallStreamObserver<QueryProviderOutbound> instructionsForPlatform = incomingQueryInstructionStream.getInstructionsForPlatform();
        this.supportedQueries.keySet().forEach(queryDefinition -> {
            instructionsForPlatform.onNext(buildSubscribeMessage(queryDefinition.getQueryName(), queryDefinition.getResultType(), UUID.randomUUID().toString()));
        });
        incomingQueryInstructionStream.enableFlowControl();
        if (this.outboundQueryStream.get() == instructionsForPlatform) {
            logger.info("QueryChannel for context '{}' connected, {} registrations resubscribed", this.context, Integer.valueOf(this.queryHandlers.size()));
        }
    }

    private void onConnectionError(Throwable th) {
        logger.info("Error on QueryChannel for context {}", this.context, th);
        scheduleReconnect(th);
    }

    private void registerOutboundStream(CallStreamObserver<QueryProviderOutbound> callStreamObserver) {
        ObjectUtils.silently(this.outboundQueryStream.getAndSet(callStreamObserver), (v0) -> {
            v0.onCompleted();
        });
    }

    private QueryProviderOutbound buildSubscribeMessage(String str, String str2, String str3) {
        return QueryProviderOutbound.newBuilder().setInstructionId(str3).setSubscribe(QuerySubscription.newBuilder().setMessageId(str3).setQuery(str).setResultName(str2).setClientId(this.clientIdentification.getClientId()).setComponentName(this.clientIdentification.getComponentName())).m2316build();
    }

    @Override // io.axoniq.axonserver.connector.query.QueryChannel
    public Registration registerQueryHandler(QueryHandler queryHandler, QueryDefinition... queryDefinitionArr) {
        CompletableFuture<Void> completedFuture = CompletableFuture.completedFuture(null);
        synchronized (this.queryHandlerMonitor) {
            if (this.queryHandlers.isEmpty()) {
                doConnectQueryStream();
            }
            for (QueryDefinition queryDefinition : queryDefinitionArr) {
                this.queryHandlers.computeIfAbsent(queryDefinition.getQueryName(), str -> {
                    return new CopyOnWriteArraySet();
                }).add(queryHandler);
                if (this.supportedQueries.computeIfAbsent(queryDefinition, queryDefinition2 -> {
                    return new AtomicInteger();
                }).getAndIncrement() == 0) {
                    completedFuture = CompletableFuture.allOf(completedFuture, sendInstruction(buildSubscribeMessage(queryDefinition.getQueryName(), queryDefinition.getResultType(), UUID.randomUUID().toString()), (v0) -> {
                        return v0.getInstructionId();
                    }, this.outboundQueryStream.get()));
                }
                logger.info("Registered handler for query '{}' in context '{}'", queryDefinition, this.context);
            }
        }
        return new AsyncRegistration(completedFuture, () -> {
            CompletableFuture<Void> completableFuture;
            synchronized (this.queryHandlerMonitor) {
                CompletableFuture<Void> completedFuture2 = CompletableFuture.completedFuture(null);
                for (QueryDefinition queryDefinition3 : queryDefinitionArr) {
                    Set<QueryHandler> set = this.queryHandlers.get(queryDefinition3.getQueryName());
                    if (set != null && set.remove(queryHandler) && set.isEmpty()) {
                        this.queryHandlers.remove(queryDefinition3.getQueryName());
                        completedFuture2 = CompletableFuture.allOf(completedFuture2, sendUnsubscribe(queryDefinition3));
                        logger.debug("Unregistered handlers for query '{}' in context '{}'", queryDefinition3, this.context);
                    }
                    this.supportedQueries.computeIfPresent(queryDefinition3, (queryDefinition4, atomicInteger) -> {
                        if (atomicInteger.decrementAndGet() == 0) {
                            return null;
                        }
                        return atomicInteger;
                    });
                }
                completableFuture = completedFuture2;
            }
            return completableFuture;
        });
    }

    private CompletableFuture<Void> sendUnsubscribe(QueryDefinition queryDefinition) {
        String uuid = UUID.randomUUID().toString();
        return sendInstruction(QueryProviderOutbound.newBuilder().setInstructionId(uuid).setUnsubscribe(QuerySubscription.newBuilder().setMessageId(uuid).setQuery(queryDefinition.getQueryName()).setResultName(queryDefinition.getResultType()).setClientId(this.clientIdentification.getClientId()).setComponentName(this.clientIdentification.getComponentName()).m2463build()).m2316build(), (v0) -> {
            return v0.getInstructionId();
        }, this.outboundQueryStream.get());
    }

    @Override // io.axoniq.axonserver.connector.query.QueryChannel
    public ResultStream<QueryResponse> query(QueryRequest queryRequest) {
        StreamObserver<QueryResponse> streamObserver = new AbstractBufferedStream<QueryResponse, QueryRequest>(this.clientIdentification.getClientId(), 32, 8) { // from class: io.axoniq.axonserver.connector.query.impl.QueryChannelImpl.2
            /* JADX INFO: Access modifiers changed from: protected */
            @Override // io.axoniq.axonserver.connector.impl.FlowControlledStream
            public QueryRequest buildFlowControlMessage(FlowControl flowControl) {
                return null;
            }

            /* JADX INFO: Access modifiers changed from: protected */
            @Override // io.axoniq.axonserver.connector.impl.FlowControlledBuffer
            public QueryResponse terminalMessage() {
                return QueryChannelImpl.TERMINAL;
            }

            @Override // io.axoniq.axonserver.connector.impl.AbstractBufferedStream, io.axoniq.axonserver.connector.impl.FlowControlledBuffer, io.axoniq.axonserver.connector.ResultStream, java.lang.AutoCloseable
            public void close() {
            }
        };
        this.queryServiceStub.query(queryRequest, streamObserver);
        return streamObserver;
    }

    @Override // io.axoniq.axonserver.connector.query.QueryChannel
    public SubscriptionQueryResult subscriptionQuery(final QueryRequest queryRequest, SerializedObject serializedObject, int i, int i2) {
        if (!ObjectUtils.hasLength(queryRequest.getMessageIdentifier())) {
            throw new IllegalArgumentException("QueryRequest must contain message identifier.");
        }
        final String messageIdentifier = queryRequest.getMessageIdentifier();
        final CompletableFuture completableFuture = new CompletableFuture();
        final StreamObserver<SubscriptionQueryResponse> subscriptionQueryStream = new SubscriptionQueryStream(messageIdentifier, completableFuture, this.clientIdentification.getClientId(), i, i2);
        final StreamObserver<SubscriptionQueryRequest> subscription = this.queryServiceStub.subscription(subscriptionQueryStream);
        subscriptionQueryStream.enableFlowControl();
        subscription.onNext(SubscriptionQueryRequest.newBuilder().setSubscribe(SubscriptionQuery.newBuilder().setQueryRequest(queryRequest).setSubscriptionIdentifier(messageIdentifier).setUpdateResponseType(serializedObject).m2652build()).m2700build());
        return new SubscriptionQueryResult() { // from class: io.axoniq.axonserver.connector.query.impl.QueryChannelImpl.3
            private final AtomicBoolean initialResultRequested = new AtomicBoolean();

            @Override // io.axoniq.axonserver.connector.query.SubscriptionQueryResult
            public CompletableFuture<QueryResponse> initialResult() {
                if (!completableFuture.isDone() && !this.initialResultRequested.getAndSet(true)) {
                    subscription.onNext(SubscriptionQueryRequest.newBuilder().setGetInitialResult(SubscriptionQuery.newBuilder().setQueryRequest(queryRequest).setSubscriptionIdentifier(messageIdentifier)).m2700build());
                }
                return completableFuture;
            }

            @Override // io.axoniq.axonserver.connector.query.SubscriptionQueryResult
            public ResultStream<QueryUpdate> updates() {
                return subscriptionQueryStream.buffer();
            }
        };
    }

    @Override // io.axoniq.axonserver.connector.impl.AbstractAxonServerChannel
    public synchronized void disconnect() {
        ObjectUtils.doIfNotNull(this.outboundQueryStream.getAndSet(null), (v0) -> {
            v0.onCompleted();
        });
        cancelAllSubscriptionQueries();
    }

    @Override // io.axoniq.axonserver.connector.impl.AbstractAxonServerChannel
    public void reconnect() {
        disconnect();
        scheduleImmediateReconnect();
    }

    @Override // io.axoniq.axonserver.connector.query.QueryChannel
    public CompletableFuture<Void> prepareDisconnect() {
        CompletableFuture<Void> completableFuture = (CompletableFuture) this.supportedQueries.keySet().stream().map(this::sendUnsubscribe).reduce((completableFuture2, completableFuture3) -> {
            return CompletableFuture.allOf(completableFuture2, completableFuture3);
        }).orElseGet(() -> {
            return CompletableFuture.completedFuture(null);
        });
        cancelAllSubscriptionQueries();
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void cancelAllSubscriptionQueries() {
        this.subscriptionQueries.forEach((str, set) -> {
            this.subscriptionQueries.remove(str).forEach((v0) -> {
                v0.cancel();
            });
        });
    }

    @Override // io.axoniq.axonserver.connector.impl.AbstractAxonServerChannel
    public boolean isReady() {
        return this.outboundQueryStream.get() != null || this.queryHandlers.isEmpty();
    }

    private void doHandleQuery(QueryProviderInbound queryProviderInbound, ReplyChannel<QueryResponse> replyChannel) {
        doHandleQuery(queryProviderInbound.getQuery(), replyChannel);
    }

    private void doHandleQuery(QueryRequest queryRequest, ReplyChannel<QueryResponse> replyChannel) {
        Set<QueryHandler> orDefault = this.queryHandlers.getOrDefault(queryRequest.getQuery(), Collections.emptySet());
        if (orDefault.isEmpty()) {
            replyChannel.sendNack();
            replyChannel.sendLast(QueryResponse.newBuilder().setRequestIdentifier(queryRequest.getMessageIdentifier()).setErrorCode(ErrorCategory.NO_HANDLER_FOR_QUERY.errorCode()).setErrorMessage(ErrorMessage.newBuilder().setMessage("No handler for query").m64build()).m2412build());
        }
        replyChannel.sendAck();
        AtomicInteger atomicInteger = new AtomicInteger(orDefault.size());
        orDefault.forEach(queryHandler -> {
            queryHandler.handle(queryRequest, new ReplyChannel<QueryResponse>() { // from class: io.axoniq.axonserver.connector.query.impl.QueryChannelImpl.4
                @Override // io.axoniq.axonserver.connector.ReplyChannel
                public void send(QueryResponse queryResponse) {
                    if (queryRequest.getMessageIdentifier().equals(queryResponse.getRequestIdentifier())) {
                        replyChannel.send(queryResponse);
                        return;
                    }
                    QueryChannelImpl.logger.debug("RequestIdentifier not properly set, modifying message");
                    replyChannel.send(queryResponse.m2376toBuilder().setRequestIdentifier(queryRequest.getMessageIdentifier()).m2412build());
                }

                @Override // io.axoniq.axonserver.connector.ReplyChannel
                public void complete() {
                    if (atomicInteger.decrementAndGet() == 0) {
                        replyChannel.complete();
                    }
                }

                @Override // io.axoniq.axonserver.connector.ReplyChannel
                public void completeWithError(ErrorMessage errorMessage) {
                    replyChannel.completeWithError(errorMessage);
                }

                @Override // io.axoniq.axonserver.connector.ReplyChannel
                public void completeWithError(ErrorCategory errorCategory, String str) {
                    replyChannel.completeWithError(errorCategory, str);
                }

                @Override // io.axoniq.axonserver.connector.ReplyChannel
                public void sendNack(ErrorMessage errorMessage) {
                    replyChannel.sendNack(errorMessage);
                }

                @Override // io.axoniq.axonserver.connector.ReplyChannel
                public void sendAck() {
                    replyChannel.sendAck();
                }
            });
        });
    }

    private void handleQuery(final QueryProviderInbound queryProviderInbound, final ReplyChannel<QueryProviderOutbound> replyChannel) {
        doHandleQuery(queryProviderInbound, new ReplyChannel<QueryResponse>() { // from class: io.axoniq.axonserver.connector.query.impl.QueryChannelImpl.5
            @Override // io.axoniq.axonserver.connector.ReplyChannel
            public void send(QueryResponse queryResponse) {
                replyChannel.send(QueryProviderOutbound.newBuilder().setQueryResponse(queryResponse).m2316build());
            }

            @Override // io.axoniq.axonserver.connector.ReplyChannel
            public void complete() {
                replyChannel.send(QueryProviderOutbound.newBuilder().setQueryComplete(QueryComplete.newBuilder().setRequestId(queryProviderInbound.getQuery().getMessageIdentifier()).setMessageId(UUID.randomUUID().toString()).m2219build()).m2316build());
                replyChannel.complete();
            }

            @Override // io.axoniq.axonserver.connector.ReplyChannel
            public void completeWithError(ErrorMessage errorMessage) {
                replyChannel.completeWithError(errorMessage);
            }

            @Override // io.axoniq.axonserver.connector.ReplyChannel
            public void completeWithError(ErrorCategory errorCategory, String str) {
                replyChannel.completeWithError(errorCategory, str);
            }

            @Override // io.axoniq.axonserver.connector.ReplyChannel
            public void sendNack(ErrorMessage errorMessage) {
                replyChannel.sendNack(errorMessage);
            }

            @Override // io.axoniq.axonserver.connector.ReplyChannel
            public void sendAck() {
                replyChannel.sendAck();
            }
        });
    }

    private void getInitialResult(QueryProviderInbound queryProviderInbound, final ReplyChannel<QueryProviderOutbound> replyChannel) {
        final String subscriptionIdentifier = queryProviderInbound.getSubscriptionQueryRequest().getGetInitialResult().getSubscriptionIdentifier();
        doHandleQuery(queryProviderInbound.getSubscriptionQueryRequest().getGetInitialResult().getQueryRequest(), new ReplyChannel<QueryResponse>() { // from class: io.axoniq.axonserver.connector.query.impl.QueryChannelImpl.6
            @Override // io.axoniq.axonserver.connector.ReplyChannel
            public void send(QueryResponse queryResponse) {
                replyChannel.send(QueryProviderOutbound.newBuilder().setSubscriptionQueryResponse(SubscriptionQueryResponse.newBuilder().setSubscriptionIdentifier(subscriptionIdentifier).setInitialResult(queryResponse).setMessageIdentifier(queryResponse.getMessageIdentifier()).m2749build()).m2316build());
            }

            @Override // io.axoniq.axonserver.connector.ReplyChannel
            public void complete() {
                replyChannel.sendAck();
            }

            @Override // io.axoniq.axonserver.connector.ReplyChannel
            public void completeWithError(ErrorMessage errorMessage) {
                replyChannel.completeWithError(errorMessage);
            }

            @Override // io.axoniq.axonserver.connector.ReplyChannel
            public void completeWithError(ErrorCategory errorCategory, String str) {
                replyChannel.completeWithError(errorCategory, str);
            }

            @Override // io.axoniq.axonserver.connector.ReplyChannel
            public void sendNack(ErrorMessage errorMessage) {
                replyChannel.sendNack(errorMessage);
            }

            @Override // io.axoniq.axonserver.connector.ReplyChannel
            public void sendAck() {
                replyChannel.sendAck();
            }
        });
    }
}
