This paper was written as a contribution to the discussion in the SP working group on July 5th, 2011.
Unlike other (centralised) messaging systems which are based on the well-understood theoretical foundation, there are almost no resources regarding distributed messaging in general and ØMQ in particular that an interested reader can be pointed to.
The goal of this paper is to explain the elementary concepts of ØMQ architecture, how they fit together and what's the rationale for them to be designed the way there are.
Topology is the primary concept in ØMQ. Unless you understand what "topology" is meant to be, other concepts will appear confusing and hard to understand, even misdesigned.
As a casual definition, we can say that "topology" is a set of applications participating on the same aspect of the business logic.
Example: Imagine a image transformation service, that resizes images to desired size and resolution. All the applications providing the transformation service, all the applications using the service and all the intermediary nodes, such as load-balancers form a topology.
Technically, topology has following properties:
- Topology is a graph where nodes are applications and lattices are data channels between applications.
- All the applications agree on the same wire protocol for their business logic.
- The graph is compact, ie. any two nodes are connected either directly or via one or more intermediaries.
Point 1 is pretty obvious. One point to make is that word "channel" is used deliberately instead of "connection" to express the fact that the model works even for connection-less underlying transports such as IP multicast or UDP.
Point 2 says that all the applications in the topology should agree on what messages are being passed (say "here's an image to resize" or "here's the resized image"), message sequences (implemented as state machines in the applications), actual encoding of the data (how are the images serialised? RGB? CMYK?) etc.
Point 3 expresses the fact that even if there are two deployments of exactly the same business logic (eg. in two companies) they form two topologies, unless they are mutually connected by a data channel(s).
To intuitively grasp the concept of topology, it's important to understand that the concept is fuzzy.
It's fuzzy in the same way the classes in object-oriented programming are fuzzy. There is a formal definition explaining that classes are collections of data members and methods, however, no definition explains which portion of business logic should form a class and which should not. It's entirely upon the programmer to decide which business concepts are encapsulated into classes and which are not. The programmer may err both by putting all the business logic into a single class — and thus virtually avoiding the object-oriented design — and by splitting the logic into zillions of little classes — thus turning the program into a incomprehensible mess of mutual dependencies.
Same way, there's no single right way to split business logic into topologies. The only rule of the thumb is that topology is an atomic unit of scaling. You can scale topology as a whole, but you cannot scale just one aspect of it. Thus, if you expect that there will be a need to scale functionality A independently of functionality B in the future, you should create a separate topology for A and separate topology for B.
Let's illustrate the above on a concrete example:
In our image transformation application, there are two basic functions: Resizing an image and adjusting the brightness of the image. We can opt to either create a single topology for both functionalities or split it into "resizing" topology and "brightness adjustment" topology.
In the former case we'll define the wire protocol in such a way as to convey the functionality we are interested in. Say, first byte of the message can be either "1" for resizing or "2" for brightness adjustment. We should be also aware of the fact that this design tightly couples the two functionalities. If we are going to add more processing nodes to the topology in the future, each of them should be capable of doing both resizing and brightness adjustment.
In the latter case, the two functionalities are disjoint. There's no need for special "type" field in the wire format as all requests in "resizing" topology are requests to resize images and all requests passed through "brightness adjustment" topology are requests to adjust brightness of images. In this design we can scale each topology independently of the other one. If we, say, devise special single-purpose FPGAs to resize images, we can simply connect them to "resizing" topology, with no impact on "brightness adjustment" topology. The arrangement can be seen on the following picture:
Note that client application can ask for both image resizing (via topology A) and brightness adjustment (via topology B). Worker 1 can do only image resizing, worker 3 can do only brightness adjustment, while worker 2 is able to provide both services.
As a final note it should be said that topology, thanks to the fact that it's clearly separated from any other topologies, can be mapped to a property of underlying transport, such as TCP port. This allows underlying network to base its behaviour on business criteria. It can, for example, measure network bandwidth consumed by a particular topology (and thus particular business logic, eg. bandwidth consumed by resizing service as opposed to bandwidth consumed by brightness adjustment service), it can do traffic shaping based on topology, eg. throttle image risizing at expense of brightness adjustment etc.
Often, there is a requirement to run messaging layer of top of different transport mechanisms in addition to TCP, be it InfiniBand (performance reasons), IP multicast (minimising bandwidth usage) or SCTP (multihoming, heartbeats etc.)
Naive approach is to start with TCP transport, maybe adding couple of features that TCP lacks, such as heartbeating, and try to provide exactly the same behaviour on top of the other underlying transports.
There are couple of problems with such approach:
- First, building a TCP-like wrapper on top of a particular transport virtually makes the transport redundant. If it behaves like TCP, why not use TCP in the first place. (Performance characteristics being the exception to this rule.)
- Second, some transports can't be even shoehorned into TCP model. For example, IP broadcast by definition sends data to all the boxes on the network, rather than to specific destination like TCP does.
Given the above problems, ØMQ takes a different approach. Underlying transports retain their native characteristics without providing a common, all encompassing interface on top. Instead, a minimal interface is provided (in particular: message delimitation, message segmentation and message atomicity) and upper layers are required to be generic enough to deal with peculiarities of different underlying transports.
In practical terms it means that wrapper on top of a transport is an extremely thin one, such as message delimitation protocol (when wrapping the TCP), message segmentation protocol (splitting long messages into several packets for packet-based transports) or late-joiner protocol (to drop trailing parts of message you can get when joining a PGM multicast stream):
Topology Establishment vs. Message Routing
Every layer in the network stack abstracts away some portion of the complexity of networking. IP layer abstracts away the need to find the route to destination host. TCP abstracts away the fact that network is inherently lossy and provides reliability guarantees.
ØMQ abstracts away the need to specify a particular network location to send the data to. Messages are sent to a topology, not to a specific endpoint. Recall that topology is tied to particular business logic, meaning that when you are sending a message to a topology, you are basically asking for a specific service to be provided, such as image resizing of brightness adjustment. Actual endpoint(s) to receive the message are selected in transparent manner by ØMQ.
To enforce this principle, ØMQ strictly separates establishment of the topology (zmq_bind, zmq_connect) and actual message passing (zmq_send, zmq_recv).
The former works with underlying transport addresses, such as IP addresses, while the latter uses just a handle (file descriptor) to address particlar topology:
/* Topology establishment */ int s = zmq_socket (...); zmq_connect (s, "tcp://192.168.0.111:5555"); /* Message routing */ const char data  = "ABC"; zmq_send (s, data, sizeof (data), 0);
Separating topology establishment from message routing isn't strictly speaking indispensable. After all, it would be easy to combine the two into a single function:
zmq_send (s, "tcp://192.168.0.111:5555", data, sizeof (data), 0);
The rationale for separation is both technical and educational. Technical arguments include:
- When we want to receive messages from the topology in asynchronous manner, we have to connect to it anyway. There's no reason for not re-using this channel for sending messages as well.
- Separation of topology establishment and message routing maps well to BSD socket API (bind/connect vs. send/recv).
Now, the educational argument is even more important. It has to do what with what ØMQ is and what it is not.
Underlying protocols, such as TCP, allow you to send data to specific endpoint. ØMQ builds on top of the underlying protocol and allows you to send data to the topology rather than to particular endpoint. Thus, if you want to send data to specific endpoint you should use TCP or similar protocol. If you want to send it to the topology and let topology to decide on the destination, you should use ØMQ.
Unfortunately, this concept seems extremely hard to grasp. It turns out, that it's almost impossible to convince people that ØMQ can't be used to address specific endpoints and that the fact is not a bug but a feature.
Separating topology establishment from message routing doesn't solve the problem, but makes the actual functionality more obvious. Adding name resolution (see the eponymous chapter below) to the mix in the future will hopefully make the fact completely obvious:
zmq_connect (s, "Brightness-Adjustment-Service"); zmq_send (s, data, sizeof (data), 0);
When thinking about topologies as means for routing messages it becomes clear that routing algorithms differ in various topologies. While "NASDAQ stock quote" topology distributes the quotes to all consumers in the topology, "brightness adjustment" topology transports an image from the client to one of the workers and passes the adjusted image back to the original client.
ØMQ reflects this fact by defining several so called "messaging patterns". The former, stock quote, topology would be an example of publish/subscribe pattern, while the latter, brightness adjustment topology would be an example of request/reply pattern.
Messaging pattern defines both the protocol used for communication between the nodes and functionality of an individual node, e.g. the algorithm it uses to route the messages. Consequently, different patterns behave like different protocols. You cannot connect publish/subscribe node to request/reply node the same way as you cannot connect TCP endpoint to SCTP endpoint. Each topology thus implements only a single messaging pattern — there is no way to join two different messaging patterns into a single topology.
This strict separation is necessary to provide guarantees about the behaviour of the topology as a whole. As long as you know that each node in the topology adheres to publish/subscribe semantics, you can provide guarantees such as "message will be delivered to all the endpoints in the topology". If part of the topology would be allowed to do say load-balancing instead of broadcasting, you won't be able to do such a statement. Even worse, as the set of messaging patterns is open-ended, you would have to expect nodes to behave in completely arbitrary manner and thus you won't be able to provide any guarantees at all.
Here's a diagram of the network stack. Note that individual messaging patterns reside on the same layer of the stack and there are no dependencies between them:
Given that some traditional messaging systems choose to offer generic routing infrastructure that allows user to build basically any routing algorithm on top of it (for example, AMQP model of exchanges, bindings and consumers) rather than delivering pre-packaged messaging patterns (e.g. topics and queues in JMS), it's important to explain the rationale for ØMQ to choose the latter option.
First, designing a fully-functional and bug-free messaging pattern is a hard task. By shifting the responsibility for creating the pattern to the user, we are guaranteed that most applications built on top of the messaging system will be faulty in some way. Even where the pattern is implemented correctly, the learning and development cost would exceed the cost associated with using a pre-packaged pattern multiple times. After all, as one early paper on DNS design says: "[Users] want to use, not understand, the systems they are provided."
Second, formally defined patterns allow to enforce the requirement that two patterns cannot co-exist in the same topology. Messaging system can check whether the peer implements the same messaging pattern and reject the connection if it does not. If implementing the pattern is user's task, such checking cannot be done.
Third, generic routing infrastructure cannot be distributed (a.k.a. federated) automatically. It is meant to work only with a simple hub-and-spoke architecture and once you want to move beyond that model you have to supply additional information, namely answer the "what's the messaging pattern?" question. Have a look at federation mechanisms built by various products on top of AMQP. The "pattern" bit is always there, whether explicitly or implicitly (by supporting just one pattern).
And finally, our experience with AMQP is that although it provides rich variety of possible messaging patterns, people are building the same couple of patterns on top of it over and over again, while completely ignoring the rest of the spectrum.
Hop-by-Hop vs. End-to-End
One of the most ingenious features of Internet stack is the clean separation of hop-by-hop functionality (IP) and end-to-end functionality (TCP, UDP, SCTP et al.) It was this split that allowed Internet ecosystem to evolve. If there was no such split, every minor change to end-to-end protcols would be as painful as IPv4 to IPv6 transition.
The idea is that every node in the network has to implement IP, however, only the endpoints using a specific end-to-end protocol, such as TCP, have to be aware of it. In other words, intermediary nodes, such as routers need no knowledge of end-to-end protocols layered on top of IP to work as expected:
The experience with separating IP and TCP layers was later generalised in form of "end-to-end argument". End-to-end argument states that if functionality cannot be properly provided on the lower layer (hop-by-hop layer in our case), ie. when it needs a help from upper layer (end-to-end layer) to work as expected, there's little value in implementing it in the lower layer in the first place.
ØMQ adheres to the end-to-end principle and separates its stack into hop-by-hop layer (clumsily denoted by socket types beginning with "X") and end-to-end layer (socket types not beginning with "X"). Note the similarity with the TCP/IP diagram above:
Similarly to TCP/IP, the hop-to-hop layer is responsible for routing, while end-to-end layer can provide additional services, such as reliability, encryption etc.
However, we shouldn't pursue the TCP/IP metaphor too far. Unlike Internet stack with single hop-to-hop protocol (IP) and multiple end-to-end protocols (TCP, UDP, SCTP etc.), in ØMQ each end-to-end protocol has its own hop-by-hop protocol. The stack thus looks something like this:
The reason for this arrangement is that routing functionality (which is provided by hop-by-hop layer) is specific to the messaging pattern and thus cannot be shared between patterns. Still, if we will encounter two messaging patterns that share the same routing algorithm and differ only by end-to-end functionality in the future, we will be able to mimic the Internet stack's model of several end-to-end protocols layered on top of a single hop-by-hop protocol.
Finally, let's have a look at concrete example of hop-by-hop vs. end-to-end split.
Request/reply pattern is meant to pass a request from a client application to one of the worker applications (doing load-balancing along the way), which then processes the request and generates a reply. Reply is then passed back to the original client:
What hop-by-hop layer has to do is to send each request to one of the upstream nodes (load-balancing) and later send the reply to the downstream node that the associated request was received from.
Everything works well until a worker processing the request fails or, possibly, whole portion of the topology gets off-line because of a network failure. In such case the client would be stuck forever, waiting for the reply that would never show up.
To solve the problem, the client can wait for specified amount of time and re-send the request if the reply doesn't arrive by then. It would also have to filter out delayed duplicate replies.
Now recall the end-to-end argument. The re-send functionality cannot be implemented without a help from the endpoint, thus, there's no much point in implementing it in hop-by-hop layer (save different possible optimisations, such as storing the request on the disk and resending it when the failed node is restarted).
What we get as a result is routing implemented in hop-by-hop layer and reliability implemented on top of it in end-to-end layer.
At the moment, ØMQ doesn't provide names of its own. To join a topology, you have to use an address defined the underlying transport, such as IP address and TCP port.
In the future though, it would be valuable to provide a name resolution service that would turn name of a topology ("ØMQ address") into underlying transport address. For example, string "Brightness-Adjustment-Service" could be resolved to "tcp://192.168.0.1111:5555".
There haven't been much thinking done about this problem, but the main issue seems to be that topology consists of multiple nodes and the name resolution service has to pick one of them. The decision should be probably done based on administrative criteria. For example, when connecting to "NASDAQ stock quotes" topology, you want the name service to connect you to your local stock quote hub rather than to NASDAQ itself, or even worse, to your competitor's stock quote hub.
Technically, the name resolution service should be implemented using DNS, as DNS is the only universally available distributed database out there. Moreover, requirements for the name resolution service seem to align nicely with the features DNS provides.
Still, the design consequences of storing names in a loosely consistent distributed database such as DNS should be taken into account, eg. topologies should be long-lived and rarely changed entities to ensure that DNS caching mechanism doesn't spoil the name resolution. Etc.
Appendix: Design Principles
This appendix summarises the principles used to assess whether particular messaging pattern is well-designed.
Uniformity principle states that it should not matter to which node in the topology you connect your application to. The service provided should be the same.
While the uniformity principle sounds obvious, it's pretty easy to break it. Consider PUB/SUB pattern as currently implemented in ØMQ. It allows for multiple publishers in the topology which introduces non-uniformity:
On the above diagram client C will observe different data feed depending on whether it connects to the intermediary 1 or intermediary 2. Intermediary 1 provides messages from both publisher A and publisher B, while intermediary 2 provdes messages only from publisher B.
Note how uniformity principle is one of the crucial principles in the design of Internet: It doesn't matter which plug you plug your notebook to, which wi-fi you use or which ISP is providing you with the access to the Internet, the view of the world is always the same.
Scalability principle states that when topology cannot handle the load, either because nodes are overloaded or because the links are congested, it should be possible to solve the problem by adding new nodes to the topology. Furthermore, the number of nodes to be added is to grow linearly with the load.
Let's have a look at patterns violating the scalability principle. The simplest non-scalable pattern is breaking an application into fixed number of functional blocks. Imagine a monolithic application combining both accounting and human resource functionality. At the point where a single box cannot handle the load, programmers may decide to split the accounting functionality and human resources functionality into separate executables so that they can be run on two boxes:
The design fails to meet the scalability test. When two boxes can't handle the load there's no way to add a third box without re-writing the applications.
Note: This pattern is represented in ØMQ by PAIR sockets.
A more complex case of non-scalable patter is distributed logging:
As number of applications to log grows, the load on the logger increases, until it is not capable to handle the load. Adding intermediary nodes between applications and the logger doesn't really solve the problem. All the messages have to get to the logger no matter whether there are intermediary nodes or not, so it's going to break at some point anyway.
To make this kind of "data collection" pattern scalable, intermediary nodes have to aggregate the messages, ie. send fixed amount of messages downstream not depending on number of upstream applications. The aggregation can take form of computing sums, passing on statistics rather than messages as such etc. This kind of aggregation pattern is used in PUB/SUB pattern for forwarding subscriptions. Not to overload the publisher by sheer number of subscription requests, these are aggregated at intermediary nodes and only deltas are sent further upstream (look here for detailed discussion of the algorithm).
Again, note how Internet conforms to scalability principle. New nodes can be added at any time, whether end-user boxes or intermediary infrastructure, without compromising the functionality or the performance of the Internet as a whole.
Interjection principle states that inserting an intermediary node into the topology should not change the behaviour at the endpoints. (Note that interjection of intermediary node can be used to scale up a topology. For concrete examples have a look here .)
Let's take an example of often asked for feature of determining the number of peers connected to a particular endpoint. And let's consider the following transformation, where intermediate node I is inserted into the topology:
As can be seen, the behaviour at endpoint A changes when intermediary is inserted into the topology. Instead of reporting 3 peers it now reports 2 peers. Thus, exposing the number of connected peers breaks the interjection principle.
Once again, interjection principle is crucial for the way Internet works. If changing the topology in the middle — say when backbone operator adds new routers — would break the applications at the endpoints, the Internet would very quickly get into the state of disrepair.
The architecture outlined in this paper reflects the current design of ØMQ, with couple of exceptions, where it refers to features under development. Hopefully, it will provide a short introduction to distributed messaging and will form a basis for the future work in the area.