Class Agent
public class Agent extends Object
JobQueueChecker
class.
It includes a listener service that can be used for remote control
of the agent. It is for example possible to send start
,
stop
and pause
requests.
This class is responsible for creating a JobExecutor
object and to delegate the actual execution of a job to it. The agent keep
track of the running jobs and makes sure that only the configured
number of jobs are running at the same time.
The agent is configured at construction time with parameters from
a Properties
object.
Parameter | Default value | Description |
---|---|---|
agent.user | - | The username to use for logging in to BASE (required) |
agent.password | - | The password to use for logging in to BASE (required) |
agent.id | - |
The external ID (required), name and description properties of the
corresponding JobAgent item in the BASE database
|
agent.name | - | |
agent.description | - | |
agent.port | 47822 | The port the remote control service listener is listening to |
agent.remotecontrol | - | A comma-separated list of computer that are allowed remote control. It is recommended that the web server is put in this list. The local host is always allowed control and doesn't have to be in this list. |
agent.allowremote.stop | false |
If stop requests should be allowed from remote hosts or not.
Note! A stop request shuts down the agent making it impossible to start it
again using remote control.
|
agent.allowremote.start | true | If start requests should be allowed from remote hosts or not |
agent.allowremote.pause | true | If pause requests should be allowed from remote hosts or not |
agent.request-handler.* | - |
One or more entries that can be used to register custom remote control request handlers.
The * should be replaced with the name of the protocol which means that requests
should take the form: protocol://custom-data....
The value is the name of a class that implements the |
agent.executor.class | ProcessJobExecutor |
The name of a class that is responsible for starting the a job once
the agent has determined that is allowed to be exected. The class must
implement the JobExecutor interface and provide a public noargument
constructor. Note that only one instance of this class exists for an agent.
It must be thread-safe since the jobs are executed in parallel threads.
|
agent.checkinterval | 30 | Number of seconds between checks to the database for new jobs. |
agent.shortest.slots | 1 | Number of slots to reserve for jobs that take < 1 minute to execute |
agent.shortest.priority | 4 | The thread priority of jobs in this slot. See Thread.setPriority(int) . |
agent.short.slots | 1 | Number of slots to reserve for jobs that take < 10 minute to execute |
agent.short.priority | 4 | The thread priority of jobs in this slot. See Thread.setPriority(int) . |
agent.medium.slots | 2 | Number of slots to reserve for jobs that take < 1 hour to execute |
agent.medium.priority | 3 | The thread priority of jobs in this slot. See Thread.setPriority(int) . |
agent.long.slots | 2 | Number of slots to reserve for jobs that take > 1 hour to execute |
agent.long.priority | 3 | The thread priority of jobs in this slot. See Thread.setPriority(int) . |
- Version:
- 2.0
- Author:
- nicklas
- Last modified
- $Date: 2021-06-04 12:40:45 +0200 (Fri, 04 Jun 2021) $
-
Field Summary
Fields Modifier and Type Field Description private Set<JobInfo>
activeJobs
private boolean
allowRemotePause
private boolean
allowRemoteStart
private boolean
allowRemoteStop
private long
checkInterval
private int
closeTimeout
Timeout to wait for jobs to act on the ABORT signal when stopping.private Map<String,Class<? extends CustomRequestHandler>>
customRequestHandlerClasses
static int
DEFAULT_CHECK_INTERVAL
The default check interval in seconds.static Class<? extends JobExecutor>
DEFAULT_JOB_EXECUTOR
The default job executor to use if none has been specified in the configuration file.static Class<? extends SlotManager>
DEFAULT_SLOT_MANAGER
The default slot manager to use if none has been specified in the configuration file.private String
description
private Class<? extends JobExecutor>
executorClass
private String
externalId
private boolean
isRunning
private JobExecutor
jobExecutor
private TimerTask
jobQueueChecker
private static org.slf4j.Logger
log
Log job agent events.private String
login
private static org.slf4j.Logger
logServer
Log job agent server events.private String
name
private String
password
private Integer
port
private Map<Job.ExecutionTime,Integer>
priorities
The thread priority to use when executing jobs in each slot.private Properties
properties
private Set<InetAddress>
remote
private MultiProtocolRequestHandler
requestHandler
private ThreadGroup
runnersGroup
The group were all job runners are placed.private SessionControl
sc
private JobAgentServerConnection
server
private InetAddress
serverAddress
private AgentSignalReceiver
signalReceiver
private SlotManager
slotManager
private Class<? extends SlotManager>
slotManagerClass
-
Constructor Summary
Constructors Constructor Description Agent(Properties properties)
Create a new job agent. -
Method Summary
Modifier and Type Method Description private void
closeJobExecutor()
Close the job executor.private void
closeJobQueueChecker()
Close the job queue checker.private void
closeServer()
Close the service listener.private void
closeSlotManager()
Close the slot manager.private JobExecutor
createJobExecutor()
Create a job executor and initialise it.private TimerTask
createJobQueueChecker()
Create a job queue checker. and register it with the BASE core scheduler.private SlotManager
createSlotManager()
Create a slot manager and initialize it.private Map<String,Class<? extends CustomRequestHandler>>
getCustomRequestHandlerClasses()
String
getDescription()
Get theagent.description
configuration value.String
getId()
Get theagent.id
configuration value.SessionControl
getImpersonatedSessionControl(Job job)
Get a session control where the owner of the job has been impersonated and the active project has been set if needed.JobAgent
getJobAgent(DbControl dc)
Get theJobAgent
item corresponding to this agent.private Class<? extends JobExecutor>
getJobExecutorClass(String className)
Get the class object for the configured job executor.String
getLogin()
Get theagent.user
configuration value.String
getName()
Get theagent.name
configuration value.String
getPassword()
Get theagent.password
configuration value.int
getPort()
Get theagent.port
configuration value.String
getProperty(String key)
Get a configuration property.private Set<InetAddress>
getRemoteAddresses(String config)
Split the string at commas and try to create anInetAddress
for each part.Set<JobInfo>
getRunningJobs()
Get a set containing the ID:s of the jobs that are currently beeing executed by this job agent.String
getServerName()
Get the host name of the server where the job agent is running.SessionControl
getSessionControl()
Get a session control with the configured user logged in.AgentSignalReceiver
getSignalReceiver()
Get the signal receiver that is processing signal messages on behalf of this job agent.(package private) Slot
getSlot(Job job)
Find a free slot for executing a job.private Class<? extends SlotManager>
getSlotManagerClass(String className)
Get the class object for the configured slot manager.int
getThreadPriority(Job.ExecutionTime slot)
Get the thread priority to use for the specified execution time slot.boolean
isAllowedControl(InetAddress remote, String cmd)
Check if the computer specified by the given address is allowed to control this job agent.boolean
isRunning()
Check if the job agent is running or not.(package private) void
jobDone(Job job, Slot usedSlot)
Used byJobRunner
to tell that a job has finished executing and that the used slot should be released.private void
maybeStopRunningJobs()
Try to stop running jobs by interrupting the threads they are executing in.void
pause()
Pause the job agent.private void
registerCustomRequestHandlers(MultiProtocolRequestHandler master)
void
registerRequestHandler(RequestHandler handler, String... protocols)
Register a request handler for one or more protocols.void
service(RequestHandler defaultHandler)
Start the listener service that listens for control commands such asstart
,stop
,pause
andinfo
.void
start()
Start the job agent.void
startJob(Job job, JobAgentSettings settings)
Start a job.void
stop()
Stop the job agent.void
unregisterRequestHandler(String... protocols)
Unregister one or more protocols.private void
validateConfiguration()
Validate that all required configuration parameters have been specified.
-
Field Details
-
DEFAULT_JOB_EXECUTOR
The default job executor to use if none has been specified in the configuration file. -
DEFAULT_SLOT_MANAGER
The default slot manager to use if none has been specified in the configuration file. -
DEFAULT_CHECK_INTERVAL
public static final int DEFAULT_CHECK_INTERVALThe default check interval in seconds.- See Also:
- Constant Field Values
-
log
private static final org.slf4j.Logger logLog job agent events. -
logServer
private static final org.slf4j.Logger logServerLog job agent server events. -
properties
-
login
-
password
-
externalId
-
name
-
description
-
port
-
remote
-
allowRemoteStop
private final boolean allowRemoteStop -
allowRemotePause
private final boolean allowRemotePause -
allowRemoteStart
private final boolean allowRemoteStart -
customRequestHandlerClasses
-
checkInterval
private final long checkInterval -
executorClass
-
slotManagerClass
-
serverAddress
-
server
-
requestHandler
-
signalReceiver
-
jobExecutor
-
slotManager
-
jobQueueChecker
-
isRunning
private boolean isRunning -
sc
-
priorities
The thread priority to use when executing jobs in each slot. -
activeJobs
-
closeTimeout
private int closeTimeoutTimeout to wait for jobs to act on the ABORT signal when stopping. Default value is 20 seconds. -
runnersGroup
The group were all job runners are placed.
-
-
Constructor Details
-
Agent
Create a new job agent. See class documentation for information about configuration parameters.- Parameters:
properties
- A properties object containing the configuration parameters for the agent
-
-
Method Details
-
getProperty
Get a configuration property.- Parameters:
key
- The key- Returns:
- The configured value or null if not found
-
getId
Get theagent.id
configuration value.- Returns:
- The ID
-
getLogin
Get theagent.user
configuration value.- Returns:
- The login
-
getPassword
Get theagent.password
configuration value.- Returns:
- The password
-
getName
Get theagent.name
configuration value.- Returns:
- The name or the ID if the name is null
-
getDescription
Get theagent.description
configuration value.- Returns:
- The description
-
getPort
public int getPort()Get theagent.port
configuration value.- Returns:
- The port number
-
getThreadPriority
Get the thread priority to use for the specified execution time slot.- Returns:
- Priority as an int. High number shows a high priority.
-
getServerName
Get the host name of the server where the job agent is running.- Returns:
- Server's name as a String.
- See Also:
SocketUtil.getPublicLocalHost()
-
getRemoteAddresses
Split the string at commas and try to create anInetAddress
for each part. Parts that turn out to be invalid are skipped and a warning message is logged.- Parameters:
config
- The string to split- Returns:
- A set of
InetAdress
object
-
getCustomRequestHandlerClasses
-
registerCustomRequestHandlers
-
getJobExecutorClass
Get the class object for the configured job executor. If the specified class can't be found or doesn't implement theJobExecutor
interface a warning message is logged and theDEFAULT_JOB_EXECUTOR
is used instead.- Parameters:
className
- The name of the job executor class- Returns:
- The class object for that class or the default job executor
-
getSlotManagerClass
Get the class object for the configured slot manager. If the specified class can't be found or doesn't implement theSlotManager
interface a warning message is logged and theDEFAULT_SLOT_MANAGER
is used instead.- Parameters:
className
- The name of the slot manager class- Returns:
- The class object for that class or the default job executor
-
validateConfiguration
Validate that all required configuration parameters have been specified.- Throws:
InvalidDataException
- If parameters are missing or have incorrect values
-
service
Start the listener service that listens for control commands such asstart
,stop
,pause
andinfo
. The listener service is only required if the job agent needs to respond to remote control commands.Note! The
AgentController
which is the default controller for agents always uses the listener service for communicating with the job agent.Note! The listener service is started in a separate thread and this method returns as soon as the network connections are set up.
Note! The default handler supplied as an argument is used as a fallback handler for unregistered protocols. Additional request handlers can be set up by calling
registerRequestHandler(RequestHandler, String...)
.- Parameters:
defaultHandler
- ARequestHandler
that handles the incoming requsts, or null to use theDefaultRequestHandler
- Throws:
IOException
- If there is an error when starting the service
-
start
public void start()Start the job agent. This method will register aJobQueueChecker
object with the BASE core schedulerApplication.getScheduler()
. The timer will call theJobQueueChecker.run()
method at intervals specified by theagent.checkinterval
configuration settings.Note! The
JobQueueChecker
will run in a separate thread and this method return immediately after registering the object with the scheduler.Note! This method also creats a single instance of a
JobExecutor
. The actual class to use is specified by theagent.jobexecutor.class
configuration setting. The default job executor isProcessJobExecutor
. -
stop
public void stop()Stop the job agent. This method will:- Cancel the
JobQueueChecker
that was registered with the BASE core scheduler by thestart()
method. - Call the
JobExecutor.close()
method on the job executor created by thestart()
method. - Close the service listener started by the
service(RequestHandler)
method. - Try to stop all running jobs by calling
Thread.interrupt()
on all job threads. - Logout from BASE.
- Cancel the
-
pause
public void pause()Pause the job agent. This method will:- Cancel the
JobQueueChecker
that was registered with the BASE core scheduler by thestart()
method. - Call the
JobExecutor.close()
method on the job executor created by thestart()
method. - Logout and stop the BASE
Application
.
start()
again will start the job agent again. - Cancel the
-
isRunning
public boolean isRunning()Check if the job agent is running or not.- Returns:
- TRUE if the job agent is running, FALSE otherwise
-
getRunningJobs
Get a set containing the ID:s of the jobs that are currently beeing executed by this job agent.- Returns:
- A set of JobInfo objects
- Since:
- 2.6
-
isAllowedControl
Check if the computer specified by the given address is allowed to control this job agent. A computer is allowed to control this job agent if it's name or ip-address is listed in theagent.remotecontrol
property. This method is called from theDefaultRequestHandler.handleCmd(java.net.Socket, String)
method to determine if a service request should be accepted or not.Note! The
stop
,start
andpause
commands are only allowed from the local host unless theagent.allowremote.stop
,agent.allowremote.start
andagent.allowremote.pause
are set to a true values.Note! The local host doesn't have to be listed in the
agent.remotecontrol
property. Requests are always allowed from the local host.- Parameters:
remote
- The address to the remote computercmd
- The command the remote computer wants to execute- Returns:
- TRUE if the computer is allowed, FALSE otherwise
-
registerRequestHandler
Register a request handler for one or more protocols. SeeMultiProtocolRequestHandler
for more information. Calls to this method is ignored if the job agent has no listener service. Seeservice(RequestHandler)
.- Parameters:
handler
- The request handlerprotocols
- The name of the protocols the handler should handle- Since:
- 2.16
-
unregisterRequestHandler
Unregister one or more protocols. SeeMultiProtocolRequestHandler
for more information. Calls to this method is ignored if the job agent has no listener service. Seeservice(RequestHandler)
.- Parameters:
protocols
- The name of the protocols the handler should handle- Since:
- 2.16
-
getSignalReceiver
Get the signal receiver that is processing signal messages on behalf of this job agent.- Since:
- 2.6
-
getSessionControl
Get a session control with the configured user logged in.- Returns:
- A session control
-
getImpersonatedSessionControl
Get a session control where the owner of the job has been impersonated and the active project has been set if needed.- Parameters:
job
- The job to get the impersonated session control for- Returns:
- A session control object
-
getJobAgent
Get theJobAgent
item corresponding to this agent. The job agent is looked up by theJobAgent.getByExternalId(DbControl, String)
method where the external ID is given by theagent.id
property.- Parameters:
dc
- The DbControl to use for database acces- Returns:
- A JobAgent item
-
getSlot
Find a free slot for executing a job. If there is a free slot in the requested pool the requested slot is returned, otherwise the method checks the slower-running pools for free slots and returns one of those. If there are no free slot available null is returned.Note! This method reserves the slot for the job. It is important that the
jobDone(Job, Slot)
method is called once the job has completed to return the slot to the pool. Failure to do so may result in that the agent thinks that all slots are used when they are not.- Parameters:
job
- The job that wants a slot- Returns:
- The assigned slot or null if no slot is available
-
startJob
Start a job. This method will create a new thread for running the job and return immediately. If the job for some reason can't be started, for example, if there are no available slots, a log messsage will be written but nothing else will happen. No exception will be thrown to the caller of this method.- Parameters:
job
- The job to startsettings
- Settings for the job agent to use.
-
jobDone
Used byJobRunner
to tell that a job has finished executing and that the used slot should be released.- Parameters:
job
- The job that has finishedusedSlot
- The slot that was used
-
closeServer
private void closeServer()Close the service listener. -
createJobQueueChecker
Create a job queue checker. and register it with the BASE core scheduler.- See Also:
Application.getScheduler()
,JobQueueChecker
-
closeJobQueueChecker
private void closeJobQueueChecker()Close the job queue checker. -
createJobExecutor
Create a job executor and initialise it.- Returns:
- A JobExecutor instance or null if none could be created
-
closeJobExecutor
private void closeJobExecutor()Close the job executor. -
createSlotManager
Create a slot manager and initialize it.- Returns:
- A SlotManager instance
- Since:
- 2.16
-
closeSlotManager
private void closeSlotManager()Close the slot manager. -
maybeStopRunningJobs
private void maybeStopRunningJobs()Try to stop running jobs by interrupting the threads they are executing in.
-