2.17.2: 2011-06-17

net.sf.basedb.clients.jobagent
Class Agent

java.lang.Object
  extended by net.sf.basedb.clients.jobagent.Agent

public class Agent
extends Object

This is the actual job agent application. It delegates the actual checking of the job queue to the JobQueueChecker class. It includes a listener service that can be used for remote control of the agent. It is for example possible to send start, stop and pause requests.

This class is responsible for creating a JobExecutor object and to delegate the actual execution of a job to it. The agent keep track of the running jobs and makes sure that only the configured number of jobs are running at the same time.

The agent is configured at construction time with parameters from a Properties object.

Parameter Default value Description
agent.user - The username to use for logging in to BASE (required)
agent.password - The password to use for logging in to BASE (required)
agent.id - The external ID (required), name and description properties of the corresponding JobAgent item in the BASE database
agent.name -
agent.description -
agent.port 47822 The port the remote control service listener is listening to
agent.remotecontrol - A comma-separated list of computer that are allowed remote control. It is recommended that the web server is put in this list. The local host is always allowed control and doesn't have to be in this list.
agent.allowremote.stop false If stop requests should be allowed from remote hosts or not. Note! A stop request shuts down the agent making it impossible to start it again using remote control.
agent.allowremote.start true If start requests should be allowed from remote hosts or not
agent.allowremote.pause true If pause requests should be allowed from remote hosts or not
agent.request-handler.* - One or more entries that can be used to register custom remote control request handlers. The * should be replaced with the name of the protocol which means that requests should take the form: protocol://custom-data....

The value is the name of a class that implements the CustomRequestHandler interface. The implementation must provide a public no-argument constructor. The implementation must be thread-safe and be able to handle multiple requests at the time.

agent.executor.class ProcessJobExecutor The name of a class that is responsible for starting the a job once the agent has determined that is allowed to be exected. The class must implement the JobExecutor interface and provide a public noargument constructor. Note that only one instance of this class exists for an agent. It must be thread-safe since the jobs are executed in parallel threads.
agent.checkinterval 30 Number of seconds between checks to the database for new jobs.
agent.shortest.slots 1 Number of slots to reserve for jobs that take < 1 minute to execute
agent.shortest.priority 4 The thread priority of jobs in this slot. See Thread.setPriority(int).
agent.short.slots 1 Number of slots to reserve for jobs that take < 10 minute to execute
agent.short.priority 4 The thread priority of jobs in this slot. See Thread.setPriority(int).
agent.medium.slots 2 Number of slots to reserve for jobs that take < 1 hour to execute
agent.medium.priority 3 The thread priority of jobs in this slot. See Thread.setPriority(int).
agent.long.slots 2 Number of slots to reserve for jobs that take > 1 hour to execute
agent.long.priority 3 The thread priority of jobs in this slot. See Thread.setPriority(int).

Version:
2.0
Author:
nicklas
Last modified
$Date: 2010-10-19 13:02:17 +0200 (Tue, 19 Oct 2010) $

Field Summary
private  Set<JobInfo> activeJobs
           
private  boolean allowRemotePause
           
private  boolean allowRemoteStart
           
private  boolean allowRemoteStop
           
private  long checkInterval
           
private  int closeTimeout
          Timeout to wait for jobs to act on the ABORT signal when stopping.
private  Map<String,Class<? extends CustomRequestHandler>> customRequestHandlerClasses
           
static int DEFAULT_CHECK_INTERVAL
          The default check interval in seconds.
static Class<? extends JobExecutor> DEFAULT_JOB_EXECUTOR
          The default job executor to use if none has been specified in the configuration file.
static Class<? extends SlotManager> DEFAULT_SLOT_MANAGER
          The default slot manager to use if none has been specified in the configuration file.
private  String description
           
private  Class<? extends JobExecutor> executorClass
           
private  String externalId
           
private  boolean isRunning
           
private  JobExecutor jobExecutor
           
private  TimerTask jobQueueChecker
           
private static Logger log
          Log job agent events.
private  String login
           
private static Logger logServer
          Log job agent server events.
private  String name
           
private  String password
           
private  Integer port
           
private  Map<Job.ExecutionTime,Integer> priorities
          The thread priority to use when executing jobs in each slot.
private  Properties properties
           
private  Set<InetAddress> remote
           
private  MultiProtocolRequestHandler requestHandler
           
private  ThreadGroup runnersGroup
          The group were all job runners are placed.
private  SessionControl sc
           
private  JobAgentServerConnection server
           
private  InetAddress serverAddress
           
private  AgentSignalReceiver signalReceiver
           
private  SlotManager slotManager
           
private  Class<? extends SlotManager> slotManagerClass
           
 
Constructor Summary
Agent(Properties properties)
          Create a new job agent.
 
Method Summary
private  void closeJobExecutor()
          Close the job executor.
private  void closeJobQueueChecker()
          Close the job queue checker.
private  void closeServer()
          Close the service listener.
private  void closeSlotManager()
          Close the slot manager.
private  JobExecutor createJobExecutor()
          Create a job executor and initialise it.
private  TimerTask createJobQueueChecker()
          Create a job queue checker. and register it with the BASE core scheduler.
private  SlotManager createSlotManager()
          Create a slot manager and initialize it.
private  Map<String,Class<? extends CustomRequestHandler>> getCustomRequestHandlerClasses()
           
 String getDescription()
          Get the agent.description configuration value.
 String getId()
          Get the agent.id configuration value.
 SessionControl getImpersonatedSessionControl(Job job)
          Get a session control where the owner of the job has been impersonated and the active project has been set if needed.
 JobAgent getJobAgent(DbControl dc)
          Get the JobAgent item corresponding to this agent.
private  Class<? extends JobExecutor> getJobExecutorClass(String className)
          Get the class object for the configured job executor.
 String getLogin()
          Get the agent.user configuration value.
 String getName()
          Get the agent.name configuration value.
 String getPassword()
          Get the agent.password configuration value.
 int getPort()
          Get the agent.port configuration value.
 String getProperty(String key)
          Get a configuration property.
private  Set<InetAddress> getRemoteAddresses(String config)
          Split the string at commas and try to create an InetAddress for each part.
 Set<JobInfo> getRunningJobs()
          Get a set containing the ID:s of the jobs that are currently beeing executed by this job agent.
 String getServerName()
          Get the host name of the server where the job agent is running.
 SessionControl getSessionControl()
          Get a session control with the configured user logged in.
 AgentSignalReceiver getSignalReceiver()
          Get the signal receiver that is processing signal messages on behalf of this job agent.
(package private)  Slot getSlot(Job job)
          Find a free slot for executing a job.
private  Class<? extends SlotManager> getSlotManagerClass(String className)
          Get the class object for the configured slot manager.
 int getThreadPriority(Job.ExecutionTime slot)
          Get the thread priority to use for the specified execution time slot.
 boolean isAllowedControl(InetAddress remote, String cmd)
          Check if the computer specified by the given address is allowed to control this job agent.
 boolean isRunning()
          Check if the job agent is running or not.
(package private)  void jobDone(Job job, Slot usedSlot)
          Used by JobRunner to tell that a job has finished executing and that the used slot should be released.
private  void maybeStopRunningJobs()
          Try to stop running jobs by interrupting the threads they are executing in.
 void pause()
          Pause the job agent.
private  void registerCustomRequestHandlers(MultiProtocolRequestHandler master)
           
 void registerRequestHandler(RequestHandler handler, String... protocols)
          Register a request handler for one or more protocols.
 void service(RequestHandler defaultHandler)
          Start the listener service that listens for control commands such as start, stop, pause and info.
 void start()
          Start the job agent.
 void startJob(Job job, JobAgentSettings settings)
          Start a job.
 void stop()
          Stop the job agent.
 void unregisterRequestHandler(String... protocols)
          Unregister one or more protocols.
private  void validateConfiguration()
          Validate that all required configuration parameters have been specified.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

DEFAULT_JOB_EXECUTOR

public static final Class<? extends JobExecutor> DEFAULT_JOB_EXECUTOR
The default job executor to use if none has been specified in the configuration file.


DEFAULT_SLOT_MANAGER

public static final Class<? extends SlotManager> DEFAULT_SLOT_MANAGER
The default slot manager to use if none has been specified in the configuration file.


DEFAULT_CHECK_INTERVAL

public static final int DEFAULT_CHECK_INTERVAL
The default check interval in seconds.

See Also:
Constant Field Values

log

private static final Logger log
Log job agent events.


logServer

private static final Logger logServer
Log job agent server events.


properties

private final Properties properties

login

private final String login

password

private final String password

externalId

private final String externalId

name

private final String name

description

private final String description

port

private final Integer port

remote

private final Set<InetAddress> remote

allowRemoteStop

private final boolean allowRemoteStop

allowRemotePause

private final boolean allowRemotePause

allowRemoteStart

private final boolean allowRemoteStart

customRequestHandlerClasses

private final Map<String,Class<? extends CustomRequestHandler>> customRequestHandlerClasses

checkInterval

private final long checkInterval

executorClass

private final Class<? extends JobExecutor> executorClass

slotManagerClass

private final Class<? extends SlotManager> slotManagerClass

serverAddress

private InetAddress serverAddress

server

private JobAgentServerConnection server

requestHandler

private MultiProtocolRequestHandler requestHandler

signalReceiver

private AgentSignalReceiver signalReceiver

jobExecutor

private JobExecutor jobExecutor

slotManager

private SlotManager slotManager

jobQueueChecker

private TimerTask jobQueueChecker

isRunning

private boolean isRunning

sc

private SessionControl sc

priorities

private final Map<Job.ExecutionTime,Integer> priorities
The thread priority to use when executing jobs in each slot.


activeJobs

private final Set<JobInfo> activeJobs

closeTimeout

private int closeTimeout
Timeout to wait for jobs to act on the ABORT signal when stopping. Default value is 20 seconds.


runnersGroup

private final ThreadGroup runnersGroup
The group were all job runners are placed.

Constructor Detail

Agent

public Agent(Properties properties)
Create a new job agent. See class documentation for information about configuration parameters.

Parameters:
properties - A properties object containing the configuration parameters for the agent
Method Detail

getProperty

public String getProperty(String key)
Get a configuration property.

Parameters:
key - The key
Returns:
The configured value or null if not found

getId

public String getId()
Get the agent.id configuration value.

Returns:
The ID

getLogin

public String getLogin()
Get the agent.user configuration value.

Returns:
The login

getPassword

public String getPassword()
Get the agent.password configuration value.

Returns:
The password

getName

public String getName()
Get the agent.name configuration value.

Returns:
The name or the ID if the name is null

getDescription

public String getDescription()
Get the agent.description configuration value.

Returns:
The description

getPort

public int getPort()
Get the agent.port configuration value.

Returns:
The port number

getThreadPriority

public int getThreadPriority(Job.ExecutionTime slot)
Get the thread priority to use for the specified execution time slot.

Parameters:
slot -
Returns:
Priority as an int. High number shows a high priority.

getServerName

public String getServerName()
Get the host name of the server where the job agent is running.

Returns:
Server's name as a String.
See Also:
SocketUtil.getPublicLocalHost()

getRemoteAddresses

private Set<InetAddress> getRemoteAddresses(String config)
Split the string at commas and try to create an InetAddress for each part. Parts that turn out to be invalid are skipped and a warning message is logged.

Parameters:
config - The string to split
Returns:
A set of InetAdress object

getCustomRequestHandlerClasses

private Map<String,Class<? extends CustomRequestHandler>> getCustomRequestHandlerClasses()

registerCustomRequestHandlers

private void registerCustomRequestHandlers(MultiProtocolRequestHandler master)

getJobExecutorClass

private Class<? extends JobExecutor> getJobExecutorClass(String className)
Get the class object for the configured job executor. If the specified class can't be found or doesn't implement the JobExecutor interface a warning message is logged and the DEFAULT_JOB_EXECUTOR is used instead.

Parameters:
className - The name of the job executor class
Returns:
The class object for that class or the default job executor

getSlotManagerClass

private Class<? extends SlotManager> getSlotManagerClass(String className)
Get the class object for the configured slot manager. If the specified class can't be found or doesn't implement the SlotManager interface a warning message is logged and the DEFAULT_SLOT_MANAGER is used instead.

Parameters:
className - The name of the slot manager class
Returns:
The class object for that class or the default job executor

validateConfiguration

private void validateConfiguration()
                            throws InvalidDataException
Validate that all required configuration parameters have been specified.

Throws:
InvalidDataException - If parameters are missing or have incorrect values

service

public void service(RequestHandler defaultHandler)
             throws IOException
Start the listener service that listens for control commands such as start, stop, pause and info. The listener service is only required if the job agent needs to respond to remote control commands.

Note! The AgentController which is the default controller for agents always uses the listener service for communicating with the job agent.

Note! The listener service is started in a separate thread and this method returns as soon as the network connections are set up.

Note! The default handler supplied as an argument is used as a fallback handler for unregistered protocols. Additional request handlers can be set up by calling registerRequestHandler(RequestHandler, String...).

Parameters:
defaultHandler - A RequestHandler that handles the incoming requsts, or null to use the DefaultRequestHandler
Throws:
IOException - If there is an error when starting the service

start

public void start()
Start the job agent. This method will register a JobQueueChecker object with the BASE core scheduler Application.getScheduler(). The timer will call the JobQueueChecker.run() method at intervals specified by the agent.checkinterval configuration settings.

Note! The JobQueueChecker will run in a separate thread and this method return immediately after registering the object with the scheduler.

Note! This method also creats a single instance of a JobExecutor. The actual class to use is specified by the agent.jobexecutor.class configuration setting. The default job executor is ProcessJobExecutor.


stop

public void stop()
Stop the job agent. This method will: Unless no other things are running in the same virtual machine as this job agent the virtual machine should exit as a result from calling this method.


pause

public void pause()
Pause the job agent. This method will: This method will not try to stop the running jobs or shut down the BASE application. Calling start() again will start the job agent again.

See Also:
stop(), start()

isRunning

public boolean isRunning()
Check if the job agent is running or not.

Returns:
TRUE if the job agent is running, FALSE otherwise

getRunningJobs

public Set<JobInfo> getRunningJobs()
Get a set containing the ID:s of the jobs that are currently beeing executed by this job agent.

Returns:
A set of JobInfo objects
Since:
2.6

isAllowedControl

public boolean isAllowedControl(InetAddress remote,
                                String cmd)
Check if the computer specified by the given address is allowed to control this job agent. A computer is allowed to control this job agent if it's name or ip-address is listed in the agent.remotecontrol property. This method is called from the DefaultRequestHandler.handleCmd(java.net.Socket, String) method to determine if a service request should be accepted or not.

Note! The stop, start and pause commands are only allowed from the local host unless the agent.allowremote.stop, agent.allowremote.start and agent.allowremote.pause are set to a true values.

Note! The local host doesn't have to be listed in the agent.remotecontrol property. Requests are always allowed from the local host.

Parameters:
remote - The address to the remote computer
cmd - The command the remote computer wants to execute
Returns:
TRUE if the computer is allowed, FALSE otherwise

registerRequestHandler

public void registerRequestHandler(RequestHandler handler,
                                   String... protocols)
Register a request handler for one or more protocols. See MultiProtocolRequestHandler for more information. Calls to this method is ignored if the job agent has no listener service. See service(RequestHandler).

Parameters:
handler - The request handler
protocols - The name of the protocols the handler should handle
Since:
2.16

unregisterRequestHandler

public void unregisterRequestHandler(String... protocols)
Unregister one or more protocols. See MultiProtocolRequestHandler for more information. Calls to this method is ignored if the job agent has no listener service. See service(RequestHandler).

Parameters:
protocols - The name of the protocols the handler should handle
Since:
2.16

getSignalReceiver

public AgentSignalReceiver getSignalReceiver()
Get the signal receiver that is processing signal messages on behalf of this job agent.

Since:
2.6

getSessionControl

public SessionControl getSessionControl()
Get a session control with the configured user logged in.

Returns:
A session control

getImpersonatedSessionControl

public SessionControl getImpersonatedSessionControl(Job job)
Get a session control where the owner of the job has been impersonated and the active project has been set if needed.

Parameters:
job - The job to get the impersonated session control for
Returns:
A session control object

getJobAgent

public JobAgent getJobAgent(DbControl dc)
Get the JobAgent item corresponding to this agent. The job agent is looked up by the JobAgent.getByExternalId(DbControl, String) method where the external ID is given by the agent.id property.

Parameters:
dc - The DbControl to use for database acces
Returns:
A JobAgent item

getSlot

Slot getSlot(Job job)
Find a free slot for executing a job. If there is a free slot in the requested pool the requested slot is returned, otherwise the method checks the slower-running pools for free slots and returns one of those. If there are no free slot available null is returned.

Note! This method reserves the slot for the job. It is important that the jobDone(Job, Slot) method is called once the job has completed to return the slot to the pool. Failure to do so may result in that the agent thinks that all slots are used when they are not.

Parameters:
job - The job that wants a slot
Returns:
The assigned slot or null if no slot is available

startJob

public void startJob(Job job,
                     JobAgentSettings settings)
Start a job. This method will create a new thread for running the job and return immediately. If the job for some reason can't be started, for example, if there are no available slots, a log messsage will be written but nothing else will happen. No exception will be thrown to the caller of this method.

Parameters:
job - The job to start
settings - Settings for the job agent to use.

jobDone

void jobDone(Job job,
             Slot usedSlot)
Used by JobRunner to tell that a job has finished executing and that the used slot should be released.

Parameters:
job - The job that has finished
usedSlot - The slot that was used

closeServer

private void closeServer()
Close the service listener.


createJobQueueChecker

private TimerTask createJobQueueChecker()
Create a job queue checker. and register it with the BASE core scheduler.

See Also:
Application.getScheduler(), JobQueueChecker

closeJobQueueChecker

private void closeJobQueueChecker()
Close the job queue checker.


createJobExecutor

private JobExecutor createJobExecutor()
Create a job executor and initialise it.

Returns:
A JobExecutor instance or null if none could be created

closeJobExecutor

private void closeJobExecutor()
Close the job executor.


createSlotManager

private SlotManager createSlotManager()
Create a slot manager and initialize it.

Returns:
A SlotManager instance
Since:
2.16

closeSlotManager

private void closeSlotManager()
Close the slot manager.


maybeStopRunningJobs

private void maybeStopRunningJobs()
Try to stop running jobs by interrupting the threads they are executing in.


2.17.2: 2011-06-17