pallet_message_queue

Branch/Release: release-polkadot-v1.10.0

Source Code

Purpose

Flexible FRAME pallet for implementing message queues. This pallet can also initiate message processing using the MessageProcessor (see Config).

Config

  • Pallet-specific configs:

    • MessageProcessor — Processor for messages

    • Size — Page/heap size type.

    • QueueChangeHandler — Code to be called when a message queue changes - either with items introduced or removed.

    • QueuePausedQuery — Queried by the pallet to check whether a queue can be serviced.

    • HeapSize — The size of the page; this also serves as the maximum message size which can be sent.

    • MaxStale — The maximum number of stale pages (i.e. of overweight messages) allowed before culling can happen. Once there are more stale pages than this, then historical pages may be dropped, even if they contain unprocessed overweight messages.

    • ServiceWeight — The amount of weight (if any) which should be provided to the message queue for servicing enqueued items on_initialize. This may be legitimately None in the case that you will call ServiceQueues::service_queues manually or set [Self::IdleMaxServiceWeight] to have it run in on_idle.

    • IdleMaxServiceWeight — The maximum amount of weight (if any) to be used from remaining weight on_idle which should be provided to the message queue for servicing enqueued items on_idle. Useful for parachains to process messages at the same block they are received. If None, it will not call ServiceQueues::service_queues in on_idle.

  • Common configs:

    • RuntimeEvent — The overarching event type.

    • WeightInfo — Weight information for extrinsics in this pallet.

Dispatchables

execute_overweight

pub fn execute_overweight(
    origin: OriginFor<T>,
    message_origin: MessageOriginOf<T>,
    page: PageIndex,
    index: T::Size,
    weight_limit: Weight,
) -> DispatchResultWithPostInfo

Execute an overweight message.

Temporary processing errors will be propagated whereas permanent errors are treated as success condition.
The weight_limit passed to this function does not affect the weight_limit set in other parts of the pallet.

Params:

  • param1: Type1 — description of the parameter

  • origin: OriginFor<T> — Must be Signed.

  • message_origin: MessageOriginOf<T> — indicates where the message to be executed arrived from (used for finding the respective queue that this message belongs to).

  • page: PageIndex — The page in the queue in which the message to be executed is sitting.

  • index: T::Size — The index into the queue of the message to be executed.

  • weight_limit: Weight — The maximum amount of weight allowed to be consumed in the execution of the message. This weight limit does not affect other parts of the pallet, and it is only used for this call of execute_overweight.

Errors:

  • QueuePaused — if the queue that overweight message to be executed belongs to is paused.

  • NoPage — if the page that overweight message to be executed belongs to does not exist.

  • NoMessage — if the overweight message could not be found.

  • Queued — if the overweight message is already scheduled for future execution. For a message to be labeled as overweight, the pallet must have previously attempted execution and encountered failure due to insufficient weight for processing. Once marked as overweight, the message is excluded from the queue for future execution.

  • AlreadyProcessed — if the overweight message is already processed.

  • InsufficientWeight — if the weight_limit is not enough to execute the overweight message.

  • TemporarilyUnprocessable — if the message processor `Yield`s execution of this message. This means processing should be reattempted later.

Events:

  • ProcessingFailed(id, origin, error)

  • Processed(id, origin, weight_used, success)

reap_page

pub fn reap_page(
    origin: OriginFor<T>,
    message_origin: MessageOriginOf<T>,
    page_index: PageIndex,
) -> DispatchResult

Remove a page which has no more messages remaining to be processed or is stale.

Params:

  • param1: Type1 — description of the parameter

  • origin: OriginFor<T> — Must be Signed.

  • message_origin: MessageOriginOf<T> — indicates where the messages arrived from (used for finding the respective queue that this page belongs to).

  • page_index: PageIndex — The page to be reaped

Errors:

  • NotReapable — if the page is not stale yet.

  • NoPage — if the page does not exist.

Events:

  • PageReaped(origin, index) — the queue (origin), and the index of the page

Important Mentions and FAQ’s

The pallet utilizes the [sp_weights::WeightMeter] to manually track its consumption to always stay within the required limit. This implies that the message processor hook can calculate the weight of a message without executing it.

How does this pallet work under the hood?

  • This pallet utilizes queues to store, enqueue, dequeue, and process messages.

  • Queues are stored in BookStateFor storage, with their origin serving as the key (so, we can identify queues by their origins).

  • Each message has an origin (message_origin), that defines into which queue the message will be stored.

  • Messages are stored by being appended to the last Page of the Queue’s Book. A Queue is a book along with the MessageOrigin for that book.

  • Each book keeps track of its pages, and the state (begin, end, count, etc.)

  • Each page also keeps track of its messages, and the state (remaining, first, last, etc.)

  • ReadyRing contains all ready queues as a double-linked list. A Queue is ready if it contains at least one Message which can be processed.

  • ServiceHead is a pointer to the ReadyRing, pointing at the next Queue to be serviced. Service means: attempting to process the messages.

Execution:

  • service_queues → returns the weight that is consumed by this function

    • we will process a queue, till either:

      • there is no more message left

        • if there is no more message left in the queue, we won’t stop, service_head will proceed with the next queue

      • or weight is insufficient

        • if weight is insufficient for the next message in the queue, service_head will try to switch to next queue, and try to process message from that queue. This will go on, until it visits every queue, and no message can be processed. Only then, it will stop.

    • each call to service_queues, we will bump the header, and start processing the next queue instead of the previous one to prevent starvation

      • Example:

        • service head is on queue 2

        • we called service_queues, which bumped the service head to queue 3

        • we processed messages from queue 3,

          • but weight was insufficient for the next message in queue 3,

          • so we switched to queue 4, (we don’t bump the service head for that)

          • weight was insufficient for queue 4 and other queues as well, and we made a round trip across queues, till we reach queue 3, and we stopped.

        • service_queues call finished

        • service head is on queue 3

        • we called service_queue again, which bumped the service head to queue 4 (although there are still messages left in queue 3)

        • we continue processing from queue 4.

      • but, to preserve priority, if we made a switch to a new queue due to weight, we don’t bump the service head. So, the next call, will be starting on the queue where we left off.

      • Example:

        • service head is on queue 2

        • we called service_queues, which bumped the service head to queue 3

        • we processed messages from queue 3,

          • but weight was insufficient for the next message in queue 3,

          • so we switched to queue 4, (we don’t bump the service head for that)

          • we processed a message from queue 4

          • weight was insufficient for queue 4 and other queues as well, and we made a round trip across queues, till we reach queue 3, and we stopped.

        • service_queues call finished

        • service head is on queue 3 (there are still messages in queue 3)

        • we called service_queue again, which bumped the service head to queue 4

        • we continue processing from queue 4, although we were processing queue 4 in the last call