package org.apache.kafka.streams.test;

import java.io.File;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.api.MockProcessorContext;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/test/MockProcessorContextAPITest.class */
public class MockProcessorContextAPITest {
    @Test
    public void shouldCaptureOutputRecords() {
        Processor<String, Long, String, Long> processor = new Processor<String, Long, String, Long>() { // from class: org.apache.kafka.streams.test.MockProcessorContextAPITest.1
            private ProcessorContext<String, Long> context;

            public void init(ProcessorContext<String, Long> processorContext) {
                this.context = processorContext;
            }

            public void process(Record<String, Long> record) {
                String str = (String) record.key();
                Long l = (Long) record.value();
                this.context.forward(record.withKey(str + l).withValue(Long.valueOf(str.length() + l.longValue())));
            }
        };
        MockProcessorContext mockProcessorContext = new MockProcessorContext();
        processor.init(mockProcessorContext);
        processor.process(new Record("foo", 5L, 0L));
        processor.process(new Record("barbaz", 50L, 0L));
        MatcherAssert.assertThat(mockProcessorContext.forwarded(), Matchers.is(Arrays.asList(new MockProcessorContext.CapturedForward(new Record("foo5", 8L, 0L)), new MockProcessorContext.CapturedForward(new Record("barbaz50", 56L, 0L)))));
        mockProcessorContext.resetForwards();
        MatcherAssert.assertThat(mockProcessorContext.forwarded(), Matchers.empty());
    }

    @Test
    public void shouldCaptureRecordsOutputToChildByName() {
        Processor<String, Long, String, Long> processor = new Processor<String, Long, String, Long>() { // from class: org.apache.kafka.streams.test.MockProcessorContextAPITest.2
            private ProcessorContext<String, Long> context;
            private int count = 0;

            public void process(Record<String, Long> record) {
                String str = (String) record.key();
                Long l = (Long) record.value();
                if (this.count == 0) {
                    this.context.forward(new Record("start", -1L, 0L));
                }
                this.context.forward(new Record(str + l, Long.valueOf(str.length() + l.longValue()), 0L), this.count % 2 == 0 ? "george" : "pete");
                this.count++;
            }

            public void init(ProcessorContext<String, Long> processorContext) {
                this.context = processorContext;
            }
        };
        MockProcessorContext mockProcessorContext = new MockProcessorContext();
        processor.init(mockProcessorContext);
        processor.process(new Record("foo", 5L, 0L));
        processor.process(new Record("barbaz", 50L, 0L));
        MatcherAssert.assertThat(mockProcessorContext.forwarded(), Matchers.is(Arrays.asList(new MockProcessorContext.CapturedForward(new Record("start", -1L, 0L), Optional.empty()), new MockProcessorContext.CapturedForward(new Record("foo5", 8L, 0L), Optional.of("george")), new MockProcessorContext.CapturedForward(new Record("barbaz50", 56L, 0L), Optional.of("pete")))));
        MatcherAssert.assertThat(mockProcessorContext.forwarded("george"), Matchers.is(Arrays.asList(new MockProcessorContext.CapturedForward(new Record("start", -1L, 0L), Optional.empty()), new MockProcessorContext.CapturedForward(new Record("foo5", 8L, 0L), Optional.of("george")))));
        MatcherAssert.assertThat(mockProcessorContext.forwarded("pete"), Matchers.is(Arrays.asList(new MockProcessorContext.CapturedForward(new Record("start", -1L, 0L), Optional.empty()), new MockProcessorContext.CapturedForward(new Record("barbaz50", 56L, 0L), Optional.of("pete")))));
        MatcherAssert.assertThat(mockProcessorContext.forwarded("steve"), Matchers.is(Collections.singletonList(new MockProcessorContext.CapturedForward(new Record("start", -1L, 0L)))));
    }

    @Test
    public void shouldCaptureCommitsAndAllowReset() {
        Processor<String, Long, Void, Void> processor = new Processor<String, Long, Void, Void>() { // from class: org.apache.kafka.streams.test.MockProcessorContextAPITest.3
            private ProcessorContext<Void, Void> context;
            private int count = 0;

            public void init(ProcessorContext<Void, Void> processorContext) {
                this.context = processorContext;
            }

            public void process(Record<String, Long> record) {
                int i = this.count + 1;
                this.count = i;
                if (i > 2) {
                    this.context.commit();
                }
            }
        };
        MockProcessorContext mockProcessorContext = new MockProcessorContext();
        processor.init(mockProcessorContext);
        processor.process(new Record("foo", 5L, 0L));
        processor.process(new Record("barbaz", 50L, 0L));
        MatcherAssert.assertThat(Boolean.valueOf(mockProcessorContext.committed()), Matchers.is(false));
        processor.process(new Record("foobar", 500L, 0L));
        MatcherAssert.assertThat(Boolean.valueOf(mockProcessorContext.committed()), Matchers.is(true));
        mockProcessorContext.resetCommit();
        MatcherAssert.assertThat(Boolean.valueOf(mockProcessorContext.committed()), Matchers.is(false));
    }

    @Test
    public void shouldStoreAndReturnStateStores() {
        Processor<String, Long, Void, Void> processor = new Processor<String, Long, Void, Void>() { // from class: org.apache.kafka.streams.test.MockProcessorContextAPITest.4
            private ProcessorContext<Void, Void> context;

            public void init(ProcessorContext<Void, Void> processorContext) {
                this.context = processorContext;
            }

            public void process(Record<String, Long> record) {
                String str = (String) record.key();
                Long l = (Long) record.value();
                KeyValueStore stateStore = this.context.getStateStore("my-state");
                stateStore.put(str, Long.valueOf((stateStore.get(str) == null ? 0L : ((Long) stateStore.get(str)).longValue()) + l.longValue()));
                stateStore.put("all", Long.valueOf((stateStore.get("all") == null ? 0L : ((Long) stateStore.get("all")).longValue()) + l.longValue()));
            }
        };
        MockProcessorContext mockProcessorContext = new MockProcessorContext();
        KeyValueStore build = Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("my-state"), Serdes.String(), Serdes.Long()).withLoggingDisabled().build();
        build.init(mockProcessorContext.getStateStoreContext(), build);
        processor.init(mockProcessorContext);
        processor.process(new Record("foo", 5L, 0L));
        processor.process(new Record("bar", 50L, 0L));
        MatcherAssert.assertThat(build.get("foo"), Matchers.is(5L));
        MatcherAssert.assertThat(build.get("bar"), Matchers.is(50L));
        MatcherAssert.assertThat(build.get("all"), Matchers.is(55L));
    }

    @Test
    public void shouldCaptureApplicationAndRecordMetadata() {
        Properties mkProperties = Utils.mkProperties(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("application.id", "testMetadata"), Utils.mkEntry("bootstrap.servers", "")}));
        Processor<String, Object, String, Object> processor = new Processor<String, Object, String, Object>() { // from class: org.apache.kafka.streams.test.MockProcessorContextAPITest.5
            private ProcessorContext<String, Object> context;

            public void init(ProcessorContext<String, Object> processorContext) {
                this.context = processorContext;
            }

            public void process(Record<String, Object> record) {
                this.context.forward(new Record("appId", this.context.applicationId(), 0L));
                this.context.forward(new Record("taskId", this.context.taskId(), 0L));
                if (this.context.recordMetadata().isPresent()) {
                    RecordMetadata recordMetadata = (RecordMetadata) this.context.recordMetadata().get();
                    this.context.forward(new Record("topic", recordMetadata.topic(), 0L));
                    this.context.forward(new Record("partition", Integer.valueOf(recordMetadata.partition()), 0L));
                    this.context.forward(new Record("offset", Long.valueOf(recordMetadata.offset()), 0L));
                }
                this.context.forward(new Record("record", record, 0L));
            }
        };
        MockProcessorContext mockProcessorContext = new MockProcessorContext(mkProperties);
        processor.init(mockProcessorContext);
        processor.process(new Record("foo", 5L, 0L));
        MatcherAssert.assertThat(mockProcessorContext.forwarded(), Matchers.is(Arrays.asList(new MockProcessorContext.CapturedForward(new Record("appId", "testMetadata", 0L)), new MockProcessorContext.CapturedForward(new Record("taskId", new TaskId(0, 0), 0L)), new MockProcessorContext.CapturedForward(new Record("record", new Record("foo", 5L, 0L), 0L)))));
        mockProcessorContext.resetForwards();
        mockProcessorContext.setRecordMetadata("t1", 0, 0L);
        processor.process(new Record("foo", 5L, 0L));
        MatcherAssert.assertThat(mockProcessorContext.forwarded(), Matchers.is(Arrays.asList(new MockProcessorContext.CapturedForward(new Record("appId", "testMetadata", 0L)), new MockProcessorContext.CapturedForward(new Record("taskId", new TaskId(0, 0), 0L)), new MockProcessorContext.CapturedForward(new Record("topic", "t1", 0L)), new MockProcessorContext.CapturedForward(new Record("partition", 0, 0L)), new MockProcessorContext.CapturedForward(new Record("offset", 0L, 0L)), new MockProcessorContext.CapturedForward(new Record("record", new Record("foo", 5L, 0L), 0L)))));
    }

    @Test
    public void shouldCapturePunctuator() {
        Processor<String, Long, Void, Void> processor = new Processor<String, Long, Void, Void>() { // from class: org.apache.kafka.streams.test.MockProcessorContextAPITest.6
            public void init(ProcessorContext<Void, Void> processorContext) {
                processorContext.schedule(Duration.ofSeconds(1L), PunctuationType.WALL_CLOCK_TIME, j -> {
                    processorContext.commit();
                });
            }

            public void process(Record<String, Long> record) {
            }
        };
        MockProcessorContext mockProcessorContext = new MockProcessorContext();
        processor.init(mockProcessorContext);
        MockProcessorContext.CapturedPunctuator capturedPunctuator = (MockProcessorContext.CapturedPunctuator) mockProcessorContext.scheduledPunctuators().get(0);
        MatcherAssert.assertThat(capturedPunctuator.getInterval(), Matchers.is(Duration.ofMillis(1000L)));
        MatcherAssert.assertThat(capturedPunctuator.getType(), Matchers.is(PunctuationType.WALL_CLOCK_TIME));
        MatcherAssert.assertThat(Boolean.valueOf(capturedPunctuator.cancelled()), Matchers.is(false));
        Punctuator punctuator = capturedPunctuator.getPunctuator();
        MatcherAssert.assertThat(Boolean.valueOf(mockProcessorContext.committed()), Matchers.is(false));
        punctuator.punctuate(1234L);
        MatcherAssert.assertThat(Boolean.valueOf(mockProcessorContext.committed()), Matchers.is(true));
    }

    @Test
    public void fullConstructorShouldSetAllExpectedAttributes() {
        Properties properties = new Properties();
        properties.put("application.id", "testFullConstructor");
        properties.put("bootstrap.servers", "");
        properties.put("default.key.serde", Serdes.String().getClass());
        properties.put("default.value.serde", Serdes.Long().getClass());
        File file = new File("");
        MockProcessorContext mockProcessorContext = new MockProcessorContext(properties, new TaskId(1, 1), file);
        MatcherAssert.assertThat(mockProcessorContext.applicationId(), Matchers.is("testFullConstructor"));
        MatcherAssert.assertThat(mockProcessorContext.taskId(), Matchers.is(new TaskId(1, 1)));
        MatcherAssert.assertThat(mockProcessorContext.appConfigs().get("application.id"), Matchers.is("testFullConstructor"));
        MatcherAssert.assertThat(mockProcessorContext.appConfigsWithPrefix("application.").get("id"), Matchers.is("testFullConstructor"));
        MatcherAssert.assertThat(mockProcessorContext.keySerde().getClass(), Matchers.is(Serdes.String().getClass()));
        MatcherAssert.assertThat(mockProcessorContext.valueSerde().getClass(), Matchers.is(Serdes.Long().getClass()));
        MatcherAssert.assertThat(mockProcessorContext.stateDir(), Matchers.is(file));
    }
}
