package org.infinispan.spark;

import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.scheduler.StreamingListener;
import org.apache.spark.streaming.scheduler.StreamingListenerBatchCompleted;
import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted;
import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted;
import org.apache.spark.streaming.scheduler.StreamingListenerOutputOperationCompleted;
import org.apache.spark.streaming.scheduler.StreamingListenerOutputOperationStarted;
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverError;
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted;
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStopped;
import org.apache.spark.streaming.scheduler.StreamingListenerStreamingStarted;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.event.ClientEvent;
import org.infinispan.spark.config.ConnectorConfiguration;
import org.infinispan.spark.domain.Person;
import org.infinispan.spark.stream.InfinispanJavaDStream;
import org.infinispan.spark.test.StreamingUtils;
import org.infinispan.spark.test.TestingUtil;
import org.junit.Assert;
import scala.Tuple2;

/* loaded from: input_file:org/infinispan/spark/JavaStreamApiTest.class */
public class JavaStreamApiTest {

    /* loaded from: input_file:org/infinispan/spark/JavaStreamApiTest$ReceiverStartListener.class */
    private class ReceiverStartListener implements StreamingListener {
        private ReceiverStartListener() {
        }

        public void onStreamingStarted(StreamingListenerStreamingStarted streamingListenerStreamingStarted) {
        }

        public void onReceiverStarted(StreamingListenerReceiverStarted streamingListenerReceiverStarted) {
        }

        public void onReceiverError(StreamingListenerReceiverError streamingListenerReceiverError) {
        }

        public void onReceiverStopped(StreamingListenerReceiverStopped streamingListenerReceiverStopped) {
        }

        public void onBatchSubmitted(StreamingListenerBatchSubmitted streamingListenerBatchSubmitted) {
        }

        public void onBatchStarted(StreamingListenerBatchStarted streamingListenerBatchStarted) {
        }

        public void onBatchCompleted(StreamingListenerBatchCompleted streamingListenerBatchCompleted) {
        }

        public void onOutputOperationStarted(StreamingListenerOutputOperationStarted streamingListenerOutputOperationStarted) {
        }

        public void onOutputOperationCompleted(StreamingListenerOutputOperationCompleted streamingListenerOutputOperationCompleted) {
        }
    }

    public void testStreamConsumer(JavaStreamingContext javaStreamingContext, ConnectorConfiguration connectorConfiguration, RemoteCache<Integer, Person> remoteCache) throws InterruptedException {
        InfinispanJavaDStream.writeToInfinispan(StreamingUtils.createJavaReceiverDInputStream(javaStreamingContext, Arrays.asList(new Tuple2(1, createPerson(1)), new Tuple2(2, createPerson(2)), new Tuple2(3, createPerson(3))), Duration.ofMillis(100L)), connectorConfiguration);
        javaStreamingContext.start();
        javaStreamingContext.awaitTerminationOrTimeout(2000L);
        Assert.assertEquals(3L, remoteCache.size());
        Assert.assertEquals("name1", ((Person) remoteCache.get(1)).getName());
        Assert.assertEquals("name2", ((Person) remoteCache.get(2)).getName());
        Assert.assertEquals("name3", ((Person) remoteCache.get(3)).getName());
    }

    public void testStreamProducer(JavaStreamingContext javaStreamingContext, ConnectorConfiguration connectorConfiguration, RemoteCache<Integer, Person> remoteCache) {
        JavaInputDStream createInfinispanInputDStream = InfinispanJavaDStream.createInfinispanInputDStream(javaStreamingContext, StorageLevel.MEMORY_ONLY(), connectorConfiguration);
        HashSet hashSet = new HashSet();
        createInfinispanInputDStream.foreachRDD((javaRDD, time) -> {
            hashSet.addAll(javaRDD.collect());
        });
        javaStreamingContext.start();
        executeAfterReceiverStarted(javaStreamingContext, () -> {
            remoteCache.put(1, createPerson(1));
            remoteCache.put(2, createPerson(2));
            remoteCache.put(3, createPerson(3));
            remoteCache.put(1, createPerson(11));
            remoteCache.remove(2);
        });
        TestingUtil.waitForCondition(() -> {
            return hashSet.size() == 5;
        });
        Assert.assertEquals(3L, hashSet.stream().filter(tuple3 -> {
            return tuple3._3() == ClientEvent.Type.CLIENT_CACHE_ENTRY_CREATED;
        }).count());
        Assert.assertEquals(1L, hashSet.stream().filter(tuple32 -> {
            return tuple32._3() == ClientEvent.Type.CLIENT_CACHE_ENTRY_REMOVED;
        }).count());
        Assert.assertEquals(1L, hashSet.stream().filter(tuple33 -> {
            return tuple33._3() == ClientEvent.Type.CLIENT_CACHE_ENTRY_MODIFIED;
        }).count());
    }

    private Person createPerson(int i) {
        return new Person("name" + i, Integer.valueOf(i), null);
    }

    private void executeAfterReceiverStarted(JavaStreamingContext javaStreamingContext, final Runnable runnable) {
        javaStreamingContext.addStreamingListener(new ReceiverStartListener() { // from class: org.infinispan.spark.JavaStreamApiTest.1
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
            }

            @Override // org.infinispan.spark.JavaStreamApiTest.ReceiverStartListener
            public void onReceiverStarted(StreamingListenerReceiverStarted streamingListenerReceiverStarted) {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                }
                runnable.run();
            }
        });
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -967539458:
                if (implMethodName.equals("lambda$testStreamProducer$ba2284f9$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/VoidFunction2") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/infinispan/spark/JavaStreamApiTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Set;Lorg/apache/spark/api/java/JavaRDD;Lorg/apache/spark/streaming/Time;)V")) {
                    Set set = (Set) serializedLambda.getCapturedArg(0);
                    return (javaRDD, time) -> {
                        set.addAll(javaRDD.collect());
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
