package org.hawkular.metrics.component.publish;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import javax.jms.Topic;
import org.hawkular.bus.Bus;
import org.hawkular.bus.common.BasicMessage;
import org.hawkular.metrics.api.jaxrs.ServiceReady;
import org.hawkular.metrics.api.jaxrs.ServiceReadyEvent;
import org.hawkular.metrics.api.jaxrs.util.Eager;
import org.hawkular.metrics.component.publish.AvailDataMessage;
import org.hawkular.metrics.component.publish.MetricDataMessage;
import org.hawkular.metrics.model.AvailabilityType;
import org.hawkular.metrics.model.Metric;
import org.hawkular.metrics.model.MetricId;
import org.hawkular.metrics.model.MetricType;
import org.jboss.logging.Logger;
import rx.Subscription;
import rx.schedulers.Schedulers;

@Eager
@ApplicationScoped
/* loaded from: input_file:WEB-INF/lib/hawkular-metrics-bus-0.18.5.Final.jar:org/hawkular/metrics/component/publish/InsertedDataSubscriber.class */
public class InsertedDataSubscriber {
    private static final Logger log = Logger.getLogger(InsertedDataSubscriber.class);

    @Resource(mappedName = "java:/topic/HawkularMetricData")
    private Topic numericTopic;

    @Resource(mappedName = "java:/topic/HawkularAvailData")
    private Topic availabilityTopic;

    @Inject
    private Bus bus;
    private Subscription subscription;

    public void onMetricsServiceReady(@Observes @ServiceReady ServiceReadyEvent serviceReadyEvent) {
        this.subscription = serviceReadyEvent.getInsertedData().buffer(50L, TimeUnit.MILLISECONDS, 100).filter(list -> {
            return Boolean.valueOf(!list.isEmpty());
        }).onBackpressureBuffer().observeOn(Schedulers.io()).subscribe(list2 -> {
            list2.forEach(this::onInsertedData);
        });
    }

    private void onInsertedData(Metric<?> metric) {
        log.tracef("Inserted metric: %s", metric);
        if (metric.getMetricId().getType() == MetricType.AVAILABILITY) {
            publishAvailablility(metric);
        } else {
            publishNumeric(metric);
        }
    }

    private void publishNumeric(Metric<? extends Number> metric) {
        BasicMessage createNumericMessage = createNumericMessage(metric);
        this.bus.send(this.numericTopic, createNumericMessage).subscribe(textMessage -> {
            log.tracef("Sent message %s", textMessage);
        }, th -> {
            log.warnf(th, "Failed to send message %s", createNumericMessage);
        });
    }

    private BasicMessage createNumericMessage(Metric<? extends Number> metric) {
        MetricId<? extends Number> metricId = metric.getMetricId();
        List<MetricDataMessage.SingleMetric> list = (List) metric.getDataPoints().stream().map(dataPoint -> {
            return new MetricDataMessage.SingleMetric(metricId.getName(), dataPoint.getTimestamp(), ((Number) dataPoint.getValue()).doubleValue());
        }).collect(Collectors.toList());
        MetricDataMessage.MetricData metricData = new MetricDataMessage.MetricData();
        metricData.setTenantId(metricId.getTenantId());
        metricData.setData(list);
        return new MetricDataMessage(metricData);
    }

    private void publishAvailablility(Metric<AvailabilityType> metric) {
        BasicMessage createAvailMessage = createAvailMessage(metric);
        this.bus.send(this.availabilityTopic, createAvailMessage).subscribe(textMessage -> {
            log.tracef("Sent message %s", textMessage);
        }, th -> {
            log.warnf(th, "Failed to send message %s", createAvailMessage);
        });
    }

    private BasicMessage createAvailMessage(Metric<AvailabilityType> metric) {
        MetricId<AvailabilityType> metricId = metric.getMetricId();
        List<AvailDataMessage.SingleAvail> list = (List) metric.getDataPoints().stream().map(dataPoint -> {
            return new AvailDataMessage.SingleAvail(metricId.getTenantId(), metricId.getName(), dataPoint.getTimestamp(), ((AvailabilityType) dataPoint.getValue()).getText().toUpperCase());
        }).collect(Collectors.toList());
        AvailDataMessage.AvailData availData = new AvailDataMessage.AvailData();
        availData.setData(list);
        return new AvailDataMessage(availData);
    }

    @PreDestroy
    void shutdown() {
        if (this.subscription != null) {
            this.subscription.unsubscribe();
        }
    }
}
