package org.hornetq.integration.twitter.impl;

import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.ConnectorService;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.integration.twitter.TwitterConstants;
import org.hornetq.twitter.HornetQTwitterLogger;
import org.hornetq.utils.ConfigurationHelper;
import twitter4j.GeoLocation;
import twitter4j.Paging;
import twitter4j.Place;
import twitter4j.ResponseList;
import twitter4j.Status;
import twitter4j.Twitter;
import twitter4j.TwitterFactory;
import twitter4j.http.AccessToken;

/* loaded from: input_file:org/hornetq/integration/twitter/impl/IncomingTweetsHandler.class */
public class IncomingTweetsHandler implements ConnectorService {
    private final String connectorName;
    private final String consumerKey;
    private final String consumerSecret;
    private final String accessToken;
    private final String accessTokenSecret;
    private final String queueName;
    private final int intervalSeconds;
    private final StorageManager storageManager;
    private final PostOffice postOffice;
    private Paging paging;
    private Twitter twitter;
    private boolean isStarted = false;
    private final ScheduledExecutorService scheduledPool;
    private ScheduledFuture<?> scheduledFuture;

    /* loaded from: input_file:org/hornetq/integration/twitter/impl/IncomingTweetsHandler$TweetsRunnable.class */
    private final class TweetsRunnable implements Runnable {
        private TweetsRunnable() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                IncomingTweetsHandler.this.poll();
            } catch (Throwable th) {
                HornetQTwitterLogger.LOGGER.errorPollingTwitter(th);
            }
        }
    }

    public IncomingTweetsHandler(String str, Map<String, Object> map, StorageManager storageManager, PostOffice postOffice, ScheduledExecutorService scheduledExecutorService) {
        this.connectorName = str;
        this.consumerKey = ConfigurationHelper.getStringProperty(TwitterConstants.CONSUMER_KEY, (String) null, map);
        this.consumerSecret = ConfigurationHelper.getStringProperty(TwitterConstants.CONSUMER_SECRET, (String) null, map);
        this.accessToken = ConfigurationHelper.getStringProperty(TwitterConstants.ACCESS_TOKEN, (String) null, map);
        this.accessTokenSecret = ConfigurationHelper.getStringProperty(TwitterConstants.ACCESS_TOKEN_SECRET, (String) null, map);
        this.queueName = ConfigurationHelper.getStringProperty(TwitterConstants.QUEUE_NAME, (String) null, map);
        Integer valueOf = Integer.valueOf(ConfigurationHelper.getIntProperty(TwitterConstants.INCOMING_INTERVAL, 0, map));
        if (valueOf.intValue() > 0) {
            this.intervalSeconds = valueOf.intValue();
        } else {
            this.intervalSeconds = 10;
        }
        this.storageManager = storageManager;
        this.postOffice = postOffice;
        this.scheduledPool = scheduledExecutorService;
    }

    public void start() throws Exception {
        if (this.postOffice.getBinding(new SimpleString(this.queueName)) == null) {
            throw new Exception(this.connectorName + ": queue " + this.queueName + " not found");
        }
        this.paging = new Paging();
        this.twitter = new TwitterFactory().getOAuthAuthorizedInstance(this.consumerKey, this.consumerSecret, new AccessToken(this.accessToken, this.accessTokenSecret));
        this.twitter.verifyCredentials();
        this.paging.setCount(1);
        this.paging.setSinceId(((Status) this.twitter.getHomeTimeline(this.paging).get(0)).getId());
        HornetQTwitterLogger.LOGGER.debug(this.connectorName + " initialise(): got latest ID: " + this.paging.getSinceId());
        this.paging.setCount(100);
        this.scheduledFuture = this.scheduledPool.scheduleWithFixedDelay(new TweetsRunnable(), this.intervalSeconds, this.intervalSeconds, TimeUnit.SECONDS);
        this.isStarted = true;
    }

    public void stop() throws Exception {
        if (this.isStarted) {
            this.scheduledFuture.cancel(true);
            this.paging = null;
            this.isStarted = false;
        }
    }

    public boolean isStarted() {
        return this.isStarted;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void poll() throws Exception {
        ResponseList homeTimeline = this.twitter.getHomeTimeline(this.paging);
        if (homeTimeline == null || homeTimeline.size() == 0) {
            return;
        }
        for (int size = homeTimeline.size() - 1; size >= 0; size--) {
            Status status = (Status) homeTimeline.get(size);
            ServerMessageImpl serverMessageImpl = new ServerMessageImpl(this.storageManager.generateID(), 50);
            serverMessageImpl.setAddress(new SimpleString(this.queueName));
            serverMessageImpl.setDurable(true);
            serverMessageImpl.encodeMessageIDToBuffer();
            putTweetIntoMessage(status, serverMessageImpl);
            this.postOffice.route(serverMessageImpl, false);
            HornetQTwitterLogger.LOGGER.debug(this.connectorName + ": routed: " + status.toString());
        }
        this.paging.setSinceId(((Status) homeTimeline.get(0)).getId());
        HornetQTwitterLogger.LOGGER.debug(this.connectorName + ": update latest ID: " + this.paging.getSinceId());
    }

    private void putTweetIntoMessage(Status status, ServerMessage serverMessage) {
        serverMessage.getBodyBuffer().writeString(status.getText());
        serverMessage.putLongProperty(TwitterConstants.KEY_ID, status.getId());
        serverMessage.putStringProperty(TwitterConstants.KEY_SOURCE, status.getSource());
        serverMessage.putLongProperty(TwitterConstants.KEY_CREATED_AT, status.getCreatedAt().getTime());
        serverMessage.putBooleanProperty(TwitterConstants.KEY_IS_TRUNCATED, status.isTruncated());
        serverMessage.putLongProperty(TwitterConstants.KEY_IN_REPLY_TO_STATUS_ID, status.getInReplyToStatusId());
        serverMessage.putIntProperty(TwitterConstants.KEY_IN_REPLY_TO_USER_ID, status.getInReplyToUserId());
        serverMessage.putBooleanProperty(TwitterConstants.KEY_IS_FAVORITED, status.isFavorited());
        serverMessage.putBooleanProperty(TwitterConstants.KEY_IS_RETWEET, status.isRetweet());
        serverMessage.putObjectProperty(TwitterConstants.KEY_CONTRIBUTORS, status.getContributors());
        GeoLocation geoLocation = status.getGeoLocation();
        if (geoLocation != null) {
            serverMessage.putDoubleProperty(TwitterConstants.KEY_GEO_LOCATION_LATITUDE, geoLocation.getLatitude());
            serverMessage.putDoubleProperty(TwitterConstants.KEY_GEO_LOCATION_LONGITUDE, geoLocation.getLongitude());
        }
        Place place = status.getPlace();
        if (place != null) {
            serverMessage.putStringProperty(TwitterConstants.KEY_PLACE_ID, place.getId());
        }
    }

    public String getName() {
        return this.connectorName;
    }
}
