.@ Tony Finch – blog


(This isn't really part of my "how not to design an MTA" series since I don't have much to say about how other MTAs do it. That's mainly because most of them don't have schedulers to speak of. Postfix has one, but it isn't able to pipeline messages down a connection, and it doesn't have a centralized router. Its current queue manager is described in the attachment linked from a message to postfix-devel from Patrik Rak.)

There are two important parts of an MTA that need a concurrent scheduler: routing (where you want concurrent DNS/LDAP/etc. lookups) and delivery (where you want concurrent connections to remote hosts). (Aside: incoming messages are also concurrent but in this case the demand arises externally so can't be scheduled).

Scheduling is tricky because of the mix of work that we have to handle. Messages can have multiple recipients: the vast majority (around 90%) have only a single recipient, but those with more than one can have very large envelopes if they combine mailing list and alias expansion. Remote systems can respond with varying speed: as well as DNS and SMTP being slow because of network latency or brokenness, SMTP servers sometimes introduce delays deliberately to cause problems for clients they think are bad in some way.

(Note: a big message has lots of data whereas a big envelope belongs to a message with lots of recipients.)

In the following I'll talk about jobs and workers to abstract from the differences between routing and delivery. In routing a job is just a recipient that needs to be routed, and the workers are routing processes; the workers are all equivalent and we have not yet found any differences between the jobs. In delivery, a job is a subset of the recipients for a message which can all be delivered to the same place, and a worker is a connection to a remote SMTP server or to a local delivery agent; there is more complexity because it's more efficient to re-use a connection for jobs routed to its destination than to handle each delivery independently.

First I'll examine the basics that are shared between routing and delivery. It's worth looking at a couple of simple schedulers that don't work.

1. Do not allocate jobs to workers on a first-come-first-served basis. A big envelope may be able to use up your resources, holding back other messages. Once you have dealt with all its fast jobs you will be dealing with just slow jobs, so your throughput will be poor.

2. Do not allocate an equal number of workers to each message. When you are heavily loaded each message will only have one worker each, which means that large envelopes will be processed serially - but we want to make use of concurrency to handle large envelopes effectively.

A possible compromise is to use a FCFS scheduler up to some maximum concurrency per message. However a maximum that makes sense on a heavily-loaded system means that big envelopes won't be able to make use of the spare resources on a lightly-loaded system. So we need a way to scale up the maximum if there are free workers.

	def allowed_new_worker(job):
		if job.message.concurrency < max_message_concurrency
		then return true
		else if job.message.concurrency < scale_max_concurrency * free_workers
		then return true
		else return false
		end

A scale factor of 10 means that a message can use about 91% of an otherwise-idle system, or two messages can use 47% each, etc. If a second big envelope comes along when the first one already has 91% of the system, then message 2 will slurp up the remaining 9% (assuming that is less than the max_message_concurrency) and as jobs in message 1 complete message 2 will take over their workers until the new equilibrium is reached.

	def next_router_job(jobs):
		if allowed_new_worker(jobs.first)
		then return jobs.first
		else return next_router_job(jobs.rest)
		end

The allowed_new_worker() function is still useful in the delivery scheduler, but we want to spread the cost of connection set-up by delivering multiple messages down each connection. If we are going to make good use of pipelining we should allocate new deliveries to connections before the previous deliveries are complete, i.e. there can be a queue of multiple jobs per worker. Furthermore, if a worker is overloaded (its queue is backlogging) we need to open another connection to a suitable destination; we should do so in a way that spreads the load across the possible destinations. We also need to detect when the load has dropped and we can close a connection.

This will depend on a database of destinations, destdb. For broken destinations it records when we should next bother to try to connecting to them. It allows us to look up efficiently any connection(s) that we have currently open to a destination, whether they are active or idle, and how old they are. When we close the last connection to a destination, we keep a record of when it was made to help the decision of where to open new connections. The destdb has a garbage collector to remove stale retry and closed-connection records.

When deciding which worker to use for a delivery job, we first classify its possible destinations (initially just the primary destinations) based on destdb into active, idle, closed, unused (absent from destdb), and retry (in which case we omit destinations that have not passed their retry time).

First sort the active list by connection time and traverse it from most recent to least recent. If we find one that has less than the backlog threshold then the first we find is the worker for this job. We prefer recent connections so that when our load goes down older connections will become idle and eventually be closed.

If this fails then check if this job is allowed to use a new worker. If it is not, we still allocate it to an active connection so that big envelopes can get a free ride off smaller ones. Note that this means their concurrency can be higher than the configured maximum even under load. We choose the first connection in the active list that has fewer jobs than the newest active connection (to spread the load evenly). If they all have the same number of jobs, use the first one. If there are no active connections then this job must wait to be rescheduled when a worker becomes free. (We can fall back to this procedure a couple of other ways below.)

The alternative case is that we have a job which can and should use a new worker. If there are destinations on its idle list the job should use the most recent one (which presumably was active until not long ago). Otherwise we are going to open a new connection. If the system has no unused workers (or idle ones that we can close) to handle the new connection, then add the job to an active connection as above.

If there are destinations on its unused list the job should pick one at random and use that. This will become a new connection which will attract new jobs, thereby spreading our load onto a part of the destination cluster we haven't used yet. Otherwise, we have previously used all the possible destinations. If there are destinations on the closed list, pick the oldest. This means we will spread our load around the destination cluster round-robin style, though in a random order because of the way we pick previously-unused destinations. Finally, if there are destinations on the retry list, try one of them at random. Since this is the last option, we will normally not retry a destination until its record is garbage-collected from destdb (causing it to appear on the unused list), unless we are under load.

At this point we have failed to find a candidate destination. There are two possibilities: all of the working destinations have active connections, or all of them failed the retry time check. In the first case, we open another connection to a destination picked at random from those with the fewest concurrent connections. If they have all reached their per-destination concurrency limit, add the job to an active connection as above.

In the second case, none of the job's primary destinations are currently working. So we repeat the whole process from the classification stage using the secondary destination list. If necessary we can fall back to the tertiary etc. destinations. If we run out of fallbacks then the message must be deferred for later retry.

That wraps up the delivery job scheduling algorithm.

I should note that there are a number of mechanisms that cause connections to be closed, so that the MTA can recover unused resources. Firstly, idle connections can be closed if the worker is required for a new connection. Secondly, we should have an ageing mechanism that closes connections after some period of time; it may be worth using different times for active and idle connections. (We want to close active connections after a time to spread load around even when it is low.) Thirdly, remote MTAs can close connections whenever they wish. If the connection has a queue of pending jobs when it is closed then these must be re-scheduled.

If the MTA is part of a cluster, the destdb should ideally be shared. This allows hosts to pool their knowledge of which hosts are down and when to retry, and it means that the destination load balancing will work across the cluster. (It would not be too difficult to implement this feature for Exim's hints databases.)

There is a little scope for simplification, because it probably isn't worth keeping track of closed connections. (I described them because I wanted to note my thoughts fairly completely.) It's probably OK to remove the special case handling for retries too, and instead rely on the destdb garbage collector.

Note that it probably isn't worth worrying about a large envelope using all your resources, because SMTP only guarantees that envelopes of up to 100 recipients will work. To encounter problems you would probably have to configure your mailing list manager to use large envelopes. (I'm assuming that you are using modern concurrency techniques, i.e. hundreds of connections per process, not one per process.) On the other hand it probably is worth taking care to limit the load imposed from outside (attackers) by smtp-time verification. You can do so within the above framework by assigning all verifications to a pseudo-message, which would probably have specially tuned concurrency settings.