Service Reports

When using the service agent framework, in addition to being able to subscribe to the service registry in order to receive announcements regarding the existence of services, it is also possible to subscribe to actual services. When subscribed to a service, if that service publishes a report with a subject matching the subscription, the subscriber will automatically receive it.

Referred to as publish/subscribe, this is a common feature of packages implementing message oriented middleware services. Note that in this implementation, the design and interface are driven by simplicity. As a result, the implementation is not underlaid by persistent message queues. While a subscriber exists and is known of by the publisher, it will receive any reports for which it has a valid subscription. If a subscriber is destroyed but is subsequently restarted, it will only receive reports published from that time onwards, it will not receive reports which may have been published in the time that it was offline.

The system design can therefore be likened to a system implementing instant messaging as opposed to a mailing list. In instant messaging you will only see those messages which are posted into a group while you are online, whereas with a mailing list any messages posted while you were away will still be there for you to see when you log back in.

As a basic system, this model of operation is suitable for many applications, but not all. If you are developing a system where it is imperative that you never miss a message, you would be advised to purchase a commercial message oriented middleware package. You will of course have to deal with the extra complexity and cost that entails.

Publishing Reports

If a service agent needs to publish a report, the member function "publishReport()" is used. In publishing a report, it will generally be the case that a service agent does it without caring who may actually be subscribed to that report. This is often referred to as anonymous publishing and results in a more loosely coupled system which can adjust dynamically to changes. That is, it is not necessary to hardwire into a service to whom it should send a report, instead, a service which is interested in the report will subscribe to it and the underlying system will handle everything else.

   1 self.publishReport("subject.string", "value")
   2 self.publishReport("subject.integer", 12345)
   3 self.publishReport("subject.float", 1.2345)
   4 self.publishReport("subject.list", [1, 2, 3, 4, 5])
   5 self.publishReport("subject.dict", {"one" : 1, "two" : 2})

When publishing a report, a service agent needs to supply a subject which in some way identifies the purpose of the report, as well as the content of the report. It is through subscription to specific subjects that subscribers will indicate their interest in specific reports. The subject name assigned to a report can have any value, but it is suggested that a hierachical naming convention be used. That is, use one or more name components, where each component is separated by a period.

By using a naming hierarchy, it becomes possible to aggregate reports into groupings which can then be easily subscribed to as a whole. Note that there is nothing special about a period as the separator for the name components. Other separators which are often used for performing the same task are a slash or a colon.

Monitoring Reports

A desire to subscribe to reports published by another service is indicated by a service agent calling the "monitorReports()" member function. In setting up such a subscription, the service agent must supply a callback function to be called when a report is received, the name of the service or the service binding object of the specific service agent to which it is subscribing and an indication of what reports it is interested in.

In the simplest case, a subscription can supply the exact same subject name under which a report is published. Alternatively, it can use special wildcard characters to allow it to pick up reports published against related subjects. The two special wildcard characters which can be used are "*" and "?". These can be incorporated anywhere in the subscription pattern.

The "?" can be used to match a single character within the subject name, where as a "*" will match any number of characters. Note that each will match any character, including a period or slash. As such, a subscription of "system.*" will match "system.time" and "system.statistics.users", but not "system". To subscribe to any reports from a particular publisher, "*" would be used.

Note that the subscription pattern described here is the default. It is actually possible within the C++ implementation of a service agent to override the default and supply an alternate matching algorithm. For example, in a bridge to the TIBCO/Rendevous package, a service agent would most likely redefine the matching algorithm to match that of that package. Therefore, when subscribing to a service agent, always check first exactly which scheme it uses.

   1 class Publisher(netsvc.Service):
   2     def __init__(self):
   3         netsvc.Service.__init__("publisher")
   4         self.joinGroup("publishers")
   5         self.publishReport("system.ctime", netsvc.DateTime(), -1)
   6         self.startTimer(self.timeout, 10, "heartbeat")
   7     def timeout(self, tag):
   8         self.publishReport("system.time", netsvc.DateTime())
   9         self.startTimer(self.timeout, 10, "heartbeat")
  10 
  11 class Subscriber(netsvc.Service):
  12     def __init__(self):
  13         netsvc.Service.__init__(self)
  14         # subscribe to any service agent with name "publisher"
  15         self.monitorReports(self.report, "publisher", "system.*")
  16     def report(self, service, subject, content):
  17         binding = self.currentReport().publisher()
  18         identity = binding.agentIdentity()
  19         publisher = "(%s/%s)" % (`service`, identity)
  20         if subject == "system.ctime":
  21             now = str(netsvc.DateTime())
  22             print "%s became available at %s" % (publisher, now)
  23             print "%s originally started at %s" % (publisher, str(content))
  24         elif subject == "system.time":
  25             print "%s was still alive at %s" % (publisher, str(content))

When called, the callback supplied by the subscriber will be passed three arguments. These are the service binding object for the service agent which published the report, the subject under which the report was published and the content of the report.

The service binding object for the service agent which published the report is provided for a number of reasons. The first is that since more than one service agent may use the same service name, it is possible that a subscription based on service name might result in responses from more than once service agent. The service binding object is therefore supplied so that it is possible to distinguish from whom a report originated. The service binding object may also be used to identity a particular service agent and send a request to it. This may be less of an issue if when subscribing to a service agent, the service binding object for the specific service agent of interest is used as opposed to a service name. This eliminates the possibility of getting reports from unrelated service agents using the same service name.

   1 class Subscriber(netsvc.Service):
   2     def __init__(self):
   3         netsvc.Service.__init__(self)
   4         # subscribe to the service group "publishers"
   5         self.subscribeServiceGroup(self.announce, "publishers")
   6     def announce(self, binding, group, status):
   7         if status == netsvc.SERVICE_AVAILABLE:
   8             # now subscribe to service agent which is member of group
   9             self.monitorReports(self.report, binding, "system.*")
  10         else:
  11             self.ignoreReports(binding)
  12     def report(self, service, subject, content):
  13         binding = self.currentReport().publisher()
  14         identity = binding.agentIdentity()
  15         publisher = "(%s/%s)" % (`service`, identity)
  16         if subject == "system.ctime":
  17             now = str(netsvc.DateTime())
  18             print "%s became available at %s" % (publisher, now)
  19             print "%s originally started at %s" % (publisher, str(content))
  20         elif subject == "system.time":
  21             print "%s was still alive at %s" % (publisher, str(content))

As expected, the subject is that under which any report was published. As to the content of the report, this is not limited to being a string, but can be any of the basic Python scalar types, a list, tuple or dictionary, as well as the "None" type and a number of extended types. User defined scalar types can also be used providing that appropriate encoders/decoders are available.

If you wish to cancel a subscription to a service, the "ignoreReports()" member function should be used. This should be supplied the name of the service and the exact same subject pattern used when subscribing to the reports in the first place. If no subject pattern is supplied, all subscriptions against that service name will be removed.

Lifetime of Reports

When publishing a report, the report will be sent to any service agents which have a current subscription which matches the subject associated with the report. The default behaviour is then such that the publishing service forgets all about the report. In this case, if a new subscription arrived immediately after, it would only be sent any reports which were published after its subscription was received. The new subscriber would not receive a copy of the report which was published just before its subscription was received.

In some situations however, it is desirable that a new subscriber be able to obtain the last report which may have been published against any subject it is interested in. This is useful in the context that a report is used to reflect the status of a service. By being able to obtain the last published report, a subscriber can know the current state of the service immediately and doesn't have to explicitly request it or wait for the status to change.

For such cases, it is possible to supply an optional lifetime for a report. That is, a time in seconds for which the report should be cached by the publishing service. When such a value is supplied, if a subscription arrives within that time, it will be sent a copy of that report. If a value of "-1" is supplied for the lifetime, it will effectively cache the report indefinitely.

   1 # publish and cache indefinitely
   2 self.publishReport("system.status", "idle", -1)
   3 
   4 # publish and cache for 60 seconds
   5 self.publishReport("system.action", "twiddle thumb", 60)
   6 
   7 # publish but don't cache
   8 self.publishReport("system.thought", "bored")

A cached report will only be discarded if a new report is published against the same subject, or the lifetime specified expires. If a new report published against the same subject has no lifetime associated with it, the cached report will be discarded, but the new report will not be cached. Note that with this mechanism, only the last report published on a specific subject will ever be cached when a lifetime value is provided.

To make the implementation as simple as possible, a report which has been cached against a subject with a finite lifetime and which has expired, will only be discarded when a new report with the same subject name is published, or a new subscription arrives which would have matched the subject. This is done to avoid having to setup internal timers to trigger destruction of the report at the moment it expired.

A consequence of this approach however, is that a report may consume resources unnecessarily beyond the lifetime which it was supposed to exist. If this becomes an issue, it is possible for a service agent to periodically purge any expired reports itself. This can be done by calling the member function "purgeReports()".

   1 class Publisher(netsvc.Service):
   2     def __init__(self):
   3         netsvc.Service.__init__(self, "publisher")
   4         # purge expired reports every 15 minutes
   5         self.scheduleAction(self.purgeReports, "*/15 * * * *")

In addition to being able to explicitly purge expired reports for performance reasons, a service agent may also prematurely expire and purge reports which are older than a certain time. The member function for this is "expireReports()" and accepts a subject pattern and optional age in seconds. The age defaults to "0" which would result in any cached report matching the subject pattern being immediately expired and purged. If a non zero value for age is supplied, only reports which were older than that age would be expired and purged. To apply this to all cached reports, regardless of subject, the "expireAllReports()" member function can be used.

Although "purgeReports()" exists specifically to deal with potential performance issues in a very limited number of cases, the "expireReports()" and "expireAllReports()" member functions are useful where a service may have reset itself and it was necessary to discard all cached reports so that new subscribers didn't receive them.

Identity of Subscribers

In most circumstances the identity of a subscriber is not important, however, such information can be quite useful in a few circumstances. At present this information is available by overriding a method in the service agent base class.

   1 class Publisher(netsvc.Service):
   2     def __init__(self):
   3         netsvc.Service.__init__(self, "publisher")
   4     def handleSubscription(self, subscription):
   5         subscriber = subscription.subscriber()
   6         if subscription.status() == netsvc.SUBSCRIPTION_REQUESTED:
   7             if self.matchSubject(subscription.subject(), "system.time"):
   8                 self.sendReport(subscriber, "system.time", netsvc.DateTime())

One can use this feature in preference to caching reports when they are published. That is, rather than caching a report when it is published so that a new subscriber automatically receives it, generate the report only when the subscription arrives. Obviously however, in this approach we would only want to have the report sent to the particular subscriber and not to all subscribers as they would potentially get duplicates otherwise.

To cater for this scenario, the member function "sendReport()" is supplied. In this variant of report publishing, the first argument is the service binding object of the subscriber obtained from the subscription notification. This report will only be sent to the subscriber in question and will not be cached. Note that if a report was also cached against the subject in question, the subscriber would still receive it as well. Both anonymous publishing and targeted reports should therefore not be used in combination for a specific subject as it may give undesired results.

The member function "matchSubject()" is supplied to assist in determining if the subject pattern contained in the subscription matches that of a particular subject. The first argument to "matchSubject()" should be the pattern and the second the actual subject. Although not used here, the opposite to the status value "SUBSCRIPTION_REQUESTED" is the value "SUBSCRIPTION_WITHDRAWN".

Note that if the "sendReport()" member function is used to send a report and the recipient has a subscription against the publishing service, but doesn't have a subscription against that service matching that subject, the report will not be delivered via the callback it originally supplied with its subscription. A similar situation is where a service receives an unsolicited report, or had since unsubscribed from the reports. In these cases there is no current callback in place for reception of the report. When this occurs the member function "unexpectedReport()" will be called. A service agent may if it desires override this member function so as to deal with any such unexpected reports.

A further use of the mechanism for identifying a subscribers identity, is so that subscriptions can be tracked and for processing or interception of data only to be undertaken while there are subscribers interested in the results. This avoids unnecessarily publishing reports when it is known there would be no one to send them to.

   1 class LogMonitor(netsvc.Service):
   2     def __init__(self):
   3         name = "logmon@%s" % netsvc.processIdentity()
   4         netsvc.Service.__init__(self, name)
   5         self._logger = netsvc.Logger()
   6         self._channels = {}
   7     def notify(self, channel, level, message):
   8         agent = channel[1:-1]
   9         report = {}
  10         report["agent"] = agent
  11         report["level"] = level
  12         report["message"] = message
  13         self.publishReport(agent, report)
  14     def handleSubscription(self, subscription):
  15         agent = subscription.subject()
  16         channel = "(%s)" % agent
  17         if subscription.status() == netsvc.SUBSCRIPTION_REQUESTED:
  18             if len(agent) != 0:
  19                 subscriber = subscription.subscriber().agentIdentity()
  20                 if not self._channels.has_key(channel):
  21                     self._channels[channel] = []
  22                     self._logger.monitorChannel(channel, self.notify)
  23                     self._channels[channel].append(subscriber)
  24       else:
  25           if self._channels.has_key(channel):
  26               subscriber = subscription.subscriber().agentIdentity()
  27               if subscriber in self._channels[channel]:
  28                   index = self._channels[channel].index(subscriber)
  29                   del self._channels[channel][index]
  30                   if len(self._channels[channel]) == 0:
  31                       del self._channels[channel]
  32                       self._logger.monitorChannel(channel, None)
  33 
  34 logger = netsvc.Logger()
  35 
  36 class Publisher(netsvc.Service):
  37     def __init__(self):
  38         netsvc.Service.__init__(self, "publisher")
  39         self._channel = "(%s)" % self.agentIdentity()
  40     def debug(self,message):
  41         logger.notifyChannel(self._channel, netsvc.LOG_DEBUG, message)

In this use of subscription information, the subscription to a specific subject is used to trigger interception of messages logged via the logger interface. For the time that subscriptions exist for a particular subject corresponding to a log channel, the log messages on that log channel will be intercepted and published. This can be useful as a remote debugging mechanism and will not unnecessarily load the process as information is only being captured and published when it is actually required.

Existence of Publishers

When a subscription to a service is made, if the service holds any cached reports with a subject matching the subscription, the subscriber will receive them immediately. If however there were no such reports, the subscriber will not receive any reports until some are published having a subject which matched its subscription. Even when there are reports which can be sent back immediately, if there are reports against multiple subjects, there is no guarantee as to which order they will be received in.

As a consequence, using the reception of a report as an indicator that a service has become available is not a good approach to take. This is because a report may not be received until some time after the service became available and the subscription accepted. Further, there is no indication when the service is no longer available.

One way as previously described of knowing when a service becomes available or when it is withdrawn, is to subscribe to the service registry. Although this will work, if you have restricted the service audience of your service agent, it will also possibly tell you about services outside of the scope of what you can subscribe to.

To avoid this difficulty, the member function "handlePublisherNotification()" is provided. This member function can be overridden in your service agent and will be called only when a subscription has been matched up and accepted by the service being subscribed to. Note that this notification will only occur for the first subscription against a particular service agent.

This member function will also be called to acknowledge withdrawal of the last subscription against a particular service agent, or when a service agent to which you were subscribed has been withdrawn.

   1 class Subscriber(netsvc.Service):
   2     def __init__(self):
   3         netsvc.Service.__init__(self)
   4         self.monitorReports("publisher", "*")
   5     def handlePublisherNotification(self, notification):
   6         name = notification.publisher().serviceName()
   7         identity = notification.publisher().agentIdentity()
   8         publisher = "(%s/%s)" % (`name`, identity)
   9         if notification.status() == SERVICE_AVAILABLE:
  10             print "%s AVAILABLE" % publisher
  11         else:
  12             print "%s WITHDRAWN" % publisher

Knowledge of when a subscription has been accepted or when the service agent subscribed to has been withdrawn can be useful when there is more than once service agent with the same name, and it is necessary to track the lifetime of each. It is also useful where it might be necessary to immediately send off a request to each service agent to obtain information not available via published reports.


CategoryOSE

OSE/Python/ServiceReports (last edited 2006-09-04 12:14:28 by GrahamDumpleton)