wiki:EventDesign
Last modified 11 years ago Last modified on 01/10/2008 02:12:23 PM

The LSST Event System

from: LSST Software Design -> LSST Event System and from: DC2 Management -> Other Components.

Overview

The LSST Event System is used to carry information between different software components. An interface in C++ and Python is provided for sending short messages between processes. They are used for logging, exceptions, and control to allow processes to adjust how they run.

LSST event messages are transmitted and received through the Apache ActiveMQ message broker on channels called "topics". Any number of processes can publish messages to a topic, and those messages are received by all processes that subscribe to that topic.

We use Mule to provide more complex routing than the one-to-one, one-to-many, many-to-one, and many-to-many routing that ActiveMQ provides. Messages can be filtered by message type, and/or on message contents, and then redirected to other topics or software services for consumption. Additionally, event messages can be transformed by Mule components and redirected to other services. We use Mule to subscribe to logging topics and in turn insert records into a database. The event portal interacts with a web service interface in Mule to extract records from that database.

ActiveMQ, JMS and MULE

The messaging system we use is called ActiveMQ. It provides loosely coupled, reliable, asynchronous communication between software components. The implementation we use in LSST is ActiveMQ-CPP, the C++ implementation. The LSST Event System objects are written in C++, and wrapped with Swig to produce Python classes.

The message format that ActiveMQ-CPP uses for LSST Events conforms with Java Message Service (JMS) messages and is treated by Mule components as such.

MULE is an open source ESB (Enterprise Service Bus) platform which handles interactions between services and applications. It supports many different types of transports and protocols, and provides a way of brokering interactions between these protocols. The Mule server uses an XML configuration file to describe how the software components interact with each other, as well as how Mule services are exposed.

For LSST, we set up components that allow us to aggregate, filter and redistribute events. For example, incoming logging messages are automatically cataloged via a MULE component that stores incoming records into a MySQL database. We query this database through a MULE web service triggered by a request from the event web portal.

The diagram below shows an example of how Mule might be used to do complex routing. The top portion of the diagram shows software components that publish event messages. The middle box represents the Mule XML configuration file and illustrates how messages are routed from various publish topics to subscription topics. The bottom portion of the diagram show the software components that subscribe to the redirected event channels.

To see how routing works, look at the Pipeline software component in the upper left hand corner of the diagram. It contains three different types of events, "Pipeline Status", "Pipeline Advisory", and "Pipeline Exception". They are all sent to the "Pipeline" topic. The events are captured by Mule which has requested that it receive messages as a subscriber to the "Pipeline" topic. Within mule, software components look at each of the message types, and republished to subscription queues. In this example, the incoming messages are duplicated by Mule and routed to two destinations: all the messages are sent to the "Pipeline Status" subscription topic, and the "Pipeline Problem" subscription topic only gets the "Pipeline Advisory" and "Pipeline Exception" messages.

Note that Mule itself is a publisher and subscriber to the ActiveMQ message broker, which is how it receives and redirects events. Mule is only required if you want to do some type of filtering and redirection of event messages.

Event Routing

Publishing Messages

Events are sent and received on topics handled by the ActiveMQ server. A topic can be any name that you wish to use. There is no need to register the name used on an ActiveMQ server ahead of time. Just start using the name you'd like to use.

It's very simple to publish and receive messages. Say that you wanted to send messages between to processes using the topic "astro". One process would create an EventTransmitter object, and send messages to topic "astro" using the publish() method. The second process process would create an EventReceiver object and call a receive() method to receive messages on topic "astro".

Messages send to a topic are said to publish to that topic. A process that creates an EventReceiver on a topic is said to subscribe to that topic. In the case above, the publish and subscribe topic are one in the same. It is possible, using the MULE routing mechanism, to publish a message to a topic, and have a subscriber to a different topic receive that message.

Note that you're not limited to one transmitter or one receiver per topic. You can have as many transmitters or receivers for messages as you'd like on a topic. A message sent from any publisher to a topic will be received by ALL subscribers to that topic.

The message you send is a DataProperty node with one or more leaves. Currently, this is the only format that is supported. Support for DataProperty objects with more complex hierarchies will be added once the DataProperty serialization API is completed. The contents of the DataProperty are up to the developer.

Application Interface

The Python API for the event system is straightforward. To send an event, you must first create a connection to the ActiveMQ server and register to publish on a topic. To do this, you create an EventTransmitter:

The arguments for this constructor are:

EventTransmitter(machine,topic)

where:

machine::

is the name of the machine which is running the ActiveMQ server

topic::

is the topic to which events will be transmitted.

For example:

import lsst.events as events

if __name__ == "__main__":
    transmitter = events.EventTransmitter("www.google.com", "logging")

Creates a connection to the ActiveMQ server at "www.google.com", and registers to send to topic "logging".

You can also use a policy file object to initialize the constructor:

    policy = Policy.createPolicy("examples/policies/localsockets_policy.paf", 1)
    transmitter = events.EventTransmitter(policy)

The policy file attributes for events are described below.

Events are sent as a DataProperty. The current format for this is a DataProperty root node with key/value pairs in DataProperty objects as children of the root node. (Once DataProperty serialization is implemented, any type of DataProperty can be sent).

The call for the publish method is:

transmitter.publish(type,dp)

where:

type::

is a string naming what type of message this is.

dp::

is the DataProperty object you want to publish

To publish an event, first create a data property node. This is the "root" of the DataProperty. Create the key value pairs as other DataProperty objects, and add those as children of the root node.

Now you must decide the type name of the message. The type is used as additional routing information by MULE to sort through different types of messages that might be sent on a particular topic. For example, for exceptions, a "FatalException" or a "OutOfBounds" type might be used to label an exception. Using types in this way allows MULE to filter messages send oner topic and re-route the message to another topic that might only be interested in messages of a particular type. For example, a controlling process might only be interested in "FatalExceptions", and not any other message type. This is purely used by Mule, and is not delivered to the receiver's endpoint. If that information is necessary at the endpoint, store it as a DataProperty?.

The reason this isn't delivered to the endpoint is that typing information is part of the underlying JMS message, and allows Mule to quickly filter messages. If all filtering were done on message contents, a custom Mule object that filters based on contents would need to be invoked by Mule. This might need to be done to filter on multiple DataProperty values, but in the majority of cases filtering on the "type" in the send call should be sufficient.

To continue, once you've done created the DataProperty node and leaves, you can send the message.

    root = datap.SupportFactory.createPropertyNode("root");
    
    host = datap.DataProperty("HOST","search1.google.com")
    root.addProperty(host)    
    
    pid = datap.DataProperty("PID",200)
    root.addProperty(pid)    
    
    fid = datap.DataProperty("FID",3.14)
    root.addProperty(fid)

    misc1 = datap.DataProperty("misc1","data 1")
    root.addProperty(misc1)

    misc2 = datap.DataProperty("misc2","data 2")
    root.addProperty(misc2)

    transmitter.publish("log", root)

To receive an event, create an EventReceiver. The constructor for this call looks like this:

EventReceiver(machine,topic)

where:

machine::

is the name of the machine which is running the ActiveMQ server

topic::

is the topic to receiver events from.

For example, this piece of code:

import lsst.events as events

if __name__ == "__main__":
    receiver = events.EventReceiver("www.google.com", "logging")

creates an EventReceiver to receive events from the ActiveMQ server running on www.google.com, from subscription topic "logging".

The EventReceiver constructor can also take a policy object as an argument:

    policy = Policy.createPolicy("examples/policies/activemq_policy.paf", 1)

    receiver = events.EventReceiver(policy)

Events are retrieved via the receive() method, which can take one of two forms:

Calling receive() with no arguments blocks until an event is received.

   val = x.receive()

Calling receive() with a numeric argument waits for that number of milliseconds. If an event is received in that time, it is returned as a DataProperty. If the specified time elapses before an event is received, the value of the object returned is null.

  val = x.receive(8000)

In Python this is tested as:

    if val.get() != None:
        something_interesting_happens_here()

EventSystem

The EventSystem object consolidates the functionality of EventTransmitter and EventReceiver into one common interface. The EventSystem object permits higher levels of the software runtime to configure the locations of the event brokers for transmitters and receivers. The lower levels of the software can then just publish on well-known topics, without having to set up and maintain the objects themselves; they can just publish and receive.

Here's an example of what higher level software would do. To sent up an event transmitter and event receiver to send and receive messages from the message broker on lsst8.ncsa.uiuc.edu for the "slice1" topic, you would do the following:

    eventSystem = EventSystem().getDefaultEventSystem()
    eventSystem.createTransmitter("lsst8.ncsa.uiuc.edu", "sliceTopic1")
    eventSystem.createReceiver("lsst8.ncsa.uiuc.edu", "sliceTopic1")

this sents up the information about the transmitters and receivers in a static object that can be later called and used by other lower levels of the software runtime.

The following code would be used by the lower level software runtime to publish a message:

    eventSystem = EventSystem().getDefaultEventSystem()
    eventSystem.publish("sliceTopic1", dpt)  // first parameter is the topic name, second parameter is the DataProperty

(Note that unlike EventTransmitter's publish() method, the publish() method in EventSystem doesn't require a "type" parameter. That was currently only used by the event system through Mule for more sophisticated routine, and is under review for the public interface).

The following code would be used by the lower level software runtime to receive a message

    eventSystem = EventSystem().getDefaultEventSystem()
    val = eventSystem.publish("sliceTopic1")  // the parameter is the topic name, and returns a DataProperty

Policy Objects

The parameters that EventTransmitter and EventReceiver check for in policy objects are:

ParameterTypeDefault
turnEventsOffbooleanfalse
useLocalSocketsbooleanfalse
hostNameStringempty
topicNameStringempty

Setting turnEventsOff to true disables event transmission completely. This is useful if you want to test some code that also contains event transmission objects, but don't want to send those events for some reason. An EventTransmitter object with this set won't send any events, and an EventReceiver object won't attempt to receive events.

Setting useLocalSockets to true makes EventTransmitter/EventReceiver use UNIX domain sockets instead of the ActiveMQ broker. This is useful in situations where you want to test events between to processes, but don't have access to a machine running the ActiveMQ broker. Both the process transmitting the event and the process receiving the event has to be on the same machine.

The value for hostName and topicName serve the same purpose as they do in the constructor objects, which is outlined above. Note that hostName is ignored if useLocalSockets is set to true, since all data transmission using UNIX domain sockets are limited to the host you're running the processes on.

Attachments