package org.apache.camel.component.ignite.compute;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.TypeConverter;
import org.apache.camel.component.ignite.IgniteConstants;
import org.apache.camel.impl.DefaultAsyncProducer;
import org.apache.camel.util.MessageHelper;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.compute.ComputeTask;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteReducer;
import org.apache.ignite.lang.IgniteRunnable;

/* loaded from: input_file:org/apache/camel/component/ignite/compute/IgniteComputeProducer.class */
public class IgniteComputeProducer extends DefaultAsyncProducer {
    private IgniteComputeEndpoint endpoint;

    /* loaded from: input_file:org/apache/camel/component/ignite/compute/IgniteComputeProducer$IgniteInCamelClosure.class */
    private static class IgniteInCamelClosure implements IgniteInClosure<IgniteFuture<Object>> {
        private static final long serialVersionUID = 7486030906412223384L;
        private Exchange exchange;
        private AsyncCallback callback;

        private IgniteInCamelClosure() {
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static IgniteInCamelClosure create(Exchange exchange, AsyncCallback asyncCallback) {
            IgniteInCamelClosure igniteInCamelClosure = new IgniteInCamelClosure();
            igniteInCamelClosure.exchange = exchange;
            igniteInCamelClosure.callback = asyncCallback;
            return igniteInCamelClosure;
        }

        public void apply(IgniteFuture<Object> igniteFuture) {
            MessageHelper.copyHeaders(this.exchange.getIn(), this.exchange.getOut(), true);
            try {
                this.exchange.getOut().setBody(igniteFuture.get());
                this.callback.done(false);
            } catch (Exception e) {
                this.exchange.setException(e);
                this.callback.done(false);
            }
        }
    }

    public IgniteComputeProducer(IgniteComputeEndpoint igniteComputeEndpoint) {
        super(igniteComputeEndpoint);
        this.endpoint = igniteComputeEndpoint;
    }

    public boolean process(Exchange exchange, AsyncCallback asyncCallback) {
        IgniteCompute withAsync = this.endpoint.createIgniteCompute().withAsync();
        try {
            switch (executionTypeFor(exchange)) {
                case CALL:
                    doCall(exchange, asyncCallback, withAsync);
                    break;
                case BROADCAST:
                    doBroadcast(exchange, asyncCallback, withAsync);
                    break;
                case EXECUTE:
                    doExecute(exchange, asyncCallback, withAsync);
                    break;
                case RUN:
                    doRun(exchange, asyncCallback, withAsync);
                    break;
                case APPLY:
                    doApply(exchange, asyncCallback, withAsync);
                    break;
                case AFFINITY_CALL:
                    doAffinityCall(exchange, asyncCallback, withAsync);
                    break;
                case AFFINITY_RUN:
                    doAffinityRun(exchange, asyncCallback, withAsync);
                    break;
                default:
                    exchange.setException(new UnsupportedOperationException("Operation not supported by Ignite Compute producer."));
                    return true;
            }
            withAsync.future().listen(IgniteInCamelClosure.create(exchange, asyncCallback));
            return false;
        } catch (Exception e) {
            exchange.setException(e);
            return true;
        }
    }

    private void doCall(Exchange exchange, AsyncCallback asyncCallback, IgniteCompute igniteCompute) throws Exception {
        Object body = exchange.getIn().getBody();
        IgniteReducer igniteReducer = (IgniteReducer) exchange.getIn().getHeader(IgniteConstants.IGNITE_COMPUTE_REDUCER, IgniteReducer.class);
        if (!Collection.class.isAssignableFrom(body.getClass())) {
            if (!IgniteCallable.class.isAssignableFrom(body.getClass())) {
                throw new RuntimeCamelException(String.format("Ignite Compute endpoint with CALL executionType is only supported for IgniteCallable payloads, or collections of them. The payload type was: %s.", body.getClass().getName()));
            }
            igniteCompute.call((IgniteCallable) body);
            return;
        }
        Collection collection = (Collection) body;
        TypeConverter typeConverter = exchange.getContext().getTypeConverter();
        ArrayList arrayList = new ArrayList(collection.size());
        Iterator it = collection.iterator();
        while (it.hasNext()) {
            arrayList.add(typeConverter.mandatoryConvertTo(IgniteCallable.class, it.next()));
        }
        if (igniteReducer != null) {
            igniteCompute.call(arrayList, igniteReducer);
        } else {
            igniteCompute.call(arrayList);
        }
    }

    private void doBroadcast(Exchange exchange, AsyncCallback asyncCallback, IgniteCompute igniteCompute) throws Exception {
        Object body = exchange.getIn().getBody();
        if (IgniteCallable.class.isAssignableFrom(body.getClass())) {
            igniteCompute.broadcast((IgniteCallable) body);
        } else if (IgniteRunnable.class.isAssignableFrom(body.getClass())) {
            igniteCompute.broadcast((IgniteRunnable) body);
        } else {
            if (!IgniteClosure.class.isAssignableFrom(body.getClass())) {
                throw new RuntimeCamelException(String.format("Ignite Compute endpoint with BROADCAST executionType is only supported for IgniteCallable, IgniteRunnable or IgniteClosure payloads. The payload type was: %s.", body.getClass().getName()));
            }
            igniteCompute.broadcast((IgniteClosure) body, exchange.getIn().getHeader(IgniteConstants.IGNITE_COMPUTE_PARAMS));
        }
    }

    private void doExecute(Exchange exchange, AsyncCallback asyncCallback, IgniteCompute igniteCompute) throws Exception {
        Object body = exchange.getIn().getBody();
        Object header = exchange.getIn().getHeader(IgniteConstants.IGNITE_COMPUTE_PARAMS);
        if ((body instanceof Class) && ComputeTask.class.isAssignableFrom((Class) body)) {
            igniteCompute.execute((Class) body, header);
            return;
        }
        if (ComputeTask.class.isAssignableFrom(body.getClass())) {
            igniteCompute.execute((ComputeTask) body, header);
        } else {
            if (this.endpoint.getTaskName() == null) {
                throw new RuntimeCamelException(String.format("Ignite Compute endpoint with EXECUTE executionType is only supported for ComputeTask payloads, Class<ComputeTask> or any payload in conjunction with the task name option. The payload type was: %s.", body.getClass().getName()));
            }
            if (exchange.getIn().getBody() != null) {
                header = exchange.getIn().getBody();
            }
            igniteCompute.execute(this.endpoint.getTaskName(), header);
        }
    }

    private void doRun(Exchange exchange, AsyncCallback asyncCallback, IgniteCompute igniteCompute) throws Exception {
        Object body = exchange.getIn().getBody();
        if (!Collection.class.isAssignableFrom(body.getClass())) {
            if (!IgniteRunnable.class.isAssignableFrom(body.getClass())) {
                throw new RuntimeCamelException(String.format("Ignite Compute endpoint with RUN executionType is only supported for IgniteRunnable payloads, or collections of them. The payload type was: %s.", body.getClass().getName()));
            }
            igniteCompute.run((IgniteRunnable) body);
            return;
        }
        Collection collection = (Collection) body;
        TypeConverter typeConverter = exchange.getContext().getTypeConverter();
        ArrayList arrayList = new ArrayList(collection.size());
        Iterator it = collection.iterator();
        while (it.hasNext()) {
            arrayList.add(typeConverter.mandatoryConvertTo(IgniteRunnable.class, it.next()));
        }
        igniteCompute.run(arrayList);
    }

    private <T, R1, R2> void doApply(Exchange exchange, AsyncCallback asyncCallback, IgniteCompute igniteCompute) throws Exception {
        IgniteClosure igniteClosure = (IgniteClosure) exchange.getIn().getBody(IgniteClosure.class);
        Object header = exchange.getIn().getHeader(IgniteConstants.IGNITE_COMPUTE_PARAMS);
        if (igniteClosure == null || header == null) {
            throw new RuntimeCamelException(String.format("Ignite Compute endpoint with APPLY executionType is only supported for IgniteClosure payloads with parameters. The payload type was: %s.", exchange.getIn().getBody().getClass().getName()));
        }
        IgniteReducer igniteReducer = (IgniteReducer) exchange.getIn().getHeader(IgniteConstants.IGNITE_COMPUTE_REDUCER, IgniteReducer.class);
        if (!Collection.class.isAssignableFrom(header.getClass())) {
            igniteCompute.apply(igniteClosure, header);
            return;
        }
        Collection collection = (Collection) header;
        if (igniteReducer == null) {
            igniteCompute.apply(igniteClosure, collection);
        } else {
            igniteCompute.apply(igniteClosure, collection, igniteReducer);
        }
    }

    private void doAffinityCall(Exchange exchange, AsyncCallback asyncCallback, IgniteCompute igniteCompute) throws Exception {
        IgniteCallable igniteCallable = (IgniteCallable) exchange.getIn().getBody(IgniteCallable.class);
        String str = (String) exchange.getIn().getHeader(IgniteConstants.IGNITE_COMPUTE_AFFINITY_CACHE_NAME, String.class);
        Object header = exchange.getIn().getHeader(IgniteConstants.IGNITE_COMPUTE_AFFINITY_KEY, Object.class);
        if (igniteCallable == null || str == null || header == null) {
            throw new RuntimeCamelException(String.format("Ignite Compute endpoint with AFFINITY_CALL executionType is only supported for IgniteCallable payloads, along with an affinity cache and key. The payload type was: %s.", exchange.getIn().getBody().getClass().getName()));
        }
        igniteCompute.affinityCall(str, header, igniteCallable);
    }

    private void doAffinityRun(Exchange exchange, AsyncCallback asyncCallback, IgniteCompute igniteCompute) throws Exception {
        IgniteRunnable igniteRunnable = (IgniteRunnable) exchange.getIn().getBody(IgniteRunnable.class);
        String str = (String) exchange.getIn().getHeader(IgniteConstants.IGNITE_COMPUTE_AFFINITY_CACHE_NAME, String.class);
        Object header = exchange.getIn().getHeader(IgniteConstants.IGNITE_COMPUTE_AFFINITY_KEY, Object.class);
        if (igniteRunnable == null || str == null || header == null) {
            throw new RuntimeCamelException(String.format("Ignite Compute endpoint with AFFINITY_RUN executionType is only supported for IgniteRunnable payloads, along with an affinity cache and key. The payload type was: %s.", exchange.getIn().getBody().getClass().getName()));
        }
        igniteCompute.affinityRun(str, header, igniteRunnable);
    }

    private IgniteComputeExecutionType executionTypeFor(Exchange exchange) {
        return (IgniteComputeExecutionType) exchange.getIn().getHeader(IgniteConstants.IGNITE_COMPUTE_EXECUTION_TYPE, this.endpoint.getExecutionType(), IgniteComputeExecutionType.class);
    }
}
