package io.vertx.pgclient;

import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.pgclient.ProxyServer;
import io.vertx.pgclient.pubsub.PgChannel;
import io.vertx.pgclient.pubsub.PgSubscriber;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/vertx/pgclient/PubSubTest.class */
public class PubSubTest extends PgTestBase {
    Vertx vertx;
    PgSubscriber subscriber;

    @Override // io.vertx.pgclient.PgTestBase
    @Before
    public void setup() throws Exception {
        super.setup();
        this.vertx = Vertx.vertx();
    }

    @After
    public void teardown(TestContext testContext) {
        if (this.subscriber != null) {
            this.subscriber.close();
        }
        this.vertx.close(testContext.asyncAssertSuccess());
    }

    @Test
    public void testNotify(TestContext testContext) {
        testNotify(testContext, "the_channel");
    }

    @Test
    public void testNotifyChannelRequiresQuotedID(TestContext testContext) {
        testNotify(testContext, "The.Channel");
    }

    public void testNotify(TestContext testContext, String str) {
        String str2 = "\"" + str.replace("\"", "\"\"") + "\"";
        Async async = testContext.async(2);
        PgConnection.connect(this.vertx, this.options, testContext.asyncAssertSuccess(pgConnection -> {
            pgConnection.query("LISTEN " + str2).execute(testContext.asyncAssertSuccess(rowSet -> {
                pgConnection.notificationHandler(pgNotification -> {
                    testContext.assertEquals(str, pgNotification.getChannel());
                    testContext.assertEquals("the message", pgNotification.getPayload());
                    async.countDown();
                });
                pgConnection.query("NOTIFY " + str2 + ", 'the message'").execute(testContext.asyncAssertSuccess(rowSet -> {
                    async.countDown();
                }));
            }));
        }));
    }

    @Test
    public void testConnect(TestContext testContext) {
        testConnect(testContext, "channel1", "channel2");
    }

    @Test
    public void testConnectChannelRequiresQuotedID(TestContext testContext) {
        testConnect(testContext, "Channel.Test.1", "Channel.Test.2");
    }

    private void testConnect(TestContext testContext, String str, String str2) {
        String str3 = "\"" + str.replace("\"", "\"\"") + "\"";
        String str4 = "\"" + str2.replace("\"", "\"\"") + "\"";
        this.subscriber = PgSubscriber.subscriber(this.vertx, this.options);
        Async async = testContext.async();
        PgChannel channel = this.subscriber.channel(str);
        PgChannel channel2 = this.subscriber.channel(str2);
        channel.handler(str5 -> {
            testContext.assertEquals("msg1", str5);
            async.countDown();
        });
        channel2.handler(str6 -> {
            testContext.assertEquals("msg2", str6);
            async.countDown();
        });
        Async async2 = testContext.async();
        this.subscriber.connect(testContext.asyncAssertSuccess(r3 -> {
            async2.complete();
        }));
        async2.awaitSuccess(10000L);
        this.subscriber.actualConnection().query("NOTIFY " + str3 + ", 'msg1'").execute(testContext.asyncAssertSuccess());
        this.subscriber.actualConnection().query("NOTIFY " + str4 + ", 'msg2'").execute(testContext.asyncAssertSuccess());
        async.awaitSuccess(10000L);
    }

    @Test
    public void testSubscribe(TestContext testContext) {
        testSubscribe(testContext, "the_channel");
    }

    @Test
    public void testSubscribeChannelRequiresQuotedID(TestContext testContext) {
        testSubscribe(testContext, "The.Channel");
    }

    @Test
    public void testSubscribeChannelContainsQuotes(TestContext testContext) {
        testSubscribe(testContext, "\"The\".\"Channel\"");
    }

    @Test
    public void testSubscribeChannelExceedsLengthLimit(TestContext testContext) {
        char[] cArr = new char[68];
        Arrays.fill(cArr, 0, 63, 'a');
        Arrays.fill(cArr, 63, cArr.length, 'b');
        testSubscribe(testContext, new String(cArr));
    }

    public void testSubscribe(TestContext testContext, String str) {
        String str2 = "\"" + str.replace("\"", "\"\"") + "\"";
        this.subscriber = PgSubscriber.subscriber(this.vertx, this.options);
        Async async = testContext.async();
        this.subscriber.connect(testContext.asyncAssertSuccess(r3 -> {
            async.complete();
        }));
        async.awaitSuccess(10000L);
        PgChannel channel = this.subscriber.channel(str);
        Async async2 = testContext.async();
        testContext.assertEquals(channel, channel.subscribeHandler(r32 -> {
            async2.complete();
        }));
        Async async3 = testContext.async();
        channel.handler(str3 -> {
            testContext.assertEquals("msg", str3);
            async3.countDown();
        });
        async2.awaitSuccess(10000L);
        this.subscriber.actualConnection().query("NOTIFY " + str2 + ", 'msg'").execute(testContext.asyncAssertSuccess());
        async3.awaitSuccess(10000L);
    }

    @Test
    public void testSubscribeNotifyWithUnquotedId(TestContext testContext) {
        this.subscriber = PgSubscriber.subscriber(this.vertx, this.options);
        Async async = testContext.async();
        this.subscriber.connect(testContext.asyncAssertSuccess(r3 -> {
            async.complete();
        }));
        async.awaitSuccess(10000L);
        PgChannel channel = this.subscriber.channel("the_channel");
        Async async2 = testContext.async();
        testContext.assertEquals(channel, channel.subscribeHandler(r32 -> {
            async2.complete();
        }));
        Async async3 = testContext.async();
        channel.handler(str -> {
            testContext.assertEquals("msg", str);
            async3.countDown();
        });
        async2.awaitSuccess(10000L);
        this.subscriber.actualConnection().query("NOTIFY The_Channel, 'msg'").execute(testContext.asyncAssertSuccess());
        async3.awaitSuccess(10000L);
    }

    @Test
    public void testUnsubscribe(TestContext testContext) {
        testUnsubscribe(testContext, "the_channel");
    }

    @Test
    public void testUnsubscribeChannelRequiresQuotedID(TestContext testContext) {
        testUnsubscribe(testContext, "The.Channel");
    }

    public void testUnsubscribe(TestContext testContext, String str) {
        this.subscriber = PgSubscriber.subscriber(this.vertx, this.options);
        Async async = testContext.async();
        this.subscriber.connect(testContext.asyncAssertSuccess(r3 -> {
            async.complete();
        }));
        async.awaitSuccess(10000L);
        PgChannel channel = this.subscriber.channel("the_channel");
        Async async2 = testContext.async();
        channel.endHandler(r32 -> {
            async2.complete();
        });
        Async async3 = testContext.async();
        channel.subscribeHandler(r33 -> {
            async3.complete();
        });
        channel.handler(str2 -> {
        });
        async3.awaitSuccess(10000L);
        channel.handler((Handler) null);
        async2.awaitSuccess(10000L);
    }

    @Test
    public void testReconnectImmediately(TestContext testContext) {
        testReconnect(testContext, 0L, "the_channel");
    }

    @Test
    public void testReconnectImmediatelyChannelRequiresQuotedID(TestContext testContext) {
        testReconnect(testContext, 0L, "The.Channel");
    }

    @Test
    public void testReconnectWithDelay(TestContext testContext) {
        testReconnect(testContext, 100L, "the_channel");
    }

    @Test
    public void testReconnectWithDelayChannelRequiresQuotedID(TestContext testContext) {
        testReconnect(testContext, 100L, "The.Channel");
    }

    public void testReconnect(TestContext testContext, long j, String str) {
        ProxyServer create = ProxyServer.create(this.vertx, this.options.getPort(), this.options.getHost());
        AtomicReference atomicReference = new AtomicReference();
        create.proxyHandler(connection -> {
            atomicReference.set(connection);
            connection.connect();
        });
        Async async = testContext.async();
        create.listen(8080, "localhost", testContext.asyncAssertSuccess(r5 -> {
            this.options.setPort(8080).setHost("localhost");
            async.complete();
        }));
        async.awaitSuccess(10000L);
        this.subscriber = PgSubscriber.subscriber(this.vertx, this.options);
        PgChannel channel = this.subscriber.channel(str);
        Async async2 = testContext.async();
        Async async3 = testContext.async();
        Async async4 = testContext.async();
        AtomicInteger atomicInteger = new AtomicInteger();
        channel.subscribeHandler(r6 -> {
            switch (atomicInteger.getAndIncrement()) {
                case 0:
                    async2.complete();
                    return;
                case 1:
                    async3.complete();
                    return;
                case 2:
                    async4.complete();
                    return;
                default:
                    return;
            }
        });
        this.subscriber.connect(asyncResult -> {
        });
        channel.handler(str2 -> {
        });
        async2.awaitSuccess(10000L);
        AtomicInteger atomicInteger2 = new AtomicInteger();
        this.subscriber.reconnectPolicy(num -> {
            testContext.assertEquals(0, num);
            testContext.assertFalse(this.subscriber.closed());
            if (atomicInteger2.getAndIncrement() < 2) {
                return Long.valueOf(j);
            }
            return -1L;
        });
        Async async5 = testContext.async();
        this.subscriber.closeHandler(r3 -> {
            async5.complete();
        });
        ((ProxyServer.Connection) atomicReference.get()).close();
        async3.awaitSuccess(10000L);
        ((ProxyServer.Connection) atomicReference.get()).close();
        async4.awaitSuccess(10000L);
        ((ProxyServer.Connection) atomicReference.get()).close();
        async5.awaitSuccess(10000L);
        testContext.assertEquals(3, Integer.valueOf(atomicInteger2.get()));
        testContext.assertTrue(this.subscriber.closed());
    }

    @Test
    public void testClose(TestContext testContext) {
        testClose(testContext, "the_channel");
    }

    @Test
    public void testCloseChannelRequiresQuotedID(TestContext testContext) {
        testClose(testContext, "The.Channel");
    }

    public void testClose(TestContext testContext, String str) {
        PgSubscriber subscriber = PgSubscriber.subscriber(this.vertx, this.options);
        PgChannel channel = subscriber.channel(str);
        Async async = testContext.async();
        channel.endHandler(r3 -> {
            async.complete();
        });
        channel.handler(str2 -> {
        });
        Async async2 = testContext.async();
        subscriber.connect(testContext.asyncAssertSuccess(r32 -> {
            async2.complete();
        }));
        async2.awaitSuccess(10000L);
        Async async3 = testContext.async();
        subscriber.closeHandler(r33 -> {
            async3.complete();
        });
        subscriber.close();
        async.awaitSuccess(10000L);
        async3.awaitSuccess(10000L);
    }
}
