package org.infinispan.client.hotrod;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.TimeUnit;
import org.infinispan.client.hotrod.test.HotRodClientTestingUtil;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.server.hotrod.HotRodServer;
import org.infinispan.server.hotrod.test.HotRodTestingUtil;
import org.infinispan.test.SingleCacheManagerTest;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;

@Test(testName = "client.hotrod.StreamingOpsTest", groups = {"functional"})
/* loaded from: input_file:org/infinispan/client/hotrod/StreamingOpsTest.class */
public class StreamingOpsTest extends SingleCacheManagerTest {
    private static final Log log = LogFactory.getLog(StreamingOpsTest.class);
    private static final String CACHE_NAME = "theCache";
    private static final int V1_SIZE = 1000000;
    private static final int V2_SIZE = 500000;
    RemoteCache<String, byte[]> remoteCache;
    StreamingRemoteCache<String> streamingRemoteCache;
    private RemoteCacheManager remoteCacheManager;
    protected HotRodServer hotrodServer;

    protected EmbeddedCacheManager createCacheManager() throws Exception {
        ConfigurationBuilder hotRodCacheConfiguration = HotRodTestingUtil.hotRodCacheConfiguration(getDefaultStandaloneCacheConfig(false));
        EmbeddedCacheManager createCacheManager = TestCacheManagerFactory.createCacheManager(HotRodTestingUtil.hotRodCacheConfiguration());
        createCacheManager.defineConfiguration(CACHE_NAME, hotRodCacheConfiguration.build());
        createCacheManager.getCache(CACHE_NAME);
        return createCacheManager;
    }

    protected void setup() throws Exception {
        super.setup();
        this.hotrodServer = HotRodClientTestingUtil.startHotRodServer(this.cacheManager);
        log.info("Started server on port: " + this.hotrodServer.getPort());
        this.remoteCacheManager = getRemoteCacheManager();
        this.remoteCache = this.remoteCacheManager.getCache(CACHE_NAME);
        this.streamingRemoteCache = this.remoteCache.streaming();
    }

    protected RemoteCacheManager getRemoteCacheManager() {
        org.infinispan.client.hotrod.configuration.ConfigurationBuilder configurationBuilder = new org.infinispan.client.hotrod.configuration.ConfigurationBuilder();
        configurationBuilder.addServer().host("localhost").port(this.hotrodServer.getPort());
        return new RemoteCacheManager(configurationBuilder.build());
    }

    @AfterClass
    public void testDestroyRemoteCacheFactory() {
        HotRodClientTestingUtil.killRemoteCacheManager(this.remoteCacheManager);
        HotRodClientTestingUtil.killServers(this.hotrodServer);
    }

    private void consumeAndCloseStream(InputStream inputStream) throws Exception {
        if (inputStream != null) {
            do {
                try {
                } finally {
                    inputStream.close();
                }
            } while (inputStream.read() >= 0);
        }
    }

    private void writeDataToStream(OutputStream outputStream, int i) throws Exception {
        for (int i2 = 0; i2 < i; i2++) {
            outputStream.write(i2 % 256);
        }
    }

    public void testPutGetStream() throws Exception {
        OutputStream put = this.streamingRemoteCache.put("k1");
        writeDataToStream(put, V1_SIZE);
        put.close();
        AssertJUnit.assertEquals(V1_SIZE, readAndCheckDataFromStream(this.streamingRemoteCache.get("k1")));
    }

    private int readAndCheckDataFromStream(InputStream inputStream) throws IOException {
        int i = 0;
        try {
            int read = inputStream.read();
            while (read >= 0) {
                AssertJUnit.assertEquals(i % 256, read);
                read = inputStream.read();
                i++;
            }
            return i;
        } finally {
            inputStream.close();
        }
    }

    public void testGetStreamWithMetadata() throws Exception {
        InputStream inputStream = this.streamingRemoteCache.get("k1");
        AssertJUnit.assertNull("expected null but received a stream", inputStream);
        consumeAndCloseStream(inputStream);
        OutputStream put = this.streamingRemoteCache.put("k1");
        writeDataToStream(put, V1_SIZE);
        put.close();
        VersionedMetadata versionedMetadata = this.streamingRemoteCache.get("k1");
        AssertJUnit.assertNotNull("expected a stream but received null", versionedMetadata);
        VersionedMetadata versionedMetadata2 = versionedMetadata;
        AssertJUnit.assertEquals(-1, versionedMetadata2.getLifespan());
        AssertJUnit.assertEquals(-1, versionedMetadata2.getMaxIdle());
        consumeAndCloseStream(versionedMetadata);
        OutputStream put2 = this.streamingRemoteCache.put("k1", 5L, TimeUnit.MINUTES);
        writeDataToStream(put2, V1_SIZE);
        put2.close();
        VersionedMetadata versionedMetadata3 = this.streamingRemoteCache.get("k1");
        AssertJUnit.assertNotNull("expected a stream but received null", versionedMetadata3);
        AssertJUnit.assertEquals(TimeUnit.MINUTES.toSeconds(5L), r0.getLifespan());
        AssertJUnit.assertEquals(-1, versionedMetadata3.getMaxIdle());
        consumeAndCloseStream(versionedMetadata3);
        OutputStream put3 = this.streamingRemoteCache.put("k1", 5L, TimeUnit.MINUTES, 3L, TimeUnit.MINUTES);
        writeDataToStream(put3, V1_SIZE);
        put3.close();
        VersionedMetadata versionedMetadata4 = this.streamingRemoteCache.get("k1");
        AssertJUnit.assertNotNull("expected a stream but received null", versionedMetadata4);
        VersionedMetadata versionedMetadata5 = versionedMetadata4;
        AssertJUnit.assertEquals(TimeUnit.MINUTES.toSeconds(5L), versionedMetadata5.getLifespan());
        AssertJUnit.assertEquals(TimeUnit.MINUTES.toSeconds(3L), versionedMetadata5.getMaxIdle());
        consumeAndCloseStream(versionedMetadata4);
    }

    public void testPutIfAbsentStream() throws Exception {
        InputStream inputStream = this.streamingRemoteCache.get("k1");
        AssertJUnit.assertNull("expected null but received a stream", inputStream);
        consumeAndCloseStream(inputStream);
        OutputStream putIfAbsent = this.streamingRemoteCache.putIfAbsent("k1");
        writeDataToStream(putIfAbsent, V1_SIZE);
        putIfAbsent.close();
        AssertJUnit.assertEquals(V1_SIZE, readAndCheckDataFromStream(this.streamingRemoteCache.get("k1")));
        OutputStream putIfAbsent2 = this.streamingRemoteCache.putIfAbsent("k1");
        writeDataToStream(putIfAbsent2, V2_SIZE);
        putIfAbsent2.close();
        AssertJUnit.assertEquals(V1_SIZE, readAndCheckDataFromStream(this.streamingRemoteCache.get("k1")));
    }

    public void testReplaceStream() throws Exception {
        OutputStream putIfAbsent = this.streamingRemoteCache.putIfAbsent("k1");
        writeDataToStream(putIfAbsent, V1_SIZE);
        putIfAbsent.close();
        VersionedMetadata versionedMetadata = this.streamingRemoteCache.get("k1");
        AssertJUnit.assertEquals(V1_SIZE, readAndCheckDataFromStream(versionedMetadata));
        long version = versionedMetadata.getVersion();
        AssertJUnit.assertTrue("Expected a non-zero version: " + version, version > 0);
        OutputStream replaceWithVersion = this.streamingRemoteCache.replaceWithVersion("k1", version + 1);
        writeDataToStream(replaceWithVersion, V2_SIZE);
        replaceWithVersion.close();
        AssertJUnit.assertEquals(V1_SIZE, readAndCheckDataFromStream(this.streamingRemoteCache.get("k1")));
        OutputStream replaceWithVersion2 = this.streamingRemoteCache.replaceWithVersion("k1", version);
        writeDataToStream(replaceWithVersion2, V2_SIZE);
        replaceWithVersion2.close();
        AssertJUnit.assertEquals(V2_SIZE, readAndCheckDataFromStream(this.streamingRemoteCache.get("k1")));
    }
}
