Class ProtonAbstractReceiver
java.lang.Object
org.apache.activemq.artemis.protocol.amqp.proton.ProtonInitializable
org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver
- All Implemented Interfaces:
ProtonDeliveryHandler
- Direct Known Subclasses:
AMQPFederationCommandProcessor, AMQPFederationEventProcessor, AMQPMirrorControllerTarget, ProtonServerReceiverContext
public abstract class ProtonAbstractReceiver
extends ProtonInitializable
implements ProtonDeliveryHandler
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionprotected static classThis servers as the default credit runnable which grants credit in batches based on a low water mark and a configured credit size to top the credit up to once the low water mark has been reached.protected static enum -
Field Summary
FieldsModifier and TypeFieldDescriptionprotected final AMQPConnectionContextprotected MessageReaderprotected MessageReaderprotected booleanprotected final Runnableprotected final Runnableprotected final booleanprotected final longprotected final MessageReaderprotected MessageReaderprotected final intprotected ScheduledFuture<?> protected intprotected BiConsumer<ProtonAbstractReceiver, Boolean> protected final AMQPSessionContextprotected final org.apache.qpid.proton.engine.Receiverprotected final org.apache.activemq.artemis.core.server.RoutingContextprotected final AMQPSessionCallbackprotected final MessageReaderprotected ProtonAbstractReceiver.ReceiverStateprotected final booleanFields inherited from class ProtonInitializable
initialized -
Constructor Summary
ConstructorsConstructorDescriptionProtonAbstractReceiver(AMQPSessionCallback sessionSPI, AMQPConnectionContext connection, AMQPSessionContext protonSession, org.apache.qpid.proton.engine.Receiver receiver) -
Method Summary
Modifier and TypeMethodDescriptionprotected abstract voidactualDelivery(Message message, org.apache.qpid.proton.engine.Delivery delivery, org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations deliveryAnnotations, org.apache.qpid.proton.engine.Receiver receiver, org.apache.activemq.artemis.core.transaction.Transaction tx) Perform the actual message processing for an inbound message.static intcalculatedUpdateRefill(int refill, int credits, int pending) voidclose(boolean remoteLinkClose) voidclose(org.apache.qpid.proton.amqp.transport.ErrorCondition condition) protected voidstatic RunnablecreateCreditRunnable(int refill, int threshold, org.apache.qpid.proton.engine.Receiver receiver, AMQPConnectionContext connection, ProtonAbstractReceiver context) This Credit Runnable can be used to manage the credit replenishment of a target AMQP receiver.protected RunnablecreateCreditRunnable(AMQPConnectionContext connection) Subclass can override this to provide a custom credit runnable that performs other checks or applies credit in a manner more fitting that implementation.voiddeliveryFailed(org.apache.qpid.proton.engine.Delivery delivery, org.apache.qpid.proton.engine.Receiver receiver, Exception e) protected voidPerforms the actual credit top up logic for the receiver.protected voidprotected voidprotected abstract SimpleStringReturns either the fixed address assigned to this sender, or the last address used by an anonymous relay sender; if this is an anonymous relay and no send has occurred then this method returnsnull.protected intSubclass can override this to provide the minimum large message size that should be used when creating receiver instances.protected static org.apache.qpid.proton.amqp.messaging.OutcomegetEffectiveDefaultOutcome(org.apache.qpid.proton.amqp.messaging.Source source) protected intgetLinkQuiesceTimeout(AMQPConnectionContext connection) Gets the time in milliseconds that the receiver should wait before it considers a link quiesce attempt to have failed and act to close the link with an error, this allows subclass implementations to override the defaults set on the connection level.voidprotected static booleanstatic booleanisBellowThreshold(int credit, int pending, int threshold) booleanisBusy()booleanisClosed()booleanprotected booleanShould the receiver drain link credit when a transient delivery error occurs, this allows subclass implementations to override the defaults set on the connection level.booleanbooleanbooleanprotected booleanShould the receiver send an AMQP Modified disposition with delivery failed set to true for address full errors instead of the Rejected disposition it would by default.voidvoidonFlow(int credits, boolean drain) final voidonMessage(org.apache.qpid.proton.engine.Delivery delivery) called when Proton receives a message to be delivered via a Delivery.voidonMessageComplete(org.apache.qpid.proton.engine.Delivery delivery, Message message, org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations deliveryAnnotations) protected static booleanoutcomeSupported(org.apache.qpid.proton.amqp.messaging.Source source, org.apache.qpid.proton.amqp.Symbol outcome) protected org.apache.activemq.artemis.core.persistence.OperationContextSet the proper operation context in the Thread Local.voidsettle(org.apache.qpid.proton.engine.Delivery settlement) voidstart()Starts the receiver if not already started which triggers a flow of credit to the remote to begin the processing of incoming messages.voidstop(int stopTimeout, BiConsumer<ProtonAbstractReceiver, Boolean> onStopped) Stop the receiver from granting additional credit and drains any granted credit from the link already.protected final voidFinal credit top up request API that will trigger a credit top up if the receiver is in a state where a grant of additional receiver credit is allowable.protected MessageReadertrySelectMessageReader(org.apache.qpid.proton.engine.Receiver receiver, org.apache.qpid.proton.engine.Delivery delivery) Methods inherited from class ProtonInitializable
initialize, isInitialized
-
Field Details
-
connection
-
protonSession
-
receiver
protected final org.apache.qpid.proton.engine.Receiver receiver -
minLargeMessageSize
protected final int minLargeMessageSize -
routingContext
protected final org.apache.activemq.artemis.core.server.RoutingContext routingContext -
sessionSPI
-
standardMessageReader
-
largeMessageReader
-
creditRunnable
-
useModified
protected final boolean useModified -
drainCreditOnNoSpace
protected final boolean drainCreditOnNoSpace -
drainTimeout
protected final long drainTimeout -
creditTopUpRunner
-
messageReader
-
pendingSettles
protected int pendingSettles -
state
-
pendingStop
-
pendingQuiesceTimeout
-
coreMessageReader
-
coreLargeMessageReader
-
coreTunnelingEnabled
protected boolean coreTunnelingEnabled
-
-
Constructor Details
-
ProtonAbstractReceiver
public ProtonAbstractReceiver(AMQPSessionCallback sessionSPI, AMQPConnectionContext connection, AMQPSessionContext protonSession, org.apache.qpid.proton.engine.Receiver receiver)
-
-
Method Details
-
getSessionContext
-
start
public void start()Starts the receiver if not already started which triggers a flow of credit to the remote to begin the processing of incoming messages. This must be called on the connection thread and will throw and exception if not.- Throws:
IllegalStateException- if not called from the connection thread or is closed or stopping.
-
stop
Stop the receiver from granting additional credit and drains any granted credit from the link already. If any pending settles or queued message remain in the work queue then the stop occurs asynchronously and the stop callback is signaled later otherwise it will be triggered on the current thread to avoid state changes from making an asynchronous call invalid. The stop call allows a timeout to be specified which will signal the stopped consumer if the timeout elapses and leaves the receiver in the stopping state which does not allow for a restart.- Parameters:
stopTimeout- A time in milliseconds to wait for the stop to complete before considering it as having failed.onStopped- A consumer that is signaled once the receiver has stopped or the timeout elapsed.- Throws:
IllegalStateException- if the receiver is currently in the stopping state.
-
close
- Specified by:
closein interfaceProtonDeliveryHandler- Throws:
ActiveMQAMQPException
-
close
public void close(org.apache.qpid.proton.amqp.transport.ErrorCondition condition) throws ActiveMQAMQPException - Specified by:
closein interfaceProtonDeliveryHandler- Throws:
ActiveMQAMQPException
-
getConnection
-
isStarted
public boolean isStarted() -
isDraining
public boolean isDraining() -
isStopping
public boolean isStopping() -
isStopped
public boolean isStopped() -
isClosed
public boolean isClosed() -
isBusy
public boolean isBusy() -
recoverContext
protected org.apache.activemq.artemis.core.persistence.OperationContext recoverContext()Set the proper operation context in the Thread Local. Return the old context -
closeCurrentReader
protected void closeCurrentReader() -
isUseModifiedForTransientDeliveryErrors
Should the receiver send an AMQP Modified disposition with delivery failed set to true for address full errors instead of the Rejected disposition it would by default.- Parameters:
connection- The connection context that this receiver instance is bound to- Returns:
trueif the receiver send a modified outcome and false for rejected outcomes.
-
isDrainOnTransientDeliveryErrors
Should the receiver drain link credit when a transient delivery error occurs, this allows subclass implementations to override the defaults set on the connection level.- Parameters:
connection- The connection context that this receiver instance is bound to- Returns:
- true if transient delivery errors should be handled by draining link credit from the remote sender.
-
getLinkQuiesceTimeout
Gets the time in milliseconds that the receiver should wait before it considers a link quiesce attempt to have failed and act to close the link with an error, this allows subclass implementations to override the defaults set on the connection level.- Parameters:
connection- The connection context that this receiver instance is bound to- Returns:
- the time in milliseconds to wait for remote sender once a link quiesce is initiated by this peer.
-
createCreditRunnable
Subclass can override this to provide a custom credit runnable that performs other checks or applies credit in a manner more fitting that implementation.- Parameters:
connection- TheAMQPConnectionContextthat this resource falls under.- Returns:
- a
Runnablethat will perform the actual credit granting operation
-
getConfiguredMinLargeMessageSize
Subclass can override this to provide the minimum large message size that should be used when creating receiver instances.- Parameters:
connection- TheAMQPConnectionContextthat this resource falls under.- Returns:
- the minimum large message size configuration value for this receiver
-
createCreditRunnable
public static Runnable createCreditRunnable(int refill, int threshold, org.apache.qpid.proton.engine.Receiver receiver, AMQPConnectionContext connection, ProtonAbstractReceiver context) This Credit Runnable can be used to manage the credit replenishment of a target AMQP receiver.- Parameters:
refill- The number of credit to top off the receiver tothreshold- The low water mark for credit before refill is donereceiver- The proton receiver that will have its credit refilledconnection- The connection that own the receivercontext- The context that will be associated with the receiver- Returns:
- A new Runnable that can be used to keep receiver credit replenished
-
incrementSettle
public void incrementSettle() -
settle
public void settle(org.apache.qpid.proton.engine.Delivery settlement) -
onFlow
public void onFlow(int credits, boolean drain) - Specified by:
onFlowin interfaceProtonDeliveryHandler
-
trySelectMessageReader
protected MessageReader trySelectMessageReader(org.apache.qpid.proton.engine.Receiver receiver, org.apache.qpid.proton.engine.Delivery delivery) -
onMessage
public final void onMessage(org.apache.qpid.proton.engine.Delivery delivery) throws ActiveMQAMQPException called when Proton receives a message to be delivered via a Delivery. This may be called more than once per deliver so we have to cache the buffer until we have received it all.- Specified by:
onMessagein interfaceProtonDeliveryHandler- Throws:
ActiveMQAMQPException
-
onMessageComplete
public void onMessageComplete(org.apache.qpid.proton.engine.Delivery delivery, Message message, org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations deliveryAnnotations) -
onExceptionWhileReading
-
deliveryFailed
public void deliveryFailed(org.apache.qpid.proton.engine.Delivery delivery, org.apache.qpid.proton.engine.Receiver receiver, Exception e) -
getAddressInUse
Returns either the fixed address assigned to this sender, or the last address used by an anonymous relay sender; if this is an anonymous relay and no send has occurred then this method returnsnull.- Returns:
- either the fixed address assigned to this sender, or the last address used by an anonymous relay sender;
if this is an anonymous relay and no send has occurred then this method returns
null
-
actualDelivery
protected abstract void actualDelivery(Message message, org.apache.qpid.proton.engine.Delivery delivery, org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations deliveryAnnotations, org.apache.qpid.proton.engine.Receiver receiver, org.apache.activemq.artemis.core.transaction.Transaction tx) Perform the actual message processing for an inbound message. The subclass either consumes and settles the message in place or hands it off to another intermediary who is responsible for eventually settling the newly read message.- Parameters:
message- The message as provided from the remote or after local transformation by subclass.delivery- The proton delivery where the message bytes where read fromdeliveryAnnotations- The delivery annotations if present that accompanied the incoming message.receiver- The proton receiver that represents the link over which the message was sent.tx- The transaction under which the incoming message was sent.
-
topUpCreditIfNeeded
protected final void topUpCreditIfNeeded()Final credit top up request API that will trigger a credit top up if the receiver is in a state where a grant of additional receiver credit is allowable. -
doCreditTopUpRun
protected void doCreditTopUpRun()Performs the actual credit top up logic for the receiver.This can be overridden in the subclass to run its own logic for credit top up instead of using the default logic used in this abstract base.
-
enableCoreTunneling
protected void enableCoreTunneling() -
failIfCoreTunnelNotEnabled
protected void failIfCoreTunnelNotEnabled() -
isAddressFull
-
outcomeSupported
protected static boolean outcomeSupported(org.apache.qpid.proton.amqp.messaging.Source source, org.apache.qpid.proton.amqp.Symbol outcome) -
getEffectiveDefaultOutcome
protected static org.apache.qpid.proton.amqp.messaging.Outcome getEffectiveDefaultOutcome(org.apache.qpid.proton.amqp.messaging.Source source) -
isBellowThreshold
public static boolean isBellowThreshold(int credit, int pending, int threshold) -
calculatedUpdateRefill
public static int calculatedUpdateRefill(int refill, int credits, int pending)
-