Class ProtonAbstractReceiver

java.lang.Object
org.apache.activemq.artemis.protocol.amqp.proton.ProtonInitializable
org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver
All Implemented Interfaces:
ProtonDeliveryHandler
Direct Known Subclasses:
AMQPFederationCommandProcessor, AMQPFederationEventProcessor, AMQPMirrorControllerTarget, ProtonServerReceiverContext

public abstract class ProtonAbstractReceiver extends ProtonInitializable implements ProtonDeliveryHandler
  • Field Details

    • connection

      protected final AMQPConnectionContext connection
    • protonSession

      protected final AMQPSessionContext protonSession
    • receiver

      protected final org.apache.qpid.proton.engine.Receiver receiver
    • minLargeMessageSize

      protected final int minLargeMessageSize
    • routingContext

      protected final org.apache.activemq.artemis.core.server.RoutingContext routingContext
    • sessionSPI

      protected final AMQPSessionCallback sessionSPI
    • standardMessageReader

      protected final MessageReader standardMessageReader
    • largeMessageReader

      protected final MessageReader largeMessageReader
    • creditRunnable

      protected final Runnable creditRunnable
    • useModified

      protected final boolean useModified
    • drainCreditOnNoSpace

      protected final boolean drainCreditOnNoSpace
    • drainTimeout

      protected final long drainTimeout
    • creditTopUpRunner

      protected final Runnable creditTopUpRunner
    • messageReader

      protected volatile MessageReader messageReader
    • pendingSettles

      protected int pendingSettles
    • state

      protected volatile ProtonAbstractReceiver.ReceiverState state
    • pendingStop

      protected BiConsumer<ProtonAbstractReceiver, Boolean> pendingStop
    • pendingQuiesceTimeout

      protected ScheduledFuture<?> pendingQuiesceTimeout
    • coreMessageReader

      protected MessageReader coreMessageReader
    • coreLargeMessageReader

      protected MessageReader coreLargeMessageReader
    • coreTunnelingEnabled

      protected boolean coreTunnelingEnabled
  • Constructor Details

  • Method Details

    • getSessionContext

      public AMQPSessionContext getSessionContext()
    • start

      public void start()
      Starts the receiver if not already started which triggers a flow of credit to the remote to begin the processing of incoming messages. This must be called on the connection thread and will throw and exception if not.
      Throws:
      IllegalStateException - if not called from the connection thread or is closed or stopping.
    • stop

      public void stop(int stopTimeout, BiConsumer<ProtonAbstractReceiver, Boolean> onStopped)
      Stop the receiver from granting additional credit and drains any granted credit from the link already. If any pending settles or queued message remain in the work queue then the stop occurs asynchronously and the stop callback is signaled later otherwise it will be triggered on the current thread to avoid state changes from making an asynchronous call invalid. The stop call allows a timeout to be specified which will signal the stopped consumer if the timeout elapses and leaves the receiver in the stopping state which does not allow for a restart.
      Parameters:
      stopTimeout - A time in milliseconds to wait for the stop to complete before considering it as having failed.
      onStopped - A consumer that is signaled once the receiver has stopped or the timeout elapsed.
      Throws:
      IllegalStateException - if the receiver is currently in the stopping state.
    • close

      public void close(boolean remoteLinkClose) throws ActiveMQAMQPException
      Specified by:
      close in interface ProtonDeliveryHandler
      Throws:
      ActiveMQAMQPException
    • close

      public void close(org.apache.qpid.proton.amqp.transport.ErrorCondition condition) throws ActiveMQAMQPException
      Specified by:
      close in interface ProtonDeliveryHandler
      Throws:
      ActiveMQAMQPException
    • getConnection

      public AMQPConnectionContext getConnection()
    • isStarted

      public boolean isStarted()
    • isDraining

      public boolean isDraining()
    • isStopping

      public boolean isStopping()
    • isStopped

      public boolean isStopped()
    • isClosed

      public boolean isClosed()
    • isBusy

      public boolean isBusy()
    • recoverContext

      protected org.apache.activemq.artemis.core.persistence.OperationContext recoverContext()
      Set the proper operation context in the Thread Local. Return the old context
    • closeCurrentReader

      protected void closeCurrentReader()
    • isUseModifiedForTransientDeliveryErrors

      protected boolean isUseModifiedForTransientDeliveryErrors(AMQPConnectionContext connection)
      Should the receiver send an AMQP Modified disposition with delivery failed set to true for address full errors instead of the Rejected disposition it would by default.
      Parameters:
      connection - The connection context that this receiver instance is bound to
      Returns:
      true if the receiver send a modified outcome and false for rejected outcomes.
    • isDrainOnTransientDeliveryErrors

      protected boolean isDrainOnTransientDeliveryErrors(AMQPConnectionContext connection)
      Should the receiver drain link credit when a transient delivery error occurs, this allows subclass implementations to override the defaults set on the connection level.
      Parameters:
      connection - The connection context that this receiver instance is bound to
      Returns:
      true if transient delivery errors should be handled by draining link credit from the remote sender.
    • getLinkQuiesceTimeout

      protected int getLinkQuiesceTimeout(AMQPConnectionContext connection)
      Gets the time in milliseconds that the receiver should wait before it considers a link quiesce attempt to have failed and act to close the link with an error, this allows subclass implementations to override the defaults set on the connection level.
      Parameters:
      connection - The connection context that this receiver instance is bound to
      Returns:
      the time in milliseconds to wait for remote sender once a link quiesce is initiated by this peer.
    • createCreditRunnable

      protected Runnable createCreditRunnable(AMQPConnectionContext connection)
      Subclass can override this to provide a custom credit runnable that performs other checks or applies credit in a manner more fitting that implementation.
      Parameters:
      connection - The AMQPConnectionContext that this resource falls under.
      Returns:
      a Runnable that will perform the actual credit granting operation
    • getConfiguredMinLargeMessageSize

      protected int getConfiguredMinLargeMessageSize(AMQPConnectionContext connection)
      Subclass can override this to provide the minimum large message size that should be used when creating receiver instances.
      Parameters:
      connection - The AMQPConnectionContext that this resource falls under.
      Returns:
      the minimum large message size configuration value for this receiver
    • createCreditRunnable

      public static Runnable createCreditRunnable(int refill, int threshold, org.apache.qpid.proton.engine.Receiver receiver, AMQPConnectionContext connection, ProtonAbstractReceiver context)
      This Credit Runnable can be used to manage the credit replenishment of a target AMQP receiver.
      Parameters:
      refill - The number of credit to top off the receiver to
      threshold - The low water mark for credit before refill is done
      receiver - The proton receiver that will have its credit refilled
      connection - The connection that own the receiver
      context - The context that will be associated with the receiver
      Returns:
      A new Runnable that can be used to keep receiver credit replenished
    • incrementSettle

      public void incrementSettle()
    • settle

      public void settle(org.apache.qpid.proton.engine.Delivery settlement)
    • onFlow

      public void onFlow(int credits, boolean drain)
      Specified by:
      onFlow in interface ProtonDeliveryHandler
    • trySelectMessageReader

      protected MessageReader trySelectMessageReader(org.apache.qpid.proton.engine.Receiver receiver, org.apache.qpid.proton.engine.Delivery delivery)
    • onMessage

      public final void onMessage(org.apache.qpid.proton.engine.Delivery delivery) throws ActiveMQAMQPException
      called when Proton receives a message to be delivered via a Delivery. This may be called more than once per deliver so we have to cache the buffer until we have received it all.
      Specified by:
      onMessage in interface ProtonDeliveryHandler
      Throws:
      ActiveMQAMQPException
    • onMessageComplete

      public void onMessageComplete(org.apache.qpid.proton.engine.Delivery delivery, Message message, org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations deliveryAnnotations)
    • onExceptionWhileReading

      public void onExceptionWhileReading(Throwable e)
    • deliveryFailed

      public void deliveryFailed(org.apache.qpid.proton.engine.Delivery delivery, org.apache.qpid.proton.engine.Receiver receiver, Exception e)
    • getAddressInUse

      protected abstract SimpleString getAddressInUse()
      Returns either the fixed address assigned to this sender, or the last address used by an anonymous relay sender; if this is an anonymous relay and no send has occurred then this method returns null.
      Returns:
      either the fixed address assigned to this sender, or the last address used by an anonymous relay sender; if this is an anonymous relay and no send has occurred then this method returns null
    • actualDelivery

      protected abstract void actualDelivery(Message message, org.apache.qpid.proton.engine.Delivery delivery, org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations deliveryAnnotations, org.apache.qpid.proton.engine.Receiver receiver, org.apache.activemq.artemis.core.transaction.Transaction tx)
      Perform the actual message processing for an inbound message. The subclass either consumes and settles the message in place or hands it off to another intermediary who is responsible for eventually settling the newly read message.
      Parameters:
      message - The message as provided from the remote or after local transformation by subclass.
      delivery - The proton delivery where the message bytes where read from
      deliveryAnnotations - The delivery annotations if present that accompanied the incoming message.
      receiver - The proton receiver that represents the link over which the message was sent.
      tx - The transaction under which the incoming message was sent.
    • topUpCreditIfNeeded

      protected final void topUpCreditIfNeeded()
      Final credit top up request API that will trigger a credit top up if the receiver is in a state where a grant of additional receiver credit is allowable.
    • doCreditTopUpRun

      protected void doCreditTopUpRun()
      Performs the actual credit top up logic for the receiver.

      This can be overridden in the subclass to run its own logic for credit top up instead of using the default logic used in this abstract base.

    • enableCoreTunneling

      protected void enableCoreTunneling()
    • failIfCoreTunnelNotEnabled

      protected void failIfCoreTunnelNotEnabled()
    • isAddressFull

      protected static boolean isAddressFull(Exception e)
    • outcomeSupported

      protected static boolean outcomeSupported(org.apache.qpid.proton.amqp.messaging.Source source, org.apache.qpid.proton.amqp.Symbol outcome)
    • getEffectiveDefaultOutcome

      protected static org.apache.qpid.proton.amqp.messaging.Outcome getEffectiveDefaultOutcome(org.apache.qpid.proton.amqp.messaging.Source source)
    • isBellowThreshold

      public static boolean isBellowThreshold(int credit, int pending, int threshold)
    • calculatedUpdateRefill

      public static int calculatedUpdateRefill(int refill, int credits, int pending)