package org.apache.jena.riot.system;

import java.io.InputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.jena.atlas.iterator.Iter;
import org.apache.jena.atlas.iterator.IteratorSlotted;
import org.apache.jena.atlas.lib.InternalErrorException;
import org.apache.jena.atlas.logging.FmtLog;
import org.apache.jena.graph.Node;
import org.apache.jena.graph.Triple;
import org.apache.jena.riot.Lang;
import org.apache.jena.riot.RDFParser;
import org.apache.jena.riot.RDFParserBuilder;
import org.apache.jena.riot.RiotException;
import org.apache.jena.riot.SysRIOT;
import org.apache.jena.sparql.core.Quad;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/jena-arq-4.6.0.jar:org/apache/jena/riot/system/AsyncParser.class */
public class AsyncParser {
    private static Logger LOG = LoggerFactory.getLogger((Class<?>) AsyncParser.class);
    private static int chunkSize = 100000;
    private static int queueSize = 10;
    private static List<EltStreamRDF> END = List.of();
    private static Function<EltStreamRDF, Triple> elt2Triple = eltStreamRDF -> {
        if (eltStreamRDF.exception != null) {
            throw eltStreamRDF.exception;
        }
        if (eltStreamRDF.quad == null) {
            return eltStreamRDF.triple;
        }
        Node graph = eltStreamRDF.quad.getGraph();
        if (graph == Quad.tripleInQuad || Quad.isDefaultGraph(graph)) {
            return eltStreamRDF.quad.asTriple();
        }
        return null;
    };
    private static Function<EltStreamRDF, Quad> elt2Quad = eltStreamRDF -> {
        if (eltStreamRDF.exception != null) {
            throw eltStreamRDF.exception;
        }
        return eltStreamRDF.triple != null ? Quad.create(Quad.defaultGraphIRI, eltStreamRDF.triple) : eltStreamRDF.quad;
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/jena-arq-4.6.0.jar:org/apache/jena/riot/system/AsyncParser$EltStreamBatcher.class */
    public static class EltStreamBatcher implements Consumer<EltStreamRDF> {
        private final int batchSize;
        private final Consumer<List<EltStreamRDF>> batchDestination;
        private List<EltStreamRDF> elements = null;
        private int count = 0;

        public EltStreamBatcher(Consumer<List<EltStreamRDF>> consumer, int i) {
            this.batchDestination = consumer;
            this.batchSize = i;
        }

        public void startBatching() {
        }

        public void finishBatching() {
            if (this.elements != null) {
                dispatch(this.elements);
                this.elements = null;
            }
            dispatch(AsyncParser.END);
        }

        private <X> boolean isEmpty(List<X> list) {
            return list == null || list.isEmpty();
        }

        @Override // java.util.function.Consumer
        public void accept(EltStreamRDF eltStreamRDF) {
            if (this.elements == null) {
                this.elements = allocChunk();
            }
            this.elements.add(eltStreamRDF);
            maybeDispatch();
        }

        private void maybeDispatch() {
            if (this.elements.size() < this.batchSize) {
                return;
            }
            dispatch(this.elements);
            this.elements = null;
        }

        private void dispatch(List<EltStreamRDF> list) {
            this.count += list.size();
            this.batchDestination.accept(list);
        }

        private List<EltStreamRDF> allocChunk() {
            return new ArrayList(this.batchSize);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/jena-arq-4.6.0.jar:org/apache/jena/riot/system/AsyncParser$EltStreamRDF.class */
    public static class EltStreamRDF {
        Triple triple = null;
        Quad quad = null;
        String prefix = null;
        String iri = null;
        RuntimeException exception = null;

        private EltStreamRDF() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/jena-arq-4.6.0.jar:org/apache/jena/riot/system/AsyncParser$StreamToElements.class */
    public static class StreamToElements implements StreamRDF {
        private final Consumer<EltStreamRDF> destination;

        public StreamToElements(Consumer<EltStreamRDF> consumer) {
            this.destination = consumer;
        }

        @Override // org.apache.jena.riot.system.StreamRDF
        public void start() {
        }

        @Override // org.apache.jena.riot.system.StreamRDF
        public void finish() {
        }

        @Override // org.apache.jena.riot.system.StreamRDF
        public void triple(Triple triple) {
            EltStreamRDF eltStreamRDF = new EltStreamRDF();
            eltStreamRDF.triple = triple;
            deliver(eltStreamRDF);
        }

        @Override // org.apache.jena.riot.system.StreamRDF
        public void quad(Quad quad) {
            EltStreamRDF eltStreamRDF = new EltStreamRDF();
            eltStreamRDF.quad = quad;
            deliver(eltStreamRDF);
        }

        @Override // org.apache.jena.riot.system.StreamRDF
        public void base(String str) {
            EltStreamRDF eltStreamRDF = new EltStreamRDF();
            eltStreamRDF.iri = str;
            deliver(eltStreamRDF);
        }

        @Override // org.apache.jena.riot.system.StreamRDF
        public void prefix(String str, String str2) {
            EltStreamRDF eltStreamRDF = new EltStreamRDF();
            eltStreamRDF.prefix = str;
            eltStreamRDF.iri = str2;
            deliver(eltStreamRDF);
        }

        private void deliver(EltStreamRDF eltStreamRDF) {
            this.destination.accept(eltStreamRDF);
        }
    }

    private AsyncParser() {
    }

    public static void asyncParse(String str, StreamRDF streamRDF) {
        asyncParse((List<String>) List.of(str), streamRDF);
    }

    public static void asyncParse(List<String> list, StreamRDF streamRDF) {
        Objects.requireNonNull(list);
        Objects.requireNonNull(streamRDF);
        LOG.debug("Parse: " + list);
        asyncParseSources(urlsToSource(list), streamRDF);
    }

    public static void asyncParse(InputStream inputStream, Lang lang, String str, StreamRDF streamRDF) {
        Objects.requireNonNull(inputStream);
        Objects.requireNonNull(lang);
        Objects.requireNonNull(streamRDF);
        asyncParseSources(inputStreamToSource(inputStream, lang, str), streamRDF);
    }

    public static void asyncParseSources(List<RDFParserBuilder> list, StreamRDF streamRDF) {
        Logger logger = LOG;
        Logger logger2 = LOG;
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(queueSize);
        startParserThread(logger, list, arrayBlockingQueue);
        receiver(logger2, arrayBlockingQueue, streamRDF);
    }

    private static List<RDFParserBuilder> urlsToSource(List<String> list) {
        return (List) list.stream().map(str -> {
            return RDFParser.source(str);
        }).collect(Collectors.toList());
    }

    private static List<RDFParserBuilder> inputStreamToSource(InputStream inputStream, Lang lang, String str) {
        return List.of(RDFParser.source(inputStream).lang(lang));
    }

    public static Iterator<Triple> asyncParseTriples(String str) {
        return asyncParseTriples((List<String>) List.of(str));
    }

    public static Iterator<Triple> asyncParseTriples(List<String> list) {
        return Iter.iter(asyncParseIterator(urlsToSource(list))).map(elt2Triple).removeNulls();
    }

    public static Iterator<Triple> asyncParseTriples(InputStream inputStream, Lang lang, String str) {
        return Iter.iter(asyncParseIterator(inputStreamToSource(inputStream, lang, str))).map(elt2Triple).removeNulls();
    }

    public static Iterator<Quad> asyncParseQuads(String str) {
        return asyncParseQuads((List<String>) List.of(str));
    }

    public static Iterator<Quad> asyncParseQuads(List<String> list) {
        return Iter.iter(asyncParseIterator(urlsToSource(list))).map(elt2Quad).removeNulls();
    }

    public static Iterator<Quad> asyncParseQuads(InputStream inputStream, Lang lang, String str) {
        return Iter.iter(asyncParseIterator(inputStreamToSource(inputStream, lang, str))).map(elt2Quad).removeNulls();
    }

    private static Iterator<EltStreamRDF> asyncParseIterator(List<RDFParserBuilder> list) {
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(queueSize);
        startParserThread(LOG, list, arrayBlockingQueue);
        return Iter.flatMap(blockingIterator(arrayBlockingQueue, list2 -> {
            return list2 == END;
        }), list3 -> {
            return list3.iterator();
        });
    }

    private static <X> Iterator<X> blockingIterator(final BlockingQueue<X> blockingQueue, final Predicate<X> predicate) {
        return new IteratorSlotted<X>() { // from class: org.apache.jena.riot.system.AsyncParser.1
            boolean ended = false;

            @Override // org.apache.jena.atlas.iterator.IteratorSlotted
            protected X moveToNext() {
                try {
                    X x = (X) blockingQueue.take();
                    if (!predicate.test(x)) {
                        return x;
                    }
                    this.ended = true;
                    return null;
                } catch (InterruptedException e) {
                    this.ended = true;
                    return null;
                }
            }

            @Override // org.apache.jena.atlas.iterator.IteratorSlotted
            protected boolean hasMore() {
                return !this.ended;
            }
        };
    }

    private static void startParserThread(final Logger logger, List<RDFParserBuilder> list, BlockingQueue<List<EltStreamRDF>> blockingQueue) {
        EltStreamBatcher eltStreamBatcher = new EltStreamBatcher(list2 -> {
            try {
                blockingQueue.put(list2);
            } catch (InterruptedException e) {
                FmtLog.error(LOG, e, "Error: %s", e.getMessage());
            }
        }, chunkSize);
        StreamToElements streamToElements = new StreamToElements(eltStreamBatcher);
        ErrorHandler errorHandler = new ErrorHandler() { // from class: org.apache.jena.riot.system.AsyncParser.2
            @Override // org.apache.jena.riot.system.ErrorHandler
            public void warning(String str, long j, long j2) {
                Logger.this.warn(SysRIOT.fmtMessage(str, j, j2));
            }

            @Override // org.apache.jena.riot.system.ErrorHandler
            public void error(String str, long j, long j2) {
                throw new RiotException(SysRIOT.fmtMessage(str, j, j2));
            }

            @Override // org.apache.jena.riot.system.ErrorHandler
            public void fatal(String str, long j, long j2) {
                throw new RiotException(SysRIOT.fmtMessage(str, j, j2));
            }
        };
        Thread thread = new Thread(() -> {
            eltStreamBatcher.startBatching();
            if (logger.isDebugEnabled()) {
                logger.debug("Start parsing");
            }
            try {
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    ((RDFParserBuilder) it.next()).errorHandler(errorHandler).parse(streamToElements);
                }
            } catch (RuntimeException e) {
                EltStreamRDF eltStreamRDF = new EltStreamRDF();
                eltStreamRDF.exception = e;
                eltStreamBatcher.accept(eltStreamRDF);
            } catch (Throwable th) {
                EltStreamRDF eltStreamRDF2 = new EltStreamRDF();
                eltStreamRDF2.exception = new RuntimeException(th);
                eltStreamBatcher.accept(eltStreamRDF2);
            }
            eltStreamBatcher.finishBatching();
            if (logger.isDebugEnabled()) {
                logger.debug("Finish parsing");
            }
        }, "AsyncParser");
        thread.setDaemon(true);
        thread.start();
    }

    private static void dispatch(EltStreamRDF eltStreamRDF, StreamRDF streamRDF) {
        if (eltStreamRDF.triple != null) {
            streamRDF.triple(eltStreamRDF.triple);
            return;
        }
        if (eltStreamRDF.quad != null) {
            streamRDF.quad(eltStreamRDF.quad);
            return;
        }
        if (eltStreamRDF.prefix != null) {
            streamRDF.prefix(eltStreamRDF.prefix, eltStreamRDF.iri);
        } else if (eltStreamRDF.iri != null) {
            streamRDF.base(eltStreamRDF.iri);
        } else {
            if (eltStreamRDF.exception == null) {
                throw new InternalErrorException("Bad EltStreamRDF");
            }
            throw eltStreamRDF.exception;
        }
    }

    private static void receiver(Logger logger, BlockingQueue<List<EltStreamRDF>> blockingQueue, StreamRDF streamRDF) {
        List<EltStreamRDF> take;
        int i = 0;
        while (true) {
            try {
                take = blockingQueue.take();
            } catch (InterruptedException e) {
                FmtLog.error(logger, e, "Interrupted", new Object[0]);
            }
            if (take == END) {
                FmtLog.debug(logger, "Receive: END (%,d)", Integer.valueOf(i));
                return;
            }
            i += take.size();
            if (LOG.isDebugEnabled()) {
                FmtLog.debug(logger, "Receive: Batch : %,d (%,d)", Integer.valueOf(take.size()), Integer.valueOf(i));
            }
            dispatch(take, streamRDF);
        }
    }

    private static void dispatch(List<EltStreamRDF> list, StreamRDF streamRDF) {
        Iterator<EltStreamRDF> it = list.iterator();
        while (it.hasNext()) {
            dispatch(it.next(), streamRDF);
        }
    }
}
