.@ Tony Finch – blog


Following on from http://fanf.livejournal.com/65203.html I feel the need to write down some more details of my log-structured queue idea.

My approach has a "crash-only software" architecture. According to the abstract of that paper: "Crash-only programs crash safely and recover quickly. There is only one way to stop such software - by crashing it - and only one way to bring it up - by initiating recovery." So, no shutdown code.

Essentials:

The queue consists of a sequence of records of various types.

The most important record type is a message envelope, which contains everything you need to route a message: at a minimum the recipient addresses, though practicality demands rather more than that. The envelope also contains a reference to the spool file which contains the message data etc.

When a message is delivered the MTA must immediately update its recipient list on stable storage. (It has to be immediate in order to avoid duplicate deliveries when the MTA restarts after being stopped at the wrong point.) We don't want to have to rewrite the whole envelope for every delivery: it is big and complicated, whereas we want delivery records to be small and simple since there will often be more than one of them for each message (especially for bigger envelopes!) and they will be most of the fsync()ed write traffic on the queue. So delivery records are just deltas relative to envelope records.

Once the current set of delivery attempts for a message has been completed, we write a replacement envelope for the message, or if there are no recipients left, we need to record that all work on the message is finished. At this point the old envelope and all its updates have become garbage.

Unless the MTA has only one message in the queue, the old envelope, the delivery records, and the replacement envelope will all be separated from each other in the queue by records related to other messages. Therefore we need some way of linking them together so that we can restart quickly, without scanning the whole queue. The point of this article is how to solve this problem.

Details:

The procedure for delivering a message is as follows. A queue runner scans the queue until it encounters an envelope which is not marked as garbage, is past its retry time, and is not already being worked on. It then kicks off delivery of this message. (There is a queue runner for newly received messages as well as older messages so all deliveries start this way.) First write a start record to the queue to record that this message is being worked on and to link back to its envelope record. These start records are used to recover state after a restart. Then the envelope is marked as garbage. This is done early so that these garbage marks are added sequentially - it's common to finish working on messages in a different order. Then deliver the message, recording the results for each set of recipients (successful or otherwise) as a delivery record in the queue, with an fsync() for each set. When there's nothing left to do for the message, either write an updated envelope record (if it is to be retried) or a finish record. Finally, mark the start record as garbage, which implicitly marks its associated delivery records as garbage.

When we restart, the part of the queue that we can't avoid scanning is the "working suffix", which contains delivery records of messages which have not yet had their replacement envelopes written. The original envelopes of the these messages are marked as garbage, but they are still needed until their replacement is written, so they can't yet be discarded. These are the messages that the MTA was working on when it stopped, so we need to recover that working state, or at least the parts that still matter. The start of the working suffix is the earliest start record which has not been marked as garbage.

One thing we can avoid scanning for at startup is the appropriate positions of the queue runners in the queue. Every so often the MTA records the set of queue runner positions for use at a restart. Doing this more frequently means less wasted work at startup, but also means more overhead in normal operation. Similarly, it also records the start of the working suffix. The procedure is to find out the current restart positions, fsync() all the queue files, then write these restart positions to disk. There are multiple queue files because - like log files - the MTA switches to a new file every so often so that when old files become entirely garbage they can be deleted. The queue runners will be marking envelopes as garbage as they go, and since they will not all be scanning the current queue file, they can't benefit from that file's frequent fsync()s, hence the need for the extra fsync() to ensure that the restart positions correctly reflect the on-disk state in case of a crash. This procedure lags slightly behind the activity of the queue runners, which is OK because the restart positions don't need to be accurate: if they are out of date the only harm will be a little more garbage to scan after startup. The important feature is that the older portions of the queue before the restart pointers are consistent.

When the MTA is restarted it can immediately start receiving messages, since this just involves appending new envelope records to the queue. However deliveries have to wait. Having read the queue positions, the MTA starts scanning the working suffix. For each start record that isn't marked as garbage, ensure that the envelope it refers to is marked as garbage (it might not be if there was a crash) then add it to the list of working messages. Remember each delivery record that is associated with a working message. If we encounter a replacement envelope or finish record for a working message (again because of a crash), mark its start record as garbage and remove it from the working list. When we run out of queue, write replacement envelopes for all the working messages, and mark their start records as garbage. The on-disk data has now been made consistent, so we can start the queue runners at their recorded positions.

There's one remaining mechanism. The earliest queue runner (dealing with the messages with the longest retry interval) leaves behind it a queue that is entirely marked as garbage. However envelopes just behind the queue runner are not true garbage until their replacement envelope has been written. Thus there must be a process which tracks the point where true garbage starts, so that when it rolls past a file boundary we know the older file can safely be removed. This can be done entirely in memory by keeping note of when deliveries triggered by the earliest queue runner complete. No persistent state is needed because after the working suffix has been scanned the garbage markers are all correct.

Optimizations:

No fsync()s are needed for writing garbage flags, start or finish records, or replacement envelopes, because if they are lost in a crash they can be recovered, or other fsync() activity prevents such a loss. These writes are book-keeping - they aren't necessary in a conventional one-file-per-message spool - so it's good that they can be coalesced with other more unavoidable writes.

The envelope of a new message does need to be fsync()ed before we return a 250 to the client that sent it to us, but if we delay a little we can piggy-back it on a delivery fsync().

Delivery records contain sets of recipients when a single delivery has multiple recipients. One delivery record is written when we get the destination server's final response to the message data. Thus we can get less than one fsync() per recipient per message.

I haven't said where to record the restart positions. This could be in the queue, which implies a bit of furtling around to find them, or in a fixed location, which implies extra seeks (similar to mtime seeks).

It might be more efficient to have multiple queues, the key difference being that replacement envelopes are written to a queue with queue runners that have an appropriate retry interval. With the single queue setup the long-interval queue runners are mostly skipping garbage, which wastes IO bandwidth and pollutes the page cache. However multiple queues may increase seeking too much. As 👤chrislightfoot points out, the conventional wisdom isn't a reliable guide in this area. More benchmarks needed.

Edit:

Actually, the garbage bits on the start records are unnecessary: they are only for use by the restart process, but it can (and does!) infer their correct value. When we finish working on a message, instead of frobbing this bit, we just update our current in-memory note of the start-of-working-suffx and end-of-garbage-prefix positions, as appropriate.

A correspondence with Postfix terminology: The working suffix is like Postfix's active queue. The new envelopes being appended to the queue are the incoming queue. The other queue runners are scanning the deferred queue. Postfix has a fairness strategy of adding messages to the active queue alternately from the incoming and deferred queues, and we can apply something similar here by controlling when we ask each queue runner to scan for its next envelope.