package org.jacorb.notification.servant;

import java.util.List;
import org.apache.avalon.framework.configuration.Configuration;
import org.jacorb.notification.ChannelContext;
import org.jacorb.notification.conf.Attributes;
import org.jacorb.notification.engine.PushOperation;
import org.jacorb.notification.engine.RetryException;
import org.jacorb.notification.engine.RetryStrategy;
import org.jacorb.notification.engine.TaskExecutor;
import org.jacorb.notification.engine.TaskProcessorRetryStrategy;
import org.jacorb.notification.interfaces.Disposable;
import org.jacorb.notification.interfaces.Message;
import org.jacorb.notification.interfaces.MessageConsumer;
import org.jacorb.notification.queue.EventQueue;
import org.jacorb.notification.queue.EventQueueFactory;
import org.jacorb.notification.queue.EventQueueFactoryDependency;
import org.jacorb.notification.util.PropertySet;
import org.jacorb.notification.util.PropertySetListener;
import org.jacorb.orb.ORB;
import org.omg.CORBA.BAD_PARAM;
import org.omg.CORBA.NO_IMPLEMENT;
import org.omg.CORBA.Object;
import org.omg.CosNotification.DiscardPolicy;
import org.omg.CosNotification.EventType;
import org.omg.CosNotification.OrderPolicy;
import org.omg.CosNotification.Property;
import org.omg.CosNotification.UnsupportedQoS;
import org.omg.CosNotifyChannelAdmin.ClientType;
import org.omg.CosNotifyChannelAdmin.ConsumerAdmin;
import org.omg.CosNotifyChannelAdmin.ConsumerAdminHelper;
import org.omg.CosNotifyChannelAdmin.ObtainInfoMode;
import org.omg.CosNotifyComm.InvalidEventType;
import org.omg.CosNotifyComm.NotifyPublish;
import org.omg.CosNotifyComm.NotifyPublishHelper;
import org.omg.CosNotifyComm.NotifyPublishOperations;
import org.omg.CosNotifyComm.NotifySubscribeOperations;

/* loaded from: input_file:org/jacorb/notification/servant/AbstractProxySupplier.class */
public abstract class AbstractProxySupplier extends AbstractProxy implements MessageConsumer, NotifySubscribeOperations, EventQueueFactoryDependency {
    private static final EventType[] EMPTY_EVENT_TYPE_ARRAY = new EventType[0];
    protected Runnable scheduleDeliverPendingMessagesOperation_;
    private TaskExecutor taskExecutor_;
    private Disposable disposeTaskExecutor_;
    private EventQueue pendingMessages_;
    private int errorThreshold_;
    private ConsumerAdmin consumerAdmin_;
    private EventQueueFactory eventQueueFactory_;
    private NotifyPublishOperations proxyOfferListener_;
    private NotifyPublish offerListener_;
    private final Object pendingMessagesRefLock_ = new Object();
    private boolean enabled_ = true;
    private PropertySetListener eventQueueConfigurationChangedCB = new PropertySetListener(this) { // from class: org.jacorb.notification.servant.AbstractProxySupplier.2
        private final AbstractProxySupplier this$0;

        {
            this.this$0 = this;
        }

        @Override // org.jacorb.notification.util.PropertySetListener
        public void validateProperty(Property[] propertyArr, List list) {
        }

        @Override // org.jacorb.notification.util.PropertySetListener
        public void actionPropertySetChanged(PropertySet propertySet) throws UnsupportedQoS {
            this.this$0.configureEventQueue();
        }
    };

    public AbstractProxySupplier() {
    }

    protected AbstractProxySupplier(ChannelContext channelContext) {
        if (isPushSupplier()) {
            this.scheduleDeliverPendingMessagesOperation_ = new Runnable(this) { // from class: org.jacorb.notification.servant.AbstractProxySupplier.1
                private final AbstractProxySupplier this$0;

                {
                    this.this$0 = this;
                }

                @Override // java.lang.Runnable
                public void run() {
                    try {
                        this.this$0.getTaskProcessor().scheduleTimedPushTask(this.this$0);
                    } catch (InterruptedException e) {
                        this.this$0.logger_.fatalError("scheduleTimedPushTask failed", e);
                    }
                }
            };
        }
    }

    @Override // org.jacorb.notification.servant.AbstractProxy
    public void configure(Configuration configuration) {
        super.configure(configuration);
        this.errorThreshold_ = configuration.getAttributeAsInteger(Attributes.EVENTCONSUMER_ERROR_THRESHOLD, 3);
    }

    @Override // org.jacorb.notification.queue.EventQueueFactoryDependency
    public final void setEventQueueFactory(EventQueueFactory eventQueueFactory) {
        this.eventQueueFactory_ = eventQueueFactory;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public EventQueueFactory getEventQueueFactory() {
        return this.eventQueueFactory_;
    }

    @Override // org.jacorb.notification.servant.AbstractProxy, org.jacorb.notification.servant.ManageableServant
    public void preActivate() throws UnsupportedQoS, Exception {
        synchronized (this.pendingMessagesRefLock_) {
            this.pendingMessages_ = getEventQueueFactory().newEventQueue(this.qosSettings_);
        }
        if (this.logger_.isInfoEnabled()) {
            this.logger_.info(new StringBuffer().append("set Error Threshold to : ").append(this.errorThreshold_).toString());
        }
        this.qosSettings_.addPropertySetListener(new String[]{OrderPolicy.value, DiscardPolicy.value}, this.eventQueueConfigurationChangedCB);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void configureEventQueue() throws UnsupportedQoS {
        EventQueue newEventQueue = getEventQueueFactory().newEventQueue(this.qosSettings_);
        try {
            synchronized (this.pendingMessagesRefLock_) {
                if (!this.pendingMessages_.isEmpty()) {
                    for (Message message : this.pendingMessages_.getAllEvents(true)) {
                        newEventQueue.put(message);
                    }
                }
                this.pendingMessages_ = newEventQueue;
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e.getMessage());
        }
    }

    @Override // org.jacorb.notification.interfaces.MessageConsumer
    public TaskExecutor getExecutor() {
        return this.taskExecutor_;
    }

    public void setTaskExecutor(TaskExecutor taskExecutor) {
        if (this.taskExecutor_ != null) {
            throw new IllegalArgumentException("TaskExecutor should be set only once!");
        }
        this.taskExecutor_ = taskExecutor;
    }

    public void setTaskExecutor(TaskExecutor taskExecutor, Disposable disposable) {
        setTaskExecutor(taskExecutor);
        this.disposeTaskExecutor_ = disposable;
    }

    public int getPendingMessagesCount() {
        int size;
        synchronized (this.pendingMessagesRefLock_) {
            size = this.pendingMessages_.getSize();
        }
        return size;
    }

    @Override // org.jacorb.notification.interfaces.MessageConsumer
    public boolean hasPendingData() {
        synchronized (this.pendingMessagesRefLock_) {
            return !this.pendingMessages_.isEmpty();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void enqueue(Message message) {
        synchronized (this.pendingMessagesRefLock_) {
            this.pendingMessages_.put(message);
        }
        if (this.logger_.isDebugEnabled()) {
            this.logger_.debug(new StringBuffer().append("added ").append(message).append(" to pending Messages.").toString());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Message getMessageBlocking() throws InterruptedException {
        Message event;
        synchronized (this.pendingMessagesRefLock_) {
            event = this.pendingMessages_.getEvent(true);
        }
        return event;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Message getMessageNoBlock() {
        Message event;
        synchronized (this.pendingMessagesRefLock_) {
            try {
                event = this.pendingMessages_.getEvent(false);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return null;
            }
        }
        return event;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Message[] getAllMessages() {
        Message[] allEvents;
        synchronized (this.pendingMessagesRefLock_) {
            try {
                allEvents = this.pendingMessages_.getAllEvents(false);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return null;
            }
        }
        return allEvents;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Message[] getUpToMessages(int i) {
        Message[] events;
        try {
            synchronized (this.pendingMessagesRefLock_) {
                events = this.pendingMessages_.getEvents(i, false);
            }
            return events;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Message[] getAtLeastMessages(int i) {
        try {
            synchronized (this.pendingMessagesRefLock_) {
                if (this.pendingMessages_.getSize() < i) {
                    return null;
                }
                return this.pendingMessages_.getAllEvents(true);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return null;
        }
    }

    public int getErrorThreshold() {
        return this.errorThreshold_;
    }

    @Override // org.jacorb.notification.servant.AbstractProxy, org.jacorb.notification.interfaces.Disposable
    public final void dispose() {
        super.dispose();
        if (this.disposeTaskExecutor_ != null) {
            this.disposeTaskExecutor_.dispose();
        }
    }

    public final ConsumerAdmin MyAdmin() {
        return this.consumerAdmin_;
    }

    @Override // org.omg.CosNotifyComm.NotifySubscribeOperations
    public final void subscription_change(EventType[] eventTypeArr, EventType[] eventTypeArr2) throws InvalidEventType {
        this.subscriptionManager_.subscription_change(eventTypeArr, eventTypeArr2);
    }

    public final EventType[] obtain_offered_types(ObtainInfoMode obtainInfoMode) {
        EventType[] eventTypeArr = EMPTY_EVENT_TYPE_ARRAY;
        switch (obtainInfoMode.value()) {
            case 0:
                eventTypeArr = this.offerManager_.obtain_offered_types();
                removeListener();
                break;
            case 1:
                registerListener();
                eventTypeArr = this.offerManager_.obtain_offered_types();
                break;
            case 2:
                removeListener();
                break;
            case 3:
                registerListener();
                break;
            default:
                throw new IllegalArgumentException("Illegal ObtainInfoMode");
        }
        return eventTypeArr;
    }

    private void registerListener() {
        NotifyPublishOperations offerListener;
        if (this.proxyOfferListener_ != null || (offerListener = getOfferListener()) == null) {
            return;
        }
        this.proxyOfferListener_ = new NotifyPublishOperations(this, offerListener) { // from class: org.jacorb.notification.servant.AbstractProxySupplier.3
            private final NotifyPublishOperations val$_listener;
            private final AbstractProxySupplier this$0;

            {
                this.this$0 = this;
                this.val$_listener = offerListener;
            }

            @Override // org.omg.CosNotifyComm.NotifyPublishOperations
            public void offer_change(EventType[] eventTypeArr, EventType[] eventTypeArr2) {
                try {
                    this.val$_listener.offer_change(eventTypeArr, eventTypeArr2);
                } catch (NO_IMPLEMENT e) {
                    this.this$0.logger_.info("disable offer_change for connected Consumer.", e);
                    this.this$0.removeListener();
                } catch (InvalidEventType e2) {
                    this.this$0.logger_.error("invalid event type", e2);
                } catch (Exception e3) {
                    this.this$0.logger_.error("offer_change failed", e3);
                }
            }
        };
        this.offerManager_.addListener(this.proxyOfferListener_);
    }

    @Override // org.jacorb.notification.servant.AbstractProxy
    protected void removeListener() {
        if (this.proxyOfferListener_ != null) {
            this.offerManager_.removeListener(this.proxyOfferListener_);
            this.proxyOfferListener_ = null;
        }
    }

    final NotifyPublishOperations getOfferListener() {
        return this.offerListener_;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.jacorb.notification.servant.AbstractProxy
    public void connectClient(Object object) {
        super.connectClient(object);
        try {
            this.offerListener_ = NotifyPublishHelper.narrow(object);
            this.logger_.debug("successfully narrowed connecting Client to IF NotifyPublish");
        } catch (Throwable th) {
            this.logger_.info("disable offer_change for connecting Consumer");
        }
    }

    @Override // org.jacorb.notification.interfaces.MessageConsumer
    public synchronized void enableDelivery() {
        this.enabled_ = true;
    }

    @Override // org.jacorb.notification.interfaces.MessageConsumer
    public synchronized void disableDelivery() {
        this.enabled_ = false;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized boolean isEnabled() {
        return this.enabled_;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static AbstractProxySupplier newProxyPullSupplier(AbstractAdmin abstractAdmin, ClientType clientType) {
        AbstractProxySupplier sequenceProxyPullSupplierImpl;
        switch (clientType.value()) {
            case 0:
                sequenceProxyPullSupplierImpl = new ProxyPullSupplierImpl();
                break;
            case 1:
                sequenceProxyPullSupplierImpl = new StructuredProxyPullSupplierImpl();
                break;
            case 2:
                sequenceProxyPullSupplierImpl = new SequenceProxyPullSupplierImpl();
                break;
            default:
                throw new BAD_PARAM();
        }
        abstractAdmin.getChannelContext().resolveDependencies(sequenceProxyPullSupplierImpl);
        sequenceProxyPullSupplierImpl.consumerAdmin_ = ConsumerAdminHelper.narrow(abstractAdmin.activate());
        sequenceProxyPullSupplierImpl.configure(((ORB) abstractAdmin.getORB()).getConfiguration());
        return sequenceProxyPullSupplierImpl;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static AbstractProxySupplier newProxyPushSupplier(AbstractAdmin abstractAdmin, ClientType clientType) {
        AbstractProxySupplier sequenceProxyPushSupplierImpl;
        switch (clientType.value()) {
            case 0:
                sequenceProxyPushSupplierImpl = new ProxyPushSupplierImpl();
                break;
            case 1:
                sequenceProxyPushSupplierImpl = new StructuredProxyPushSupplierImpl();
                break;
            case 2:
                sequenceProxyPushSupplierImpl = new SequenceProxyPushSupplierImpl();
                break;
            default:
                throw new BAD_PARAM(new StringBuffer().append("The ClientType: ").append(clientType.value()).append(" is unknown").toString());
        }
        abstractAdmin.getChannelContext().resolveDependencies(sequenceProxyPushSupplierImpl);
        sequenceProxyPushSupplierImpl.consumerAdmin_ = ConsumerAdminHelper.narrow(abstractAdmin.activate());
        sequenceProxyPushSupplierImpl.configure(((ORB) abstractAdmin.getORB()).getConfiguration());
        return sequenceProxyPushSupplierImpl;
    }

    public boolean isPushSupplier() {
        switch (MyType().value()) {
            case 0:
            case 2:
            case 4:
            case 6:
                return true;
            case 1:
            case 3:
            case 5:
            default:
                return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleFailedPushOperation(PushOperation pushOperation, Throwable th) {
        if (RetryStrategy.isFatalException(th)) {
            if (this.logger_.isErrorEnabled()) {
                this.logger_.error(new StringBuffer().append("push raised ").append(th).append(": will destroy ProxySupplier, ").append("disconnect Consumer").toString(), th);
            }
            pushOperation.dispose();
            dispose();
            return;
        }
        RetryStrategy retryStrategy = getRetryStrategy(this, pushOperation);
        try {
            retryStrategy.retry();
        } catch (RetryException e) {
            this.logger_.error("retry failed", e);
            retryStrategy.dispose();
            dispose();
        }
    }

    private RetryStrategy getRetryStrategy(MessageConsumer messageConsumer, PushOperation pushOperation) {
        return new TaskProcessorRetryStrategy(messageConsumer, pushOperation, getTaskProcessor());
    }

    @Override // org.jacorb.notification.interfaces.MessageConsumer
    public boolean isRetryAllowed() {
        return !isDisposed() && getErrorCounter() < getErrorThreshold();
    }
}
