package org.kie.kogito.serverless.workflow.rpc;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.Message;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.protobuf.ProtoUtils;
import io.grpc.stub.ClientCalls;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import org.kie.kogito.internal.process.runtime.KogitoWorkItem;
import org.kie.kogito.jackson.utils.JsonObjectUtils;
import org.kie.kogito.serverless.workflow.WorkflowWorkItemHandler;

/* loaded from: input_file:org/kie/kogito/serverless/workflow/rpc/RPCWorkItemHandler.class */
public abstract class RPCWorkItemHandler extends WorkflowWorkItemHandler {
    public static final String NAME = "gRPC";
    public static final String SERVICE_PROP = "serviceName";
    public static final String FILE_PROP = "fileName";
    public static final String METHOD_PROP = "methodName";
    public static final String GRPC_ENUM_DEFAULT_PROPERTY = "kogito.grpc.enum.includeDefault";
    public static final String GRPC_STREAM_TIMEOUT_PROPERTY = "kogito.grpc.stream.timeout";
    public static final boolean GRPC_ENUM_DEFAULT_VALUE = false;
    public static final int GRPC_STREAM_TIMEOUT_VALUE = 20;
    private final Collection<RPCDecorator> decorators;
    private final int streamTimeout;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/kie/kogito/serverless/workflow/rpc/RPCWorkItemHandler$WaitingStreamObserver.class */
    public static class WaitingStreamObserver implements StreamObserver<Message> {
        List<Message> responses = new ArrayList();
        CompletableFuture<List<Message>> responsesFuture = new CompletableFuture<>();
        private final int timeout;

        public WaitingStreamObserver(int i) {
            this.timeout = i;
        }

        public void onNext(Message message) {
            this.responses.add(message);
        }

        public void onError(Throwable th) {
            this.responsesFuture.completeExceptionally(th);
        }

        public void onCompleted() {
            this.responsesFuture.complete(this.responses);
        }

        public List<Message> get() {
            try {
                return this.responsesFuture.get(this.timeout, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IllegalStateException(e);
            } catch (ExecutionException e2) {
                throw new IllegalStateException(getServerStreamErrorMessage(e2.getCause()), e2.getCause());
            } catch (TimeoutException e3) {
                throw new IllegalStateException(String.format("gRPC call timed out after %d seconds", Integer.valueOf(this.timeout)), e3);
            }
        }

        public void checkForServerStreamErrors() {
            if (this.responsesFuture.isCompletedExceptionally()) {
                try {
                    this.responsesFuture.join();
                } catch (CompletionException e) {
                    throw new IllegalStateException(getServerStreamErrorMessage(e.getCause()), e.getCause());
                }
            }
        }

        private String getServerStreamErrorMessage(Throwable th) {
            return String.format("Received an error through gRPC server stream with status: %s", Status.fromThrowable(th));
        }
    }

    public RPCWorkItemHandler() {
        this(false, 20);
    }

    public RPCWorkItemHandler(boolean z, int i) {
        this.decorators = new ArrayList();
        this.streamTimeout = i;
        if (z) {
            this.decorators.add(new DefaultEnumRpcDecorator());
        }
    }

    @Override // org.kie.kogito.serverless.workflow.WorkflowWorkItemHandler
    protected Object internalExecute(KogitoWorkItem kogitoWorkItem, Map<String, Object> map) {
        Map metaData = kogitoWorkItem.getNodeInstance().getNode().getMetaData();
        String str = (String) metaData.get(FILE_PROP);
        String str2 = (String) metaData.get(SERVICE_PROP);
        return doCall(FileDescriptorHolder.get().descriptor().orElseThrow(() -> {
            return new IllegalStateException("Descriptor protobuf/descriptor-sets/output.protobin is not present");
        }), map, getChannel(str, str2), str, str2, (String) metaData.get(METHOD_PROP));
    }

    protected abstract Channel getChannel(String str, String str2);

    private JsonNode doCall(DescriptorProtos.FileDescriptorSet fileDescriptorSet, Map<String, Object> map, Channel channel, String str, String str2, String str3) {
        try {
            Descriptors.ServiceDescriptor serviceDescriptor = (Descriptors.ServiceDescriptor) Objects.requireNonNull(Descriptors.FileDescriptor.buildFrom((DescriptorProtos.FileDescriptorProto) fileDescriptorSet.getFileList().stream().filter(fileDescriptorProto -> {
                return fileDescriptorProto.getName().equals(str);
            }).findFirst().orElseThrow(() -> {
                return new IllegalArgumentException("Cannot find file name " + str);
            }), new Descriptors.FileDescriptor[0], true).findServiceByName(str2), "Cannot find service name " + str2);
            Descriptors.MethodDescriptor methodDescriptor = (Descriptors.MethodDescriptor) Objects.requireNonNull(serviceDescriptor.findMethodByName(str3), "Cannot find method name " + str3);
            MethodDescriptor.MethodType methodType = getMethodType(methodDescriptor);
            ClientCall newCall = channel.newCall(MethodDescriptor.newBuilder().setType(methodType).setFullMethodName(MethodDescriptor.generateFullMethodName(serviceDescriptor.getFullName(), methodDescriptor.getName())).setRequestMarshaller(ProtoUtils.marshaller(DynamicMessage.newBuilder(methodDescriptor.getInputType()).buildPartial())).setResponseMarshaller(ProtoUtils.marshaller(DynamicMessage.newBuilder(methodDescriptor.getOutputType()).buildPartial())).build(), CallOptions.DEFAULT.withWaitForReady());
            if (methodType == MethodDescriptor.MethodType.CLIENT_STREAMING) {
                return asyncStreamingCall(map, methodDescriptor, streamObserver -> {
                    return ClientCalls.asyncClientStreamingCall(newCall, streamObserver);
                }, list -> {
                    return list.isEmpty() ? JsonObjectUtils.fromValue((Object) null) : (JsonNode) list.get(0);
                });
            }
            if (methodType == MethodDescriptor.MethodType.BIDI_STREAMING) {
                return asyncStreamingCall(map, methodDescriptor, streamObserver2 -> {
                    return ClientCalls.asyncBidiStreamingCall(newCall, streamObserver2);
                }, (v0) -> {
                    return JsonObjectUtils.fromValue(v0);
                });
            }
            if (methodType != MethodDescriptor.MethodType.SERVER_STREAMING) {
                return convert((Message) ClientCalls.blockingUnaryCall(newCall, RPCConverterFactory.get().buildMessage(map, DynamicMessage.newBuilder(methodDescriptor.getInputType())).build()), methodDescriptor);
            }
            ArrayList arrayList = new ArrayList();
            ClientCalls.blockingServerStreamingCall(newCall, RPCConverterFactory.get().buildMessage(map, DynamicMessage.newBuilder(methodDescriptor.getInputType())).build()).forEachRemaining(message -> {
                arrayList.add(convert(message, methodDescriptor));
            });
            return JsonObjectUtils.fromValue(arrayList);
        } catch (Descriptors.DescriptorValidationException e) {
            throw new IllegalStateException((Throwable) e);
        }
    }

    private JsonNode convert(Message message, Descriptors.MethodDescriptor methodDescriptor) {
        JsonNode jsonNode = RPCConverterFactory.get().getJsonNode(message);
        Iterator<RPCDecorator> it = this.decorators.iterator();
        while (it.hasNext()) {
            jsonNode = it.next().decorate(jsonNode, methodDescriptor.getOutputType());
        }
        return jsonNode;
    }

    private JsonNode asyncStreamingCall(Map<String, Object> map, Descriptors.MethodDescriptor methodDescriptor, UnaryOperator<StreamObserver<Message>> unaryOperator, Function<List<JsonNode>, JsonNode> function) {
        WaitingStreamObserver waitingStreamObserver = new WaitingStreamObserver(this.streamTimeout);
        StreamObserver streamObserver = (StreamObserver) unaryOperator.apply(waitingStreamObserver);
        Iterator it = ((List) Objects.requireNonNull((List) map.get("ContentData"), "Missing streaming call parameter")).iterator();
        while (it.hasNext()) {
            try {
                streamObserver.onNext(RPCConverterFactory.get().buildMessage(it.next(), DynamicMessage.newBuilder(methodDescriptor.getInputType())).build());
                waitingStreamObserver.checkForServerStreamErrors();
            } catch (Exception e) {
                streamObserver.onError(e);
                throw e;
            }
        }
        streamObserver.onCompleted();
        return function.apply((List) waitingStreamObserver.get().stream().map(message -> {
            return convert(message, methodDescriptor);
        }).collect(Collectors.toList()));
    }

    private static MethodDescriptor.MethodType getMethodType(Descriptors.MethodDescriptor methodDescriptor) {
        DescriptorProtos.MethodDescriptorProto proto = methodDescriptor.toProto();
        return proto.getClientStreaming() ? proto.getServerStreaming() ? MethodDescriptor.MethodType.BIDI_STREAMING : MethodDescriptor.MethodType.CLIENT_STREAMING : proto.getServerStreaming() ? MethodDescriptor.MethodType.SERVER_STREAMING : MethodDescriptor.MethodType.UNARY;
    }
}
