Service Requests

The ability within the service agent framework to find out what services exist and the ability of a service agent to publish reports can be useful in itself, but more often than not one wants to make a specific request of a service to perform some action. In most cases such an action would result in a response, be it the return of data related to the request being made or an error indication. Referred to as request/reply, this is probably the most fundamental feature of message oriented middleware software.

As with the implementation of the publish/subscribe feature, simplicity has been a major driving force in influencing the design. As a result, the implementation of the request/reply feature should not be likened to that of point to point messages using persistant messages queues. In this implementation, if a service to which you want to send a request doesn't exist you will not be able to send your request, nor is a service able to receive any requests sent when it wasn't running.

Although persistant message queues are not a feature of this implementation, the request/reply and publish/subscribe features can actually be seen as sitting at a lower level of abstraction. As a result, it would be possible to build on top of these features and implement persistent message queues and gauranteed modes of delivery if required. For many systems such features aren't required though, so they are not implemented with the aim being to make the software as simple as possible to use and understand.

Sending a Request

In order to send a request to a service you need to first obtain a service endpoint object. This is a special Python object which holds an internal reference to the service binding object for the service and which will automatically dispatch your request for you. To obtain a service endpoint, the member function "serviceEndPoint()" is used.

When invoking "serviceEndPoint()", the member function needs to be supplied with either a service binding object for the particular service agent to which you wish to send the request, or the name of the service. When a service name is supplied, a lookup will be made against the service registry and the first service agent found with that service name will be used.

To invoke the request against the remote service agent, the service endpoint object is used as a proxy. That is, a member function call is made against the object as if it were the actual service object you wished to call. The only difference is that the call isn't synchronous but asynchronous. This means that the result is not returned immediately.

As to the parameters to the call, multiple arguments can be supplied, with any of the basic Python scalar types, a list, tuple, dictionary, the "None" type, as well as a number of extended types being able to be used. User defined scalar types can also be used providing that appropriate encoders are available. Note that keyword arguments cannot be used and will be ignored.

   1 class PagerClient(netsvc.Service):
   2     def __init__(self, number, message):
   3         netsvc.Service.__init__(self, "", "")
   4         service = self.serviceEndPoint("SMS")
   5         if service:
   6             service.send(number, message)

When a service endpoint is created by using a service name, you should always check whether a service agent with that name could actually be found. This is done by doing a truth test against the service endpoint object or comparing it to "None". Note that although it may equate to "None", the service endpoint object is a distinct object in its own right. If you don't check the validity of the service endpoint object and still make a request against the service, a special exception indicating that such a service isn't available will be raised.

   1 class PagerClient(netsvc.Service):
   2     def __init__(self, number, message):
   3         netsvc.Service.__init__(self, "", "")
   4         service = self.serviceEndPoint("SMS")
   5         try:
   6             service.send(number, message)
   7         except netsvc.ServiceUnavailable:
   8             # ...

Obviously the service name by itself can only be used if you don't care which instance of a service is used when there is more than one. If you wanted to select a specific service agent, or wanted to be able to send a request to all service agents with the same service name, you would need to perform a lookup against the service registry to obtain the full list of service agents.

   1 class Client(netsvc.Service):
   2     def __init__(self, name):
   3         netsvc.Service.__init__(self, "", "")
   4         bindings = self.lookupServiceName(name)
   5         for binding in bindings:
   6           service = self.serviceEndPoint(binding)
   7           if service:
   8               service.reset()

Handling a Response

When you send a service request, you do not get an immediate response back. That is, the call is asynchronous. If you want to be able to capture any response generated from a request, you need to capture the conversation id associated with the request and then register a callback to handle the response. The conversation id is the value returned when you make the call against the service endpoint object. Having obtained the conversation id you must then register a callback to handle the response using the member function "processResponse()". If you also want to be notified that the request has failed, you will also need to set up a separate callback using the "processFailure()" member function.

   1 class Client(netsvc.Service):
   2     def __init__(self,name):
   3         netsvc.Service.__init__(self, "", "")
   4         service = self.serviceEndPoint("SMS")
   5         if service:
   6             id = service.uptime()
   7             self.processReponse(self.uptimeResponse, id)
   8             self.processFailure(self.uptimeFailure, id)
   9       def uptimeResponse(self, result):
  10           print result
  11       def uptimeFailure(self):
  12           print "failure"

The callbacks which you put in place to handle the result and/or failure will be automatically deregistered when a response is received. This will be the case whether the response is valid or was a failure indication. Prior to having received a response, if you decide you are no longer interested in the response, you can call the member function "ignoreResponse()" supplying the conversation id. If you are submitting multiple requests in one go, you must call the "processResponse()" and/or "processFailure()" member functions for a conversation id before you send any subsequent request.

Note that prior to the release of OSE 7.0pl5, instead of using the "processResponse()" and "processFailure()" member functions, one would use the "monitorResponse()" method. This method in effect combined the operation of both of the new methods albeit it with subtle differences as far as the arguments the callback would be passed and the functionality it implemented. Using the new methods it is possible to register separate callbacks for handling of the result versus a failure. It is even possible to only register interest in one or the other of the result or a failure notification. The "monitorResponse()" member function should as a result now be viewed as deprecated and should not be used.

Identifying a Response

If a callback is being registered to handle the response from multiple service requests, you will most likely need to be able to identify to which request a response belongs to. To get the conversation id of the original request, the "conversationId()" member function can be called.

   1 class Client(netsvc.Service):
   2     def __init__(self, name):
   3         netsvc.Service.__init__(self, "", "")
   4         bindings = self.lookupServiceName(name)
   5         for binding in bindings:
   6             service = self.serviceEndPoint(binding)
   7             if service:
   8                 id = service.uptime()
   9                 self.processResponse(self.uptimeResponse, id)
  10                 print "request", binding.agentIdentity(), id
  11     def uptimeResponse(self, result):
  12           id = self.conversationId()
  13           print "result", id, result

Instead of requesting the conversation id, it is also possible to define your callback so as to take two arguments instead of one, these being the conversation id and the result instead of just the result.

   1 class Client(netsvc.Service):
   2     def __init__(self, name):
   3         netsvc.Service.__init__(self, "", "")
   4         bindings = self.lookupServiceName(name)
   5         for binding in bindings:
   6             service = self.serviceEndPoint(binding)
   7             if service:
   8                 id = service.uptime()
   9                 self.processResponse(self.uptimeResponse, id)
  10                 print "request", binding.agentIdentity(), id
  11     def uptimeResponse(self, id, result):
  12         print "result", id, result

These are not keyword arguments, but positional parameters which the code which calls the callback function supplies or not based on the number of arguments the callback accepts. In other words, the callback must accept the appropriate number of arguments as necessary and in the specified order. If you know that the remote method being called doesn't actually return a valid response, ie., it returns a void or null response, you can even leave out the parameters altogether.

   1 class Client(netsvc.Service):
   2     def __init__(self, name):
   3         netsvc.Service.__init__(self, "", "")
   4         bindings = self.lookupServiceName(name)
   5         for binding in bindings:
   6             service = self.serviceEndPoint(binding)
   7             if service:
   8                 id = service.reset()
   9                 self.processResponse(self.resetResponse, id)
  10                 print "request", binding.agentIdentity(), id
  11     def resetResponse(self):
  12         print "result"

In addition to "conversationId()" the member function "currentResponse()" is also available. This member function returns an object providing both the "conversationId()" and "sender()" member functions. If you need the service binding object for the service agent who sent the response, you can perform a lookup against the service registry using the service address provided by "sender()". Note though that you shouldn't assume that the service binding object will be available as the remote service may have been withdrawn by the time you make your query.

Detecting a Failure

If you send a service request to a service agent and you need to detect if a failure occurs, you will need to have registered a callback using the "processFailure()" member function. A failure may occur due to the service not supplying a method to handle the request you made, an incorrect number of arguments being supplied, an error within the method being called or because the remote service agent was withdrawn before a response was received.

When a failure does occur, the details of the failure can be obtained in a number of ways. If the callback you provide doesn't take any arguments, you can obtain a failure object detailing the error which occurred by calling the "currentFailure()" member function. The member functions provided by the failure object are "error()", "description()", "origin()" and "details()". The conversation id associated with the request which failed can be obtained using the member function "conversationId()".

Of the member functions provided by the failure object, the "error()" member function returns an integer error code. The "description()" member function returns a text description of the error. The "origin()" member function returns a string which in some way identifies the origin of the error and "detail()" may contain as text, extra details relating to the error which has occurred.

   1 class Client(netsvc.Service):
   2     def __init__(self, name):
   3         netsvc.Service.__init__(self, "", "")
   4         service = self.serviceEndPoint("SMS")
   5         if service:
   6             id = service.uptime()
   7             self.processResponse(self.uptimeResponse, id)
   8             self.processFailure(self.uptimeFailure, id)
   9     def uptimeResponse(self, result):
  10         print result
  11     def uptimeFailure(self):
  12         id = self.conversationId()
  13         failure = self.currentFailure()
  14         if failure.origin() == "netsvc" and \
                failure.error() == netsvc.SERVER_METHOD_UNAVAILABLE:
  15             # method didn't exist

As an alternative to using the "conversationId()" member function to obtain the conversation id of the failed request, if the callback accepts a single argument, the conversation id will be passed as an argument to the callback function.

   1 class Client(netsvc.Service):
   2     def __init__(self, name):
   3         netsvc.Service.__init__(self, "", "")
   4         service = self.serviceEndPoint("SMS")
   5         if service:
   6             id = service.uptime()
   7             self.processResponse(self.uptimeResponse, id)
   8             self.processFailure(self.uptimeFailure, id)
   9     def uptimeResponse(self, result):
  10         print result
  11     def uptimeFailure(self, id):
  12         failure = self.currentFailure()
  13        if failure.origin() == "netsvc" and \
                failure.error() == netsvc.SERVER_METHOD_UNAVAILABLE:
  14             # method didn't exist

This ability to have details of the failure supplied as arguments to the callback function also extends to the contents of the failure object if the callback function accepts an additional four parameters in addition to that for the conversation id.

   1 class Client(netsvc.Service):
   2     def __init__(self, name):
   3         netsvc.Service.__init__(self, "", "")
   4         service = self.serviceEndPoint("SMS")
   5         if service:
   6             id = service.uptime()
   7             self.processResponse(self.uptimeResponse, id)
   8             self.processFailure(self.uptimeFailure, id)
   9     def uptimeResponse(self, result):
  10         print result
  11     def uptimeFailure(self, id, error, description, origin, details):
  12         if origin == "netsvc" and error == netsvc.SERVER_METHOD_UNAVAILABLE:
  13             # method didn't exist

These are not keyword arguments, but positional parameters which the code which calls the callback function supplies or not based on the number of arguments the callback accepts. In other words, the callback must accept the appropriate number of arguments as necessary and in the specified order.

Lack of Response

When you send a request, there is no gaurantee that the remote service agent hasn't been destroyed even before it receives your request. If a remote service agent delays sending an immediate response to your request, the problem might also arise that the remote service agent is destroyed before it completes the response. Finally, an intermediate process relaying your request might be shutdown or crash meaning either the request or response is lost.

In order to handle these situations, when the "processFailure()" member function is used to register interest in the failure of a request, it will automatically setup a subscription on the existance of the remote service agent against which the request has been made. In the event that the remote service agent becomes unavailable before a response is received, an application error will be returned as a failure to provide notification of this occuring.

   1 class Client(netsvc.Service):
   2     def __init__(self, name):
   3         netsvc.Service.__init__(self, "", "")
   4         service = self.serviceEndPoint("SMS")
   5         if service:
   6             id = service.uptime()
   7             self.processResponse(self.uptimeResponse, id)
   8             self.processFailure(self.uptimeFailure, id)
   9     def uptimeResponse(self, result):
  10         print result
  11     def uptimeFailure(self, id, error, description, origin, details):
  12         if origin == "netsvc" and error == netsvc.SERVER_APPLICATION_ERROR:
  13             # request has failed, possibly because no response was received

Note that the "monitorResponse()" member function which has been made deprecated as of OSE 7.0pl5, does not setup a subscription to the existance of the remote service agent. Thus, if you are using this older member function to catch the failure of a request, you will not get any failure notification in these circumstances.

Although the "processFailure()" member function will ensure that a failure is returned if no response is received prior to the remote service agent becoming unavailable, programming errors or external communications failures in code associated with the remote service agent might still result in no response being received where the remote service agent still exists. If this is an issue and you also want to implement a timeout whereby if no response has been received within a certain period of time, a timeout value can be supplied when you call the "processFailure()" member function.

   1 class Client(netsvc.Service):
   2     def __init__(self, name):
   3         netsvc.Service.__init__(self, "", "")
   4         service = self.serviceEndPoint("SMS")
   5         if service:
   6             id = service.uptime()
   7             self.processResponse(self.uptimeResponse, id)
   8             self.processFailure(self.uptimeFailure, id, 60)
   9     def uptimeResponse(self, result):
  10         print result
  11     def uptimeFailure(self, id, error, description, origin, details):
  12         if origin == "netsvc" and error == netsvc.CLIENT_REQUEST_TIMEOUT:
  13             # timeout occurred

When a timeout occurs, it will be notified as a request failure. The timeout should be the maximum number of seconds to wait. The callback will be automatically deregistered and if the response did subsequently arrive it would be ignored. If you wanted a timeout to occur but didn't want the callback to be deregistered, you would need to create your own timer. If that timer uses the conversation id corresponding to the request as the timer name, the timer will be automatically stopped if a response does actually arrive. You should not use the conversation id to set up a timer if you have already defined a timeout when calling the member function "processFailure()" as internally it will use the conversation id for its own timer.

Servicing a Request

When you send a request, if the remote service agent is implemented using the Python interface, not just any member function of the service can be called. In order that a member function of a service can be called, the service agent must have exported it as a public method. This is done by calling the member function "exportMethod()" and it would normally be done from within the constructor of the service agent.

   1 class PagingService(netsvc.Service):
   2     def __init__(self, name="SMS"):
   3         netsvc.Service.__init__(self, name)
   4         self.exportMethod(self.time)
   5         self.exportMethod(self.uptime)
   6         self.exportMethod(self.send)
   7     def time(self):
   8         return netsvc.DateTime()
   9     def uptime(self):
  10         # ...
  11     def send(self, number, message):
  12         # ...

By default the method name associated with the member function will be its actual name. If you wish to export a member function under a different method name, the method name can be supplied as an extra argument to the "exportMethod()" member function.

   1 class PagingService(netsvc.Service):
   2     def __init__(self, name="SMS"):
   3         netsvc.Service.__init__(self, name)
   4         self.exportMethod(self.sendMessage, "send")
   5     def sendMessage(self, number, message):
   6         # ...

The reason for requiring that methods be explicitly exported is that it would usually be quite dangerous to allow open access to all member functions of a class. This is because any class is likely to implement methods to assist in intermediate stages of processing a request. Providing automatic access to such member functions could compromise the operation of the class.

When a method is invoked as a result of a service request, the default behaviour will be that the value returned from the method will be what is returned to the caller as the response. If necessary, a method may explicitly indicate that a failure response should instead be returned. A method can also indicate that a delayed response will be sent. This latter case is useful when the service needs to do something first in order to generate a suitable response.

Generating a Failure

If a method encounters an error and raises an exception this will be caught by the service agent framework and a failure response will be generated. The value of the origin for this type of failure will be "netsvc" and the value of the error code will be "SERVER_APPLICATION_ERROR". If you want to generate a failure response which is specific to your application, you should catch any exceptions and indicate the type of failure response by calling the member function "abortResponse()".

   1 class Database(netsvc.Service):
   2     def __init__(self, name="database", **kw):
   3         netsvc.Service.__init__(self, name)
   4         self._database = MySQLdb.connect(**kw)
   5         self.exportMethod(self.execute)
   6     def execute(self, query, args=None):
   7         try:
   8             cursor = self._database.cursor()
   9             cursor.execute(query, args)
  10             if cursor.description == None:
  11                 return cursor.rowcount
  12             return cursor.fetchall()
  13         except MySQLdb.ProgrammingError, exception:
  14             details = netsvc.exceptionDetails()
  15             self.abortResponse(1, "Programming Error", "db", details)
  16         except MySQLdb.Error(error, description):
  17             self.abortResponse(error, description, "mysql")

The four arguments to the member function "abortResponse()" are the error code, the description of the error, the origin and any additional details. It is recommended that an origin which clearly identifies the source of the error, or namespace from which the error codes are derived always be used. If an origin is not used, it becomes impossible to programmatically deal with an error when different aspects of a service generate overlapping error code sets.

Note that the "abortResponse()" member function will in turn raise its own special exception. When this exception is caught by the service agent framework, it will be translated into a failure response as described by the arguments used to call "abortResponse()". As a new exception is raised, you should avoid an except clause which catches all exceptions in any code which encloses code which might call "abortResponse()". Alternatively, you should explicitly pass on exceptions of type "ServiceFailure".

   1 try:
   2     self.execute(...)
   3 except netsvc.ServiceFailure:
   4     raise
   5 except:
   6     self.abortResponse(...)

If many of the public methods of a service generate the same type of exceptions, rather than provide code to catch the exceptions in every method, it is possible to override the member function "executeMethod()". This member function is called by the service agent framework to call the actual member function referred to by a service request. It is important to preserve the existing functionality of this method otherwise service requests will not execute correctly.

   1 class Database(netsvc.Service):
   2     # ...
   3     def executeMethod(self, name, method, params):
   4       try:
   5           return netsvc.Service.executeMethod(self, name, method, params)
   6       except MySQLdb.ProgrammingError, exception:
   7           details = netsvc.exceptionDetails()
   8           self.abortResponse(1, "Programming Error", "db", details)
   9       except MySQLdb.Error(error, description):
  10           self.abortResponse(error, description, "mysql")

The member function "executeMethod()" might also be overridden if you want to track what requests are being made against a service. The arguments to the member function are the name of the method, the actual member function and the parameters to be supplied when the member function is called.

Delaying a Response

In a distributed application, it is sometimes the case that when a method is called it doesn't have the information necessary to generate an immediate response. This may be the case where it needs to initiate its own service requests to accumulate the data needed to generate the result. Because the service agent framework is based on an event driven system, it is not possible for the method to simply block waiting for its own data. This is because the method must return before anything else can execute.

To deal with this, the member functions "suspendResponse()" and "resumeResponse()" are provided. If the "suspendResponse()" member function is called, it will raise an exception which will be caught by the service agent framework. The name of this exception is "DelayedResponse" and lets the service agent framework know that a response will be sent at a later time.

When the member function "suspendResponse()" is called, a callback function should be supplied as argument which finalises the request and returns the appropriate result. The callback passed to "suspendResponse()" will only be called when the "resumeResponse()" method is called at some later point in time. In particular, you would call "resumeResponse()" once you have collected together all the data which forms the result for the original request.

   1 class DatabaseProxy(netsvc.Service):
   2     def __init__(self, name="database-proxy")
   3         netsvc.Service.__init__(self, name):
   4         self.exportMethod(self.tablesRequest, "tables")
   5         self._request = {}
   6         self._result = {}
   7     def tablesRequest(self):
   8         service = self.serviceEndPoint("database")
   9         id = service.execute("show tables")
  10         self.processResponse(self.queryResponse, id)
  11         self.processFailure(self.queryFailure, id)
  12         self._request[id] = self.conversationId()
  13         self.suspendResponse(self.tablesResult)
  14     def tablesResult(self):
  15         request = self.conversationId()
  16         result = self._result[request]
  17         del self._result[request]
  18         return result
  19     def queryResponse(self, id, result):
  20         if self._request.has_key(id):
  21             request = self._request[id]
  22             self._result[request] = result
  23             del self._request[id]
  24             self.resumeResponse(request)
  25     def queryFailure(self, id, error, description, origin, details):
  26         if self._request.has_key(id):
  27             request = self._request[id]
  28             del self._request[id]
  29             self.cancelResponse(request, error, description, origin, failure)

As can be seen, it will be necessary to save away state information about a suspended request so it can be later resumed. In this example the conversation id of the original request is associated with the conversation id of the downstream request. When the result of the downstream request is received, it can be saved away and the original request resumed with the cached result being returned. In the event that the downstream request fails, the "cancelResponse()" method is used to abort the original request.

As with the "abortResponse()" member function, if "suspendResponse()" is being called from within a method, it will be necessary for any code to be explicit about what exceptions it catches, or to at least catch the "DelayedResponse" exception and pass it on as is.

Note that "suspendResponse()" and "resumeResponse()" were only added in OSE 7.0pl5 and are a layer on top of the "delayResponse()" method which only performed the single operation of raising the exception of type "DelayedResponse". The newer functions should make the task of implementing a delayed response easier, so if you are using "delayResponse()" you should change your code to use the newer functions.

Identity of the Sender

Normally it is not necessary to know the identity of the sender of a request. If a means of identifying who has initiated the request is required however, the details of the current request can be queried to obtain the address of the sender. This can be useful where a separate session object in the form of a new service is created to manage interaction with a particular client. To obtain the request object for the current request the "currentRequest()" member function is used.

By calling the "sender()" member function of a request object, the service address of the service agent initiating the request can be obtained. Having created a separate session for that client, all requests for that session can be authenticated as being from the same service agent. Such a scheme may even have as a prelude a log in mechanism to ensure that a service agent making the request has sufficient privileges to initiate a separate session.

Whether or not a login and password is required, the idea is that the method used to initiate the session returns the name of the service created to manage the session. Such a session object should monitor the existence of the service agent who initiated the session such that the session can be destroyed automatically when the owner is withdrawn.

   1 class Session(netsvc.Service):
   2     def __init__(self, name, client):
   3         netsvc.Service.__init__(self, name)
   4         self._client = client
   5         self.subscribeServiceAddress(self.announce, client)
   6         self.exportMethod(self.close)
   7     def announce(self, binding, status):
   8         if status = netsvc.SERVICE_WITHDRAWN:
   9             self.destroyAgent()
  10     def close(self):
  11         client = self.currentRequest().sender()
  12         if client != self._client:
  13             self.abortResponse(1, "Not Owner of Session")
  14         self.destroyAgent()
  15 
  16   class Service(netsvc.Service):
  17     def __init__(self, name="service"):
  18         netsvc.Service.__init__(self, name)