original:http://mnb.ociweb.com/mnb/MiddlewareNewsBrief-201004.html
Introduction
High-performance messaging middleware is an increasingly important infrastructure component of any distributed application. A well-designed messaging infrastructure can minimize dependencies between application components, enabling evolution of the software as requirements change and as new components are integrated.
There are many such messaging middleware systems. One is OpenDDS (www.opendds.org), OCI’s open source implementation of the Object Management Group (OMG) Data Distribution Service (DDS) specification. DDS supports high-performance publish-subscribe messaging through type-safe interfaces over custom, high-speed transports with configurable Quality-of-Service guarantees. DDS-based solutions have been successfully applied to many thorny middleware problems.
The second messaging middleware product mentioned in the title of this article is ZeroMQ (www.zeromq.org). ZeroMQ is a middleware product with a different focus than OpenDDS. While OpenDDS is type-safe and emphasizes publish-subscribe behavior, ZeroMQ is a lightweight socket-like message queueing layer that sends raw message buffers from sender to receiver. OpenDDS can also send raw buffers from sender to receiver, but that is not its target usage.
The event that drove the research and testing in this article was a conversation with a prospective customer who had an existing .NET-based application and was planning to integrate either OpenDDS or ZeroMQ as messaging middleware underneath it. The customer planned to stream .NET objects into a raw buffer and send that buffer across the messaging middleware.
Before we ran the tests, we wondered what the tradeoffs would be between in terms of ease-of-use and performance in using OpenDDS to do that as opposed to ZeroMQ. As we said above, sending a raw buffer through OpenDDS is not its best use case, but it’s worthwhile to do the comparison to characterize relative performance. The raw buffer test is definitely going to favor ZeroMQ since that’s what it is designed for. In a subsequent test, we’ll send typed C++ data through both OpenDDS and ZeroMQ to find out how much overhead is added to each test by strong typing. That test is closer to OpenDDS’s intended use. We’ll test OpenDDS using a regular IDL struct, and we’ll test ZeroMQ with a C++ struct using both Boost Serialization and Google Protocol Buffers to simulate a likely application developer use case. The use of strongly typed data will slow the ZeroMQ test cases down, but also makes them more realistic for a wider range of applications. Until we test, we are not sure whether OpenDDS or ZeroMQ will be faster for the strongly typed tests.
Finally, as a baseline, we’ll test with Boost.Asio. Boost.Asio is a thin layer on top of sockets. We are interested in determining how ZeroMQ’s performance compares to Boost.Asio’s performance, and how ease-of-use compares as well. In other words, what does ZeroMQ provide in ease-of-use over Boost.Asio, and what do you pay in performance?
In the raw buffer test cases, we expect Boost.Asio have the best performance, followed by ZeroMQ, and lastly OpenDDS. In the typed C++ data tests, we’re not sure what kind of performance to expect until we run the tests — but we expect to find out how efficient OpenDDS is at sending strongly typed data across the wire.
One item worth noting is that we do not claim to be ZeroMQ experts. We have used references and documentation on the ZeroMQ web site to learn how to set up our ZeroMQ tests. We have built all relevant software with debugging turned off to enable apples-to-apples performance comparisons. Since the source code for all of the tests is available at the end of the article, feel free to download them yourself, run the tests, and make modifications as you see fit. See the README files in the root directory and the bin directory for more information.
To summarize, our performance tests are as follows:
- OpenDDS raw buffer test
- ZeroMQ raw buffer test
- Boost.Asio raw buffer test
- OpenDDS .NET objects streamed through a raw buffer test
- ZeroMQ .NET objects streamed through a raw buffer test
- Boost.Asio .NET objects streamed through a raw buffer test
- OpenDDS strongly typed data test
- ZeroMQ strongly typed data with Boost Serialization test
- Boost.Asio strongly typed data with Boost Serialization test
- ZeroMQ strongly typed data with Google Protocol Buffers test
Performance
These tests were executed on an old laptop with a 1.6 GHz Intel Pentium 4 and 2 GB RAM running Windows XP. We’re not interested in raw numbers so much as we’re interested in comparisons, so the speed of the test machine is not very important. We could have run most of these tests on a much faster Linux workstation, but we wouldn’t have been able to run the .NET tests with Microsoft’s compiler and runtime.
OpenDDS Raw Buffer Test
OpenDDS applications use a “topic” as a rendezvous point. In other words, OpenDDS publications and subscriptions find each other based on nothing more than a topic name. A federation-capable deamon, the OpenDDS InfoRepo, associates publications and subscriptions, enabling the publication and subscription applications to be coded without any knowledge of the other side’s endpoints. This architecture differs from that of the ZeroMQ and Boost.Asio examples, as we will see later.

The OpenDDS raw buffer test code simply publishes a raw data buffer on an OpenDDS topic. The code is in opendds/idl/Raw.idl, opendds/cpp/RawPubisher.cpp, and opendds/cpp.RawSubscriber.cpp. We define an IDL struct that simply contains a raw octet buffer, as shown below:
module MiddlewareNewsBrief
{
const string RAW_BUFFER_TOPIC_NAME = "MiddlewareNewsBrief::RawBuffer";
const string RAW_BUFFER_TOPIC_TYPE = "MiddlewareNewsBrief::RawBuffer";
#pragma DCPS_DATA_TYPE "MiddlewareNewsBrief::RawBuffer"
typedef sequence<octet> BufferType;
struct RawBuffer
{
BufferType buffer;
};
};
An octet is a raw byte, and a sequence of octets represents a raw byte array that can grow and shrink. To use this, we'll write an OpenDDS publishing process and an OpenDDS subscribing process to create raw octet buffers and write them across the wire. The publisher side publishes a data sample containing a raw buffer; the subscriber receives it, and publishes an echo sample back to the publisher. We measure total latency for each sample by measuring the time it takes to make that round trip. Then we send the next sample. We won't show the OpenDDS publishing and subscribing code here simply because we have a lot of other code to show, and you can see other examples of OpenDDS publishing or subscribing code by examining the attached code files or by browsing through this introductory OpenDDS article. To execute the test, we first run the OpenDDS daemon process, the DCPSInfoRepo. This process is the rendezvous point for publications and subscriptions, enabling them to find each other based on the topic name. In this particular command-line, the DCPSInfoRepo writes out its reference to a file calledrepo.ior. We could also tell the DCPSInfoRepo to listen on a particular endpoint, as documented in this article. Obviously, we're running this particular test (and all of the others) on one host because we're not interested in measuring the latency of the network itself. Note that theMNB_ROOTenvironment variable must point to the directory where the Middleware News Brief code has been downloaded and unzipped. See theREADMEfile in that directory for more information on setting up the environment. We run the DCPSInfoRepo daemon as follows:%DDS_ROOT%\bin\DCPSInfoRepo -ORBSvcConf %MNB_ROOT%\config\lib_tcp.conf -o repo.iorWe then run an OpenDDS Raw Buffer subscribing process:
%MNB_ROOT%\bin\OpenDdsRawSubscriber \ -ORBSvcConf %MNB_ROOT%\config\lib_tcp.conf \ -DCPSConfigFile %MNB_ROOT%\config\tcp_conf.ini \ -DCPSInfoRepo file://repo.iorFinally, we run an OpenDDS Raw Buffer publishing process to publish one message to our interested subscriber, wait for an echo of that message back from that subscriber, and repeat that loop 999 more times.
%MNB_ROOT%\bin\OpenDdsRawPublisher \ -ORBSvcConf %MNB_ROOT%\config\lib_tcp.conf \ -DCPSConfigFile %MNB_ROOT%\config\tcp_conf.ini \ -DCPSInfoRepo file://repo.ior -num 1000In the publisher's window, we should see output that looks something like this, measuring the average one-way latency of each of the 1000 messages:
Number of messages is 1000 Messages size is 1000 Writing... Average latency in milliseconds: 0.185So, that's our approximate baseline for OpenDDS — 0.185 milliseconds, or 185 microseconds. Repeated runs of the test will show different values, so you will want to run the test several times to get a clearer picture of the average latency.
We can run more than one subscribing process at a time simply by launching extra subscribers. We also need to tell the publishing process how many subscribers to expect so that it knows how many echoed samples it should expect:
%MNB_ROOT%\bin\OpenDdsRawSubscriber \ -ORBSvcConf %MNB_ROOT%\config\lib_tcp.conf \ -DCPSConfigFile %MNB_ROOT%\config\tcp_conf.ini \ -DCPSInfoRepo file://repo.ior %MNB_ROOT%\bin\OpenDdsRawSubscriber \ -ORBSvcConf %MNB_ROOT%\config\lib_tcp.conf \ -DCPSConfigFile %MNB_ROOT%\config\tcp_conf.ini \ -DCPSInfoRepo file://repo.ior %MNB_ROOT%\bin\OpenDdsRawSubscriber \ -ORBSvcConf %MNB_ROOT%\config\lib_tcp.conf \ -DCPSConfigFile %MNB_ROOT%\config\tcp_conf.ini \ -DCPSInfoRepo file://repo.ior %MNB_ROOT%\bin\OpenDdsRawPublisher \ -ORBSvcConf %MNB_ROOT%\config\lib_tcp.conf \ -DCPSConfigFile %MNB_ROOT%\config\tcp_conf.ini \ -DCPSInfoRepo file://repo.ior -num 1000 \ -ns 3Note that the OpenDDS test also has a Perl script that can launch the DCPSInfoRepo, the publishing process and subscribing process for us:
perl %MNB_ROOT%\bin\opendds_test.pl OpenDdsRaw -num 1000That Perl script can also launch more than one subscribing process, enabling performance testing of 1-to-N cases:
perl %MNB_ROOT%\bin\opendds_test.pl OpenDdsRaw -num 1000 -num-subs 3We can also run the OpenDDS example with a multicast transport by using the script's
-tr mcastcommand-line arguments. An OpenDDS application's transport can be configured via a file.perl %MNB_ROOT%\bin\opendds_test.pl OpenDdsRaw -num 1000 -tr mcastZeroMQ Raw Buffer Test
ZeroMQ (www.zeromq.org) is designed as a rather thin message queueing layer over sockets, so sending raw buffers is what it does best. Our ZeroMQ test measures latency the same way as the OpenDDS test does, by publishing a buffer, waiting for an echo, and then repeating that process.
ZeroMQ does not use a daemon process to associate publications and subscriptions. In fact, it doesn't use topics for for association purposes at all. In ZeroMQ applications, we manually pass the endpoint of the subscription into the publication's process as we would in a socket application.
The ZeroMQ raw buffer test code is in
zeromq/cpp/ZeromqPublisher.cppandzeromq/cpp/ZeromqSubscriber.cpp.The core of the publishing process code is shown below. We'll use ZeroMQ's
ZMQ_PUBandZMQ_SUBcapabilities to come as close as we can to a publish-subscribe API. You can see from the firstforloop below that ZeroMQ does allow us to attach many transports to the same ZeroMQ socket object, so we only have to publish the buffer once to send it to all connected subscribers.int main (int argc, char *argv []) { try { if (argc < 5) { std::cout << "usage: ZeromqPublisher <message-size> <roundtrip-count> <bind-to-sub> [<connect-to-pub>]+\n" << "e.g.: ZeromqPublisher 1000 10000 tcp://eth0:54321 tcp://spider:54322 tcp://spider:54323\n" << " use a literal IP address and port for each endpoint\n" << std::endl; } size_t message_size = atoi (argv [1]); size_t roundtrip_count = atoi (argv [2]); const char* bind_to_sub = argv[3];Here, we create our ZeroMQ sockets. We specify the outgoing socket as a publishing socket and the incoming socket as a subscribing socket.zmq::context_t ctx (1, 1); zmq::socket_t pub(ctx,ZMQ_PUB); zmq::socket_t sub(ctx,ZMQ_SUB); sub.setsockopt(ZMQ_SUBSCRIBE,"DataFeed\x00",9); sub.bind(bind_to_sub);We have to provide an endpoint for each subscribing process on the command-line. Here, we iterate through those endpoints and connect to each.size_t num_subscribers = 0; for (int argi = 4; argi < argc; ++argi) { pub.connect(argv[argi]); ++num_subscribers; }Note that we prepend a "topic" name, "DataFeed", onto each message buffer. ZeroMQ uses the topic names like message filters, enabling a subscriber to instruct ZeroMQ to filter out messages with "topic" names it does not wish to receive.We loop, publishing a message and waiting for its echo. Note that we only callsendonce to send the message to all connected subscribers.for (size_t i = 0; i != roundtrip_count; i++) { pub.send(msg); for (size_t jj = 0; jj < num_subscribers; ++jj) { // Wait for echoed message from each subscriber sub.recv(&msg); if (msg.size() != message_size) { std::cout << "Message of incorrect size received: " << msg.size() << std::endl; return -1; } } } unsigned long elapsed = zmq_stopwatch_stop (watch); double latency = (double) elapsed / (roundtrip_count * 2.0) / (double)num_subscribers; printf ("message size: %d [B]\n", (int) message_size); printf ("roundtrip count: %d\n", (int) roundtrip_count); printf ("\n\naverage latency: %.3f [us]\n\n\n", (double) latency); return 0; } catch (std::exception &e) { std::cout << "An error occurred: " << e.what() << std::endl; return 1; } }As we mentioned, the linememcpy(msg.data(), "DataFeed\x00", 9);is the "topic" that we're publishing upon. ZeroMQ simulates topics with a message filtering mechanism; the subscribing process indicates that it only wants messages prepended with the filter string "DataFeed". It is the responsibility of the publisher to encode the filter string on the front of the message buffer, and it is the responsibility of the subscriber to skip over the filter string on its received buffer. The core of the subscribing process code is shown below, with the echo of the buffer back to the publisher for round-trip latency measurement.#include "zmq.hpp" #include <iostream> int main (int argc, char *argv []) { try { if (argc != 5) { std::cout << "usage: ZeromqSubscriber <message-size> <roundtrip-count> <bind-to-sub> <connect-to-pub>\n" << "e.g.: ZeromqSubscriber 1000 10000 tcp://eth0:54322 tcp://spider:54321\n" << " on Windows, use a literal IP address and port for each endpoint\n" << std::endl; return 1; } size_t message_size = atoi (argv [1]); size_t roundtrip_count = atoi (argv [2]); const char* bind_to_sub = argv [3]; const char* connect_to_pub = argv [4]; zmq::context_t ctx (1, 1); zmq::socket_t pub (ctx, ZMQ_PUB); pub.connect(connect_to_pub);Note the subscription to the "DataFeed" topic, which tells ZeroMQ to only pass through message buffers that start with the string "DataFeed". It is the subscribing processes' responsibility to skip over this topic string to extract any message content, as we'll see in a later example.zmq::socket_t sub (ctx, ZMQ_SUB); sub.setsockopt(ZMQ_SUBSCRIBE,"DataFeed\x00",9); sub.bind(bind_to_sub); printf("Entering recv loop -- %d messages, size = %d\n", roundtrip_count, message_size); for (size_t i = 0; i != roundtrip_count; i++) { zmq::message_t msg; sub.recv (&msg); // Echo it back pub.send(msg,0); } printf("Finished receiving messages\n"); return 0; } catch (std::exception &e) { std::cout << "An error occurred: " << e.what() << std::endl; return 1; } }To run this test case, we start a ZeroMQ subscribing process and a ZeroMQ publishing process, as follows:%MNB_ROOT%\bin\ZeromqSubscriber 1000 1000 \ tcp://10.201.200.72:54322 \ tcp://10.201.200.72:54321 %MNB_ROOT%\bin\ZeromqPublisher 1000 1000 \ tcp://10.201.200.72:54321 \ tcp://10.201.200.72:54322In the publisher's window, we should see output that looks something like this, measuring the average one-way latency of each of the 1000 messages:
Entering send loop -- 1000 messages, size = 1000 message size: 1000 [B] roundtrip count: 1000 average latency: 170.336 [us]The average latency of about 170 microseconds compares to an average latency with the OpenDDS raw buffer test of 185 microseconds. However, you really need to run each test several times to get a clear picture of the relative performance. It's also a good idea to shut down as many processes as possible on the test computer.
One thing that you can see while running the ZeroMQ test is that we need to supply ZeroMQ with endpoints (an IP address and a port) for its sockets. OpenDDS handles that internally, and only needs a topic name to associate a publication and a subscription. The difference becomes more apparent when we run several ZeroMQ subscribers:
%MNB_ROOT%\bin\ZeromqSubscriber 1000 1000 \ tcp://10.201.200.72:54322 \ tcp://10.201.200.72:54321 %MNB_ROOT%\bin\ZeromqSubscriber 1000 1000 \ tcp://10.201.200.72:54323 \ tcp://10.201.200.72:54321 %MNB_ROOT%\bin\ZeromqSubscriber 1000 1000 \ tcp://10.201.200.72:54324 \ tcp://10.201.200.72:54321 %MNB_ROOT%\bin\ZeromqPublisher 1000 1000 \ tcp://10.201.200.72:54321 \ tcp://10.201.200.72:54322 \ tcp://10.201.200.72:54323 \ tcp://10.201.200.72:54324Notice how the publishing process needs each subscribing process's endpoint to associate correctly with each subscriber. Contrast that with the OpenDDS test, where we merely run as many subscribing processes as we want and let OpenDDS make the associations for us. So we're seeing some ease-of-use differences between the two middleware products, and that's without even touching on OpenDDS's extensive Quality-of-Service.
Again, each test also has a Perl script that can launch the publishing process and subscribing process for us:
perl %MNB_ROOT%\bin\zeromq_test.pl Zeromq 1000 1000That Perl script can also launch more than one subscribing process, enabling performance testing of 1-to-N cases:
perl %MNB_ROOT%\bin\zeromq_test.pl Zeromq 1000 1000 -num-subs 3Boost.Asio Raw Buffer Test
Boost.Asio (www.boost.org) is C++ network programming framework providing a thin, object-oriented layer over sockets. Boost.Asio's layer is — or at least should be — thinner than ZeroMQ's. The purpose of testing Boost.Asio is to give us a baseline and get an idea of how much processing ZeroMQ and OpenDDS add to basic socket communication.
Like ZeroMQ, our Boost.Asio example does not use a daemon process to associate publications and subscriptions. Again, we manually pass the endpoint of the subscription into the publication's process, as we would in any other socket application.
The Boost.Asio raw buffer test code is in
boostasio/cpp/BoostPublisher.cppandboostasio/cpp/BoostSubscriber.cpp.The Boost.Asio publishing process code is shown below. Note that we have to manage the one-to-many relationship from the publisher to the subscribers ourself. Also note that this code is limited to TCP; we can change the transport for both the OpenDDS and ZeroMQ examples by passing in different command-line options.
#include <cstdlib> #include <cstring> #include <iostream> #include <boost/asio.hpp> #include "Profiler.h" using boost::asio::ip::tcp; int main(int argc, char* argv[]) { try { if (argc < 5) { std::cerr << "Usage: BoostPublisher <message-size> <num-messages> [<host> <port>]+\n"; return 1; } size_t message_size = std::atoi(argv[1]); size_t num_messages = std::atoi(argv[2]); const size_t used_args = 3; boost::asio::io_service io_service; size_t num_subscribers = (argc - used_args) / 2; std::vector<boost::shared_ptr<tcp::socket> > subscribers; subscribers.reserve(num_subscribers);We resolve each subscriber's endpoint.for (size_t i=0; i < num_subscribers; ++i) { tcp::resolver resolver(io_service); tcp::resolver::query query(tcp::v4(), argv[i*2+used_args], argv[i*2+used_args+1]); tcp::resolver::iterator iterator = resolver.resolve(query); boost::shared_ptr<tcp::socket> s(new tcp::socket(io_service)); subscribers.push_back(s); s->connect(*iterator); } printf("Sending %d messages of size %d to %d subscribers\n", num_messages, message_size, num_subscribers); boost::shared_ptr<char> request(new char[message_size]); boost::shared_ptr<char> reply(new char[message_size]); MIDDLEWARENEWSBRIEF_PROFILER_TIME_TYPE start = MIDDLEWARENEWSBRIEF_PROFILER_GET_TIME; for (size_t i=0; i < num_messages; ++i) { memset(request.get(), 0, message_size); memset(reply.get(), 0, message_size); // Two loops here simulates pub/sub behavior, // where we publish to all subscribers before // looking for an echoed sample coming backWe must manually publish to each attached subscriber endpoint.for (size_t jj = 0; jj < num_subscribers; ++jj) { boost::asio::write(*(subscribers[jj]), boost::asio::buffer(request.get(), message_size)); } for (size_t jj = 0; jj < num_subscribers; ++jj) { size_t reply_length = boost::asio::read(*(subscribers[jj]), boost::asio::buffer(reply.get(), message_size)); if (reply_length != message_size) { std::cerr << "Message reply size mismatch; expected " << message_size << ", received " << reply_length << std::endl; return -1; } } } MIDDLEWARENEWSBRIEF_PROFILER_TIME_TYPE finish = MIDDLEWARENEWSBRIEF_PROFILER_GET_TIME; MIDDLEWARENEWSBRIEF_PROFILER_TIME_TYPE elapsed = MIDDLEWARENEWSBRIEF_PROFILER_DIFF(finish,start); double latency = (double) elapsed / (num_messages * 2.0) / (double)(num_subscribers); printf("\n\nAverage latency in %s: %.3f\n\n\n", MIDDLEWARENEWSBRIEF_PROFILER_TIME_UNITS, latency); printf("Finished\n"); } catch (std::exception& e) { std::cerr << "Exception: " << e.what() << "\n"; } return 0; }The subscribing process code is shown below, with the echo of the buffer back to the publisher for round-trip latency measurement. The bulk of the subscriber logic takes place in aServerclass, which is not shown but is available in the attached source code inboostasio/cpp/Server[.h|.cpp]. TheServerclass is a class that we've written to manage the socket sessions between the publisher and the subscribers.#include "Server.h" #include <cstdlib> #include <iostream> #include <boost/bind.hpp> #include <boost/asio.hpp> using boost::asio::ip::tcp; int main(int argc, char* argv[]) { try { if (argc != 4) { std::cerr << "Usage: BoostSubscriber <message-size> <num-messages> <port>\n"; return 1; } boost::asio::io_service io_service; size_t message_size = std::atoi(argv[1]); size_t num_messages = std::atoi(argv[2]); Server s(io_service, std::atoi(argv[3]), num_messages); printf("Waiting; running for %d messages\n", num_messages); io_service.run(); } catch (std::exception& e) { std::cerr << "Exception: " << e.what() << "\n"; } return 0; }To run the test, run a publishing process and a subscribing process:%MNB_ROOT%\bin\BoostSubscriber 1000 1000 54421 %MNB_ROOT%\bin\BoostPublisher 1000 1000 10.201.200.72 54421In the publisher's window, we should see output that looks something like this, measuring the average one-way latency of each of the 1000 messages:
Sending 1000 messages of size 1000 to 1 subscribers Average latency in milliseconds: 0.075 FinishedThe average latency of about 75 microseconds compares to an average latency with the OpenDDS raw buffer test of 185 microseconds and the ZeroMQ raw buffer test of about 170 microseconds. Again, though, you really need to run each test several times to get a clear picture of the relative performance.
As with the ZeroMQ test, we need to supply the Boost.Asio test with endpoints for its sockets. OpenDDS, of course, handles that internally, and only needs a topic name to associate a publication and a subscription. As before, the difference is more obvious with several subscribers:
%MNB_ROOT%\bin\BoostSubscriber 1000 1000 54421 %MNB_ROOT%\bin\BoostSubscriber 1000 1000 54422 %MNB_ROOT%\bin\BoostSubscriber 1000 1000 54423 %MNB_ROOT%\bin\BoostPublisher 1000 1000 10.201.200.72 54421 \ 10.201.200.72 54422 \ 10.201.200.72 54423Notice how the Boost.Asio publishing process also needs each subscribing process's endpoint to associate correctly with each subscriber.
As usual, each test also has a Perl script that can launch the publishing process and subscribing process for us:
perl %MNB_ROOT%\bin\boost_test.pl Boost 1000 1000That Perl script can also launch more than one subscribing process, enabling performance testing of 1-to-N cases:
perl %MNB_ROOT%\bin\boost_test.pl Boost 1000 1000 -num-subs 3.NET Streaming Tests
For our next round of tests, we use C# to stream a couple of .NET objects into an raw buffer and send them across the wire. These tests were driven by a customer who was exploring the use of ZeroMQ to stream .NET objects from a publisher to a subscriber. We wanted to determine if we could do the same thing with OpenDDS and compare performance and usability. As a baseline, we examined streaming .NET objects across Boost.Asio as well.
We will stream instances of the following .NET types across the wire in our tests. These files are in the
common/csharpdirectory.using System: namespace MiddlewareNewsBrief { [Serializable] public class MarketDataEntry { public uint mdUpdateAction = 0; public uint mdPriceLevel = 0; public String mdEntryType = ""; public uint openCloseSettleFlag = 0; public uint securityIDSource = 0; public uint securityID = 0; public uint rptSeq = 0; public double mdEntryPx = 0.0; public uint mdEntryTime = 0; public uint mdEntrySize = 0; public uint numberOfOrders = 0; public String tradingSessionID = ""; public double netChgPrevDay = 0.0; public uint tradeVolume = 0; public String tradeCondition = ""; public String tickDirection = ""; public String quoteCondition = ""; public uint aggressorSide = 0; public String matchEventIndicator = ""; public static MarketDataEntry createTestData() { // implementation omitted for brevity } } [Serializable] public class MarketData { public uint securityID = 0; public String applVersionID = ""; public String messageType = ""; public String senderCompID = ""; public uint msgSeqNum = 0; public uint sendingTime = 0; public uint tradeDate = 0; public bool isEcho = false; public uint counter = 0; public MarketDataEntry[] mdEntries = new MarketDataEntry[0]; public static MarketData createTestData() { // implementation omitted for brevity } } [Serializable] public class RelatedSym { public String symbol = ""; public ulong orderQuantity = 0; public uint side = 0; public ulong transactTime = 0; public uint quoteType = 0; public uint securityID = 0; public uint securityIDSource = 0; public static RelatedSym createTestData() { // implementation omitted for brevity } } [Serializable] public class QuoteRequest { public uint securityID = 0; public String applVersionID = ""; public String messageType = ""; public String senderCompID = ""; public uint msgSeqNum = 0; public uint sendingTime = 0; public String quoteReqID = ""; public bool isEcho = false; public uint counter = 0; public RelatedSym[] related = new RelatedSym[0]; public static QuoteRequest createTestData() { // implementation omitted for brevity } } }ZeroMQ .NET Tests
ZeroMQ has a set of .NET bindings, so streaming .NET objects is fairly straightforward. We do have one complicating factor, though. We publish our data on two topics, a "MarketData" topic and a "QuoteRequest" topic. In ZeroMQ, we indicate these by prepending message filter strings to the front of each buffer. So we have to be careful to stream the .NET objects into the buffer at a location after each message filter string. The publishing process creates its .NET objects, and sets an "echo" flag to false and a counter on each object. It streams the objects into a buffer and sends them to the subscriber. On the subscriber side, the subscriber deserializes the objects, checks the counter and the "echo" flag, changes the "echo" flag to true, reserializes the objects, and echos them back. The publisher takes the message reply, deserializes the objects again, and checks the echo flag and the counter. These steps ensure that the serialization was successful, i.e. that we can stream and resurrect the .NET objects successfully after sending them through ZeroMQ. The ZeroMQ publisher code, located inzeromq/csharp/ZmqTypedPublisher.cs, is shown below:using System; using System.Collections.Generic; using System.Text; using System.Runtime.InteropServices; using System.Diagnostics; using System.Runtime.Serialization; using System.Runtime.Serialization.Formatters.Binary; using System.IO; using MiddlewareNewsBrief; class ZmqTypedPublisher { static unsafe int Main(string[] args) { const String QUOTE_TOPIC = "QuoteRequest"; const String MARKET_DATA_TOPIC = "MarketData"; if (args.Length < 3) { Console.Out.WriteLine("usage: ZmqTypedPublisher <roundtrip-count> " + "<bind-to-sub> [<connect-to-pub>]+\n"); Console.Out.WriteLine(" e.g.: ZmqTypedPublisher 10000 tcp://10.201.200.72:54321 tcp://10.201.200.72:54322\n"); return 1; } int roundtripCount = Convert.ToInt32(args[0]); String bind_to_sub = args[1];We publish "MarketData" and "QuoteRequest" messages; the subscriber also uses the topic filter string to determine the message type.// Initialise 0MQ infrastructure ZMQ.Context ctx = new ZMQ.Context(1, 1, 0); ZMQ.Socket pub = ctx.Socket(ZMQ.PUB); ZMQ.Socket sub = ctx.Socket(ZMQ.SUB); sub.SetSockOpt(ZMQ.SUBSCRIBE, MARKET_DATA_TOPIC); sub.SetSockOpt(ZMQ.SUBSCRIBE, QUOTE_TOPIC); Console.Out.WriteLine("Binding to " + bind_to_sub); sub.Bind(bind_to_sub);We connect to each subscriber endpoint.int num_subscribers = 0; for (int i = 2; i < args.Length; ++i) { Console.Out.WriteLine("Connecting to " + args[i]); pub.Connect(args[i]); ++num_subscribers; } // Create two messages to send, and stream each message // to a byte array IFormatter formatter = new BinaryFormatter(); ASCIIEncoding encoding = new System.Text.ASCIIEncoding(); MarketData md = MarketData.createTestData(); QuoteRequest qr = QuoteRequest.createTestData(); Console.Out.WriteLine("Sending messages -- " + num_subscribers + " subscribers, " + roundtripCount + " messages"); // Start measuring the time. System.Diagnostics.Stopwatch watch; watch = new Stopwatch(); watch.Start(); // Start sending messages. for (uint i = 0; i < roundtripCount; i++) {Every tenth message is aQuoteRequest; the others areMarketDatamessages.// Send 90% MarketData messages if (i % 10 == 5) { qr.isEcho = false; qr.counter = i; byte[] quoteMsg = serialize(qr, QUOTE_TOPIC, formatter, encoding); pub.Send(quoteMsg); } else { md.isEcho = false; md.counter = i; byte[] mdMsg = serialize(md, MARKET_DATA_TOPIC, formatter, encoding); pub.Send(mdMsg); }We wait for an echoed message from each subscriber.byte[] echoMsg; for (int jj = 0; jj < num_subscribers; ++jj) { sub.Recv(out echoMsg); // Get the "Topic" from the front of the byte array int topicEndIndex = Array.IndexOf(echoMsg, (byte)'\x00'); String topic = new String(encoding.GetChars(echoMsg, 0, topicEndIndex)); // Deserialize the echo, which should be the same message object echo = deserialize(echoMsg, topic, formatter); if (topic.Equals(MARKET_DATA_TOPIC)) { MarketData mdEcho = (MarketData)echo; Debug.Assert(mdEcho.isEcho == true, "Subscriber forgot to set isEcho flag to true"); Debug.Assert(mdEcho.counter == i, "Counter mismatch in subscriber's reply"); } else if (topic.Equals(QUOTE_TOPIC)) { QuoteRequest qrEcho = (QuoteRequest)echo; Debug.Assert(qrEcho.isEcho == true, "Subscriber forgot to set isEcho flag to true"); Debug.Assert(qrEcho.counter == i, "Counter mismatch in subscriber's reply"); } else { Console.Out.WriteLine("ERROR: received topic " + topic); return -1; } } } // Stop measuring the time. watch.Stop(); Int64 elapsedTime = watch.ElapsedTicks; // Print out the test parameters. Console.Out.WriteLine("roundtrip count: " + roundtripCount); // Compute and print out the latency. double latency = (double)(elapsedTime) / roundtripCount / 2 * 1000000 / Stopwatch.Frequency / (double)num_subscribers; Console.Out.WriteLine("\n\nYour average latency is {0} [us]\n\n", latency.ToString("f2")); return 0; }The serialization functions are below. Note that we first write the topic name, with a null terminator, to the stream. Then we write the streamed object itself. The .NET stream keeps track of its cursor internally, which helps significantly.static byte[] serialize(object obj, String topic, IFormatter formatter, ASCIIEncoding encoding) { MemoryStream stream = new MemoryStream(); // "topic" for ZeroMQ stream.Write(encoding.GetBytes(topic + '\x00'), 0, topic.Length + 1); formatter.Serialize(stream, obj); stream.Close(); return stream.ToArray(); }To deserialize, note that we must seek past the topic name. We know what the echoed topic name should be because it should match what we published. On the subscriber side, it is a bit more complicated to extract the topic name from the message before deserializing the objects.static object deserialize(byte[] msg, String topic, IFormatter formatter) { MemoryStream stream = new MemoryStream(msg); // Seek past "topic" for ZeroMQ stream.Seek(topic.Length + 1, SeekOrigin.Begin); object obj = formatter.Deserialize(stream); stream.Close(); return obj; } }The subscribing process subscribes to the "MarketData" and "QuoteRequest" filters and loops to receive messages. For each message, it checks the message's filter string value, skips over the filter string, deserializes the rest of the message into the appropriate .NET instance, and checks the "echo" flag and the counter. It then sets the "echo" flag to true and reserializes the .NET instances to echo them back across the wire to the publisher. The subscriber's code, located inzeromq/csharp/ZmqTypedSubscriber.cs, is shown below: The subscriber subscribes to "MarketData" and "QuoteRequest".ZMQ.Socket sub = ctx.Socket(ZMQ.SUB); sub.SetSockOpt(ZMQ.SUBSCRIBE, QUOTE_TOPIC); sub.SetSockOpt(ZMQ.SUBSCRIBE, MARKET_DATA_TOPIC); sub.Bind(bind_to_sub); ASCIIEncoding encoding = new System.Text.ASCIIEncoding(); IFormatter formatter = new BinaryFormatter(); Console.Out.WriteLine("Entering recv loop -- " + roundtripCount + " messages"); for (int i = 0; i < roundtripCount; i++) { byte[] msg; sub.Recv(out msg);Here, we detect the full topic name by assuming that it is null-terminated. We save the topic name and seek past it to deserialize theQuoteRequestorMarketDataobject, whichever it might be. The topic name tells us the data type of the serialized object.// Get the "Topic" from the front of the byte array int topicEndIndex = Array.IndexOf(msg, (byte)'\x00'); String topic = new String(encoding.GetChars(msg, 0, topicEndIndex)); MemoryStream inStream = new MemoryStream(msg); inStream.Seek(topic.Length + 1, SeekOrigin.Begin); object obj = formatter.Deserialize(inStream); inStream.Close(); if (topic.Equals(MARKET_DATA_TOPIC)) { MarketData md = (MarketData)obj; if (md.isEcho == true) { Console.Out.WriteLine("Subscriber received echo sample"); } if (md.counter != i) { Console.Out.WriteLine("Counter mismatch"); } md.isEcho = true; } else if (topic.Equals(QUOTE_TOPIC)) { QuoteRequest qr = (QuoteRequest)obj; if (qr.isEcho == true) { Console.Out.WriteLine("Subscriber received echo sample"); } if (qr.counter != i) { Console.Out.WriteLine("Counter mismatch"); } qr.isEcho = true; } else { Console.Out.WriteLine("ERROR: received topic " + topic); return -1; } MemoryStream outStream = new MemoryStream();To echo the object back, we first stream the topic name, then the object.// "topic" for ZeroMQ outStream.Write(new System.Text.ASCIIEncoding().GetBytes(topic + '\x00'), 0, topic.Length + 1); formatter.Serialize(outStream, obj); outStream.Close(); msg = outStream.ToArray(); // Echo it back pub.Send(msg); } Console.Out.WriteLine("Finished receiving messages"); return 0; } }We'll make our lives easier by running the test with the provided Perl script, as follows:perl %MNB_ROOT%\bin\zeromq_test.pl ZeromqTypedDotnet 1000We should see output that looks like this:
Entering recv loop -- 1000 messages Binding to tcp://10.201.200.72:54321 Connecting to tcp://10.201.200.72:54322 Sending messages -- 1 subscribers, 1000 messages Finished receiving messages roundtrip count: 1000 Your average latency is 537.33 [us]So our average latency for 1000 messages is about 537 microseconds. You'll want to run the test several times, as the latency numbers can vary based on other processes currently executing.
As with all of our Perl scripts, we can tell the script to launch more than one subscriber:
perl %MNB_ROOT%\bin\zeromq_test.pl ZeromqTypedDotnet 1000 -num-subs 3OpenDDS .NET Tests
Since neither OpenDDS nor Boost.Asio supplies .NET bindings, we'll have to roll our own. We'll use a small C++/CLI layer as an intermediary between the C# code and our C++ middleware. In the end, we'll combine C#, C++/CLI, and standard C++ in the same executable. For more information on combining .NET and OpenDDS, please see Charles Calkins' fine series of Middleware News Brief articles, especially this one.
The code for the C++/CLI layer between .NET and OpenDDS is in the
opendds/managed_cppdirectory. Since this article is not trying to describe how to integrate .NET and OpenDDS, I'll simply show the header file for the interface between the two, plus thewritemethod. For more information on integrating .NET and OpenDDS, please see either the attached code or this article.Our C++/CLI interface between C# and OpenDDS, located in
opendds/managed_cpp/OpenDdsPubSubProxy.handopendds/managed_cpp/OpenDdsPubSubProxy.cpp, is as follows:// .NET using namespace System; #include <vcclr.h> namespace MiddlewareNewsBrief { class OpenDdsPubSubUtil; public enum class ProcessType { PUBLISHER_PROCESS, SUBSCRIBER_PROCESS }; // wrap OpenDDS interaction with a .NET class public ref class OpenDdsPubSubProxy {We delegate to an unmanaged C++ class (not shown, but available in the filesopendds/managed_cpp/OpenDdsPubSubUtil.handopendds/managed_cpp/OpenDdsPubSubUtil.cpp) that forms the bridge into OpenDDS.// can't put an unmanaged thing in a ref class, but a // pointer to an unmanaged thing is okay OpenDdsPubSubUtil* impl_; public: OpenDdsPubSubProxy(ProcessType ptype); ~OpenDdsPubSubProxy(); // Returns an integer handle for writing to a particular topic int createDataWriter(String^ topic_name);Note the use of thepin_ptrin thewritemethod. We're passing a C++/CLI buffer into unmanaged C++, and we need to pin the buffer to its current memory location so the C++/CLI garbage collector doesn't try to move it around.This event handler is called when an OpenDDS sample is received. The OpenDDSon_data_availablecallback is mapped to this C#EventHandler.delegate void EventHandler(Object^ sender, int data_writer_handle, array<unsigned char> ^managed_buffer); EventHandler^ handler_; event EventHandler^ ProcessNotification { void add(EventHandler^ p) { handler_ +=p; } void remove(EventHandler^ p) { handler_ -=p; } void raise(Object^ obj, int data_writer_handle, array<unsigned char> ^managed_buffer) { if (handler_!=nullptr) { handler_(obj, data_writer_handle, managed_buffer); } } }; }; } // namespaceThis simple C++/CLI API exposes the ability to create a DataWriter on a topic, to write data samples, and to attach an event handler to receive callbacks when echoed data is received. The publishing code is inopendds/csharp/TypedDotnetPublisher.cs, and is shown below:using System; using System.Collections.Generic; using System.Text; using System.Runtime.InteropServices; using System.Diagnostics; using System.Runtime.Serialization; using System.Runtime.Serialization.Formatters.Binary; using System.IO; using System.Threading; using MiddlewareNewsBrief; public class TypedDotnetPublisher { OpenDdsPubSubProxy proxy_; uint echo_count_ = 0; uint message_count_ = 0; int marketDataWriterHandle_ = -1; int quoteRequestDataWriterHandle_ = -1; IFormatter formatter_ = new BinaryFormatter(); static unsafe int Main(string[] args) { TypedDotnetPublisher me = new TypedDotnetPublisher(); me.Run(args); return 0; } public TypedDotnetPublisher() { } int Run(string[] args) { const String QUOTE_TOPIC = "QuoteRequest"; const String MARKET_DATA_TOPIC = "MarketData"; int roundtripCount = 10000; int num_subscribers = 1; for (int i = 0; i < args.Length; ++i) { if (args[i].Equals("-num")) { roundtripCount = Convert.ToInt32(args[i + 1]); } else if (args[i].Equals("-ns")) { num_subscribers = Convert.ToInt32(args[i + 1]); } } Console.WriteLine(num_subscribers + " Subscribers, " + roundtripCount + " messages");We create our DataWriters and attach callbacks through the C++/CLIOpenDdsPubSubProxy.this.proxy_ = new OpenDdsPubSubProxy(MiddlewareNewsBrief.ProcessType.PUBLISHER_PROCESS); this.proxy_.ProcessNotification += new OpenDdsPubSubProxy.EventHandler(OnEchoReceived); this.marketDataWriterHandle_ = this.proxy_.createDataWriter(MARKET_DATA_TOPIC); this.quoteRequestDataWriterHandle_ = this.proxy_.createDataWriter(QUOTE_TOPIC); System.Threading.Thread.Sleep(1000); Console.Out.WriteLine("Sending messages -- " + num_subscribers + " subscribers, " + roundtripCount + " messages"); // Create two messages to send MarketData md = MarketData.createTestData(); QuoteRequest qr = QuoteRequest.createTestData(); // Start measuring the time. System.Diagnostics.Stopwatch watch; watch = new Stopwatch(); watch.Start(); // Start sending messages. for (uint i = 0; i < roundtripCount; i++) { this.message_count_ = i;We take either aMarketDataor aQuoteRequest, serialize it into the buffer, and write the buffer across the wire.// Send 90% MarketData messages if (i % 10 == 5) { qr.isEcho = false; qr.counter = i; byte[] quoteMsg = serialize(qr, formatter_); proxy_.write(this.quoteRequestDataWriterHandle_, quoteMsg); } else { md.isEcho = false; md.counter = i; byte[] mdMsg = serialize(md, formatter_); proxy_.write(this.marketDataWriterHandle_, mdMsg); }Here, we wait for an echoed sample from each subscriber.
