Htcondor Qpid

HTCondor Qpid Messaging System

The HTCondor Qpid Messaging System can be divided into 3 units.

  1. Message Publisher (client producer)
  2. broker where the messaging Queues are initialized (server)
  3. Message Listener (client consumer)

At a higher level the interaction between these layers is depicted in the following diagram.

qpid.jpg Figure 1

The Message Publisher sends messages to the queue in the broker. This is similar to a PUT http request where the client sends new content to the the server. The message thus obtained from the Message Publisher is appended to the target queue. The messages in the queue are maintain persistently.When a Message Listener establishes connection with the queue and requests for messages,they are forwarded to the listener. Again this interaction is similar to a GET http request.

A detailed description of each of these three units follows.

Qpid Message Publisher:

Event logs for jobs submitted with HTCondor are written by the Schedd and the Shadow. These log files are rotated over to maintain the latest 'X' event log entries. One of the ideas of the message publisher is to read and persistently forward Event log entries into the AMQP queue.

The message publisher uses HTCondor's log reader api.The publisher works as a wrapper on top of the log reader api, providing control to selectively include or exclude event types of the job whose logs are to be read and forwarded.

The challenges faced by this system are

  1. avoid loss of log entries.
  2. avoid duplicate log messages
  3. message transformation into AMQP queue format

The message publisher solves these issues by

  • assigning each read entry a unique ID
  • maintaining a second message queue which is consumed by the publisher whenever the client queue is accessed. After each message consumption, the publisher makes sure that the consumed entry left the queue.
  • asserting that messages are read in order according to their unique ID (thus noticing missing messages)
  • persistently storing state, such that in case of a system crash the previous queue state can be restored
  • Reformatting log events: Currently the log message can be modified to the syslogng format or the classAd format or simply made as a sequence of characters (with or without separators). The abstraction on the formatting api provides easy scope to plugin other formats. Thus the log events can be converted to messages of required format and then forwarded.

Forwarding could mean simply to write into another text file(locally/remotely) or send messages via a syslogng or to a messaging queue system. In this documentation we are to discuss about the interactions with a messaging system.

In figure 2 the various components of the Event Log Publisher and integration with Qpid as a client publisher is given.

condorQpidInt.jpg Figure 2

Figure 3 provides the flow chart explanation of the operational logic.

condorQpidFlow.jpg Figure 3

Figure 3 in words:

  1. Init the user/event log file
  2. Read event
  3. Format event data and create unique ID
  4. If message queue is empty, try to restore from persistent state, otherwise consume an entry from backup queue (the queue which is read by publisher itself to assure consistency)
  5. compare message ID against previous message to assert order
  6. update persistence file with new ID and send message to both queues
  7. Continue with reading the next event from the user/event log file
  8. ...

SOURCE CODE:

  • Qpid Client Publisher: topic_publisher.cpp

Qpid Messaging System:

Apache's Qpid Messaging System plays the middle man role. Given below is some notes about Qpid.

Enterprise Messaging systems let programs communicate by exchanging messages, much as people communicate by exchanging email. Unlike email, enterprise messaging systems provide guaranteed delivery, speed, security, and freedom from spam. Until recently, there was no open standard for Enterprise Messaging systems, so programmers either wrote their own, or used expensive proprietary systems.

AMQP Advanced Message Queuing Protocol is the first open standard for Enterprise Messaging. It is designed to support messaging for just about any distributed or business application. Routing can be configured flexibly, easily supporting common messaging paradigms like point-to-point, fanout, publish-subscribe, and request-response.

Apache Qpid implements the latest AMQP specification, providing transaction management, queuing, distribution, security, management, clustering, federation and heterogeneous multi-platform support and a lot more. And Apache Qpid is extremely fast.

More information about Qpid and its implentation,configuration, and setup details can be found at [http://qpid.apache.org/]

The Qpid broker is configured to use the publisher-subscriber routing model and store messages persistently. Also the queues in the broker and their corresponding routing keys are used to initialize queues in the Qpid broker.

SOURCE CODE:

  • Qpid queue declaration and binding with exchanges: declareQueues.cpp

Qpid Message Listener:

This qpid client is used for requesting messages from the broker queues. Multiple message listeners can run concurrently each tied to a different or same queue to read messages. In our implementation we have a listener that can either write the listened log messages to a file(local/remote) after formatting or send in emails or send instance messages.

SOURCE CODE:

  • Qpid Client Listener that sends skype IM: topic_listener.cpp
  • Skype IM script: messageAll.py

Figure 4 details the integration of the above explained 3 components:

condorQpidInt2.jpg Figure 4

HTCondor ADD-ON:

The integration of the event log forwarder with the messaging system is added into the HTCondor source code as a daemon ,HTCondor job implementation. condor_qpid daemon:

The qpid broker is run as a HTCondor daemon – condor_qpid that is controlled by the condor_master. It performs the following functions:

  1. Declare queues and bind them to the exchange
  2. run the qpid broker on a free available port that is dynamically chosen
  3. publish a class Ad containing meta details about the qpid broker such as port #, host name, queue name, routing key

HTCondor jobs:

The qpid clients that send messages to the broker queues as well and listen/retrieve messages run as HTCondor jobs that poll for the condor_qpid daemon published classAd for meta details needed to connect to the broker queues.

Figure 5 explains the control flow between the HTCondor qpid broker daemon and client HTCondor jobs.

condorQpidDaemon.jpg Figure 5


Example session:

Once you've built HTCondor Pigeon, this is what an example session could look like (make sure you have $qpid/lib and $condor/lib in your LD_LIBRARY_PATH, and $condor/bin and $condor/sbin in your PATH):

 # start the daemon
condor_pigeon

 # run the publisher, where myLog.log is an existing HTCondor user log file
condor_pigeon_topic_publisher myLog.log persist.dat

 # run the example listener
condor_pigeon_topic_listener

Attachments: