Event Framework

The main support for concurrency in the OSE C++ class libraries comes in the form of a mechanism for building event driven systems. This is based around a central job queue and a dispatcher, which takes successive jobs from the queue and executes them. To support real time systems, there also exist a number of event sources which will schedule jobs to trigger an agent to be notified when an event of interest occurs. The major event sources include timers, signals and the availability of data for reading on a socket.

The major classes in the OSE C++ class library involved in providing this functionality are the "OTC_Dispatcher", "OTC_EVAgent" and "OTC_Job" classes, plus the various event classes related to the event sources. In the C++ implementation, communication of events is mainly performed by passing around event objects and having a single event handler method in an agent to deal with them. In the Python implementation, separate callback functions can be registered by an agent against each event of interest.

Note that only the major features of the C++ implementation are reflected in the Python interface. Python does not provide a means of creating your own event types or event sources. A Python agent is also not able to process any events except those from the major event sources.

Scheduling a Job

Scheduling of jobs comes in the form of registering a callback function with the dispatcher for execution. A job may be scheduled as a priority job, a standard job, or an idle job. The type of job determines where in the order of existing jobs, a new job will be placed. Any priority jobs are executed before a standard job is processed. When there are no priority jobs or standard jobs remaining, any pending idle jobs will be reclassified as standard jobs and subsequently executed. When scheduling a job, if jobs of the same type already exist, the new job will be placed at the end of the list of jobs of the same type.

To schedule a job the dispatcher member function "schedule()" must be called, supplying the callback function and the type of job. To set the dispatcher running, the member function "run()" is called. If the only feature of the event system which is used is that of scheduling jobs, the "run()" function will return when there are no more jobs to execute. A job may prematurely stop the dispatcher by calling the "stop()" member function. If a callback raises an exception which is not caught and processed within the callback itself, the details of the exception will be logged, the dispatcher stopped and Python exited immediately.

   1 def callback(message="hi"):
   2     print message
   3 
   4 dispatcher = netsvc.Dispatcher()
   5 dispatcher.schedule(callback,netsvc.IDLE_JOB)
   6 dispatcher.schedule(callback,netsvc.STANDARD_JOB)
   7 dispatcher.schedule(callback,netsvc.PRIORITY_JOB)
   8 dispatcher.run()

The callback supplied when scheduling a job can be any callable object such as a normal function or a member function associated with an instance of a class. If a callback function is scheduled directly with the dispatcher in this way, it will be called with no arguments and cannot be cancelled once scheduled.

If it is necessary to pass arguments to a callback function, an instance of the "Job" class must be used in place of the actual callback function. The "Job" class will hold a reference to the real callback function as well as the arguments. When the job is executed it will call the callback function with the supplied arguments.

   1 job = netsvc.Job(callback, ("bye",))
   2 dispatcher.schedule(job, netsvc.IDLE_JOB)

In addition to providing a means of supplying arguments to a callback function, the "Job" class provides a means of cancelling execution of a callback function. In order to do this, a reference to the instance of the "Job" class should be kept. If it is subsequently necessary to cancel execution of the callback prior to it having being called, the "cancel()" member function of the "Job" class should be called.

   1 job = None
   2 def callback1():
   3     print "hi"
   4     job.cancel()
   5 def callback2():
   6     print "hi"
   7 
   8 dispatcher.schedule(callback1, netsvc.PRIORITY_JOB)
   9 job = netsvc.Job(callback2)
  10 dispatcher.schedule(job, netsvc.STANDARD_JOB)

All that is occuring here is that when the "cancel()" member function is called, a flag is set. When the job is executed it will note that the flag is set and will not execute the callback function. If the callback function is a member function of a class, it is important to ensure that any reference to the instance of the "Job" class is destroyed when no longer required. If this is not done and the reference is a member variable of the same class the callback function is a member of, a circular reference will exist and that instance of the class will not be able to be destroyed.

Any arguments to be passed to the callback function would by default be supplied when the instance of the "Job" class is created. If it is necessary to generate an instance of the "Job" class such that it can be passed to another part of the program, but the arguments to the callback function are not known at that time, it is instead possible to supply the arguments at the time the job is scheduled. This is done by using the "schedule()" member function of the "Job" class rather than that of the dispatcher. Any arguments supplied in this way will override those provided when the instance of the "Job" class is created.

   1 job = None
   2 def callback1(message):
   3     print message
   4     job.schedule(netsvc.STANDARD_JOB, ("override",))
   5 def callback2(message)
   6     print message
   7 
   8 job = netsvc.Job(callback1, ("default",))
   9 job.schedule(netsvc.STANDARD_JOB)
  10 job = netsvc.Job(callback2)

This would allow for instance a class which accepts callback registrations to return a reference to a "Job" class which will later be used to schedule the callback with an as yet undetermined set of arguments. The client who registered the callback could however cancel execution of the callback before it is called.

Once "cancel()" has been called on an instance of a "Job" class, whether or not it has already been scheduled, the callback function will never be executed. To reset the flag which makes the callback function runnable, the "reset()" member function should be called. To determine if an instance of the "Job" class is still in a runnable state, a truth test can be performed on it.

   1 if job:
   2     # job wasn't cancelled
   3     job.schedule(netsvc.STANDARD_JOB)
   4 else:
   5     # job was cancelled
   6     pass

If you wish to use the "Job" class separate to the dispatcher, you can trigger execution of the callback function by calling the "execute()" member function. If any arguments are supplied to the "execute()" member function, these will override any which may have been supplied when that instance of the "Job" class was created.

Real Time Events

The Python interface provides the ability to register interest in a number of real time events. These are program shutdown, one off alarms or actions, recurring actions, timers, signals and data activity on sockets. That an event of interest has occurred is notified by execution of a callback supplied at the time that interest in an event is registered.

In the C++ implementation, the methods for expressing interest in a specific type of event were spread across numerous classes. In the Python interface, all functions for registration of interest in events are contained within the "Agent" base class. Any object interested in receiving notification of an event occurring is expected to derive from the "Agent" class.

The simplest type of notification isn't really a real time event at all, but a variation on the concept of scheduling a job with the dispatcher. Instead of calling the "schedule()" member function of the dispatcher, the "scheduleAction()" member function of the "Agent" base class is called.

The major difference between using "scheduleAction()" and "schedule()" is that when using "scheduleAction()" you can optionally supply an additional string argument to be used as an identifier for that job. This identifier can be used to cancel the job before it actually gets executed by calling "cancelAction()". If the callback funcion accepts a single argument, the identifier will also be passed to the callback function as argument. The identifier can thus be used to distinguish between different jobs calling the same callback function. If an identifier is not explicitly provided, a unique internal identifier will be created. Whether or not the identifier is set explicitly or created internally, the identifier used is returned as the result of the "scheduleAction()" method.

   1 class Object(netsvc.Agent):
   2     def __init__(self):
   3         self.scheduleAction(self.callback1, netsvc.PRIORITY_JOB)
   4     def callback1(self):
   5         self.scheduleAction(self.callback2, netsvc.IDLE_JOB, "hi")
   6         self.scheduleAction(self.callback2, netsvc.IDLE_JOB, "bye")
   7     def callback2(self,name):
   8         print name
   9         if name == "hi":
  10             self.cancelAction("bye")
  11 
  12 dispatcher = netsvc.Dispatcher()
  13 object = Object()
  14 dispatcher.run()

When using the "Agent" class, you still need to run the dispatcher. You do not need to schedule any jobs directly with the dispactcher, but any initial agents need to be created prior to the dispatcher being run. Note that in scheduling a job with a particular identifier, any job already scheduled with that agent using the same identifier will first be cancelled. If you want to cancel all jobs scheduled using the "scheduleAction()" member function you should call the "cancelAllActions()" member function.

Destroying Agents

Ensuring that any outstanding job is cancelled, or deregistering interest in any event source, is important if you are endeavouring to destroy an agent object. If registrations are not cancelled, a circular reference will exist between data held by the instance of the "Agent" base class and the derived object. Such circular references defeat the Python reference counting mechanism, meaning that the object may never be destroyed.

To combat this particular situation, the member function "destroyAgent()" is included in the "Agent" base class. This will cancel all outstanding jobs and cancel any interest in other event sources as well, destroying any circular references in the process. Provided there are no other references to the object elsewhere, Python should now be able to destroy it.

If you have circular references within your derived class, you may wish to extend this method in your own class so as to undo those circular references. Using the same member function name will make it less confusing to a user of your class as they will only have to call one function. If this is done, you should ensure however that the last thing the derived version of the method does is call the version of the method in the immediate base class.

Alarms and Timers

Alarms and timers are a means of having a callback function executed at some point of time in the future. The difference between an alarm and a timer is that an alarm is defined by an absolute value or point in time, where as a timer is defined by a relative offset in time. For an alarm this means supplying the clock time in seconds at which the callback should be executed. For a timer this means supplying the number of seconds from now at which point the callback should be executed.

   1 class Object(netsvc.Agent):
   2     def __init__(self):
   3         offset = 60
   4         now = time.time()
   5         then = now + offset
   6         self.setAlarm(self.callback1, then)
   7         self.startTimer(self.callback2, offset, "timeout-1")
   8         self.startTimer(self.callback2, offset+10, "timeout-2")
   9     def callback1(self):
  10         print "alarm"
  11     def callback2(self,name):
  12         print name
  13         if name == "timeout-1":
  14             self.cancelTimer("timeout-2")

The member function for setting an alarm is "setAlarm()" and that for starting a timer is "startTimer()". The first argument is the callback function, the second argument is the absolute or relative time and the third argument is an optional identifier for that alarm or timer. Scheduling an alarm or timer with an identifier matching that of an alarm or timer which hasn't yet expired will cause that unexpired alarm or timer to be cancelled.

Both types of events are one off events, with the registration being cancelled once the callback has been executed. The identifier may also be used to cancel an alarm or timer before it expires. To cancel an alarm use "cancelAlarm()" and to cancel a timer use "cancelTimer()". To cancel all pending alarms use "cancelAllAlarms()" and to cancel all pending timers use "cancelAllTimers()". If an identifier is not excplicitly provided, an internal identifier will be automatically created with it being returned as the result of the function being called to schedule the callback.

Recurring Actions

A recurring action is where a job is run at regular intervals. Precisely when the callback function associated with a job is executed is determined by a specification of the form used by the UNIX cron utility. The specification consists of five fields each separated by white space. The fields specify:

A field may be an asterisk "*", which always stands for "first-last". Ranges of numbers are allowed. Ranges are two numbers separated with a hyphen. The specified range is inclusive. For example, 8-11 for an "hours" entry specifies execution at hours 8, 9, 10 and 11.

Lists are allowed. A list is a set of numbers (or ranges) separated by commas. For example, "1,2,5,9" and "0-4,8-12". Step values can be used in conjunction with ranges. Following a range with "/number" specifies skips of the number's value through the range. For example, "0-23/2" can be used in the hours field to specify the callback function be executed every other hour. Steps are also permitted after an asterisk, so if you want to say "every two hours", just use "*/2".

Names can also be used for the "month" and "day of week" fields. Use the first three letters of the particular day or month (lower case, or first letter only uppercase).

The day that a callback function is to be executed can be specified by two fields, day of month and day of week. If both fields are restricted (ie., aren't "*"), the callback function will be executed when either field matches the current time. For example, "30 4 1,15 * 5" would cause the callback function to be executed at 4:30 am on the 1st and 15th of each month, plus every Friday.

To schedule this type of job, the "scheduleAction()" function is used except that instead of specifying the job type as the second argument, the specification string should be used.

   1 class Object(netsvc.Agent):
   2     def __init__(self):
   3         self.scheduleAction(self.daily, "0 0 * * *", "daily")
   4         self.scheduleAction(self.weekly, "0 0 * * Sat", "weekly")
   5         self.scheduleAction(self.monthly, "0 0 1 * *", "monthly")
   6         self.scheduleAction(self.yearly, "0 0 1 Jan *", "yearly")
   7         self.scheduleAction(self.holiday, "0 0 25 Dec *", "christmas")
   8     def daily(self):
   9         print "daily"
  10     def weekly(self):
  11         print "weekly"
  12     def monthly(self):
  13         print "monthly"
  14     def yearly(self):
  15         print "yearly"
  16     def holiday(self, name):
  17         print name

As a recurring action by nature will always run at some point in the future, you have to explicitly call "cancelAction()" to stop it from running, even if it has already run at some point in time already. If you make an error in the specification string such that it is invalid, no indication will be given and the job will simply never be executed. The "cancelAllActions()" member function, as well as cancelling actions associated with a once off call of a callback function, will also cancel all recurring actions.

Socket Events

In an event driven system, it is important that any callback not unnecessarily block waiting for something to happen. If a callback does block, it prevents any other part of the system from doing something. The main reason which a callback may block is due to an attempt to read data from a socket when there is no data waiting to be read. In an event driven system, an application should register interest in the availability of data on a socket and only attempt to read data from the socket when it is known that there is some available.

It is also advantageous in a event driven system for sockets to be placed into non blocking mode. When a socket is in non blocking mode, if data is written to a socket and the socket is full an error is returned indicating that the call would have blocked. The code can now register interest in the possibility of being able to write data to a socket and subsequently be notified when such a call would be successful. In the mean time, other parts of the system can still do something.

To register interest in either of these events, the member function "subscribeSocket()" should be used. The first argument to the function should be the callback function, the second argument the socket descriptor and the third argument the type of events. If the third argument is not supplied, it will default to "SOCKET_POLLIN", indicating interest in the availability of data on a socket for reading.

Other possible values for the third argument are "SOCKET_POLLOUT" and "SOCKET_POLLPRI". The value "SOCKET_POLLPRI" is similar to "SOCKET_POLLIN" except that it relates to there being priority out of band data being available for reading. Out of band data is not a feature which is used much these days and isn't implemented the same on all systems. It is probably best to avoid using out of band data.

A final value of "SOCKET_POLLOUT" indicates interest in when data can be safely written to the socket without the call blocking. Note that this will generally nearly always be the case, so you should only subscribe to this event on a socket, when you know that writing to the socket would cause it to block. Once you have been notified that it is safe to write to a socket and you have written your data, you should immediately unsubscribe to this event on a socket, otherwise your callback will continually be called.

   1 class Agent(netsvc.Agent):
   2     def __init__(self, host, port):
   3         netsvc.Agent.__init__(self)
   4         self._host = host
   5         self._port = port
   6         self.scheduleAction(self.connect, netsvc.STANDARD_JOB)
   7     def connect(self):
   8         self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
   9         try:
  10             self._sock.connect((host, port))
  11         except:
  12             dispatcher.stop()
  13         else:
  14             self.subscribeSocket(self.read, self._sock.fileno())
  15     def read(self, fileno, event):
  16         if fileno != self._sock.fileno():
  17             return
  18         if event == netsvc.SOCKET_POLLIN:
  19             data = self._sock.recv(1024)
  20         if len(data) == 0:
  21             self.unsubscribeSocket(self._sock.fileno())
  22             self._sock.close()
  23             dispatcher.stop()
  24         else:
  25             sys.stdout.write(data)

When you are no longer interested in a particular event on a socket, you can unsubscribe to that event using the "unsubscribeSocket()" member function. If called with only a single argument, all events currently of interest on that socket will be unsubscribed. To unsubscribe to only a specific event type, pass the type of event as the second argument.

Program Signals

The most common circumstance in which an application may receive a program signal is when it is being killed as result of a user interrupting it by typing control-C, or if running UNIX, when the operating system is being shutdown. Other uses for program signals are to force an application to reread a configuration file.

These three cases are typically indicated by the program signals "SIGINT", "SIGTERM" and "SIGHUP". A robust application should at least catch the first two of these signals and cause the program to shutdown gracefully. This may entail ensuring that any data is written out to files, removal of file locks, closing off of database connections etc.

To subscribe to a signal, the member function "subscribeSignal()" should be used. The first argument should be a callback function to be called when a particular signal occurs and the second argument the particular signal of interest. A particular agent may only supply one callback for any particular signal, but different agents may subscribe to the same signal with both being notified when it occurs. Although an interest in such a signal is usually persistent, it is possible to unsubscribe from a particular signal using the member function "unsubscribeSignal()" and unsubscribe from all signals using "unsubscribeAllSignals()".

   1 class Agent(netsvc.Agent):
   2     def __init__(self):
   3         netsvc.Agent.__init__(self)
   4         self.subscribeSignal(self.signal, signal.SIGINT)
   5         self.subscribeSignal(self.signal, signal.SIGTERM)
   6     def signal(self, signum):
   7         self.scheduleAction(self.stop, netsvc.PRIORITY_JOB)
   8     def stop(self):
   9         netsvc.Dispatcher().stop()

In practice, only one of the agents subscribed to "SIGINT" and "SIGTERM" should actually shutdown the dispatcher. This agent should however, not shutdown the dispatcher immediately as other agents may not yet have received their notification that the signal occurred. The agent should instead schedule a priority job to actually stop the dispatcher. This priority job will only be executed after all outstanding signal notifications have been delivered.

Program Shutdown

Subscription to a program signal provides a means of immediately shutting down an application when caused to do so by an external signal. What program signals don't do however, is provide a means of initiating a graceful shutdown of an application from within the application itself. An application could send itself a signal, however, this isn't necessarily practical.

A further problem is that in an event driven system, it may not always be possible to perform everything that is required in a single callback function. What is instead needed is the ability to run the application for a further finite amount of time so that any outstanding operations can be finalised first. At the end of that time, then the application can be stopped.

To support this slightly more orderly mechanism for program shutdown, the member function "scheduleShutdown()" is provided. When an agent wishes to force the program to shutdown it should call this member function. This member function can also be called when an external signal intended to shutdown the program is received. Doing this in the latter case means you don't need to have separate code for the two different cases.

If an agent is interested in the fact that the program is being shutdown, it can call the "subscribeShutdown()" member function, supplying a callback function to be called when such an event does occur. Note that the call to "scheduleShutdown()" will result in the dispatcher being stopped automatically, so you do not need to do it explicitly. If necessary, an agent can unsubscribe from program shutdown notifications by calling the member function "unsubscribeShutdown()".

   1 class Agent(netsvc.Agent):
   2     def __init__(self):
   3         self.subscribeShutdown(self.shutdown)
   4         self.subscribeSignal(self.signal, signal.SIGINT)
   5         self.subscribeSignal(self.signal, signal.SIGTERM)
   6         self.startTimer(self.timeout, 60)
   7     def timeout(self):
   8         self.scheduleShutdown()
   9     def signal(self, signum):
  10         self.scheduleShutdown()
  11     def shutdown(self, category):
  12         if category == netsvc.SHUTDOWN_PENDING:
  13             # shutdown is pending
  14         else:
  15             # shutdown has arrived

When shutdown is initiated, any callback function supplied by an agent will actually be called twice. The first time it is called, it will be called with the value "SHUTDOWN_PENDING". Once all subscribed agents have been notified that shutdown is pending, the callback function will then be subsequently called again, this time with the value "SHUTDOWN_ARRIVED". Upon all agents receiving the second notification, the dispatcher will be stopped and the process will exit.

Note that the second of these notifications will not occur immediately after the first. Exactly how much time may pass is dependent on a number of factors. The first determining factor is the argument supplied to the "scheduleShutdown()" member function. If no argument is supplied, or a value of "0" is supplied, there will be an inbuilt delay of 1 second between shutdown being scheduled and the program actually being shutdown.

This implicit delay gives scope for activities which can't be factored into a single callback function time to be carried out. For example, it may be necessary to send data via a socket to some remote host and wait for the response. If the default value of 1 second is insufficient, or is too long a time, it can be overridden in a number of ways.

The first way of overriding the default value of 1 second is by setting the environment variable "OTCLIB_SHUTDOWNDELAY". If this is done, it should be set to a value representing the number of milliseconds to wait. An alternative is to modify each call of "scheduleShutdown()" and explicitly provide the time delay as an argument. If this is done, the argument should express the number of full or partial seconds as a floating point value.

Using a time delay is a useful starting point, as it provides a means of defining an upper bound on the amount of time you wish to allow the system to run before it is stopped. Having a small delay and ensuring everything is done in that time is preferable, as in certain circumstances such as the operating system sending a "SIGTERM" to an application on system shutdown, the operating system will usually forcibly shutdown your application using "SIGKILL" after 5 seconds if it doesn't do so of its own accord.

Although getting away from the goal of having only one mechanism for shutting down a program, in this circumstance, it may still be preferable to separately identify a SIGTERM signal and deal with it differently. Here you might only do anything that is absolutely essential and stop the process immediately. What is the best approach will depend on the specific application in question.

If the problem of a "SIGTERM" signal is ignored, a further mechanism for delaying actual shutdown of a process is also provided. If upon receiving notification of a pending shutdown, an agent knows it needs to wait for some event to occur first, it can call the "suspendShutdown()" member function. If this is done, although the shutdown delay may expire, actual program shutdown will not occur until a corresponding call to the "resumeShutdown()" member function. If more than one agent calls "suspendShutdown()", actual shutdown will not occur until "resumeShutdown()" has been called a matching number of times.

Although it is possible to suspend the shutdown process in this way, it is not possible to cancel it completely. But then, if an agent doesn't call "resumeShutdown()" at some point it would never actually occur. This wouldn't be very useful however, as more than likely parts of the application may have placed themselves into a dormant state.

Finally, as scheduling program shutdown upon a signal occurring would be done in practically all programs, support for this has been factored into the actual dispatcher. Thus, instead of dedicating a specific agent to catch any signals, the main program file can contain:

   1 dispatcher = netsvc.Dispatcher()
   2 dispatcher.monitor(signal.SIGINT)
   3 dispatcher.monitor(signal.SIGTERM)

If this interface is used however, the only means of overriding the delay between shutdown being scheduled and actual shutdown is by the "OTCLIB_SHUTDOWNDELAY" environment variable.

The dispatcher also provides the member function "shutdown()". This behaves much the same as the "scheduleShutdown()" member function of the "Agent" class. The presence of the "shutdown()" member function in the dispatcher, allows code which is distinct from an agent to also schedule a program shutdown.

Note that whatever mechanism is used to initiate program shutdown using these features, messages will be displayed via the logger indicating that shutdown has been scheduled and that it has arrived. Additional messages will be displayed via the logger when the shutdown process is suspended and resumed.


CategoryOSE

OSE/Python/EventFramework (last edited 2006-09-04 11:45:39 by GrahamDumpleton)