/*
 * Decompiled with CFR 0.152.
 */
package org.hawkular.metrics.component.publish;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import javax.jms.Destination;
import javax.jms.Topic;
import org.hawkular.bus.Bus;
import org.hawkular.bus.common.BasicMessage;
import org.hawkular.metrics.api.jaxrs.ServiceReady;
import org.hawkular.metrics.api.jaxrs.ServiceReadyEvent;
import org.hawkular.metrics.api.jaxrs.util.Eager;
import org.hawkular.metrics.component.publish.AvailDataMessage;
import org.hawkular.metrics.component.publish.MetricDataMessage;
import org.hawkular.metrics.model.AvailabilityType;
import org.hawkular.metrics.model.Metric;
import org.hawkular.metrics.model.MetricId;
import org.hawkular.metrics.model.MetricType;
import org.jboss.logging.Logger;
import rx.Observable;
import rx.Subscription;
import rx.schedulers.Schedulers;

@ApplicationScoped
@Eager
public class InsertedDataSubscriber {
    private static final Logger log = Logger.getLogger(InsertedDataSubscriber.class);
    @Resource(mappedName="java:/topic/HawkularMetricData")
    private Topic numericTopic;
    @Resource(mappedName="java:/topic/HawkularAvailData")
    private Topic availabilityTopic;
    @Inject
    private Bus bus;
    private Subscription subscription;

    public void onMetricsServiceReady(@Observes @ServiceReady ServiceReadyEvent event) {
        Observable events = event.getInsertedData().buffer(50L, TimeUnit.MILLISECONDS, 100).filter(list -> !list.isEmpty()).onBackpressureBuffer().observeOn(Schedulers.io());
        this.subscription = events.subscribe(list -> list.forEach(this::onInsertedData));
    }

    private void onInsertedData(Metric<?> metric) {
        log.tracef("Inserted metric: %s", metric);
        if (metric.getMetricId().getType() == MetricType.AVAILABILITY) {
            Metric<?> avail = metric;
            this.publishAvailablility(avail);
        } else {
            Metric<?> numeric = metric;
            this.publishNumeric(numeric);
        }
    }

    private void publishNumeric(Metric<? extends Number> metric) {
        BasicMessage message = this.createNumericMessage(metric);
        this.bus.send((Destination)this.numericTopic, message).subscribe(msg -> log.tracef("Sent message %s", msg), t -> log.warnf(t, "Failed to send message %s", (Object)message));
    }

    private BasicMessage createNumericMessage(Metric<? extends Number> numeric) {
        MetricId numericId = numeric.getMetricId();
        List<MetricDataMessage.SingleMetric> numericList = numeric.getDataPoints().stream().map(dataPoint -> new MetricDataMessage.SingleMetric(numericId.getName(), dataPoint.getTimestamp(), ((Number)dataPoint.getValue()).doubleValue())).collect(Collectors.toList());
        MetricDataMessage.MetricData metricData = new MetricDataMessage.MetricData();
        metricData.setTenantId(numericId.getTenantId());
        metricData.setData(numericList);
        return new MetricDataMessage(metricData);
    }

    private void publishAvailablility(Metric<AvailabilityType> metric) {
        BasicMessage message = this.createAvailMessage(metric);
        this.bus.send((Destination)this.availabilityTopic, message).subscribe(msg -> log.tracef("Sent message %s", msg), t -> log.warnf(t, "Failed to send message %s", (Object)message));
    }

    private BasicMessage createAvailMessage(Metric<AvailabilityType> avail) {
        MetricId availId = avail.getMetricId();
        List<AvailDataMessage.SingleAvail> availList = avail.getDataPoints().stream().map(dataPoint -> new AvailDataMessage.SingleAvail(availId.getTenantId(), availId.getName(), dataPoint.getTimestamp(), ((AvailabilityType)dataPoint.getValue()).getText().toUpperCase())).collect(Collectors.toList());
        AvailDataMessage.AvailData metricData = new AvailDataMessage.AvailData();
        metricData.setData(availList);
        return new AvailDataMessage(metricData);
    }

    @PreDestroy
    void shutdown() {
        if (this.subscription != null) {
            this.subscription.unsubscribe();
        }
    }
}

