pallet_message_queue
Purpose
Flexible FRAME pallet for implementing message queues. This pallet can also initiate message processing using the MessageProcessor
(see Config
).
Config
-
Pallet-specific configs:
-
MessageProcessor
— Processor for messages -
Size
— Page/heap size type. -
QueueChangeHandler
— Code to be called when a message queue changes - either with items introduced or removed. -
QueuePausedQuery
— Queried by the pallet to check whether a queue can be serviced. -
HeapSize
— The size of the page; this also serves as the maximum message size which can be sent. -
MaxStale
— The maximum number of stale pages (i.e. of overweight messages) allowed before culling can happen. Once there are more stale pages than this, then historical pages may be dropped, even if they contain unprocessed overweight messages. -
ServiceWeight
— The amount of weight (if any) which should be provided to the message queue for servicing enqueued itemson_initialize
. This may be legitimatelyNone
in the case that you will callServiceQueues::service_queues
manually or set [Self::IdleMaxServiceWeight
] to have it run inon_idle
. -
IdleMaxServiceWeight
— The maximum amount of weight (if any) to be used from remaining weighton_idle
which should be provided to the message queue for servicing enqueued itemson_idle
. Useful for parachains to process messages at the same block they are received. IfNone
, it will not callServiceQueues::service_queues
inon_idle
.
-
-
Common configs:
-
RuntimeEvent
— The overarching event type. -
WeightInfo
— Weight information for extrinsics in this pallet.
-
Dispatchables
execute_overweight
pub fn execute_overweight(
origin: OriginFor<T>,
message_origin: MessageOriginOf<T>,
page: PageIndex,
index: T::Size,
weight_limit: Weight,
) -> DispatchResultWithPostInfo
Execute an overweight message.
Temporary processing errors will be propagated whereas permanent errors are treated as success condition. |
The weight_limit passed to this function does not affect the weight_limit set in other parts of the pallet.
|
Params:
-
param1: Type1
— description of the parameter -
origin: OriginFor<T>
— Must beSigned
. -
message_origin: MessageOriginOf<T>
— indicates where the message to be executed arrived from (used for finding the respective queue that this message belongs to). -
page: PageIndex
— The page in the queue in which the message to be executed is sitting. -
index: T::Size
— The index into the queue of the message to be executed. -
weight_limit: Weight
— The maximum amount of weight allowed to be consumed in the execution of the message. This weight limit does not affect other parts of the pallet, and it is only used for this call ofexecute_overweight
.
Errors:
-
QueuePaused
— if the queue that overweight message to be executed belongs to is paused. -
NoPage
— if the page that overweight message to be executed belongs to does not exist. -
NoMessage
— if the overweight message could not be found. -
Queued
— if the overweight message is already scheduled for future execution. For a message to be labeled as overweight, the pallet must have previously attempted execution and encountered failure due to insufficient weight for processing. Once marked as overweight, the message is excluded from the queue for future execution. -
AlreadyProcessed
— if the overweight message is already processed. -
InsufficientWeight
— if theweight_limit
is not enough to execute the overweight message. -
TemporarilyUnprocessable
— if the message processor `Yield`s execution of this message. This means processing should be reattempted later.
Events:
-
ProcessingFailed(id, origin, error)
-
Processed(id, origin, weight_used, success)
reap_page
pub fn reap_page(
origin: OriginFor<T>,
message_origin: MessageOriginOf<T>,
page_index: PageIndex,
) -> DispatchResult
Remove a page which has no more messages remaining to be processed or is stale.
Params:
-
param1: Type1
— description of the parameter -
origin: OriginFor<T>
— Must beSigned
. -
message_origin: MessageOriginOf<T>
— indicates where the messages arrived from (used for finding the respective queue that this page belongs to). -
page_index: PageIndex
— The page to be reaped
Errors:
-
NotReapable
— if the page is not stale yet. -
NoPage
— if the page does not exist.
Events:
-
PageReaped(origin, index)
— the queue (origin), and the index of the page
Important Mentions and FAQ’s
The pallet utilizes the [sp_weights::WeightMeter ] to manually track its consumption to always stay within
the required limit. This implies that the message processor hook can calculate the weight of a message without executing it.
|
How does this pallet work under the hood?
-
This pallet utilizes queues to store, enqueue, dequeue, and process messages.
-
Queues are stored in
BookStateFor
storage, with their origin serving as the key (so, we can identify queues by their origins). -
Each message has an origin (message_origin), that defines into which queue the message will be stored.
-
Messages are stored by being appended to the last
Page
of the Queue’s Book. A Queue is a book along with the MessageOrigin for that book. -
Each book keeps track of its pages, and the state (begin, end, count, etc.)
-
Each page also keeps track of its messages, and the state (remaining, first, last, etc.)
-
ReadyRing
contains all ready queues as a double-linked list. A Queue is ready if it contains at least one Message which can be processed. -
ServiceHead
is a pointer to theReadyRing
, pointing at the nextQueue
to be serviced. Service means: attempting to process the messages.
Execution:
-
service_queues
→ returns the weight that is consumed by this function-
we will process a queue, till either:
-
there is no more message left
-
if there is no more message left in the queue, we won’t stop, service_head will proceed with the next queue
-
-
or weight is insufficient
-
if weight is insufficient for the next message in the queue, service_head will try to switch to next queue, and try to process message from that queue. This will go on, until it visits every queue, and no message can be processed. Only then, it will stop.
-
-
-
each call to
service_queues
, we will bump the header, and start processing the next queue instead of the previous one to prevent starvation-
Example:
-
service head is on queue 2
-
we called
service_queues
, which bumped the service head to queue 3 -
we processed messages from queue 3,
-
but weight was insufficient for the next message in queue 3,
-
so we switched to queue 4, (we don’t bump the service head for that)
-
weight was insufficient for queue 4 and other queues as well, and we made a round trip across queues, till we reach queue 3, and we stopped.
-
-
service_queues
call finished -
service head is on queue 3
-
we called
service_queue
again, which bumped the service head to queue 4 (although there are still messages left in queue 3) -
we continue processing from queue 4.
-
-
but, to preserve priority, if we made a switch to a new queue due to weight, we don’t bump the service head. So, the next call, will be starting on the queue where we left off.
-
Example:
-
service head is on queue 2
-
we called
service_queues
, which bumped the service head to queue 3 -
we processed messages from queue 3,
-
but weight was insufficient for the next message in queue 3,
-
so we switched to queue 4, (we don’t bump the service head for that)
-
we processed a message from queue 4
-
weight was insufficient for queue 4 and other queues as well, and we made a round trip across queues, till we reach queue 3, and we stopped.
-
-
service_queues
call finished -
service head is on queue 3 (there are still messages in queue 3)
-
we called
service_queue
again, which bumped the service head to queue 4 -
we continue processing from queue 4, although we were processing queue 4 in the last call
-
-
-