/*
 * Decompiled with CFR 0.152.
 */
package org.hornetq.integration.twitter.impl;

import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.ConnectorService;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.twitter.HornetQTwitterLogger;
import org.hornetq.utils.ConfigurationHelper;
import twitter4j.GeoLocation;
import twitter4j.Paging;
import twitter4j.Place;
import twitter4j.ResponseList;
import twitter4j.Status;
import twitter4j.Twitter;
import twitter4j.TwitterFactory;
import twitter4j.http.AccessToken;

public class IncomingTweetsHandler
implements ConnectorService {
    private final String connectorName;
    private final String consumerKey;
    private final String consumerSecret;
    private final String accessToken;
    private final String accessTokenSecret;
    private final String queueName;
    private final int intervalSeconds;
    private final StorageManager storageManager;
    private final PostOffice postOffice;
    private Paging paging;
    private Twitter twitter;
    private boolean isStarted = false;
    private final ScheduledExecutorService scheduledPool;
    private ScheduledFuture<?> scheduledFuture;

    public IncomingTweetsHandler(String connectorName, Map<String, Object> configuration, StorageManager storageManager, PostOffice postOffice, ScheduledExecutorService scheduledThreadPool) {
        this.connectorName = connectorName;
        this.consumerKey = ConfigurationHelper.getStringProperty((String)"consumerKey", null, configuration);
        this.consumerSecret = ConfigurationHelper.getStringProperty((String)"consumerSecret", null, configuration);
        this.accessToken = ConfigurationHelper.getStringProperty((String)"accessToken", null, configuration);
        this.accessTokenSecret = ConfigurationHelper.getStringProperty((String)"accessTokenSecret", null, configuration);
        this.queueName = ConfigurationHelper.getStringProperty((String)"queue", null, configuration);
        Integer intervalSeconds = ConfigurationHelper.getIntProperty((String)"interval", (int)0, configuration);
        this.intervalSeconds = intervalSeconds > 0 ? intervalSeconds : 10;
        this.storageManager = storageManager;
        this.postOffice = postOffice;
        this.scheduledPool = scheduledThreadPool;
    }

    public void start() throws Exception {
        Binding b = this.postOffice.getBinding(new SimpleString(this.queueName));
        if (b == null) {
            throw new Exception(this.connectorName + ": queue " + this.queueName + " not found");
        }
        this.paging = new Paging();
        TwitterFactory tf = new TwitterFactory();
        this.twitter = tf.getOAuthAuthorizedInstance(this.consumerKey, this.consumerSecret, new AccessToken(this.accessToken, this.accessTokenSecret));
        this.twitter.verifyCredentials();
        this.paging.setCount(1);
        ResponseList res = this.twitter.getHomeTimeline(this.paging);
        this.paging.setSinceId(((Status)res.get(0)).getId());
        HornetQTwitterLogger.LOGGER.debug(this.connectorName + " initialise(): got latest ID: " + this.paging.getSinceId());
        this.paging.setCount(100);
        this.scheduledFuture = this.scheduledPool.scheduleWithFixedDelay(new TweetsRunnable(), this.intervalSeconds, this.intervalSeconds, TimeUnit.SECONDS);
        this.isStarted = true;
    }

    public void stop() throws Exception {
        if (!this.isStarted) {
            return;
        }
        this.scheduledFuture.cancel(true);
        this.paging = null;
        this.isStarted = false;
    }

    public boolean isStarted() {
        return this.isStarted;
    }

    private void poll() throws Exception {
        ResponseList res = this.twitter.getHomeTimeline(this.paging);
        if (res == null || res.size() == 0) {
            return;
        }
        for (int i = res.size() - 1; i >= 0; --i) {
            Status status = (Status)res.get(i);
            ServerMessageImpl msg = new ServerMessageImpl(this.storageManager.generateID(), 50);
            msg.setAddress(new SimpleString(this.queueName));
            msg.setDurable(true);
            msg.encodeMessageIDToBuffer();
            this.putTweetIntoMessage(status, (ServerMessage)msg);
            this.postOffice.route((ServerMessage)msg, false);
            HornetQTwitterLogger.LOGGER.debug(this.connectorName + ": routed: " + status.toString());
        }
        this.paging.setSinceId(((Status)res.get(0)).getId());
        HornetQTwitterLogger.LOGGER.debug(this.connectorName + ": update latest ID: " + this.paging.getSinceId());
    }

    private void putTweetIntoMessage(Status status, ServerMessage msg) {
        Place place;
        msg.getBodyBuffer().writeString(status.getText());
        msg.putLongProperty("id", status.getId());
        msg.putStringProperty("source", status.getSource());
        msg.putLongProperty("createdAt", status.getCreatedAt().getTime());
        msg.putBooleanProperty("isTruncated", status.isTruncated());
        msg.putLongProperty("inReplyToStatusId", status.getInReplyToStatusId());
        msg.putIntProperty("inReplyToUserId", status.getInReplyToUserId());
        msg.putBooleanProperty("isFavorited", status.isFavorited());
        msg.putBooleanProperty("isRetweet", status.isRetweet());
        msg.putObjectProperty("contributors", (Object)status.getContributors());
        GeoLocation gl = status.getGeoLocation();
        if (gl != null) {
            msg.putDoubleProperty("geoLocation.latitude", gl.getLatitude());
            msg.putDoubleProperty("geoLocation.longitude", gl.getLongitude());
        }
        if ((place = status.getPlace()) != null) {
            msg.putStringProperty("place.id", place.getId());
        }
    }

    public String getName() {
        return this.connectorName;
    }

    private final class TweetsRunnable
    implements Runnable {
        private TweetsRunnable() {
        }

        @Override
        public void run() {
            try {
                IncomingTweetsHandler.this.poll();
            }
            catch (Throwable t) {
                HornetQTwitterLogger.LOGGER.errorPollingTwitter(t);
            }
        }
    }
}

