Message Exchange
The features of the service agent framework may be used standalone within a single process or across a set of connected processes. That is, use of the service agent framework is not dependent on a process being able to connect to a central message exchange process. When combined with the HTTP servlet framework and RPC over HTTP interface, a single process may be more than adequate for many applications, especially simple web based services.
If such a service starts to out grow the bounds of a single process however, the application can easily be split up across multiple processes or machines. This will enable services to be distributed based on load or proximity to required resources. Being able to split up the application in this way is also advantageous in that it becomes easier to introduce into the application distinct components which are written in C++ as opposed to Python.
Unlike most message oriented middleware packages, there is no dedicated message exchange process. Instead, the components relating to client and server aspects of the mechanism for implementing a distributed service agent framework are directly accessible. This means that it is possible to take an existing application and embed within it a message exchange server endpoint. Growing your application then becomes a simple matter of creating new processes which incorporate a message exchange client endpoint and have it connect to your original application.
The major classes in the OSE C++ class library used to provide this functionality are the OTC_Exchange, OTC_InetClient and OTC_InetListener classes. Note that the Python interface only provides the ability to create connections between processes which make use of the INET socket protocol. When the C++ interface is used directly, on a UNIX platform there is also the option of using the UNIX socket protocol.
Exchange Initialisation
To create a message exchange endpoint in a process, the Exchange class is used. When creating an instance of the Exchange class it is necessary to specify whether it is performing the role of a message exchange server or that of a client. A message exchange server is a process which takes on the role of being a hub for message distribution. That is, a message exchange server is a process which accepts connections from one or more message exchange clients and distributes messages between the client processes as appropriate.
Two different approaches can be taken in regard to the message exchange server. The first is that the message exchange server component can be embedded within an existing application and new clients attach to that existing application. Alternatively, a separate process can be created which embeds just the message exchange server component and the existing application, now modelled as a client, along with any new clients connect to this new process.
In both server configurations, initialisation of the message exchange server endpoint is the same. Subsequent to initialisation, the endpoint is then directed to listen on a specific port for any client connections.
1 port = 11111
2 exchange = netsvc.Exchange(netsvc.EXCHANGE_SERVER)
3 exchange.listen(port)
In the case of a message exchange client, instead of listening for connections, the endpoint is directed to connect to a message exchange server.
1 host = "localhost"
2 port = 11111
3 retry = 5
4 exchange = netsvc.Exchange(netsvc.EXCHANGE_CLIENT)
5 exchange.connect(host, port, retry)
Because it is possible that the message exchange server is not available, a retry delay can be specified. When supplied this will result in successive attempts to connect to the server until a connection is established. The retry delay when supplied needs to be specified in seconds.
Note that if a connection to the server is lost, the client will also attempt to reconnect automatically after the retry delay time has expired. This has the affect that a client will always try to stay connected to its server without you needing to take any specific action. Your process will not be prematurely shutdown if a connection cannot be established or if a connection is lost.
Service Availability
Unless the service audience of a service agent has been set so as to restrict its visibility, a service will automatically become visible within connected processes through the service registry of the remote process. That is, if a particular service is located within the same process as the message exchange server endpoint and a new client connects, a subscriber to that service in the client will be notified that the service is available. Similarly, any service within a client will become visible from the server as well as other connected clients.
Although the service is located in a separate process, the same service registry interface is used to subscribe to the presence of the service. Subscription to reports produced by the service and the issuing of requests against that service are also mediated through the same interface as previously described. The only exception to this is that the LocalService proxy class cannot be used to communicate with any service in a remote process, it being restricted to services implemented using Python which appear in the same process.
Except for the LocalService proxy class, that there is no distinction in the interface to communicate between services whether they be in the same or a remote process, means that it is a simple matter to split an application across multiple processes. If a distinct message exchange server process is used, all that is required is that each process embed a message exchange client and connect to the message exchange server.
As an example, a process supporting a service which publishes periodic reports would be written as follows.
1 class Publisher(netsvc.Service):
2 def __init__(self):
3 netsvc.Service.__init__("publisher")
4 self.publishReport("system.ctime", netsvc.DateTime(), -1)
5 self.startTimer(self.timeout, 10, "heartbeat")
6 def timeout(self, tag):
7 self.publishReport("system.time", netsvc.DateTime())
8 self.startTimer(self.timeout, 10, "heartbeat")
9
10 dispatcher = netsvc.Dispatcher()
11 dispatcher.monitor(signal.SIGINT)
12 exchange = netsvc.Exchange(netsvc.EXCHANGE_CLIENT)
13 exchange.connect("localhost", 11111, 5)
14 dispatcher.run()
The process containing the corresponding subscriber to this service would then be written as follows.
1 class Subscriber(netsvc.Service):
2 def __init__(self):
3 netsvc.Service.__init__(self)
4 self.monitorReports(self.report, "publisher", "system.*")
5 def report(self, service, subject, content):
6 name = service.serviceName()
7 identity = service.agentIdentity()
8 publisher = "(%s/%s)" % (`name`, identity)
9 if subject == "system.ctime":
10 now = str(netsvc.DateTime())
11 print "%s became available at %s" % (publisher, now)
12 print "%s originally started at %s" % (publisher, str(content))
13 elif subject == "system.time":
14 print "%s was still alive at %s" % (publisher, str(content))
15
16 dispatcher = netsvc.Dispatcher()
17 dispatcher.monitor(signal.SIGINT)
18 exchange = netsvc.Exchange(netsvc.EXCHANGE_CLIENT)
19 exchange.connect("localhost", 11111, 5)
20 dispatcher.run()
The only difference is that a message exchange client has been added to each, the actual services are identical to what they were when used in the same process.
In regard to announcements of service availability and their subsequent withdrawal, when everything is in the same process, such an announcements means that the service had been created or destroyed. In the context of a distributed system, such an announcement means that a service is now visible or is no longer visible. Such an announcement doesn't mean that the service was necessarily destroyed as it could be the case that the message exchange server process was shutdown. Thus the service could still exist, it just may not be reachable.
Because services may become unavailable, or connections lost and also because connections between processes will automatically restart when possible, it is important that client services take notice of announcements regarding the availability of a service it is using. A client service should not assume that a service it is using will always be available and should be programmed to accommodate this fact.
Connection Announcements
Monitoring the existence of services gives precise information about when such services become available. This however may be too much fine detail. If a client process needs to merely know when a connection had been established to the message exchange server, it is possible to create a derived version of the Exchange class and override the handleConnection() member function.
This member function will be called when a client has successfully connected to a server, when that connection is subsequently lost, or when an initial connection attempt fails. On the server side, the member function is called when a connection is accepted and when it is lost.
1 class Exchange(netsvc.Exchange):
2 def __init__(self, type):
3 netsvc.Exchange.__init__(self, type)
4 def handleConnection(self, announcement):
5 state = "INACTIVE"
6 if announcement.state() == netsvc.CONNECTION_ACTIVE:
7 state = "ACTIVE"
8 process = announcement.remoteProcess()
9 address = announcement.remoteAddress()
10 message = "%s %s (%s)" % (state, process, address)
11 logger.notify(netsvc.LOG_NOTICE, message)
Overriding this method can be useful purely for logging purposes, but might also be used in a client process to trigger an announcement to activate the function of the process upon a connection becoming active. Consequently, the operation of a client process could be suspended or the process shutdown when no active connection could be established or the connection lost.
This latter mode of operation would be necessary when a retry delay is not specified when connecting a message exchange client to a server. In this situation the retry delay defaults to the value of "-1", indicating that one and only one connection attempt should be made. If this is used, a client should monitor to see if the connection fails and shutdown the process if it does. Similarly, if it does manage to connect to the server, when that connection is subsequently lost the process should again be shutdown.
Note that creation of a one off connection will currently consume resources that cannot be reclaimed. This is a limitation of the Python interface and is not present when using the OSE C++ class library directly which has a way of reclaiming the resources. As the intent is that the message exchange framework is for permanent connections, this is not seen as too problematic at this time and will only be addressed at some time in the future.
Authorisation of Clients
As the message exchange framework provides direct access into an application, it may be desirable to restrict which hosts can connect in to an application. If this type of control is required, it can be implemented by creating a new derived version of the Exchange class and overriding the member function authorise(). For each client connection that a server gets, this member function will be called with the IP address of the host the client is located on. A server may then reject or accept the connection.
1 class Exchange(netsvc.Exchange):
2 def __init__(self, type, hosts=[]):
3 netsvc.Exchange.__init__(self, type)
4 self._allow = hosts
5 def authorise(self, host):
6 return host in self._allow
To accept a connection the member function should return a true value and false otherwise. When a connection is rejected, the client will see it as a failed connection attempt.
Distributed Exchange Server
When an application is distributed across multiple machines, it may not be desirable that processes on one machine must connect to the message exchange server located on another machine. The problem here is that if the machine hosting the message exchange server is shutdown, none of the processes located on remote machines will be able to communicate with each other. In essence there is a single point of failure.
When an application is distributed across multiple machines, it is often the case that even if one machine were to be shutdown, the processes on a different machine might be able to quite happily keep operating so long as they could still communicate. To support this, a means of setting up a distributed version of the message exchange server is provided.
In this arrangement, each machine has its own message exchange server, with each message exchange server connected to all others. If a machine is now shutdown or connections to one machine lost, other machines will still be able to communicate with processes on any machines which are still accessible. That is, loss of the message exchange server on one machine will only directly impact that machine.
To setup a distributed exchange server, the message exchange server endpoint is created as before. The difference is that as well as listening on a port for new connections, client like connections are created to the other message exchange servers. The aim here is to effectively create a star connected network between the message exchange servers. That is, each message exchange server has a connection to all other message exchange servers.
1 port = 11111
2 exchange = Exchange(netsvc.EXCHANGE_SERVER)
3 exchange.listen(port)
4
5 delay = 5
6 for host in hosts:
7 exchange.connect(host, port, delay)
Note that since connections are bidirectional, it is not necessary for each message exchange server to mutually connect to each other. That is, if you have two message exchange servers, it is only necessary for one to connect to the other. In other words, the list of remote hosts in one would be empty, where as the list of the remote hosts in the other would be the reciprocal host. If two message exchange servers do connect to each other, this will be detected and one connection will be ignored, however it should be avoided.
Multiple Exchange Groups
When creating a service agent, the default service audience is "*", indicating that knowledge of the service should be distributed as widely as possible. One alternative is to set the service audience to the empty string, which will always result in the service only being visible within its own process. What occurs for other values of the service audience property depends on the exchange group assigned to a message exchange endpoint.
By default, the exchange group of a message exchange endpoint is empty, but may be set by an optional argument when initialising the class. A message exchange endpoint is only able to be connected to a complimentary message exchange endpoint which is a member of the same group. That is, a message exchange client endpoint can only connect to a message exchange server endpoint with the same exchange group.
With respect to service visibility, a message exchange endpoint will only pass information about services if the service audience is "*", or if the service audience is the same as the exchange group. The only exception to this is when the exchange group is empty. In that case, an empty service audience will still restrict visibility of a service to its own process.
By using multiple exchange groups within an application, it becomes possible to segment an application into parts and restrict visibility of services to those parts of the applications which need to see them. As an example, a service may act as a front end for multiple back end services which do the real work and for which it is not necessary that they be visible.
In this example, the process containing the front end service, as well as creating a message exchange client endpoint for the default exchange group, would create its own message exchange server endpoint. The default name for this exchange group would be overridden and a different port used for connections. Back end processes would then connect to this new port, with all services in the back end processes having a service audience matching that of the new exchange group.
1 class FrontEnd(netsvc.Service):
2 def __init__(self, name="database")
3 netsvc.Service.__init__(self, name)
4 self.subscribeServiceGroup(self.announce, "backend")
5 def announce(self, binding, group, status):
6 if binding.serviceAudience() == "database":
7 # this is one of ours
8
9 default = netsvc.Exchange(netsvc.EXCHANGE_CLIENT)
10 default.connect("localhost", 11111, 5)
11 backend = netsvc.Exchange(netsvc.EXCHANGE_SERVER, "database")
12 backend.listen(11112)
The front end service would use subscription to a service group to know about the existence of any back end services. Each of the back end services would in turn add themselves to the same group so the front end is aware of their existence. The front end service can check the service audience for a service to know for sure that it is one of its back end services and not an imposter visible through the default exchange group.
1 class BackEnd(netsvc.Service):
2 def __init__(self, name="", audience="database")
3 netsvc.Service.__init__(self, name, audience)
4 self.joinGroup("backend")
5
6 backend = netsvc.Exchange(netsvc.EXCHANGE_CLIENT, "database")
7 backend.connect("localhost", 11112, 5)
Having done this, any services within the back end process will only be visible from other back end processes and the front end process. The services in the back end process will not be visible within any process reachable from the front end process over the original message exchange client endpoint attached to the default exchange group. Back end services will still be able to see any services on the default exchange group which had a service audience of "*".
Note that different exchange groups should not overlap. That is, they should only ever share at most one process with any other exchange group. In effect, exchange groups when used should form a hierarchy. The only time that loops are allowed within the way processes are connected is when creating a distributed exchange server for a specific exchange group.
Scalability of the Framework
Because there is no dedicated message exchange server process serving as the sole repository of service information, the service registry in each process will contain a record of all services it can see. As the size of an application grows to have very large number of services this may result in the size of what otherwise should be a small process to grow unnecessarily.
Currently there are couple of approaches that can be taken to reduce this problem, however, it is recommended that if you know that you will have very large numbers of services and specifically publishers and subscribers, that you might be better off purchasing one of the commercial products which are specifically designed and targeted at such large scale systems. Such products might not support the concept of distinct services and instead implement a flat name space for subscriptions, but they are more likely to scale better.
In other words, the design of the service agent framework and the message exchange framework lends itself to small to medium size systems. Don't expect to be able to run the whole of the New York stock market data feeds through this system as it will more than likely not suit your requirements.
Having made this disclaimer, the first things you can do to reduce growth in the size of the service registry in each process, is to not export a service beyond the scope of a process unless you really need to. That is, if the service only needs to be visible within its own process, sets its service audience to be the empty string.
Such a service will not be visible outside of the process and that service will not be able to subscribe to services outside of the process, but in most cases the service will still be able to make a request against a remote service. Restricting the visibility of a service to its own process will also cut down on traffic between processes relating to the existence and withdrawal of services.
The next thing which can be done is to look more closely at the relationship that exists between services. If there are a group of related services which only need to talk to each, locate them together. This can be done by putting them in the same process and restricting visibility to that process, or by separating them from the remainder of the application by creating a distinct exchange group. In both cases, have only the services which need to be public actually visible globally.
