package org.infinispan.server.test.query;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.infinispan.arquillian.core.InfinispanResource;
import org.infinispan.arquillian.core.RemoteInfinispanServer;
import org.infinispan.client.hotrod.Search;
import org.infinispan.protostream.sampledomain.User;
import org.infinispan.query.api.continuous.ContinuousQuery;
import org.infinispan.query.api.continuous.ContinuousQueryListener;
import org.infinispan.query.dsl.Query;
import org.infinispan.server.test.category.Queries;
import org.jboss.arquillian.junit.Arquillian;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(Arquillian.class)
@Category({Queries.class})
/* loaded from: input_file:org/infinispan/server/test/query/RemoteContinuousQueryIT.class */
public class RemoteContinuousQueryIT extends RemoteQueryBaseIT {

    @InfinispanResource("remote-query-1")
    protected RemoteInfinispanServer server;

    public RemoteContinuousQueryIT() {
        super("clustered", "localtestcache");
    }

    @Override // org.infinispan.server.test.query.RemoteQueryBaseIT
    protected RemoteInfinispanServer getServer() {
        return this.server;
    }

    @Test
    public void testContinuousQuery() {
        this.remoteCache.put(1, createUser(1, 25));
        this.remoteCache.put(2, createUser(2, 25));
        this.remoteCache.put(3, createUser(3, 20));
        Assert.assertEquals(3L, this.remoteCache.size());
        Query build = Search.getQueryFactory(this.remoteCache).from(User.class).having("name").eq("user1").and().having("age").gt(20).build();
        final LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        final LinkedBlockingQueue linkedBlockingQueue2 = new LinkedBlockingQueue();
        final LinkedBlockingQueue linkedBlockingQueue3 = new LinkedBlockingQueue();
        ContinuousQueryListener<Integer, User> continuousQueryListener = new ContinuousQueryListener<Integer, User>() { // from class: org.infinispan.server.test.query.RemoteContinuousQueryIT.1
            public void resultJoining(Integer num, User user) {
                linkedBlockingQueue.add(num);
            }

            public void resultUpdated(Integer num, User user) {
                linkedBlockingQueue2.add(num);
            }

            public void resultLeaving(Integer num) {
                linkedBlockingQueue3.add(num);
            }
        };
        ContinuousQuery continuousQuery = Search.getContinuousQuery(this.remoteCache);
        continuousQuery.addContinuousQueryListener(build, continuousQueryListener);
        expectElementsInQueue(linkedBlockingQueue, 1);
        expectElementsInQueue(linkedBlockingQueue2, 0);
        expectElementsInQueue(linkedBlockingQueue3, 0);
        User createUser = createUser(4, 30);
        createUser.setName("user1");
        this.remoteCache.put(4, createUser);
        expectElementsInQueue(linkedBlockingQueue, 1);
        expectElementsInQueue(linkedBlockingQueue2, 0);
        expectElementsInQueue(linkedBlockingQueue3, 0);
        User user = (User) this.remoteCache.get(1);
        user.setAge(19);
        this.remoteCache.put(1, user);
        expectElementsInQueue(linkedBlockingQueue, 0);
        expectElementsInQueue(linkedBlockingQueue2, 0);
        expectElementsInQueue(linkedBlockingQueue3, 1);
        User user2 = (User) this.remoteCache.get(4);
        user2.setAge(32);
        this.remoteCache.put(4, user2);
        expectElementsInQueue(linkedBlockingQueue, 0);
        expectElementsInQueue(linkedBlockingQueue2, 1);
        expectElementsInQueue(linkedBlockingQueue3, 0);
        this.remoteCache.clear();
        expectElementsInQueue(linkedBlockingQueue, 0);
        expectElementsInQueue(linkedBlockingQueue2, 0);
        expectElementsInQueue(linkedBlockingQueue3, 1);
        continuousQuery.removeContinuousQueryListener(continuousQueryListener);
        user.setAge(25);
        this.remoteCache.put(1, user);
        expectElementsInQueue(linkedBlockingQueue, 0);
        expectElementsInQueue(linkedBlockingQueue2, 0);
        expectElementsInQueue(linkedBlockingQueue3, 0);
    }

    private User createUser(int i, int i2) {
        User user = new User();
        user.setId(i);
        user.setName("user" + i);
        user.setAge(Integer.valueOf(i2));
        user.setSurname("Doesn't matter");
        user.setGender(User.Gender.MALE);
        return user;
    }

    private void expectElementsInQueue(BlockingQueue<?> blockingQueue, int i) {
        for (int i2 = 0; i2 < i; i2++) {
            try {
                Assert.assertNotNull("Queue was empty!", blockingQueue.poll(5L, TimeUnit.SECONDS));
            } catch (InterruptedException e) {
                throw new AssertionError("Interrupted while waiting for condition", e);
            }
        }
        try {
            Assert.assertNull("No more elements expected in queue!", blockingQueue.poll(500L, TimeUnit.MILLISECONDS));
        } catch (InterruptedException e2) {
            throw new AssertionError("Interrupted while waiting for condition", e2);
        }
    }
}
