Agent Remoting
JBoss/Remoting
The communications layer used by the agent (and by the server as well) has JBoss/Remoting at its foundation. The RHQ comm layer builds on top of JBoss/Remoting - adding features like guaranteed delivery, throttling, and remote POJO invocations.
Commands/Command Responses/Command Services
Deep down in the RHQ comm layer (sitting on top of JBoss/Remoting) is the Command layer. All messages flowing into and out of the agent are Command objects. When a command reaches the remote server, it is processed by what is known as a Command Service which in turn responds with a Command Response object. This Command/CommandResponse framework is the basic building block upon which the other aspects of the communications framework is built upon.
Remote POJO Invocation Layer
Sitting higher up in the comm stack is the remote POJO invocation layer. This layer provides most of the communications API used by the rest of the subsystems. Developers write their POJOs as a normal Java object, remote them via the RHQ Remoting subsystem and they immediately become available for remote clients to call. Using the remote POJO invocation layer allows development to proceed without the developer having to know anything about the remoting layer while at the same time being given functionality such as guaranteed delivery, asynchronous invocations, failover, etc.
Writing new commands / command services
Here's what you need to do to add new commands to the agent.
- Create a Command
- Extend AbstractCommand
- Define the Command Type (the command name and version)
- Define the Command parameters
- Create methods that return strongly typed parameters
- Define constructors that delegate to all superclass' constructors
- EXAMPLE: see EchoCommand.java
- Create a Command Response
- Extend AbstractCommandResponse
- Create a method that returns the result object as a strongly typed object
- Define constructors that delegate to all superclass' constructors
- EXAMPLE see EchoCommandResponse.java
- Create a Command Service (the thing that executes the command on the server)
- Extend CommandService or MultipleCommandService (if you want to more easily support multiple commands in a single service)
- Override method getSupportedCommandTypes(), or getSupportedCommandTypeExecutors() if using MultipleCommandService, to define the commands this command service can execute
- Implement the execute method (or write CommandExecutor objects if using MultipleCommandService)
- EXAMPLE EchoCommandService
You don't technically need Commands and Command Responses - you can use GenericCommand/GenericCommandResponse - however, these are weakly typed and requires you to perform casting to actually process the commands in a more specific way (as opposed to generically processing commands, like the CmdlineClient does).
After these are defined, you add them to the agent. You can do this statically by adding your CommandService class name to communications.properties (the rhq.communications.command-services property). Or you can dynamically add command services at runtime. Requires rhq.communications.command-service-directory.allow-dynamic-discovery to be set to true in communications.properties (which it is in the default props file). Do add the services, you call ServiceContainer.addCommandService() passing in either a String (which is the classname of your service) or an Object (which is the command service instance).
RPC / remote POJOs
You can write POJOs that are remoteable. Use ServiceContainer.addRemotePojo() to make your pojo instance remoted. Use ClientRemotePojoInvoker.getRemotePojo() to get a proxy to the remoted POJO. This floats over the command infrastructure, thus giving it all the functionality that the command stuff provides (like setting timeout for commands, guaranteed delivery, concurrent, asynchronous sending, synchronous sending, etc. etc.)
ServiceContainer
This singleton contains the server-side services. You can dynamically add new command services (so new commands can be processed) and you can add new network detection listeners so you can detect new servers (RHQ Agents or RHQ Servers both act as "comm servers") that come on and off line.
ClientCommandSender
This provides a client that can send commands to any remote server (RHQ Agent/RHQ Server). It can:
- send concurrent messages
- queue up commands when the maximum number of concurrent commands are in process of sending
- can send asynchronously (with a callback mechanism to tell you when its done) or synchronously.
- you can define what timeout to use - this is the time to wait before aborting a command. A command can define its own timeout or can just use the default timeout that is configured (agent.properties rhq.agent.client.command-timeout-msecs)
- you can enable throttling - the sender can be told that it can only send a maximum number of commands in a particular time period
- guaranteed delivery such that we persist the command to disk and only after we send it do we purge from disk
You can get an instance from the agent (AgentMain.getClientCommandSender() which sends the commands to the RHQ Server) or you can instantiate your own (requires you to know the remote endpoint or the RHQ Server or whatever remote server you want to send to).
When writing agent-side plugins, all of this remoting infrastructure and pojo registration is handled for you by the agent and its plugin container.
PersistentFifo
There is a class that persists data in a FIFO queue in a single file - the class is PersistentFifo. I have a bunch of tests that proves it works 
It persists byte[] data - but its API allows you to persist objects and take objects from the queue (it simply (de)serializes the object):
- void put(byte[])
- byte[] take()
- void putObject(Serializable object)
- Object takeObject()
You provide 3 things to the constructor - the File where the data is written, the maximum file size limit and a purge percentage. The maximum file size indicates how big we will allow the file to get before we need to shrink it somehow. If you put an entry on the FIFO, and that entry causes the file size to go over that max file size, this will trigger a purge. A purge will force the file to reduce in size such that it goes below the percentage of the max file size as defined by purge_percentage parameter (for example, if the maximum file size is 100KB and the purge percentage is 90, then a purge will shink the file down to no more than 90KB). A purge performs two things:
- it compresses the file by removing all entries that have already been taken off the queue but still exist in the file
- if compressing the file doesn't bring the file size down below the max, the purge will begin to remove entries from the queue (the oldest entries are removed first).
See the class javadocs for details on how this is implemented.
I think we can use this for both my needs (to persist commands for guaranteed delivery) and for the plugin container to use if it wants to squirrel away metric data to a local spool file, for example. You can squirrel away double[] arrays using putObject and takeObject for example.
Also plan to refactor this or add a new class to allow persisting the messages to a database, as opposed to a filesystem.