Event Framework Changes
The old method of using PropertySets directly in sending and receiving events has been retained, but deprecated in favor of encapsulating PropertySets into Event objects, which are described below. The intention is to remove direct publishing and receiving of PropertySets using the old method in an upcoming release. They are being retained now (instead of being removed completely) to make the transition to the new API easier, rather than breaking everything that uses it. Note that the new features being added to the API use the new Event objects, and are not available to code using the directly sent and received PropertySets.
Two features are being removed: Local sockets and the "matchingReceive" methods.
Local sockets are being removed in favor of using the ActiveMQ directly. The code implementing this used Unix domain sockets which could not emulate what you might see by using the ActiveMQ daemon in production. Furthermore, the new message property filtering mechanism would not have been able to be implemented properly, since there is no interaction with the ActiveMQ event broker. The ActiveMQ broker has been installed as part of the software stack.
The matchingReceive methods, which did client side filtering of events have been removed in favor of implementing message selectors. Messages are not filtered on the server side. This is faster, removes the message cache which was kept internally, reduces the amount of code in the Event framework, and is much more efficient.
Event objects
In previous releases, the Event system operated on PropertySet objects which had no semantic meaning. The EventTransmitter was given a PropertySet to send to a topic, and each EventReceiver listening to that topic received the PropertySet.
In this release we introduce Event objects. Instead of having EventTransmitters and EventReceivers operating directly with PropertySet objects, they will operating on Event objects, which encapsulate those PropertySet objects. The Event objects themselves will contain additional information available through accessor methods. This will help the users of Events to determine the type of Event object which is received, so they can decide what to do with it.
entity Event
- property type - string:: A unique name indicating the type of an event.
- property eventTime - long long:: A timestamp (in ns since epoch) representing the nominal time of the occurance or state change being reported by this Event. In practice, this will usually be set to the time that the Event was created by the publisher; however, it can be set to any time past or present, depending on the semantics of the Event topic. While this precision of this value is ns, the value may represent only an estimate.
- property hostId - string:: A unique name for the host (usually the IP hostname) running the process that published the event.
- property runId - string:: A logical name representing the execution context of the publisher. Logically coupled processes may share the same runId.
- property status - string:: A semantic identifier that uniquely describes the state, change of state, or requested action being communicated by this event. A subscriber can match this identifier (via string compare) against a known of identifiers and assume the implied semantics. If the Event carries no additional semantics (beyond what is implied by the topic and the specific Event type), this property will be an empty string.
- property topic - string:: This is the name of the channel that this event was received on. This is set by
EventBroker?EventTransmitter. - property pubTime - long long :: The time (in ns since the epoch) that the event was first published. This is set by the EventTransmitter.
Added API methods:
- getCustomPropertyNames() - vector<std::string>:: Returns the names of the non-standard, application-specific properties attached to this Event. These are properties that are not set automatically when the Event is created or published.
- getEventTime() - long long:: Return the eventTime property.
- setEventTime(long long nsecs) - void:: Set the EventTime?, in nanoseconds.
- updateEventTime() - void:: Set the EventTime? to the current time.
- getEventDate() - string:: Return a formatted date string representing the eventTime property.
- getPropertySet():- PropertySet:: Return a PropertySet containing all of the properties attached to this Event. This includes both standard, type-specific properties and additional custom properties.
- getPubDate() - string:: Return a formatted date string representing the publication time.
- getPubTime() - long long:: Return the value of the pubTime property or zero if it has not been published yet.
- getTopic() - string:: Return the current value of the topic or an empty string if it has not been published yet.
- getHostId() - string:: return the name of the host which created this event.
- getRunId() - string:: return the LSST run id for this event.
- getType() - string:: return the name of the type of event this is.
- getStatus() - string:: return a status message.
C++:
Event(const std::string& runid, const PropertySet& ps);
Event(cms::TextMessage *msg, const PropertySet::Ptr ps);
~Event();
PropertySet::Ptr getPropertySet() const;
std::string getPubDate();
long long getPubTime();
void setPubTime(long long t);
long long getEventTime();
std::string getEventDate();
std::string getHostId();
std::string getRunId();
std::string getType();
std::string getStatus();
void setTopic(std::string topic);
std::string getTopic();
vector<std::string> getFilterablePropertyNames();
vector<std::string> getCustomPropertyNames();
PropertySet::Ptr getCustomPropertySet() const;
virtual void populateHeader(cms::TextMessage* msg) const;
Each of the Event objects listed below are subclasses of the Event object, unless otherwise noted.
ErrorEvent - An event reporting an error of some time.
Additional field:
- property message : string - a string that describe the error
C++:
ErrorEvent(const std::string& runid, PropertySet::Ptr psp);
~ErrorEvent();
std::string getMessage();
ExceptionEvent - An event reporting that an unhandled error has been detected by the producing process. Typically, that process or thread has stopped running as a result of this error. Subclassed from ErrorEvent.
Additional fields:
- property exceptionType : string
- property stackTrace : string
C++:
ExceptionEvent(const std::string& runid, const LSSTException ex);
~ExceptionEvent();
std::string getExceptionType();
std::string getStackTrace();
FaultEvent - An Event issued by the Fault System that signals an error condition based on analysis of various Monitor Events. Typically, this will require attention by an administrator or automated agent. Subclassed from ErrorEvent
C++:
FaultEvent(const std::string& runid, const PropertySet::Ptr& psp);
~FaultEvent();
LogEvent - An event representing a Log message. It is used from an EventLog (created from a LogRecord) in order to share that message with other remote components.
Recognized values for the status property include:
start - some logical block of processing (usually identified by the log
property) has just started.
end - some logical block of processing (usually identified by the log
property) has just ended.
Additional fields:
- comment - StringArray
- The human-readable log messages
- level - int
- LogRecord's importance property
- log - string
- LogRecords name property
C++:
LogEvent(const std::string& runid, const pexLogging::LogRecord& rec);
~LogEvent();
int getLevel();
std::string getLog();
vector<std::string> getComment();
StatusEvent - An Event indicating progress or normal change of state of a system. The producing process continues to run after issuing such an event. Receivers of these events wait for certain states to change, and react to these changes in status. An example would be a pipeline component that generated a message that states that a data product is complete. The receiver of this event would see that the data product is ready, and would act on this information.
This is different from a LogEvent in that it may not be persisted in the same way (if at all).
Additional fields:
- originatorId - unsigned long: An identifier which names which component generated this event. This is composed of an IP address, process ID, and a local id, which identifies this particular message. The originatorId is automatically generated when the StatusEvent? is created.
C++:
StatusEvent(const std::string& runid, int64_t originatorId, const PropertySet& ps);
StatusEvent(const std::string& runid, int64_t originatorId, const PropertySet::Ptr psp);
StatusEvent(cms::TextMessage *msg, const PropertySet::Ptr psp);
~StatusEvent();
virtual void populateHeader(cms::TextMessage *msg) const;
short getProcessId();
short getLocalId();
int getIPId();
int64_t getOriginatorId();
CommandEvent - An Event intended as a message to another component. The producing process continues to run after issuing such an event. Receivers of these events act on these commands. This is issued in response to a StatusEvent.
This is different from a StatusEvent in that it indicates to the receiving component that action should be taken, whereas the StatusEvent just indicates status for a component.
Additional fields:
- originatorId - unsigned long: An identifier which names which component generated this event. This is composed of an IP address, process ID, and a local id, which identifies this particular message. The originatorId is automatically generated when the StatusEvent? is created.
C++:
CommandEvent(const std::string& runid, int64_t destinationId, const PropertySet::Ptr psp);
CommandEvent(cms::TextMessage *msg, const PropertySet::Ptr psp);
~CommandEvent();
virtual void populateHeader(cms::TextMessage *msg) const;
int64_t getOriginatorId();
short getOriginatorLocalId();
short getOriginatorProcessId();
int getOriginatorIPId();
int64_t getDestinationId();
short getDestinationLocalId();
short getDestinationProcessId();
int getDestinationIPId();
PipelineLogEvent This object is subclassed from LogEvent, and is issued by the executing pipeline.
Additional fields:
- dataId - string:: The unique identifier for the data chunk being processed in the current pipeline iteration. If this identifier is not known, the value should be an empty string.
- loopnum - int:: A number indicating the current pipeline iteration. In the case of the nightly pipelines, this is the iteration for processing the n-th observational visit.
- pipeline - string:: The logical name (ie. shortName) of the pipeline that generated this message.
- sliceId - int:: The identifier for the parallel slice process that generated this message. A value of -1 represents the Master Pipeline process.
- stageId - int:: The identifier for the stage currently running when the message was generated. A value of -1 means that no stage was in context.
C++:
PipelineLogEvent(const std::string& runid, const pexLogging::LogRecord& rec, const PropertySet::Ptr psp)
~PipelineEvent();
std::string getDataId();
int getLoopnum();
std::string getPipeline();
int getSliceId();
int getStageId();
Note that the additional fields for this event are supplied in the constructor as part of the PropertySet. If any required field does not present in this PropertySet, an exception will be thrown.
Finer Control over Event Listening:
ActiveMQ allows messages to be filtered according to the contents of the message properties.
Basic fields are bool/unsigned char/double/float/int/long long/short/string.
On the Receiver side, messages are filtered through the selector syntax of ActiveMQ. This lets us specify SQL92-style expressions to do filtering. A receiver can specify: "runid = 'srp2009'" to only receive messages whose properties match that. Much more complex expressions are possible.
The following is the C++ description of the proposed Event Framework Objects. Receiving on multiple topics with one EventReceiver is currently being investigated, and may cause additional API changes to that object and to the EventSystem object to accommodate this feature.
EventReceiver
EventReceiver(const pexPolicy::Policy& policy);
EventReceiver(const std::string& hostName, const std::string& topicName);
EventReceiver(const std::string& hostName, const int hostPort, const std::string& topicName);
~EventReceiver();
PropertySet::Ptr receive();
PropertySet::Ptr receive(long timeout);
std::string getTopicName();
static const long infiniteTimeout = -1;
NEW METHODS:
EventReceiver(const std::string& hostName, const std::string& topicName, const std::string& selector);
EventReceiver(const std::string& hostName, const int hostPort, const std::string& topicName, const std::string& selector);
Event receive();
Event receive(long timeout);
EventSystem
EventSystem();
~EventSystem();
static EventSystem& getDefaultEventSystem();
void createTransmitter(const std::string& hostName, const std::string& topicName);
void createTransmitter(const pexPolicy::Policy& policy);
void createReceiver(const std::string& hostName, const std::string& topicName);
void createReceiver(const std::string& hostName, const int hostPort, const std::string& topicName);
void createReceiver(const std::string& hostName, const std::string& topicName, const std::string& selector);
void createReceiver(const std::string& hostName, const int hostPort, const std::string& topicName, const std::string& selector);
void createReceiver(const pexPolicy::Policy& policy);
void publish(const std::string& topicName, const PropertySet::Ptr psp);
void publish(const std::string& topicName, const pexLogging::LogRecord& rec);
PropertySet::Ptr receive(const std::string& topicName);
PropertySet::Ptr receive(const std::string& topicName, const long timeout);
NEW METHODS:
void publish(const std::string& topicName, const Event& event)
Event receive(const std::string& topicName);
Event receive(const std::string& topicName, const long timeout);
NEW STATIC CONSTANT:
static const int DEFAULTHOSTPORT = 61616;
EventTransmitter
EventTransmitter(const pexPolicy::Policy& policy);
EventTransmitter(const std::string& hostName, const std::string& topicName);
EventTransmitter(const std::string& hostName, const int hostPort, const std::string& topicName);
~EventTransmitter();
void publish(const PropertySet::Ptr& psp);
void publish(const PropertySet& ps);
void publish(const pexLogging::LogRecord& rec);
std::string getTopicName();
void publish(const std::string& type, const PropertySet& ps);
NEW METHODS:
void publish(Event& event);


