/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.server.functional;

import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.Search;
import org.infinispan.protostream.sampledomain.User;
import org.infinispan.query.api.continuous.ContinuousQuery;
import org.infinispan.query.api.continuous.ContinuousQueryListener;
import org.infinispan.query.dsl.FilterConditionContext;
import org.infinispan.query.dsl.Query;
import org.infinispan.query.dsl.QueryFactory;
import org.infinispan.server.functional.ClusteredIT;
import org.infinispan.server.test.InfinispanServerRule;
import org.infinispan.server.test.InfinispanServerTestMethodRule;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class HotRodCacheContinuousQueries {
    @ClassRule
    public static InfinispanServerRule SERVERS = ClusteredIT.SERVERS;
    @Rule
    public InfinispanServerTestMethodRule SERVER_TEST = new InfinispanServerTestMethodRule(SERVERS);
    private final boolean indexed;

    @Parameterized.Parameters(name="{0}")
    public static Collection<Object[]> data() {
        ArrayList<Object[]> data = new ArrayList<Object[]>();
        data.add(new Object[]{true});
        data.add(new Object[]{false});
        return data;
    }

    public HotRodCacheContinuousQueries(boolean indexed) {
        this.indexed = indexed;
    }

    @Test
    public void testQueries() {
        RemoteCache remoteCache = ClusteredIT.createQueryableCache(this.SERVER_TEST, this.indexed);
        remoteCache.put((Object)1, (Object)this.createUser(1, 25));
        remoteCache.put((Object)2, (Object)this.createUser(2, 25));
        remoteCache.put((Object)3, (Object)this.createUser(3, 20));
        Assert.assertEquals((long)3L, (long)remoteCache.size());
        QueryFactory qf = Search.getQueryFactory(remoteCache);
        Query query = ((FilterConditionContext)qf.from(User.class).having("name").eq((Object)"user1")).and().having("age").gt((Object)20).build();
        final LinkedBlockingQueue joined = new LinkedBlockingQueue();
        final LinkedBlockingQueue updated = new LinkedBlockingQueue();
        final LinkedBlockingQueue left = new LinkedBlockingQueue();
        ContinuousQueryListener<Integer, User> listener = new ContinuousQueryListener<Integer, User>(){

            public void resultJoining(Integer key, User value) {
                joined.add(key);
            }

            public void resultUpdated(Integer key, User value) {
                updated.add(key);
            }

            public void resultLeaving(Integer key) {
                left.add(key);
            }
        };
        ContinuousQuery continuousQuery = Search.getContinuousQuery(remoteCache);
        continuousQuery.addContinuousQueryListener(query, (ContinuousQueryListener)listener);
        this.expectElementsInQueue(joined, 1);
        this.expectElementsInQueue(updated, 0);
        this.expectElementsInQueue(left, 0);
        User user4 = this.createUser(4, 30);
        user4.setName("user1");
        remoteCache.put((Object)4, (Object)user4);
        this.expectElementsInQueue(joined, 1);
        this.expectElementsInQueue(updated, 0);
        this.expectElementsInQueue(left, 0);
        User user1 = (User)remoteCache.get((Object)1);
        user1.setAge(Integer.valueOf(19));
        remoteCache.put((Object)1, (Object)user1);
        this.expectElementsInQueue(joined, 0);
        this.expectElementsInQueue(updated, 0);
        this.expectElementsInQueue(left, 1);
        user4 = (User)remoteCache.get((Object)4);
        user4.setAge(Integer.valueOf(32));
        remoteCache.put((Object)4, (Object)user4);
        this.expectElementsInQueue(joined, 0);
        this.expectElementsInQueue(updated, 1);
        this.expectElementsInQueue(left, 0);
        remoteCache.clear();
        this.expectElementsInQueue(joined, 0);
        this.expectElementsInQueue(updated, 0);
        this.expectElementsInQueue(left, 1);
        continuousQuery.removeContinuousQueryListener((ContinuousQueryListener)listener);
        user1.setAge(Integer.valueOf(25));
        remoteCache.put((Object)1, (Object)user1);
        this.expectElementsInQueue(joined, 0);
        this.expectElementsInQueue(updated, 0);
        this.expectElementsInQueue(left, 0);
    }

    private User createUser(int id, int age) {
        User user = new User();
        user.setId(id);
        user.setName("user" + id);
        user.setAge(Integer.valueOf(age));
        user.setSurname("Doesn't matter");
        user.setGender(User.Gender.MALE);
        return user;
    }

    private void expectElementsInQueue(BlockingQueue<?> queue, int numElements) {
        for (int i = 0; i < numElements; ++i) {
            try {
                Object e = queue.poll(5L, TimeUnit.SECONDS);
                Assert.assertNotNull((String)"Queue was empty!", e);
                continue;
            }
            catch (InterruptedException e) {
                throw new AssertionError("Interrupted while waiting for condition", e);
            }
        }
        try {
            Object e = queue.poll(500L, TimeUnit.MILLISECONDS);
            Assert.assertNull((String)"No more elements expected in queue!", e);
        }
        catch (InterruptedException e) {
            throw new AssertionError("Interrupted while waiting for condition", e);
        }
    }
}

