|
Gaudi Framework, version v22r2 |
| Home | Generated: Tue May 10 2011 |
Data structure used for sending monitoring data to a MonaLisa module. More...
#include <ApMon.h>

Public Member Functions | |
| ApMon (char *initsource) throw (runtime_error) | |
| Initializes an ApMon object from a configuration file or URL. | |
| ApMon (int nDestinations, char **destinationsList) throw (runtime_error) | |
| Initializes an ApMon data structure from a vector of strings. | |
| ApMon (int nDestinations, char **destAddresses, int *destPorts, char **destPasswds) throw (runtime_error) | |
| Initializes an ApMon data structure, using arrays instead of a file. | |
| ~ApMon () | |
| ApMon destructor. | |
| int | sendParameter (char *clusterName, char *nodeName, char *paramName, int valueType, char *paramValue) throw (runtime_error) |
| Sends a parameter and its value to the MonALISA module. | |
| int | sendTimedParameter (char *clusterName, char *nodeName, char *paramName, int valueType, char *paramValue, int timestamp) throw (runtime_error) |
| Sends a parameter and its value to the MonALISA module, together with a timestamp. | |
| int | sendParameter (char *clusterName, char *nodeName, char *paramName, int paramValue) throw (runtime_error) |
| Sends an integer parameter and its value to the MonALISA module. | |
| int | sendParameter (char *clusterName, char *nodeName, char *paramName, float paramValue) throw (runtime_error) |
| Sends a parameter of type float and its value to the MonALISA module. | |
| int | sendParameter (char *clusterName, char *nodeName, char *paramName, double paramValue) throw (runtime_error) |
| Sends a parameter of type double and its value to the MonALISA module. | |
| int | sendParameter (char *clusterName, char *nodeName, char *paramName, char *paramValue) throw (runtime_error) |
| Sends a parameter of type string and its value to the MonALISA module. | |
| int | sendParameters (char *clusterName, char *nodeName, int nParams, char **paramNames, int *valueTypes, char **paramValues) throw (runtime_error) |
| Sends a parameter of type string and its value to the MonALISA module. | |
| int | sendTimedParameters (char *clusterName, char *nodeName, int nParams, char **paramNames, int *valueTypes, char **paramValues, int timestamp) throw (runtime_error) |
| Sends a set of parameters and their values to the MonALISA module, together with a timestamp. | |
| bool | getConfCheck () |
| Returns the value of the confCheck flag. | |
| long | getRecheckInterval () |
| Returns the value of the time interval (in seconds) between two recheck operations for the configuration files. | |
| void | setRecheckInterval (long val) |
| Sets the value of the time interval (in seconds) between two recheck operations for the configuration files. | |
| void | setConfRecheck (bool confRecheck, long interval) |
| Enables/disables the periodical check for changes in the configuration files/URLs. | |
| void | setConfRecheck (bool confRecheck) |
| Enables/disables the periodical check for changes in the configuration files/URLs. | |
| void | setJobMonitoring (bool jobMonitoring, long interval) |
| Enables/disables the periodical sending of datagrams with job monitoring information. | |
| void | setJobMonitoring (bool jobMonitoring) |
| Enables/disables the job monitoring. | |
| long | getJobMonitorInterval () |
| Returns the interval at which job monitoring datagrams are sent. | |
| bool | getJobMonitoring () |
| Returns true if the job monitoring is enabled, and false otherwise. | |
| void | setSysMonitoring (bool sysMonitoring, long interval) |
| Enables/disables the periodical sending of datagrams with system monitoring information. | |
| void | setSysMonitoring (bool sysMonitoring) |
| Enables/disables the system monitoring. | |
| long | getSysMonitorInterval () |
| Returns the interval at which system monitoring datagrams are sent. | |
| bool | getSysMonitoring () |
| Returns true if the system monitoring is enabled, and false otherwise. | |
| void | setGenMonitoring (bool genMonitoring, int nIntervals) |
| Enables/disables the periodical sending of datagrams with general system information. | |
| void | setGenMonitoring (bool genMonitoring) |
| Enables/disables the sending of datagrams with general system information. | |
| bool | getGenMonitoring () |
| Returns true if the sending of general system information is enabled and false otherwise. | |
| void | addJobToMonitor (long pid, char *workdir, char *clusterName, char *nodeName) throw (runtime_error) |
| Adds a new job to the list of the jobs monitored by ApMon. | |
| void | removeJobToMonitor (long pid) throw (runtime_error) |
| Removes a job from the list of the jobs monitored by ApMon. | |
| void | setSysMonClusterNode (char *clusterName, char *nodeName) |
| This function is called by the user to set the cluster name and the node name for the system monitoring datagrams. | |
| void | setMaxMsgRate (int maxRate) |
| This sets the maxim number of messages that are send to MonALISA in one second. | |
Static Public Member Functions | |
| static void | setLogLevel (char *newLevel_s) |
| Sets the ApMon logging level. | |
| static void | errExit (char *msg) |
| Displays an error message and exits with -1 as return value. | |
Protected Member Functions | |
| void | initialize (char *filename, bool firstTime) throw (runtime_error) |
| Initializes an ApMon object from a configuration file. | |
| void | constructFromList (int nDestinations, char **destinationsList) throw (runtime_error) |
| Initializes an ApMon object from a list with URLs and destination hosts. | |
| void | initialize (int nDestinations, char **destList, bool firstTime) throw (runtime_error) |
| Initializes an ApMon object from a list with URLs and destination hosts. | |
| void | loadFile (char *filename, int *nDestinations, char **destAddresses, int *destPorts, char **destPasswds) throw (runtime_error) |
| Parses a configuration file which contains addresses, ports and passwords for the destination hosts and puts the results in the vectors given as parameters. | |
| void | arrayInit (int nDestinations, char **destAddresses, int *destPorts, char **destPasswds) throw (runtime_error) |
| Internal function that initializes an ApMon data structure. | |
| void | arrayInit (int nDestinations, char **destAddresses, int *destPorts, char **destPasswds, bool firstTime) throw (runtime_error) |
| Internal function that initializes an ApMon data structure. | |
| void | addToDestinations (char *line, int *nDestinations, char *destAddresses[], int destPorts[], char *destPasswds[]) |
| Parses the string line, which has the form hostname:port, and adds the hostname and the port to the lists given as parameters. | |
| void | getDestFromWeb (char *url, int *nDestinations, char *destAddresses[], int destPorts[], char *destPasswds[], ConfURLs &confURLs) throw (runtime_error) |
| Gets a configuration file from a web location and adds the destination addresses and ports to the lists given as parameters. | |
| void | encodeParams (int nParams, char **paramNames, int *valueTypes, char **paramValues, int timestamp) throw (runtime_error) |
| Encodes in the XDR format the data from a ApMon structure. | |
| void | initMonitoring () |
| Initializes the monitoring configurations and the names of the parameters included in the monitoring datagrams. | |
| void | sendJobInfo () |
| Sends datagrams containing information about the jobs that are currently being monitored. | |
| void | sendOneJobInfo (MonitoredJob job) |
| Sends datagrams with monitoring information about the specified job to all the destination hosts. | |
| void | updateJobInfo (MonitoredJob job) |
| Update the monitoring information regarding the specified job. | |
| void | sendSysInfo () |
| Sends datagrams with system monitoring information to all the destination hosts. | |
| void | updateSysInfo () |
| Update the system monitoring information with new values obtained from the proc/ filesystem. | |
| void | sendGeneralInfo () |
| Sends datagrams with general system monitoring information to all the destination hosts. | |
| void | updateGeneralInfo () |
| Update the general monitoring information. | |
| void | setBackgroundThread (bool val) |
| Sets the value of the confCheck flag. | |
| long | getCrtRecheckInterval () |
| Returns the actual value of the time interval (in seconds) between two recheck operations for the configuration files. | |
| void | setCrtRecheckInterval (long val) |
| void | freeConf () |
| Frees the data structures needed to hold the configuratin settings. | |
| void | parseXApMonLine (char *line) |
| Parses an xApMon line from the configuration file and sets the corresponding parameters in the ApMon object. | |
| void | initSocket () throw (runtime_error) |
| Initializes the UDP socket used to send the datagrams. | |
| void | parseConf (FILE *fp, int *nDestinations, char **destAddresses, int *destPorts, char **destPasswds) throw (runtime_error) |
| Parses the contents of a configuration file. | |
| bool | shouldSend () |
| Decides if the current datagram should be sent (so that the maximum number of datagrams per second is respected in average). | |
Protected Attributes | |
| char * | clusterName |
| The name of the monitored cluster. | |
| char * | nodeName |
| The name of the monitored node. | |
| char * | sysMonCluster |
| The cluster name used when sending system monitoring datagrams. | |
| char * | sysMonNode |
| The node name used when sending system monitoring datagrams. | |
| int | nDestinations |
| The number of destinations to send the results to. | |
| char ** | destAddresses |
| The IP addresses where the results will be sent. | |
| int * | destPorts |
| The ports where the destination hosts listen. | |
| char ** | destPasswds |
| Passwords for the MonALISA hosts. | |
| char * | buf |
| The buffer which holds the message data (encoded in XDR). | |
| int | dgramSize |
| The size of the data inside the datagram (header not included) | |
| int | sockfd |
| Socket descriptor. | |
| bool | confCheck |
| If this flag is true, the configuration file / URLs are periodically rechecked for changes. | |
| int | nInitSources |
| The number of initialization sources. | |
| char ** | initSources |
| The name(s) of the initialization source(s) (file or list). | |
| int | initType |
| long | recheckInterval |
| The configuration file and the URLs are checked for changes at this numer of seconds (this value is requested by the user and will be used if no errors appear when reloading the configuration). | |
| long | crtRecheckInterval |
| If the configuration URLs cannot be reloaded, the interval until the next attempt will be increased. | |
| pthread_t | bkThread |
| Background thread which periodically rechecks the configuration and sends monitoring information. | |
| pthread_mutex_t | mutex |
| Used to protect the general ApMon data structures. | |
| pthread_mutex_t | mutexBack |
| Used to protect the variables needed by the background thread. | |
| pthread_mutex_t | mutexCond |
| Used for the condition variable confChangedCond. | |
| pthread_cond_t | confChangedCond |
| Used to notify changes in the monitoring configuration. | |
| bool | recheckChanged |
| These flags indicate changes in the monitoring configuration. | |
| bool | jobMonChanged |
| bool | sysMonChanged |
| bool | haveBkThread |
| If this flag is true, the background thread is created (but not necessarily started). | |
| bool | bkThreadStarted |
| If this flag is true, the background thread is started. | |
| bool | stopBkThread |
| If this flag is true, there was a request to stop the background thread. | |
| bool | autoDisableMonitoring |
| If this flag is set to true, when the value of a parameter cannot be read from proc/, ApMon will not attempt to include that value in the next datagrams. | |
| bool | sysMonitoring |
| If this flag is true, packets with system information taken from /proc are periodically sent to MonALISA. | |
| bool | jobMonitoring |
| If this flag is true, packets with job information taken from /proc are periodically sent to MonALISA. | |
| bool | genMonitoring |
| If this flag is true, packets with general system information taken from /proc are periodically sent to MonALISA. | |
| long | jobMonitorInterval |
| Job/System monitoring information obtained from /proc is sent at these time intervals. | |
| long | sysMonitorInterval |
| int | genMonitorIntervals |
| General system monitoring information is sent at a time interval equal to genMonitorIntervals * sysMonitorInterval. | |
| int | nSysMonitorParams |
| Number of parameters that can be enabled/disabled by the user in the system/job/general monitoring datagrams. | |
| int | nJobMonitorParams |
| int | nGenMonitorParams |
| char * | sysMonitorParams [MAX_SYS_PARAMS] |
| char * | genMonitorParams [MAX_GEN_PARAMS] |
| char * | jobMonitorParams [MAX_JOB_PARAMS] |
| int | actSysMonitorParams [MAX_SYS_PARAMS] |
| int | actGenMonitorParams [MAX_GEN_PARAMS] |
| int | actJobMonitorParams [MAX_JOB_PARAMS] |
| ConfURLs | confURLs |
| int | nMonJobs |
| The number of jobs that will be monitored. | |
| MonitoredJob * | monJobs |
| Array which holds information about the jobs to be monitored. | |
| long | lastModifFile |
| The last time when the configuration file was modified. | |
| time_t | lastJobInfoSend |
| char | username [MAX_STRING_LEN] |
| The name of the user who owns this process. | |
| char | groupname [MAX_STRING_LEN] |
| The group to which the user belongs. | |
| char | myHostname [MAX_STRING_LEN] |
| The name of the host on which ApMon currently runs. | |
| char | myIP [MAX_STRING_LEN] |
| The main IP address of the host on which ApMon currently runs. | |
| int | numIPs |
| The number of IP addresses of the host. | |
| char | allMyIPs [20][20] |
| A list with all the IP addresses of the host. | |
| int | numCPUs |
| The number of CPUs on the machine that runs ApMon. | |
| bool | sysInfo_first |
| time_t | lastSysInfoSend |
| The moment when the last system monitoring datagram was sent. | |
| double | lastSysVals [MAX_SYS_PARAMS] |
| double | currentSysVals [MAX_SYS_PARAMS] |
| int | sysRetResults [MAX_SYS_PARAMS] |
| double | currentJobVals [MAX_JOB_PARAMS] |
| int | jobRetResults [MAX_JOB_PARAMS] |
| double | currentGenVals [MAX_GEN_PARAMS] |
| int | genRetResults [MAX_GEN_PARAMS] |
| double | currentProcessStates [NLETTERS] |
| char | cpuVendor [100] |
| char | cpuFamily [100] |
| char | cpuModel [100] |
| char | cpuModelName [200] |
| char | interfaceNames [20][20] |
| The names of the network interfaces. | |
| int | nInterfaces |
| The number of network interfaces. | |
| double | lastBytesSent [20] |
| The total number of bytes sent through each interface, when the previous system monitoring datagram was sent. | |
| double | lastBytesReceived [20] |
| double | lastNetErrs [20] |
| The total number of network errors for each interface, when the previous system monitoring datagram was sent. | |
| double * | currentNetIn |
| The current values for the net_in, net_out, net_errs parameters. | |
| double * | currentNetOut |
| double * | currentNetErrs |
| double | currentNSockets [4] |
| The number of open TCP, UDP, ICM and Unix sockets. | |
| double | currentSocketsTCP [20] |
| The number of TCP sockets in each possible state (ESTABLISHED, LISTEN, ...) | |
| char * | socketStatesMapTCP [20] |
| Table that associates the names of the TCP sockets states with the symbolic constants. | |
| int | maxMsgRate |
| long | prvTime |
| double | prvSent |
| double | prvDrop |
| long | crtTime |
| long | crtSent |
| long | crtDrop |
| double | hWeight |
| int | instance_id |
| Random number that identifies this instance of ApMon. | |
| int | seq_nr |
| Sequence number for the packets that are sent to MonALISA. | |
Friends | |
| class | ProcUtils |
| void * | bkTask (void *param) |
| This function is executed in a background thread and has two roles: it automatically sends the system/job monitoring parameters (if the user requested) and it checks the configuration file/URLs for changes. | |
Data structure used for sending monitoring data to a MonaLisa module.
The data is packed in UDP datagrams, in XDR format. A datagram has the following structure:
Since v1.6 ApMon has the xApMon extension, which can be configured to send periodically, in a background thread, monitoring information regarding the system and/or some specified jobs.
Definition at line 212 of file ApMon.h.
| ApMon::ApMon | ( | char * | initsource ) | throw (runtime_error) |
Initializes an ApMon object from a configuration file or URL.
| filename | The name of the file/URL which contains the addresses and the ports of the destination hosts, and also the passwords (see README for details about the structure of this file). |
Definition at line 61 of file ApMon.cpp.
{
if (initsource == NULL)
throw runtime_error("[ ApMon() ] No conf file/URL provided");
if (strstr(initsource, "http://") == initsource) {
char *destList[1];
destList[0] = initsource;
constructFromList(1, destList);
} else {
nInitSources = 1;
initType = FILE_INIT;
initSources = (char **)malloc(nInitSources * sizeof(char *));
if (initSources == NULL)
throw runtime_error("[ ApMon() ] Error allocating memory.");
initSources[0] = strdup(initsource);
initMonitoring();
initialize(initsource, true);
}
}
| ApMon::ApMon | ( | int | nDestinations, |
| char ** | destinationsList | ||
| ) | throw (runtime_error) |
Initializes an ApMon data structure from a vector of strings.
The strings can be of the form hostname[:port] [passwd] or can be URLs from where the hostnames are to be read.
Definition at line 142 of file ApMon.cpp.
{
constructFromList(nDestinations, destinationsList);
}
| ApMon::ApMon | ( | int | nDestinations, |
| char ** | destAddresses, | ||
| int * | destPorts, | ||
| char ** | destPasswds | ||
| ) | throw (runtime_error) |
Initializes an ApMon data structure, using arrays instead of a file.
| nDestinations | The number of destination hosts where the results will be sent. |
| destAddresses | Array that contains the hostnames or IP addresses of the destination hosts. |
| destPorts | The ports where the MonaLisa modules listen on the destination hosts. |
| destPasswds | The passwords for the MonALISA hosts. |
| ApMon::~ApMon | ( | ) |
ApMon destructor.
Definition at line 568 of file ApMon.cpp.
{
int i;
if (bkThreadStarted) {
if (getJobMonitoring()) {
/* send a datagram with job monitoring information which covers
the last time interval */
sendJobInfo();
}
}
pthread_mutex_lock(&mutexBack);
setBackgroundThread(false);
pthread_mutex_unlock(&mutexBack);
pthread_mutex_destroy(&mutex);
pthread_mutex_destroy(&mutexBack);
pthread_mutex_destroy(&mutexCond);
pthread_cond_destroy(&confChangedCond);
free(clusterName);
free(nodeName);
free(sysMonCluster); free(sysMonNode);
freeConf();
free(monJobs);
for (i = 0; i < nInitSources; i++) {
free(initSources[i]);
}
free(initSources);
free(buf);
#ifndef WIN32
close(sockfd);
#else
closesocket(sockfd);
WSACleanup();
#endif
}
| void ApMon::addJobToMonitor | ( | long | pid, |
| char * | workdir, | ||
| char * | clusterName, | ||
| char * | nodeName | ||
| ) | throw (runtime_error) |
Adds a new job to the list of the jobs monitored by ApMon.
| pid | The job's PID. |
| workdir | The working directory of the job. If it is NULL or if it has a zero length, directory monitoring will be disabled for this job. |
| clusterName | The cluster name associated with the monitoring data for this job in MonALISA. |
| nodeName | The node name associated with the monitoring data for this job in MonALISA. |
Definition at line 1292 of file ApMon.cpp.
{
if (nMonJobs >= MAX_MONITORED_JOBS)
throw runtime_error("[ addJobToMonitor() ] Maximum number of jobs that can be monitored exceeded.");
MonitoredJob job;
job.pid = pid;
if (workdir == NULL)
strcpy(job.workdir, "");
else
strcpy(job.workdir, workdir);
if (clusterName == NULL || strlen(clusterName) == 0)
strcpy(job.clusterName, "ApMon_JobMon");
else
strcpy(job.clusterName, clusterName);
if (nodeName == NULL || strlen(nodeName) == 0)
strcpy(job.nodeName, this -> myIP);
else
strcpy(job.nodeName, nodeName);
monJobs[nMonJobs++] = job;
}
| void ApMon::addToDestinations | ( | char * | line, |
| int * | nDestinations, | ||
| char * | destAddresses[], | ||
| int | destPorts[], | ||
| char * | destPasswds[] | ||
| ) | [protected] |
Parses the string line, which has the form hostname:port, and adds the hostname and the port to the lists given as parameters.
| line | The line to be parsed. |
| nDestinations | The number of destination hosts in the lists. Will be modified (incremented) in the function. |
| destAddresses | The list with IP addresses or hostnames. |
| destPorts | The list of corresponding ports. |
| destPasswds | Passwords for the destination hosts. |
Definition at line 235 of file ApMon.cpp.
{
char *addr, *port, *passwd;
const char *sep1 = " \t";
const char *sep2 = ":";
char *tmp = strdup(line);
char *firstToken;
// char buf[MAX_STRING_LEN];
// char *pbuf = buf;
/* the address & port are separated from the password with spaces */
firstToken = strtok/*_r*/(tmp, sep1);//, &pbuf);
passwd = strtok/*_r*/(NULL, sep1);//, &pbuf);
/* the address and the port are separated with ":" */
addr = strtok/*_r*/(firstToken, sep2);//, &pbuf);
port = strtok/*_r*/(NULL, sep2);//, &pbuf);
destAddresses[*nDestinations] = strdup(addr);
if (port == NULL)
destPorts[*nDestinations] = DEFAULT_PORT;
else
destPorts[*nDestinations] = atoi(port);
if (passwd == NULL)
destPasswds[*nDestinations] = strdup("");
else
destPasswds[*nDestinations] = strdup(passwd);
(*nDestinations)++;
free(tmp);
}
| void ApMon::arrayInit | ( | int | nDestinations, |
| char ** | destAddresses, | ||
| int * | destPorts, | ||
| char ** | destPasswds | ||
| ) | throw (runtime_error) [protected] |
Internal function that initializes an ApMon data structure.
| nDestinations | The number of destination hosts where the results will be sent. |
| destAddresses | Array that contains the hostnames or IP addresses of the destination hosts. |
| destPorts | The ports where the MonaLisa modules listen on the destination hosts. |
| destPasswds | Passwords for the destination hosts. |
Definition at line 365 of file ApMon.cpp.
{
arrayInit(nDestinations, destAddresses, destPorts, destPasswds, true);
}
| void ApMon::arrayInit | ( | int | nDestinations, |
| char ** | destAddresses, | ||
| int * | destPorts, | ||
| char ** | destPasswds, | ||
| bool | firstTime | ||
| ) | throw (runtime_error) [protected] |
Internal function that initializes an ApMon data structure.
| nDestinations | The number of destination hosts where the results will be sent. |
| destAddresses | Array that contains the hostnames or IP addresses of the destination hosts. |
| destPorts | The ports where the MonaLisa modules listen on the destination hosts. |
| destPasswds | Passwords for the destination hosts. |
| firstTime | If it is true, all the initializations will be done (the object is being constructed now). Else, only some structures will be reinitialized. |
Definition at line 372 of file ApMon.cpp.
{
int i, j;
int ret;
char *ipAddr, logmsg[100];
bool found, havePublicIP;
int tmpNDestinations;
char **tmpAddresses, **tmpPasswds;
int *tmpPorts;
if (destAddresses == NULL || destPorts == NULL || nDestinations == 0)
throw runtime_error("[ arrayInit() ] Destination addresses or ports not provided");
/* initializations that we have to do only once */
if (firstTime) {
//this -> appPID = getpid();
this -> nMonJobs = 0;
this -> monJobs = (MonitoredJob *)malloc(MAX_MONITORED_JOBS *
sizeof(MonitoredJob));
try {
this -> numCPUs = ProcUtils::getNumCPUs();
} catch (procutils_error &err) {
logger(WARNING, err.what());
this -> numCPUs = 0;
}
/* get the names of the network interfaces */
this -> nInterfaces = 0;
try {
ProcUtils::getNetworkInterfaces(this -> nInterfaces,
this -> interfaceNames);
} catch (procutils_error &err) {
logger(WARNING, err.what());
this -> nInterfaces = 0;
}
/* get the hostname of the machine */
ret = gethostname(this -> myHostname, MAX_STRING_LEN -1);
if (ret < 0) {
logger(WARNING, "Could not obtain the local hostname");
strcpy(myHostname, "unknown");
} else
myHostname[MAX_STRING_LEN - 1] = 0;
/* get the IPs of the machine */
this -> numIPs = 0; havePublicIP = false;
strcpy(this -> myIP, "unknown");
/* default values for cluster name and node name */
this -> clusterName = strdup("ApMon_UserSend");
this -> nodeName = strdup(myHostname);
#ifndef WIN32
int sockd = socket(PF_INET, SOCK_STREAM, 0);
if(sockd < 0){
logger(WARNING, "Could not obtain local IP addresses");
} else {
for (i = 0; i < this -> nInterfaces; i++) {
struct ifreq ifr;
memset(&ifr, 0, sizeof(ifr));
strncpy(ifr.ifr_name, this -> interfaceNames[i], sizeof(ifr.ifr_name) - 1);
if(ioctl(sockd, SIOCGIFADDR, &ifr)<0)
continue; //????????
char ip[4], tmp_s[20];
#ifdef __APPLE__
memcpy(ip, ifr.ifr_addr.sa_data+2, 4);
#else
memcpy(ip, ifr.ifr_hwaddr.sa_data+2, 4);
#endif
strcpy(tmp_s, inet_ntoa(*(struct in_addr *)ip));
sprintf(logmsg, "Found local IP address: %s", tmp_s);
logger(FINE, logmsg);
if (strcmp(tmp_s, "127.0.0.1") != 0 && !havePublicIP) {
strcpy(this -> myIP, tmp_s);
if (!isPrivateAddress(tmp_s))
havePublicIP = true;
}
strcpy(this -> allMyIPs[this -> numIPs], tmp_s);
this -> numIPs++;
}
}
#else
struct hostent *hptr;
if ((hptr = gethostbyname(myHostname))!= NULL) {
i = 0;
struct in_addr addr;
while ((hptr -> h_addr_list)[i] != NULL) {
memcpy(&(addr.s_addr), (hptr -> h_addr_list)[i], 4);
ipAddr = inet_ntoa(addr);
if (strcmp(ipAddr, "127.0.0.1") != 0) {
strcpy(this -> myIP, ipAddr);
if (!isPrivateAddress(ipAddr))
break;
}
i++;
}
}
#endif
this -> sysMonCluster = strdup("ApMon_SysMon");
this -> sysMonNode = strdup(this -> myIP);
this -> prvTime = 0;
this -> prvSent = 0;
this -> prvDrop = 0;
this -> crtTime = 0;
this -> crtSent = 0;
this -> crtDrop = 0;
this -> hWeight = exp(-5.0/60.0);
srand(time(NULL));
/* initialize buffer for XDR encoding */
this -> buf = (char *)malloc(MAX_DGRAM_SIZE);
if (this -> buf == NULL)
throw runtime_error("[ arrayInit() ] Error allocating memory");
this -> dgramSize = 0;
/*create the socket & set options*/
initSocket();
/* initialize the sender ID and the sequence number */
instance_id = rand();
seq_nr = 0;
}
/* put the destination addresses, ports & passwords in some temporary
buffers (because we don't want to lock mutex while making DNS
requests)
*/
tmpNDestinations = 0;
tmpPorts = (int *)malloc(nDestinations * sizeof(int));
tmpAddresses = (char **)malloc(nDestinations * sizeof(char *));
tmpPasswds = (char **)malloc(nDestinations * sizeof(char *));
if (tmpPorts == NULL || tmpAddresses == NULL ||
tmpPasswds == NULL)
throw runtime_error("[ arrayInit() ] Error allocating memory");
for (i = 0; i < nDestinations; i++) {
try {
ipAddr = findIP(destAddresses[i]);
} catch (runtime_error &err) {
logger(FATAL, err.what());
continue;
}
/* make sure this address is not already in the list */
found = false;
for (j = 0; j < tmpNDestinations; j++) {
if (!strcmp(ipAddr, tmpAddresses[j])) {
found = true;
break;
}
}
/* add the address to the list */
if (!found) {
tmpAddresses[tmpNDestinations] = ipAddr;
tmpPorts[tmpNDestinations] = destPorts[i];
tmpPasswds[tmpNDestinations] = strdup(destPasswds[i]);
sprintf(logmsg, "Adding destination host: %s - port %d",
tmpAddresses[tmpNDestinations], tmpPorts[tmpNDestinations]);
logger(INFO, logmsg);
tmpNDestinations++;
}
}
if (tmpNDestinations == 0) {
freeMat(tmpAddresses, tmpNDestinations);
freeMat(tmpPasswds, tmpNDestinations);
throw runtime_error("[ arrayInit() ] There is no destination host specified correctly!");
}
pthread_mutex_lock(&mutex);
if (!firstTime)
freeConf();
this -> nDestinations = tmpNDestinations;
this -> destAddresses = tmpAddresses;
this -> destPorts = tmpPorts;
this -> destPasswds = tmpPasswds;
pthread_mutex_unlock(&mutex);
/* start job/system monitoring according to the settings previously read
from the configuration file */
setJobMonitoring(jobMonitoring, jobMonitorInterval);
setSysMonitoring(sysMonitoring, sysMonitorInterval);
setGenMonitoring(genMonitoring, genMonitorIntervals);
setConfRecheck(confCheck, recheckInterval);
}
| void ApMon::constructFromList | ( | int | nDestinations, |
| char ** | destinationsList | ||
| ) | throw (runtime_error) [protected] |
Initializes an ApMon object from a list with URLs and destination hosts.
Definition at line 146 of file ApMon.cpp.
{
int i;
if (destinationsList == NULL)
throw runtime_error("[ constructFromList() ] Null destination list");
#ifdef __APPLE__
initType = OLIST_INIT;
#else
initType = LIST_INIT;
#endif
initMonitoring();
/* save the initialization list */
nInitSources = nDestinations;
initSources = (char **)malloc(nInitSources * sizeof(char*));
if (initSources == NULL)
throw runtime_error("[ ApMon() ] Error allocating memory.");
for (i = 0; i < nInitSources; i++)
initSources[i] = strdup(destinationsList[i]);
initialize(nDestinations, destinationsList, true);
}
| void ApMon::encodeParams | ( | int | nParams, |
| char ** | paramNames, | ||
| int * | valueTypes, | ||
| char ** | paramValues, | ||
| int | timestamp | ||
| ) | throw (runtime_error) [protected] |
Encodes in the XDR format the data from a ApMon structure.
Must be called before sending the data over the newtork.
Definition at line 808 of file ApMon.cpp.
{
XDR xdrs; /* XDR handle. */
int i, effectiveNParams;
/* count the number of parameters actually sent in the datagram
(the parameters with a NULL name and the string parameters
with a NULL value are skipped)
*/
effectiveNParams = nParams;
for (i = 0; i < nParams; i++) {
if (paramNames[i] == NULL || (valueTypes[i] == XDR_STRING &&
paramValues[i] == NULL)) {
effectiveNParams--;
}
}
if (effectiveNParams == 0)
throw runtime_error("[ encodeParams() ] No valid parameters in datagram, sending aborted");
/*** estimate the length of the send buffer ***/
/* add the length of the cluster name & node name */
dgramSize = xdrSize(XDR_STRING, clusterName) +
xdrSize(XDR_STRING, nodeName) + xdrSize(XDR_INT32, NULL);
/* add the lengths for the parameters (name + size + value) */
for (i = 0; i < nParams; i++) {
dgramSize += xdrSize(XDR_STRING, paramNames[i]) + xdrSize(XDR_INT32, NULL) +
+ xdrSize(valueTypes[i], paramValues[i]);
}
/* check that the maximum datagram size is not exceeded */
if (dgramSize + MAX_HEADER_LENGTH > MAX_DGRAM_SIZE)
throw runtime_error("[ encodeParams() ] Maximum datagram size exceeded");
/* initialize the XDR stream */
xdrmem_create(&xdrs, buf, MAX_DGRAM_SIZE, XDR_ENCODE);
try {
/* encode the cluster name, the node name and the number of parameters */
if (!xdr_string(&xdrs, &(clusterName), strlen(clusterName)
+ 1))
throw runtime_error("[ encodeParams() ] XDR encoding error for the cluster name");
if (!xdr_string(&xdrs, &(nodeName), strlen(nodeName) + 1))
throw runtime_error("[ encodeParams() ] XDR encoding error for the node name");
if (!xdr_int(&xdrs, &(effectiveNParams)))
throw runtime_error("[ encodeParams() ] XDR encoding error for the number of parameters");
/* encode the parameters */
for (i = 0; i < nParams; i++) {
if (paramNames[i] == NULL || (valueTypes[i] == XDR_STRING &&
paramValues[i] == NULL)) {
logger(WARNING, "NULL parameter name or value - skipping parameter...");
continue;
}
/* parameter name */
if (!xdr_string(&xdrs, &(paramNames[i]), strlen(paramNames[i]) + 1))
throw runtime_error("[ encodeParams() ] XDR encoding error for parameter name");
/* parameter value type */
if (!xdr_int(&xdrs, &(valueTypes[i])))
throw runtime_error("[ encodeParams() ] XDR encoding error for parameter value type");
/* parameter value */
switch (valueTypes[i]) {
case XDR_STRING:
if (!xdr_string(&xdrs, &(paramValues[i]),
strlen(paramValues[i]) + 1))
throw runtime_error("[ encodeParams() ] XDR encoding error for parameter value");
break;
//INT16 is not supported
/* case XDR_INT16:
if (!xdr_short(&xdrs, (short *)(paramValues[i])))
return RET_ERROR;
break;
*/ case XDR_INT32:
if (!xdr_int(&xdrs, (int *)(paramValues[i])))
throw runtime_error("[ encodeParams() ] XDR encoding error for parameter value");
break;
case XDR_REAL32:
if (!xdr_float(&xdrs, (float *)(paramValues[i])))
throw runtime_error("[ encodeParams() ] XDR encoding error for parameter value");
break;
case XDR_REAL64:
if (!xdr_double(&xdrs, (double *)(paramValues[i])))
throw runtime_error("[ encodeParams() ] XDR encoding error for parameter value");
break;
default:
throw runtime_error("[ encodeParams() ] Unknown type for XDR encoding");
}
}
/* encode the timestamp if necessary */
if (timestamp > 0) {
if (!xdr_int(&xdrs, ×tamp))
throw runtime_error("[ encodeParams() ] XDR encoding error for the timestamp");
dgramSize += xdrSize(XDR_INT32, NULL);
}
} catch (runtime_error& err) {
xdr_destroy(&xdrs);
throw err;
}
xdr_destroy(&xdrs);
}
| static void ApMon::errExit | ( | char * | msg ) | [static] |
Displays an error message and exits with -1 as return value.
| msg | The message to be displayed. |
| void ApMon::freeConf | ( | ) | [protected] |
Frees the data structures needed to hold the configuratin settings.
Definition at line 609 of file ApMon.cpp.
{
int i;
freeMat(destAddresses, nDestinations);
freeMat(destPasswds, nDestinations);
free(destPorts);
for (i = 0; i < confURLs.nConfURLs; i++) {
free(confURLs.vURLs[i]);
free(confURLs.lastModifURLs[i]);
}
}
| bool ApMon::getConfCheck | ( | ) | [inline] |
| long ApMon::getCrtRecheckInterval | ( | ) | [inline, protected] |
Returns the actual value of the time interval (in seconds) between two recheck operations for the configuration files.
Definition at line 956 of file ApMon.h.
{
return crtRecheckInterval;
}
| void ApMon::getDestFromWeb | ( | char * | url, |
| int * | nDestinations, | ||
| char * | destAddresses[], | ||
| int | destPorts[], | ||
| char * | destPasswds[], | ||
| ConfURLs & | confURLs | ||
| ) | throw (runtime_error) [protected] |
Gets a configuration file from a web location and adds the destination addresses and ports to the lists given as parameters.
Definition at line 267 of file ApMon.cpp.
{
char temp_filename[300];
FILE *tmp_file;
char *line, *ret, *tmp = NULL;
bool modifLineFound;
long mypid = getpid();
char str1[20], str2[20];
int totalSize, headerSize, contentSize;
#ifndef WIN32
sprintf(temp_filename, "/tmp/apmon_webconf%ld", mypid);
#else
char *tmpp = getenv("TEMP");
if(tmpp == NULL)
tmpp = getenv("TMP");
if(tmpp == NULL)
tmpp = "c:";
sprintf(temp_filename, "%s\\apmon_webconf%ld", tmpp, mypid);
#endif
/* get the configuration file from web and put it in a temporary file */
totalSize = httpRequest(url, (char*)"GET", temp_filename);
/* read the configuration from the temporary file */
tmp_file = fopen(temp_filename, "rt");
if (tmp_file == NULL)
throw runtime_error("[ getDestFromWeb() ] Error getting the configuration web page");
line = (char*)malloc((MAX_STRING_LEN + 1) * sizeof(char));
//check the HTTP header to see if we got the page correctly
fgets(line, MAX_STRING_LEN, tmp_file);
sscanf(line, "%s %s", str1, str2);
if (atoi(str2) != 200) {
free(line);
fclose(tmp_file);
throw runtime_error("[ getDestFromWeb() ] The web page does not exist on the server");
}
confURLs.vURLs[confURLs.nConfURLs] = strdup(url);
// check the header for the "Last-Modified" and "Content-Length" lines
modifLineFound = false;
contentSize = 0;
do {
if (tmp != NULL)
free(tmp);
ret = fgets(line, MAX_STRING_LEN, tmp_file);
if (ret == NULL) {
free(line); fclose(tmp_file);
throw runtime_error("[ getDestFromWeb() ] Error getting the configuration web page");
}
if (strstr(line, "Last-Modified") == line) {
modifLineFound = true;
confURLs.lastModifURLs[confURLs.nConfURLs] = strdup(line);
}
if (strstr(line, "Content-Length") == line) {
sscanf(line, "%s %d", str1, &contentSize);
}
tmp = trimString(line);
} while (strlen(tmp) != 0);
free(tmp); free(line);
if (!modifLineFound)
confURLs.lastModifURLs[confURLs.nConfURLs] = strdup("");
confURLs.nConfURLs++;
headerSize = ftell(tmp_file);
if (totalSize - headerSize < contentSize) {
fclose(tmp_file);
throw runtime_error("[ getDestFromWeb() ] Web page received incompletely");
}
try {
parseConf(tmp_file, nDestinations, destAddresses, destPorts,
destPasswds);
} catch (...) {
fclose(tmp_file);
unlink(temp_filename);
throw;
}
fclose(tmp_file);
unlink(temp_filename);
}
| bool ApMon::getGenMonitoring | ( | ) | [inline] |
| bool ApMon::getJobMonitoring | ( | ) | [inline] |
| long ApMon::getJobMonitorInterval | ( | ) | [inline] |
Returns the interval at which job monitoring datagrams are sent.
If the job monitoring is disabled, returns -1.
Definition at line 685 of file ApMon.h.
{
long i = -1;
pthread_mutex_lock(&mutexBack);
if (jobMonitoring)
i = jobMonitorInterval;
pthread_mutex_unlock(&mutexBack);
return i;
}
| long ApMon::getRecheckInterval | ( | ) | [inline] |
Returns the value of the time interval (in seconds) between two recheck operations for the configuration files.
If error(s) appear when reloading the configuration, the actual interval will be increased (transparently for the user).
Definition at line 639 of file ApMon.h.
{ return recheckInterval; }
| bool ApMon::getSysMonitoring | ( | ) | [inline] |
| long ApMon::getSysMonitorInterval | ( | ) | [inline] |
Returns the interval at which system monitoring datagrams are sent.
If the job monitoring is disabled, returns -1.
Definition at line 721 of file ApMon.h.
{
long i = -1;
pthread_mutex_lock(&mutexBack);
if (sysMonitoring)
i = sysMonitorInterval;
pthread_mutex_unlock(&mutexBack);
return i;
}
| void ApMon::initialize | ( | int | nDestinations, |
| char ** | destList, | ||
| bool | firstTime | ||
| ) | throw (runtime_error) [protected] |
Initializes an ApMon object from a list with URLs and destination hosts.
| nDestinations | The number of elements in destList. |
| destList | The list with URLs. |
| firstTime | If it is true, all the initializations will be done (the object is being constructed now). Else, only some structures will be reinitialized. |
Definition at line 173 of file ApMon.cpp.
{
char *destAddresses[MAX_N_DESTINATIONS];
int destPorts[MAX_N_DESTINATIONS];
char *destPasswds[MAX_N_DESTINATIONS];
char errmsg[200];
int i;
int cnt = 0;
ConfURLs confURLs;
logger(INFO, "Initializing destination addresses & ports:");
if (nDestinations > MAX_N_DESTINATIONS)
throw runtime_error("[ initialize() ] Maximum number of destinations exceeded");
confURLs.nConfURLs = 0;
for (i = 0; i < nDestinations; i++) {
try {
if (strstr(destinationsList[i], "http") == destinationsList[i])
getDestFromWeb(destinationsList[i], &cnt,
destAddresses, destPorts, destPasswds, confURLs);
else
addToDestinations(destinationsList[i], &cnt,
destAddresses, destPorts, destPasswds);
} catch (runtime_error &e) {
sprintf(errmsg, "[ initialize() ] Error while loading the configuration: %s", e.what());
logger(WARNING, errmsg);
if (!firstTime) {
for (i = 0; i < cnt; i++) {
free(destAddresses[i]);
free(destPasswds[i]);
}
logger(WARNING, "Configuration not reloaded successfully. Keeping the previous one.");
return;
}
} // catch
} // for
try {
arrayInit(cnt, destAddresses, destPorts, destPasswds, firstTime);
} catch (runtime_error& err) {
if (firstTime)
throw err;
else {
logger(WARNING, "Error reloading the configuration. Keeping the previous one.");
return;
}
}
for (i = 0; i < cnt; i++) {
free(destAddresses[i]);
free(destPasswds[i]);
}
pthread_mutex_lock(&mutex);
this -> confURLs = confURLs;
pthread_mutex_unlock(&mutex);
}
| void ApMon::initialize | ( | char * | filename, |
| bool | firstTime | ||
| ) | throw (runtime_error) [protected] |
Initializes an ApMon object from a configuration file.
| filename | The name of the file which contains the addresses and the ports of the destination hosts (see README for details about the structure of this file). |
| firstTime | If it is true, all the initializations will be done (the object is being constructed now). Else, only some structures will be reinitialized. |
Definition at line 85 of file ApMon.cpp.
{
char *destAddresses[MAX_N_DESTINATIONS];
int destPorts[MAX_N_DESTINATIONS];
char *destPasswds[MAX_N_DESTINATIONS];
int nDest = 0, i;
ConfURLs confURLs;
confURLs.nConfURLs = 0;
try {
loadFile(filename, &nDest, destAddresses, destPorts, destPasswds);
arrayInit(nDest, destAddresses, destPorts, destPasswds, firstTime);
} catch (runtime_error& err) {
if (firstTime)
throw err;
else {
logger(WARNING, err.what());
logger(WARNING, "Error reloading the configuration. Keeping the previous one.");
return;
}
}
for (i = 0; i < nDest; i++) {
free(destAddresses[i]);
free(destPasswds[i]);
}
pthread_mutex_lock(&mutex);
this -> confURLs = confURLs;
pthread_mutex_unlock(&mutex);
}
| void ApMon::initMonitoring | ( | ) | [protected] |
Initializes the monitoring configurations and the names of the parameters included in the monitoring datagrams.
Definition at line 682 of file monitor_utils.cpp.
{
int i;
this -> autoDisableMonitoring = true;
this -> sysMonitoring = false;
this -> jobMonitoring = false;
this -> genMonitoring = false;
this -> confCheck = false;
#ifndef WIN32
pthread_mutex_init(&this -> mutex, NULL);
pthread_mutex_init(&this -> mutexBack, NULL);
pthread_mutex_init(&this -> mutexCond, NULL);
pthread_cond_init(&this -> confChangedCond, NULL);
#else
logger(INFO, "init mutexes...");
this -> mutex = CreateMutex(NULL, FALSE, NULL);
this -> mutexBack = CreateMutex(NULL, FALSE, NULL);
this -> mutexCond = CreateMutex(NULL, FALSE, NULL);
this -> confChangedCond = CreateEvent(NULL, FALSE, FALSE, NULL);
// Initialize the Windows Sockets library
WORD wVersionRequested;
WSADATA wsaData;
int err;
wVersionRequested = MAKEWORD( 2, 0 );
err = WSAStartup( wVersionRequested, &wsaData );
if ( err != 0 ) {
logger(FATAL, "Could not initialize the Windows Sockets library (WS2_32.dll)");
}
#endif
this -> haveBkThread = false;
this -> bkThreadStarted = false;
this -> stopBkThread = false;
this -> recheckChanged = false;
this -> jobMonChanged = false;
this -> sysMonChanged = false;
this -> recheckInterval = RECHECK_INTERVAL;
this -> crtRecheckInterval = RECHECK_INTERVAL;
this -> jobMonitorInterval = JOB_MONITOR_INTERVAL;
this -> sysMonitorInterval = SYS_MONITOR_INTERVAL;
this -> nSysMonitorParams = initSysParams(this -> sysMonitorParams);
this -> nGenMonitorParams = initGenParams(this -> genMonitorParams);
this -> nJobMonitorParams = initJobParams(this -> jobMonitorParams);
initSocketStatesMapTCP(this -> socketStatesMapTCP);
this -> sysInfo_first = true;
try {
this -> lastSysInfoSend = ProcUtils::getBootTime();
} catch (procutils_error& perr) {
logger(WARNING, perr.what());
logger(WARNING, "The first system monitoring values may be inaccurate");
this -> lastSysInfoSend = 0;
}
for (i = 0; i < nSysMonitorParams; i++)
this -> lastSysVals[i] = 0;
//this -> lastUsrTime = this -> lastSysTime = 0;
//this -> lastNiceTime = this -> lastIdleTime = 0;
for (i = 0; i < nSysMonitorParams; i++) {
actSysMonitorParams[i] = 1;
sysRetResults[i] = RET_SUCCESS;
}
for (i = 0; i < nGenMonitorParams; i++) {
actGenMonitorParams[i] = 1;
genRetResults[i] = RET_SUCCESS;
}
for (i = 0; i < nJobMonitorParams; i++) {
actJobMonitorParams[i] = 1;
jobRetResults[i] = RET_SUCCESS;
}
this -> maxMsgRate = MAX_MSG_RATE;
}
| void ApMon::initSocket | ( | ) | throw (runtime_error) [protected] |
Initializes the UDP socket used to send the datagrams.
Definition at line 1366 of file ApMon.cpp.
{
int optval1 = 1;
struct timeval optval2;
int ret1, ret2, ret3;
sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if (sockfd < 0)
throw runtime_error("[ initSocket() ] Error creating socket");
ret1 = setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, (char *) &optval1,
sizeof(optval1));
/* set connection timeout */
optval2.tv_sec = 20;
optval2.tv_usec = 0;
ret2 = setsockopt(sockfd, SOL_SOCKET, SO_SNDTIMEO, (char *) &optval2,
sizeof(optval2));
ret3 = setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, (char *) &optval2,
sizeof(optval2));
if (ret1 != 0 || ret2 != 0 || ret3 != 0)
throw runtime_error("[ initSocket() ] Error initializing socket.");
}
| void ApMon::loadFile | ( | char * | filename, |
| int * | nDestinations, | ||
| char ** | destAddresses, | ||
| int * | destPorts, | ||
| char ** | destPasswds | ||
| ) | throw (runtime_error) [protected] |
Parses a configuration file which contains addresses, ports and passwords for the destination hosts and puts the results in the vectors given as parameters.
| filename | The name of the configuration file. |
| nDestinations | Output parameter, will contain the number of destination hosts. |
| destAddresses | Will contain the destination addresses. |
| destPorts | Will contain the ports from the destination hosts. |
| destPasswds | Will contain the passwords for the destination hosts. |
Definition at line 120 of file ApMon.cpp.
{
FILE *f;
char msg[100];
/* initializations for the destination addresses */
f = fopen(filename, "rt");
if (f == NULL) {
throw runtime_error("[ loadFile() ] Error opening configuration file");
}
sprintf(msg, "Loading file %s ...", filename);
logger(INFO, msg);
lastModifFile = time(NULL);
parseConf(f, nDestinations, destAddresses, destPorts,
destPasswds);
fclose(f);
}
| void ApMon::parseConf | ( | FILE * | fp, |
| int * | nDestinations, | ||
| char ** | destAddresses, | ||
| int * | destPorts, | ||
| char ** | destPasswds | ||
| ) | throw (runtime_error) [protected] |
Parses the contents of a configuration file.
The destination addresses and ports are stored in the arrays given as parameters.
Definition at line 1389 of file ApMon.cpp.
{
int i, ch;
char *line = (char *)malloc ((MAX_STRING_LEN1) * sizeof(char));
char *tmp = NULL;
char *loglevel_s;
// char sbuf[30];
// char *pbuf = sbuf;
/* parse the input file */
while(fgets(line, MAX_STRING_LEN, fp) != NULL) {
if (tmp != NULL) {
free(tmp);
tmp = NULL;
}
line[MAX_STRING_LEN - 1] = 0;
/* check if the line was too long */
ch = fgetc(fp); // see if we are at the end of the file
ungetc(ch, fp);
if (line[strlen(line) - 1] != 10 && ch != EOF) {
/* if the line doesn't end with a \n and we are not at the end
of file, the line from the file was longer than MAX_STRING_LEN */
fclose(fp);
throw runtime_error ("[ parseConf() ] Maximum line length exceeded in the conf file");
}
tmp = trimString(line);
/* skip the blank lines and the comment lines */
if (strlen(tmp) == 0 || strchr(tmp, '#') == tmp)
continue;
if (strstr(tmp, "xApMon_loglevel") == tmp) {
char *tmp2 = tmp;
strtok/*_r*/(tmp2, "= ");//, &pbuf);
loglevel_s = strtok/*_r*/(NULL, "= ");//, &pbuf);
setLogLevel(loglevel_s);
continue;
}
if (strstr(tmp, "xApMon_") == tmp) {
parseXApMonLine(tmp);
continue;
}
if (*nDestinations >= MAX_N_DESTINATIONS) {
free(line); free(tmp);
for (i = 0; i < *nDestinations; i++) {
free(destAddresses[i]);
free(destPasswds[i]);
}
fclose(fp);
throw runtime_error("[ parseConf() ] Maximum number of destinations exceeded.");
}
addToDestinations(tmp, nDestinations, destAddresses, destPorts,
destPasswds);
}
if (tmp != NULL)
free(tmp);
free(line);
}
| void ApMon::parseXApMonLine | ( | char * | line ) | [protected] |
Parses an xApMon line from the configuration file and sets the corresponding parameters in the ApMon object.
Definition at line 771 of file monitor_utils.cpp.
{
bool flag, found;
int ind;
char tmp[MAX_STRING_LEN], logmsg[200];
char *param, *value;
// char sbuf[MAX_STRING_LEN];
// char *pbuf = sbuf;
const char *sep = " =";
strcpy(tmp, line);
char *tmp2 = tmp + strlen("xApMon_");
param = strtok/*_r*/(tmp2, sep);//, &pbuf);
value = strtok/*_r*/(NULL, sep);//, &pbuf);
/* if it is an on/off parameter, assign its value to flag */
if (strcmp(value, "on") == 0)
flag = true;
else /* if it is not an on/off paramenter the value of flag doesn't matter */
flag = false;
pthread_mutex_lock(&mutexBack);
found = false;
if (strcmp(param, "job_monitoring") == 0) {
this -> jobMonitoring = flag; found = true;
}
if (strcmp(param, "sys_monitoring") == 0) {
this -> sysMonitoring = flag; found = true;
}
if (strcmp(param, "job_interval") == 0) {
this -> jobMonitorInterval = atol(value); found = true;
}
if (strcmp(param, "sys_interval") == 0) {
this -> sysMonitorInterval = atol(value); found = true;
}
if (strcmp(param, "general_info") == 0) {
this -> genMonitoring = flag; found = true;
}
if (strcmp(param, "conf_recheck") == 0) {
this -> confCheck = flag; found = true;
}
if (strcmp(param, "recheck_interval") == 0) {
this -> recheckInterval = this -> crtRecheckInterval = atol(value);
found = true;
}
if (strcmp(param, "auto_disable") == 0) {
this -> autoDisableMonitoring = flag;
found = true;
}
if (strcmp(param, "maxMsgRate") == 0) {
this -> maxMsgRate = atoi(value);
found = true;
}
if (found) {
pthread_mutex_unlock(&mutexBack);
return;
}
if (strstr(param, "sys_") == param) {
ind = getVectIndex(param + strlen("sys_"), sysMonitorParams,
nSysMonitorParams);
if (ind < 0) {
pthread_mutex_unlock(&mutexBack);
sprintf(logmsg, "Invalid parameter name in the configuration file: %s",
param);
logger(WARNING, logmsg);
return;
}
found = true;
this -> actSysMonitorParams[ind] = (int)flag;
}
if (strstr(param, "job_") == param) {
ind = getVectIndex(param + strlen("job_"), jobMonitorParams,
nJobMonitorParams);
if (ind < 0) {
pthread_mutex_unlock(&mutexBack);
sprintf(logmsg, "Invalid parameter name in the configuration file: %s",
param);
logger(WARNING, logmsg);
return;
}
found = true;
this -> actJobMonitorParams[ind] = (int)flag;
}
if (!found) {
ind = getVectIndex(param, genMonitorParams,
nGenMonitorParams);
if (ind < 0) {
pthread_mutex_unlock(&mutexBack);
sprintf(logmsg, "Invalid parameter name in the configuration file: %s",
param);
logger(WARNING, logmsg);
return;
} else {
found = true;
this -> actGenMonitorParams[ind] = (int)flag;
}
}
if (!found) {
sprintf(logmsg, "Invalid parameter name in the configuration file: %s",
param);
logger(WARNING, logmsg);
}
pthread_mutex_unlock(&mutexBack);
}
| void ApMon::removeJobToMonitor | ( | long | pid ) | throw (runtime_error) |
Removes a job from the list of the jobs monitored by ApMon.
| pid | The pid of the job to be removed. |
Definition at line 1315 of file ApMon.cpp.
{
int i, j;
char msg[100];
if (nMonJobs <= 0)
throw runtime_error("[ removeJobToMonitor() ] There are no monitored jobs.");
for (i = 0; i < nMonJobs; i++) {
if (monJobs[i].pid == pid) {
/* found the job, now remove it */
for (j = i; j < nMonJobs - 1; j++)
monJobs[j] = monJobs[j + 1];
nMonJobs--;
return;
}
}
/* the job was not found */
sprintf(msg, "removeJobToMonitor(): Job %ld not found.", pid);
throw runtime_error(msg);
}
| void ApMon::sendGeneralInfo | ( | ) | [protected] |
Sends datagrams with general system monitoring information to all the destination hosts.
Definition at line 582 of file monitor_utils.cpp.
{
#ifndef WIN32
int nParams, maxNParams, i;
char tmp_s[50];
char **paramNames, **paramValues;
int *valueTypes;
logger(INFO, "Sending general monitoring information...");
maxNParams = nGenMonitorParams + numIPs;
valueTypes = (int *)malloc(maxNParams * sizeof(int));
paramNames = (char **)malloc(maxNParams * sizeof(char *));
paramValues = (char **)malloc(maxNParams * sizeof(char *));
nParams = 0;
updateGeneralInfo();
if (actGenMonitorParams[GEN_HOSTNAME]) {
paramNames[nParams] = strdup(genMonitorParams[GEN_HOSTNAME]);
valueTypes[nParams] = XDR_STRING;
paramValues[nParams] = myHostname;
nParams++;
}
if (actGenMonitorParams[GEN_IP]) {
for (i = 0; i < this -> numIPs; i++) {
strcpy(tmp_s, "ip_");
strcat(tmp_s, interfaceNames[i]);
paramNames[nParams] = strdup(tmp_s);
valueTypes[nParams] = XDR_STRING;
paramValues[nParams] = this -> allMyIPs[i];
nParams++;
}
}
if (actGenMonitorParams[GEN_CPU_VENDOR_ID] && strlen(cpuVendor) != 0) {
paramNames[nParams] = strdup(genMonitorParams[GEN_CPU_VENDOR_ID]);
valueTypes[nParams] = XDR_STRING;
paramValues[nParams] = cpuVendor;
nParams++;
}
if (actGenMonitorParams[GEN_CPU_FAMILY] && strlen(cpuFamily) != 0) {
paramNames[nParams] = strdup(genMonitorParams[GEN_CPU_FAMILY]);
valueTypes[nParams] = XDR_STRING;
paramValues[nParams] = cpuFamily;
nParams++;
}
if (actGenMonitorParams[GEN_CPU_MODEL] && strlen(cpuModel) != 0) {
paramNames[nParams] = strdup(genMonitorParams[GEN_CPU_MODEL]);
valueTypes[nParams] = XDR_STRING;
paramValues[nParams] = cpuModel;
nParams++;
}
if (actGenMonitorParams[GEN_CPU_MODEL_NAME] && strlen(cpuModelName) != 0) {
paramNames[nParams] = strdup(genMonitorParams[GEN_CPU_MODEL_NAME]);
valueTypes[nParams] = XDR_STRING;
paramValues[nParams] = cpuModelName;
nParams++;
}
for (i = 0; i < nGenMonitorParams; i++) {
if (actGenMonitorParams[i] != 1 || i == GEN_IP || i == GEN_HOSTNAME ||
i == GEN_CPU_VENDOR_ID || i == GEN_CPU_FAMILY || i == GEN_CPU_MODEL
|| i == GEN_CPU_MODEL_NAME)
continue;
if (genRetResults[i] == PROCUTILS_ERROR) {
/* could not read the requested information from /proc, disable this
parameter */
if (autoDisableMonitoring)
actGenMonitorParams[i] = 0;
} else if (genRetResults[i] != RET_ERROR) {
paramNames[nParams] = strdup(genMonitorParams[i]);
paramValues[nParams] = (char *)¤tGenVals[i];
valueTypes[nParams] = XDR_REAL64;
nParams++;
}
}
try {
if (nParams > 0)
sendParameters(sysMonCluster, sysMonNode, nParams,
paramNames, valueTypes, paramValues);
} catch (runtime_error& err) {
logger(WARNING, err.what());
}
for (i = 0; i < nParams; i++)
free(paramNames[i]);
free(paramNames);
free(valueTypes);
free(paramValues);
#endif
}
| void ApMon::sendJobInfo | ( | ) | [protected] |
Sends datagrams containing information about the jobs that are currently being monitored.
Definition at line 48 of file monitor_utils.cpp.
{
#ifndef WIN32
int i;
long crtTime;
/* the apMon_free() function calls sendJobInfo() from another thread and
we need mutual exclusion */
pthread_mutex_lock(&mutexBack);
if (nMonJobs == 0) {
logger(WARNING, "There are no jobs to be monitored, not sending job monitoring information.");
pthread_mutex_unlock(&mutexBack);
return;
}
crtTime = time(NULL);
logger(INFO, "Sending job monitoring information...");
lastJobInfoSend = (time_t)crtTime;
/* send monitoring information for all the jobs specified by the user */
for (i = 0; i < nMonJobs; i++)
sendOneJobInfo(monJobs[i]);
pthread_mutex_unlock(&mutexBack);
#endif
}
| void ApMon::sendOneJobInfo | ( | MonitoredJob | job ) | [protected] |
Sends datagrams with monitoring information about the specified job to all the destination hosts.
Definition at line 152 of file monitor_utils.cpp.
{
int i;
int nParams = 0;
char **paramNames, **paramValues;
int *valueTypes;
valueTypes = (int *)malloc(nJobMonitorParams * sizeof(int));
paramNames = (char **)malloc(nJobMonitorParams * sizeof(char *));
paramValues = (char **)malloc(nJobMonitorParams * sizeof(char *));
for (i = 0; i < nJobMonitorParams; i++) {
jobRetResults[i] = RET_SUCCESS;
currentJobVals[i] = 0;
}
updateJobInfo(job);
for (i = 0; i < nJobMonitorParams; i++) {
if (actJobMonitorParams[i] && jobRetResults[i] != RET_ERROR) {
paramNames[nParams] = jobMonitorParams[i];
paramValues[nParams] = (char *)¤tJobVals[i];
valueTypes[nParams] = XDR_REAL64;
nParams++;
}
/* don't disable the parameter (maybe for another job it can be
obtained) */
/*
else
if (autoDisableMonitoring)
actJobMonitorParams[ind] = 0;
*/
}
if (nParams == 0) {
free(paramNames); free(valueTypes);
free(paramValues);
return;
}
try {
if (nParams > 0)
sendParameters(job.clusterName, job.nodeName, nParams,
paramNames, valueTypes, paramValues);
} catch (runtime_error& err) {
logger(WARNING, err.what());
}
free(paramNames);
free(valueTypes);
free(paramValues);
}
| int ApMon::sendParameter | ( | char * | clusterName, |
| char * | nodeName, | ||
| char * | paramName, | ||
| char * | paramValue | ||
| ) | throw (runtime_error) |
Sends a parameter of type string and its value to the MonALISA module.
| clusterName | The name of the cluster that is monitored. If it is NULL, we keep the same cluster and node name as in the previous datagram. |
| nodeName | The name of the node from the cluster from which the value was taken. |
| paramName | The name of the parameter. |
| paramValue | The value of the parameter. |
Definition at line 801 of file ApMon.cpp.
{
return sendParameter(clusterName, nodeName, paramName, XDR_STRING,
paramValue);
}
| int ApMon::sendParameter | ( | char * | clusterName, |
| char * | nodeName, | ||
| char * | paramName, | ||
| double | paramValue | ||
| ) | throw (runtime_error) |
Sends a parameter of type double and its value to the MonALISA module.
| clusterName | The name of the cluster that is monitored. If it is NULL,we keep the same cluster and node name as in the previous datagram. |
| nodeName | The name of the node from the cluster from which the value was taken. |
| paramName | The name of the parameter. |
| paramValue | The value of the parameter. |
Definition at line 794 of file ApMon.cpp.
{
return sendParameter(clusterName, nodeName, paramName, XDR_REAL64,
(char *)¶mValue);
}
| int ApMon::sendParameter | ( | char * | clusterName, |
| char * | nodeName, | ||
| char * | paramName, | ||
| float | paramValue | ||
| ) | throw (runtime_error) |
Sends a parameter of type float and its value to the MonALISA module.
| clusterName | The name of the cluster that is monitored. If it is NULL, we keep the same cluster and node name as in the previous datagram. |
| nodeName | The name of the node from the cluster from which the value was taken. |
| paramName | The name of the parameter. |
| paramValue | The value of the parameter. |
Definition at line 787 of file ApMon.cpp.
{
return sendParameter(clusterName, nodeName, paramName, XDR_REAL32,
(char *)¶mValue);
}
| int ApMon::sendParameter | ( | char * | clusterName, |
| char * | nodeName, | ||
| char * | paramName, | ||
| int | valueType, | ||
| char * | paramValue | ||
| ) | throw (runtime_error) |
Sends a parameter and its value to the MonALISA module.
| clusterName | The name of the cluster that is monitored. If it is NULL, we keep the same cluster and node name as in the previous datagram. |
| nodeName | The name of the node from the cluster from which the value was taken. |
| paramName | The name of the parameter. |
| valueType | The value type of the parameter. Can be one of the constants XDR_INT32 (integer), XDR_REAL32 (float), XDR_REAL64 (double), XDR_STRING (null-terminated string). |
| paramValue | Pointer to the value of the parameter. |
Definition at line 764 of file ApMon.cpp.
{
return sendParameters(clusterName, nodeName, 1, ¶mName,
&valueType, ¶mValue);
}
| int ApMon::sendParameter | ( | char * | clusterName, |
| char * | nodeName, | ||
| char * | paramName, | ||
| int | paramValue | ||
| ) | throw (runtime_error) |
Sends an integer parameter and its value to the MonALISA module.
| clusterName | The name of the cluster that is monitored. If it is NULL, we keep the same cluster and node name as in the previous datagram. |
| nodeName | The name of the node from the cluster from which the value was taken. |
| paramName | The name of the parameter. |
| paramValue | The value of the parameter. |
Definition at line 780 of file ApMon.cpp.
{
return sendParameter(clusterName, nodeName, paramName, XDR_INT32,
(char *)¶mValue);
}
| int ApMon::sendParameters | ( | char * | clusterName, |
| char * | nodeName, | ||
| int | nParams, | ||
| char ** | paramNames, | ||
| int * | valueTypes, | ||
| char ** | paramValues | ||
| ) | throw (runtime_error) |
Sends a parameter of type string and its value to the MonALISA module.
| clusterName | The name of the cluster that is monitored.If it is NULL, we keep the same cluster and node name as in the previous datagram. |
| nodeName | The name of the node from the cluster from which the value was taken. |
| paramName | The name of the parameter. |
| paramValue | The value of the parameter. |
Definition at line 621 of file ApMon.cpp.
{
return sendTimedParameters(clusterName, nodeName, nParams,
paramNames, valueTypes, paramValues, -1);
}
| void ApMon::sendSysInfo | ( | ) | [protected] |
Sends datagrams with system monitoring information to all the destination hosts.
Definition at line 371 of file monitor_utils.cpp.
{
#ifndef WIN32
int nParams = 0, maxNParams;
int i;
long crtTime;
int *valueTypes;
char **paramNames, **paramValues;
crtTime = time(NULL);
logger(INFO, "Sending system monitoring information...");
/* make some initializations only the first time this
function is called */
if (this -> sysInfo_first) {
for (i = 0; i < this -> nInterfaces; i++) {
this -> lastBytesSent[i] = this -> lastBytesReceived[i] = 0.0;
this -> lastNetErrs[i] = 0;
}
this -> sysInfo_first = FALSE;
}
/* the maximum number of parameters that can be included in a datagram */
/* (the last three terms are for: parameters corresponding to each possible
state of the processes, parameters corresponding to the types of open
sockets, parameters corresponding to each possible state of the TCP
sockets.) */
maxNParams = nSysMonitorParams + (2 * nInterfaces - 1) + 15 + 4 +
N_TCP_STATES;
valueTypes = (int *)malloc(maxNParams * sizeof(int));
paramNames = (char **)malloc(maxNParams * sizeof(char *));
paramValues = (char **)malloc(maxNParams * sizeof(char *));
for (i = 0; i < nSysMonitorParams; i++) {
if (actSysMonitorParams[i] > 0) /* if the parameter is enabled */
sysRetResults[i] = RET_SUCCESS;
else /* mark it with RET_ERROR so that it will be not included in the
datagram */
sysRetResults[i] = RET_ERROR;
}
updateSysInfo();
for (i = 0; i < nSysMonitorParams; i++) {
if (i == SYS_NET_IN || i == SYS_NET_OUT || i == SYS_NET_ERRS ||
i == SYS_NET_SOCKETS || i == SYS_NET_TCP_DETAILS || i == SYS_PROCESSES)
continue;
if (sysRetResults[i] == PROCUTILS_ERROR) {
/* could not read the requested information from /proc, disable this
parameter */
if (autoDisableMonitoring)
actSysMonitorParams[i] = 0;
} else if (sysRetResults[i] != RET_ERROR) {
/* the parameter is enabled and there were no errors obtaining it */
paramNames[nParams] = strdup(sysMonitorParams[i]);
paramValues[nParams] = (char *)¤tSysVals[i];
valueTypes[nParams] = XDR_REAL64;
nParams++;
}
}
if (actSysMonitorParams[SYS_NET_IN] == 1) {
if (sysRetResults[SYS_NET_IN] == PROCUTILS_ERROR) {
if (autoDisableMonitoring)
actSysMonitorParams[SYS_NET_IN] = 0;
} else if (sysRetResults[SYS_NET_IN] != RET_ERROR) {
for (i = 0; i < nInterfaces; i++) {
paramNames[nParams] = (char *)malloc(20 * sizeof(char));
strcpy(paramNames[nParams], interfaceNames[i]);
strcat(paramNames[nParams], "_in");
paramValues[nParams] = (char *)¤tNetIn[i];
valueTypes[nParams] = XDR_REAL64;
nParams++;
}
}
}
if (actSysMonitorParams[SYS_NET_OUT] == 1) {
if (sysRetResults[SYS_NET_IN] == PROCUTILS_ERROR) {
if (autoDisableMonitoring)
actSysMonitorParams[SYS_NET_OUT] = 0;
} else if (sysRetResults[SYS_NET_OUT] != RET_ERROR) {
for (i = 0; i < nInterfaces; i++) {
paramNames[nParams] = (char *)malloc(20 * sizeof(char));
strcpy(paramNames[nParams], interfaceNames[i]);
strcat(paramNames[nParams], "_out");
paramValues[nParams] = (char *)¤tNetOut[i];
valueTypes[nParams] = XDR_REAL64;
nParams++;
}
}
}
if (actSysMonitorParams[SYS_NET_ERRS] == 1) {
if (sysRetResults[SYS_NET_ERRS] == PROCUTILS_ERROR) {
if (autoDisableMonitoring)
actSysMonitorParams[SYS_NET_ERRS] = 0;
} else if (sysRetResults[SYS_NET_ERRS] != RET_ERROR) {
for (i = 0; i < nInterfaces; i++) {
paramNames[nParams] = (char *)malloc(20 * sizeof(char));
strcpy(paramNames[nParams], interfaceNames[i]);
strcat(paramNames[nParams], "_errs");
paramValues[nParams] = (char *)¤tNetErrs[i];
valueTypes[nParams] = XDR_REAL64;
nParams++;
}
}
}
if (actSysMonitorParams[SYS_PROCESSES] == 1) {
if (sysRetResults[SYS_PROCESSES] != RET_ERROR) {
char act_states[] = {'D', 'R', 'S', 'T', 'Z'};
for (i = 0; i < 5; i++) {
paramNames[nParams] = (char *)malloc(20 * sizeof(char));
sprintf(paramNames[nParams], "processes_%c", act_states[i]);
paramValues[nParams] = (char *)¤tProcessStates[act_states[i] - 65];
valueTypes[nParams] = XDR_REAL64;
nParams++;
}
}
}
if (actSysMonitorParams[SYS_NET_SOCKETS] == 1) {
if (sysRetResults[SYS_NET_SOCKETS] != RET_ERROR) {
const char *socket_types[] = {"tcp", "udp", "icm", "unix"};
for (i = 0; i < 4; i++) {
paramNames[nParams] = (char *)malloc(30 * sizeof(char));
sprintf(paramNames[nParams], "sockets_%s", socket_types[i]);
paramValues[nParams] = (char *)¤tNSockets[i];
valueTypes[nParams] = XDR_REAL64;
nParams++;
}
}
}
if (actSysMonitorParams[SYS_NET_TCP_DETAILS] == 1) {
if (sysRetResults[SYS_NET_TCP_DETAILS] != RET_ERROR) {
for (i = 0; i < N_TCP_STATES; i++) {
paramNames[nParams] = (char *)malloc(30 * sizeof(char));
sprintf(paramNames[nParams], "sockets_tcp_%s", socketStatesMapTCP[i]);
paramValues[nParams] = (char *)¤tSocketsTCP[i];
valueTypes[nParams] = XDR_REAL64;
nParams++;
}
}
}
try {
if (nParams > 0)
sendParameters(sysMonCluster, sysMonNode, nParams,
paramNames, valueTypes, paramValues);
} catch (runtime_error& err) {
logger(WARNING, err.what());
}
this -> lastSysInfoSend = crtTime;
if (sysRetResults[SYS_NET_IN] == RET_SUCCESS) {
free(currentNetIn);
free(currentNetOut);
free(currentNetErrs);
}
for (i = 0; i < nParams; i++)
free(paramNames[i]);
free(paramNames);
free(valueTypes);
free(paramValues);
#endif
}
| int ApMon::sendTimedParameter | ( | char * | clusterName, |
| char * | nodeName, | ||
| char * | paramName, | ||
| int | valueType, | ||
| char * | paramValue, | ||
| int | timestamp | ||
| ) | throw (runtime_error) |
Sends a parameter and its value to the MonALISA module, together with a timestamp.
| clusterName | The name of the cluster that is monitored. If it is NULL, we keep the same cluster and node name as in the previous datagram. |
| nodeName | The name of the node from the cluster from which the value was taken. |
| paramName | The name of the parameter. |
| valueType | The value type of the parameter. Can be one of the constants XDR_INT32 (integer), XDR_REAL32 (float), XDR_REAL64 (double), XDR_STRING (null-terminated string). |
| paramValue | Pointer to the value of the parameter. |
| timestamp | The associated timestamp (in seconds). |
Definition at line 772 of file ApMon.cpp.
{
return sendTimedParameters(clusterName, nodeName, 1, ¶mName,
&valueType, ¶mValue, timestamp);
}
| int ApMon::sendTimedParameters | ( | char * | clusterName, |
| char * | nodeName, | ||
| int | nParams, | ||
| char ** | paramNames, | ||
| int * | valueTypes, | ||
| char ** | paramValues, | ||
| int | timestamp | ||
| ) | throw (runtime_error) |
Sends a set of parameters and their values to the MonALISA module, together with a timestamp.
| clusterName | The name of the cluster that is monitored. If it is NULL, we keep the same cluster and node name as in the previous datagram. |
| nodeName | The name of the node from the cluster from which the value was taken. |
| nParams | The number of parameters to be sent. |
| paramNames | Array with the parameter names. |
| valueTypes | Array with the value types represented as integers. |
| paramValue | Array with the parameter values. |
| timestamp | The timestamp (in seconds) associated with the data. |
Definition at line 628 of file ApMon.cpp.
{
int i;
int ret, ret1, ret2;
char msg[100], buf2[MAX_HEADER_LENGTH+4], newBuf[MAX_DGRAM_SIZE];
#ifdef WIN32
char crtAddr[20];
#endif
char *headerTmp;
char header[MAX_HEADER_LENGTH] = "v:";
strcat(header, APMON_VERSION);
strcat(header, "_cpp"); // to indicate this is the C++ version
strcat(header, "p:");
pthread_mutex_lock(&mutex);
if(!shouldSend()) {
pthread_mutex_unlock(&mutex);
return RET_NOT_SENT;
}
if (clusterName != NULL) { // don't keep the cached values for cluster name
// and node name
free(this -> clusterName);
this -> clusterName = strdup(clusterName);
if (nodeName != NULL) { /* the user provided a name */
free(this -> nodeName);
this -> nodeName = strdup(nodeName);
}
else { /* set the node name to the node's IP */
free(this -> nodeName);
this -> nodeName = strdup(this -> myHostname);
} // else
} // if
if (this -> clusterName == NULL || this -> nodeName == NULL) {
pthread_mutex_unlock(&mutex);
throw runtime_error("[ sendTimedParameters() ] Null cluster name or node name");
}
//sortParams(nParams, paramNames, valueTypes, paramValues);
/* try to encode the parameters */
try {
encodeParams(nParams, paramNames, valueTypes, paramValues, timestamp);
} catch (runtime_error& err) {
pthread_mutex_unlock(&mutex);
throw err;
}
headerTmp = (char *)malloc(MAX_HEADER_LENGTH * sizeof(char));
/* for each destination */
for (i = 0; i < nDestinations; i++) {
XDR xdrs;
struct sockaddr_in destAddr;
/* initialize the destination address */
memset(&destAddr, 0, sizeof(destAddr));
destAddr.sin_family = AF_INET;
destAddr.sin_port = htons(destPorts[i]);
#ifndef WIN32
inet_pton(AF_INET, destAddresses[i], &destAddr.sin_addr);
#else
int dummy = sizeof(destAddr);
sprintf(crtAddr, "%s:%d", destAddresses[i], destPorts[i]);
ret = WSAStringToAddress(crtAddr, AF_INET, NULL, (struct sockaddr *) &destAddr, &dummy);
if(ret){
ret = WSAGetLastError();
sprintf(msg, "[ sendTimedParameters() ] Error packing address %s, code %d ", crtAddr, ret);
throw runtime_error(msg);
}
#endif
/* add the header (which is different for each destination) */
strcpy(headerTmp, header);
strcat(headerTmp, destPasswds[i]);
/* initialize the XDR stream to encode the header */
xdrmem_create(&xdrs, buf2, MAX_HEADER_LENGTH, XDR_ENCODE);
/* encode the header */
ret = xdr_string(&xdrs, &(headerTmp), strlen(headerTmp) + 1);
/* add the instance ID and the sequence number */
ret1 = xdr_int(&xdrs, &(instance_id));
ret2 = xdr_int(&xdrs, &(seq_nr));
if (!ret || !ret1 || !ret2) {
free(headerTmp);
pthread_mutex_unlock(&mutex);
throw runtime_error("[ sendTimedParameters() ] XDR encoding error for the header");
}
/* concatenate the header and the rest of the datagram */
int buf2Length = xdrSize(XDR_STRING, headerTmp) + 2 * xdrSize(XDR_INT32, NULL);
memcpy(newBuf, buf2, buf2Length);
memcpy(newBuf + buf2Length, buf, dgramSize);
/* send the buffer */
ret = sendto(sockfd, newBuf, dgramSize + buf2Length, 0,
(struct sockaddr *)&destAddr, sizeof(destAddr));
if (ret == RET_ERROR) {
free(headerTmp);
pthread_mutex_unlock(&mutex);
/*re-initialize the socket */
#ifndef WIN32
close(sockfd);
#else
closesocket(sockfd);
#endif
initSocket();
/* throw exception because the datagram was not sent */
sprintf(msg, "[ sendTimedParameters() ] Error sending data to destination %s ",
destAddresses[i]);
throw runtime_error(msg);
}
else {
sprintf(msg, "Datagram with size %d, instance id %d, sequence number %d, sent to %s, containing parameters:",
ret, instance_id, seq_nr, destAddresses[i]);
logger(FINE, msg);
logParameters(FINE, nParams, paramNames, valueTypes, paramValues);
}
xdr_destroy(&xdrs);
}
seq_nr = (seq_nr + 1) % TWO_BILLION;
free(headerTmp);
pthread_mutex_unlock(&mutex);
return RET_SUCCESS;
}
| void ApMon::setBackgroundThread | ( | bool | val ) | [protected] |
Sets the value of the confCheck flag.
If it is true, the configuration file and/or the URLs will be periodically checked for modifications. By default it is false.
Definition at line 1250 of file ApMon.cpp.
{
// mutexBack is locked
if (val == true) {
if (!haveBkThread) {
#ifndef WIN32
pthread_create(&bkThread, NULL, &bkTask, this);
#else
DWORD dummy;
bkThread = CreateThread(NULL, 65536, &bkTask, this, 0, &dummy);
#endif
haveBkThread = true;
} else {
pthread_mutex_lock(&mutexCond);
pthread_cond_signal(&confChangedCond);
pthread_mutex_unlock(&mutexCond);
}
}
if (val == false) {
//if (bkThreadStarted) {
if (haveBkThread) {
stopBkThread = true;
pthread_mutex_unlock(&mutexBack);
#ifndef WIN32
pthread_mutex_lock(&mutexCond);
#endif
pthread_cond_signal(&confChangedCond);
logger(INFO, "[Stopping the background thread...]");
#ifndef WIN32
pthread_mutex_unlock(&mutexCond);
pthread_join(bkThread, NULL);
#else
WaitForSingleObject(bkThread, INFINITE);
#endif
pthread_mutex_lock(&mutexBack);
// logger(INFO, "bk thread stopped!");
haveBkThread = false;
bkThreadStarted = false;
stopBkThread = false;
}
}
}
| void ApMon::setConfRecheck | ( | bool | confRecheck ) | [inline] |
Enables/disables the periodical check for changes in the configuration files/URLs.
If enabled, the verifications will be done at the default time interval.
Definition at line 663 of file ApMon.h.
{
setConfRecheck(confRecheck, RECHECK_INTERVAL);
}
| void ApMon::setConfRecheck | ( | bool | confRecheck, |
| long | interval | ||
| ) |
Enables/disables the periodical check for changes in the configuration files/URLs.
| confRecheck | If it is true, the periodical checking is enabled. |
| interval | The time interval at which the verifications are done. If it is negative, a default value will be used. |
Definition at line 1126 of file ApMon.cpp.
{
char logmsg[100];
if (confCheck) {
sprintf(logmsg, "Enabling configuration reloading (interval %ld)",
interval);
logger(INFO, logmsg);
}
pthread_mutex_lock(&mutexBack);
if (initType == DIRECT_INIT) { // no need to reload the configuration
logger(WARNING, "[ setConfRecheck() } No configuration file/URL to reload.");
return;
}
this -> confCheck = confCheck;
this -> recheckChanged = true;
if (confCheck) {
if (interval > 0) {
this -> recheckInterval = interval;
this -> crtRecheckInterval = interval;
} else {
this -> recheckInterval = RECHECK_INTERVAL;
this -> crtRecheckInterval = RECHECK_INTERVAL;
}
setBackgroundThread(true);
}
else {
if (jobMonitoring == false && sysMonitoring == false)
setBackgroundThread(false);
}
pthread_mutex_unlock(&mutexBack);
}
| void ApMon::setCrtRecheckInterval | ( | long | val ) | [protected] |
Definition at line 1169 of file ApMon.cpp.
{
pthread_mutex_lock(&mutexBack);
crtRecheckInterval = val;
pthread_mutex_unlock(&mutexBack);
}
| void ApMon::setGenMonitoring | ( | bool | genMonitoring ) | [inline] |
Enables/disables the sending of datagrams with general system information.
A default value is used for the number of time intervals at which the datagrams are sent.
| void ApMon::setGenMonitoring | ( | bool | genMonitoring, |
| int | nIntervals | ||
| ) |
Enables/disables the periodical sending of datagrams with general system information.
| genMonitoring | If it is true, enables the sending of the datagrams. |
| interval | The number of time intervals at which the datagrams are sent (considering the interval for sending system monitoring information). If it is negative, a default value will be used. |
Definition at line 1225 of file ApMon.cpp.
{
char logmsg[100];
sprintf(logmsg, "Setting general information monitoring to %s ",
boolStrings[(int)genMonitoring]);
logger(INFO, logmsg);
pthread_mutex_lock(&mutexBack);
this -> genMonitoring = genMonitoring;
this -> sysMonChanged = true;
if (genMonitoring == true) {
if (nIntervals > 0)
this -> genMonitorIntervals = nIntervals;
else
this -> genMonitorIntervals = GEN_MONITOR_INTERVALS;
if (this -> sysMonitoring == false) {
pthread_mutex_unlock(&mutexBack);
setSysMonitoring(true);
pthread_mutex_lock(&mutexBack);
}
} // TODO: else check if we can stop the background thread (if no
// system parameters are enabled for monitoring)
pthread_mutex_unlock(&mutexBack);
}
| void ApMon::setJobMonitoring | ( | bool | jobMonitoring ) | [inline] |
Enables/disables the job monitoring.
If the job monitoring is enabled, the datagrams will be sent at the default time interval.
| void ApMon::setJobMonitoring | ( | bool | jobMonitoring, |
| long | interval | ||
| ) |
Enables/disables the periodical sending of datagrams with job monitoring information.
| jobMonitoring | If it is true, the job monitoring is enabled |
| interval | The time interval at which the datagrams are sent. If it is negative, a default value will be used. |
Definition at line 1175 of file ApMon.cpp.
{
char logmsg[100];
if (jobMonitoring) {
sprintf(logmsg, "Enabling job monitoring, time interval %ld s... ", interval);
logger(INFO, logmsg);
} else
logger(INFO, "Disabling job monitoring...");
pthread_mutex_lock(&mutexBack);
this -> jobMonitoring = jobMonitoring;
this -> jobMonChanged = true;
if (jobMonitoring == true) {
if (interval > 0)
this -> jobMonitorInterval = interval;
else
this -> jobMonitorInterval = JOB_MONITOR_INTERVAL;
setBackgroundThread(true);
} else {
// disable the background thread if it is not needed anymore
if (this -> sysMonitoring == false && this -> confCheck == false)
setBackgroundThread(false);
}
pthread_mutex_unlock(&mutexBack);
}
| void ApMon::setLogLevel | ( | char * | newLevel_s ) | [static] |
Sets the ApMon logging level.
Possible values are 0 (FATAL), 1 (WARNING), 2 (INFO), 3 (FINE), 4 (DEBUG);
Definition at line 1343 of file ApMon.cpp.
{
int newLevel;
const char *levels[5] = {"FATAL", "WARNING", "INFO", "FINE", "DEBUG"};
char logmsg[100];
for (newLevel = 0; newLevel < 5; newLevel++)
if (strcmp(newLevel_s, levels[newLevel]) == 0)
break;
if (newLevel >= 5) {
sprintf(logmsg, "[ setLogLevel() ] Invalid level value: %s", newLevel_s);
logger(WARNING, logmsg);
}
else
logger(0, NULL, newLevel);
}
| void ApMon::setMaxMsgRate | ( | int | maxRate ) |
This sets the maxim number of messages that are send to MonALISA in one second.
Default, this number is 50.
Definition at line 1361 of file ApMon.cpp.
{
if (maxRate > 0)
this -> maxMsgRate = maxRate;
}
| void ApMon::setRecheckInterval | ( | long | val ) |
Sets the value of the time interval (in seconds) between two recheck operations for the configuration files.
The default value is 5min. If the value is negative, the configuration rechecking is turned off. If error(s) appear when reloading the configuration, the actual interval will be increased (transparently for the user).
Definition at line 1160 of file ApMon.cpp.
{
if (val > 0) {
setConfRecheck(true, val);
}
else {
setConfRecheck(false, val);
}
}
| void ApMon::setSysMonClusterNode | ( | char * | clusterName, |
| char * | nodeName | ||
| ) |
This function is called by the user to set the cluster name and the node name for the system monitoring datagrams.
Definition at line 1337 of file ApMon.cpp.
{
free (sysMonCluster); free(sysMonNode);
sysMonCluster = strdup(clusterName);
sysMonNode = strdup(nodeName);
}
| void ApMon::setSysMonitoring | ( | bool | sysMonitoring, |
| long | interval | ||
| ) |
Enables/disables the periodical sending of datagrams with system monitoring information.
| sysMonitoring | If it is true, the system monitoring is enabled |
| interval | The time interval at which the datagrams are sent. If it is negative, a default value will be used. |
Definition at line 1200 of file ApMon.cpp.
{
char logmsg[100];
if (sysMonitoring) {
sprintf(logmsg, "Enabling system monitoring, time interval %ld s... ", interval);
logger(INFO, logmsg);
} else
logger(INFO, "Disabling system monitoring...");
pthread_mutex_lock(&mutexBack);
this -> sysMonitoring = sysMonitoring;
this -> sysMonChanged = true;
if (sysMonitoring == true) {
if (interval > 0)
this -> sysMonitorInterval = interval;
else
this -> sysMonitorInterval = SYS_MONITOR_INTERVAL;
setBackgroundThread(true);
} else {
// disable the background thread if it is not needed anymore
if (this -> jobMonitoring == false && this -> confCheck == false)
setBackgroundThread(false);
}
pthread_mutex_unlock(&mutexBack);
}
| void ApMon::setSysMonitoring | ( | bool | sysMonitoring ) | [inline] |
Enables/disables the system monitoring.
If the system monitoring is enabled, the datagrams will be sent at the default time interval.
| bool ApMon::shouldSend | ( | ) | [protected] |
Decides if the current datagram should be sent (so that the maximum number of datagrams per second is respected in average).
This decision is based on the number of messages previously sent.
new time, update previous counters;
reset current counter
compute the history
when we should start dropping messages
counting sent and dropped messages
Definition at line 1456 of file ApMon.cpp.
{
long now = time(NULL);
bool doSend;
char msg[200];
//printf("now %ld crtTime %ld\n", now, crtTime);
if (now != crtTime){
prvSent = hWeight * prvSent + (1.0 - hWeight) * crtSent / (now - crtTime);
prvTime = crtTime;
sprintf(msg, "previously sent: %ld dropped: %ld", crtSent, crtDrop);
logger(DEBUG, msg);
crtTime = now;
crtSent = 0;
crtDrop = 0;
//printf("\n");
}
int valSent = (int)(prvSent * hWeight + crtSent * (1.0 - hWeight));
doSend = true;
int level = this -> maxMsgRate - this -> maxMsgRate / 10;
if (valSent > (this -> maxMsgRate - level)) {
//int max10 = this -> maxMsgRate / 10;
int rnd = rand() % (this -> maxMsgRate / 10);
doSend = (rnd < (this -> maxMsgRate - valSent));
}
if (doSend) {
crtSent++;
//printf("#");
} else {
crtDrop++;
//printf(".");
}
return doSend;
}
| void ApMon::updateGeneralInfo | ( | ) | [protected] |
Update the general monitoring information.
Definition at line 546 of file monitor_utils.cpp.
{
strcpy(cpuVendor, ""); strcpy(cpuFamily, "");
strcpy(cpuModel, ""); strcpy(cpuModelName, "");
if (actGenMonitorParams[GEN_CPU_MHZ] == 1 ||
actGenMonitorParams[GEN_BOGOMIPS] == 1 ||
actGenMonitorParams[GEN_CPU_VENDOR_ID] == 1 ||
actGenMonitorParams[GEN_CPU_FAMILY] == 1 ||
actGenMonitorParams[GEN_CPU_MODEL] == 1 ||
actGenMonitorParams[GEN_CPU_MODEL_NAME] == 1) {
try {
ProcUtils::getCPUInfo(*this);
} catch (procutils_error& err) {
logger(WARNING, err.what());
genRetResults[GEN_CPU_MHZ] = genRetResults[GEN_BOGOMIPS] = PROCUTILS_ERROR;
}
}
if (actGenMonitorParams[GEN_TOTAL_MEM] == 1 ||
actGenMonitorParams[GEN_TOTAL_SWAP] == 1) {
try {
ProcUtils::getSysMem(currentGenVals[GEN_TOTAL_MEM],
currentGenVals[GEN_TOTAL_SWAP]);
} catch (procutils_error& perr) {
logger(WARNING, perr.what());
genRetResults[GEN_TOTAL_MEM] = genRetResults[GEN_TOTAL_SWAP] = PROCUTILS_ERROR;
}
}
if (this -> numCPUs > 0)
currentGenVals[GEN_NO_CPUS] = this -> numCPUs;
else
genRetResults[GEN_NO_CPUS] = PROCUTILS_ERROR;
}
| void ApMon::updateJobInfo | ( | MonitoredJob | job ) | [protected] |
Update the monitoring information regarding the specified job.
Definition at line 75 of file monitor_utils.cpp.
{
bool needJobInfo, needDiskInfo;
bool jobExists = true;
char err_msg[200];
PsInfo jobInfo;
JobDirInfo dirInfo;
/**** runtime, CPU & memory usage information ****/
needJobInfo = actJobMonitorParams[JOB_RUN_TIME]
|| actJobMonitorParams[JOB_CPU_TIME]
|| actJobMonitorParams[JOB_CPU_USAGE]
|| actJobMonitorParams[JOB_MEM_USAGE]
|| actJobMonitorParams[JOB_VIRTUALMEM]
|| actJobMonitorParams[JOB_RSS]
|| actJobMonitorParams[JOB_OPEN_FILES];
if (needJobInfo) {
try {
readJobInfo(job.pid, jobInfo);
currentJobVals[JOB_RUN_TIME] = jobInfo.etime;
currentJobVals[JOB_CPU_TIME] = jobInfo.cputime;
currentJobVals[JOB_CPU_USAGE] = jobInfo.pcpu;
currentJobVals[JOB_MEM_USAGE] = jobInfo.pmem;
currentJobVals[JOB_VIRTUALMEM] = jobInfo.vsz;
currentJobVals[JOB_RSS] = jobInfo.rsz;
if (jobInfo.open_fd < 0)
jobRetResults[JOB_OPEN_FILES] = RET_ERROR;
currentJobVals[JOB_OPEN_FILES] = jobInfo.open_fd;
} catch (runtime_error &err) {
logger(WARNING, err.what());
jobRetResults[JOB_RUN_TIME] = jobRetResults[JOB_CPU_TIME] =
jobRetResults[JOB_CPU_USAGE] = jobRetResults[JOB_MEM_USAGE] =
jobRetResults[JOB_VIRTUALMEM] = jobRetResults[JOB_RSS] =
jobRetResults[JOB_OPEN_FILES] = RET_ERROR;
strcpy(err_msg, err.what());
if (strstr(err_msg, "does not exist") != NULL)
jobExists = false;
}
}
/* if the monitored job has terminated, remove it */
if (!jobExists) {
try {
removeJobToMonitor(job.pid);
} catch (runtime_error &err) {
logger(WARNING, err.what());
}
return;
}
/* disk usage information */
needDiskInfo = actJobMonitorParams[JOB_DISK_TOTAL]
|| actJobMonitorParams[JOB_DISK_USED]
|| actJobMonitorParams[JOB_DISK_FREE]
|| actJobMonitorParams[JOB_DISK_USAGE]
|| actJobMonitorParams[JOB_WORKDIR_SIZE];
if (needDiskInfo) {
try {
readJobDiskUsage(job, dirInfo);
currentJobVals[JOB_WORKDIR_SIZE] = dirInfo.workdir_size;
currentJobVals[JOB_DISK_TOTAL] = dirInfo.disk_total;
currentJobVals[JOB_DISK_USED] = dirInfo.disk_used;
currentJobVals[JOB_DISK_USAGE] = dirInfo.disk_usage;
currentJobVals[JOB_DISK_FREE] = dirInfo.disk_free;
} catch (runtime_error& err) {
logger(WARNING, err.what());
jobRetResults[JOB_WORKDIR_SIZE] = jobRetResults[JOB_DISK_TOTAL]
= jobRetResults[JOB_DISK_USED]
= jobRetResults[JOB_DISK_USAGE]
= jobRetResults[JOB_DISK_FREE]
= RET_ERROR;
}
}
}
| void ApMon::updateSysInfo | ( | ) | [protected] |
Update the system monitoring information with new values obtained from the proc/ filesystem.
Definition at line 207 of file monitor_utils.cpp.
{
int needCPUInfo, needSwapPagesInfo, needLoadInfo, needMemInfo,
needNetInfo, needUptime, needProcessesInfo, needNetstatInfo;
/**** CPU usage information ****/
needCPUInfo = actSysMonitorParams[SYS_CPU_USAGE]
|| actSysMonitorParams[SYS_CPU_USR]
|| actSysMonitorParams[SYS_CPU_SYS]
|| actSysMonitorParams[SYS_CPU_NICE]
|| actSysMonitorParams[SYS_CPU_IDLE];
if (needCPUInfo) {
try {
ProcUtils::getCPUUsage(*this, currentSysVals[SYS_CPU_USAGE],
currentSysVals[SYS_CPU_USR],
currentSysVals[SYS_CPU_SYS],
currentSysVals[SYS_CPU_NICE],
currentSysVals[SYS_CPU_IDLE], numCPUs);
} catch (procutils_error &perr) {
/* "permanent" error (the parameters could not be obtained) */
logger(WARNING, perr.what());
sysRetResults[SYS_CPU_USAGE] = sysRetResults[SYS_CPU_SYS] =
sysRetResults[SYS_CPU_USR] = sysRetResults[SYS_CPU_NICE] =
sysRetResults[SYS_CPU_IDLE] = sysRetResults[SYS_CPU_USAGE] = PROCUTILS_ERROR;
} catch (runtime_error &err) {
/* temporary error (next time we might be able to get the paramerers) */
logger(WARNING, err.what());
sysRetResults[SYS_CPU_USAGE] = sysRetResults[SYS_CPU_SYS]
= sysRetResults[SYS_CPU_USR]
= sysRetResults[SYS_CPU_NICE]
= sysRetResults[SYS_CPU_IDLE]
= sysRetResults[SYS_CPU_USAGE]
= RET_ERROR;
}
}
needSwapPagesInfo = actSysMonitorParams[SYS_PAGES_IN]
|| actSysMonitorParams[SYS_PAGES_OUT]
|| actSysMonitorParams[SYS_SWAP_IN]
|| actSysMonitorParams[SYS_SWAP_OUT];
if (needSwapPagesInfo) {
try {
ProcUtils::getSwapPages(*this, currentSysVals[SYS_PAGES_IN],
currentSysVals[SYS_PAGES_OUT],
currentSysVals[SYS_SWAP_IN],
currentSysVals[SYS_SWAP_OUT]);
} catch (procutils_error &perr) {
/* "permanent" error (the parameters could not be obtained) */
logger(WARNING, perr.what());
sysRetResults[SYS_PAGES_IN] = sysRetResults[SYS_PAGES_OUT] =
sysRetResults[SYS_SWAP_OUT] = sysRetResults[SYS_SWAP_IN] = PROCUTILS_ERROR;
} catch (runtime_error &err) {
/* temporary error (next time we might be able to get the paramerers) */
logger(WARNING, err.what());
sysRetResults[SYS_PAGES_IN] = sysRetResults[SYS_PAGES_OUT]
= sysRetResults[SYS_SWAP_IN]
= sysRetResults[SYS_SWAP_OUT]
= RET_ERROR;
}
}
needLoadInfo = actSysMonitorParams[SYS_LOAD1]
|| actSysMonitorParams[SYS_LOAD5]
|| actSysMonitorParams[SYS_LOAD15];
if (needLoadInfo) {
double dummyVal;
try {
/* the number of processes is now obtained with the getProcesses()
function, not with getLoad() */
ProcUtils::getLoad(currentSysVals[SYS_LOAD1], currentSysVals[SYS_LOAD5],
currentSysVals[SYS_LOAD15],dummyVal);
} catch (procutils_error& perr) {
/* "permanent" error (the parameters could not be obtained) */
logger(WARNING, perr.what());
sysRetResults[SYS_LOAD1] = sysRetResults[SYS_LOAD5]
= sysRetResults[SYS_LOAD15]
= PROCUTILS_ERROR;
}
}
/**** get statistics about the current processes ****/
needProcessesInfo = actSysMonitorParams[SYS_PROCESSES];
if (needProcessesInfo) {
try {
ProcUtils::getProcesses(currentSysVals[SYS_PROCESSES],
currentProcessStates);
} catch (runtime_error& err) {
logger(WARNING, err.what());
sysRetResults[SYS_PROCESSES] = RET_ERROR;
}
}
/**** get the amount of memory currently in use ****/
needMemInfo = actSysMonitorParams[SYS_MEM_USED]
|| actSysMonitorParams[SYS_MEM_FREE]
|| actSysMonitorParams[SYS_SWAP_USED]
|| actSysMonitorParams[SYS_SWAP_FREE]
|| actSysMonitorParams[SYS_MEM_USAGE]
|| actSysMonitorParams[SYS_SWAP_USAGE];
if (needMemInfo) {
try {
ProcUtils::getMemUsed(currentSysVals[SYS_MEM_USED],
currentSysVals[SYS_MEM_FREE],
currentSysVals[SYS_SWAP_USED],
currentSysVals[SYS_SWAP_FREE]);
currentSysVals[SYS_MEM_USAGE] = 100 * currentSysVals[SYS_MEM_USED] /
(currentSysVals[SYS_MEM_USED] + currentSysVals[SYS_MEM_FREE]);
currentSysVals[SYS_SWAP_USAGE] = 100 * currentSysVals[SYS_SWAP_USED] /
(currentSysVals[SYS_SWAP_USED] + currentSysVals[SYS_SWAP_FREE]);
} catch (procutils_error &perr) {
logger(WARNING, perr.what());
sysRetResults[SYS_MEM_USED] = sysRetResults[SYS_MEM_FREE] =
sysRetResults[SYS_SWAP_USED] = sysRetResults[SYS_SWAP_FREE] =
sysRetResults[SYS_MEM_USAGE] = sysRetResults[SYS_SWAP_USAGE] =
PROCUTILS_ERROR;
}
}
/**** network monitoring information ****/
needNetInfo = actSysMonitorParams[SYS_NET_IN] ||
actSysMonitorParams[SYS_NET_OUT] || actSysMonitorParams[SYS_NET_ERRS];
if (needNetInfo && this -> nInterfaces > 0) {
try {
ProcUtils::getNetInfo(*this, ¤tNetIn, ¤tNetOut,
¤tNetErrs);
} catch (procutils_error &perr) {
logger(WARNING, perr.what());
sysRetResults[SYS_NET_IN] = sysRetResults[SYS_NET_OUT] =
sysRetResults[SYS_NET_ERRS] = PROCUTILS_ERROR;
} catch (runtime_error &err) {
logger(WARNING, err.what());
sysRetResults[SYS_NET_IN] = sysRetResults[SYS_NET_OUT] =
sysRetResults[SYS_NET_ERRS] = RET_ERROR;
}
}
needNetstatInfo = actSysMonitorParams[SYS_NET_SOCKETS] ||
actSysMonitorParams[SYS_NET_TCP_DETAILS];
if (needNetstatInfo) {
try {
ProcUtils::getNetstatInfo(*this, this -> currentNSockets,
this -> currentSocketsTCP);
} catch (runtime_error &err) {
logger(WARNING, err.what());
sysRetResults[SYS_NET_SOCKETS] = sysRetResults[SYS_NET_TCP_DETAILS] =
RET_ERROR;
}
}
needUptime = actSysMonitorParams[SYS_UPTIME];
if (needUptime) {
try {
currentSysVals[SYS_UPTIME] = ProcUtils::getUpTime();
} catch (procutils_error &perr) {
logger(WARNING, perr.what());
sysRetResults[SYS_UPTIME] = PROCUTILS_ERROR;
}
}
}
| void* bkTask | ( | void * | param ) | [friend] |
This function is executed in a background thread and has two roles: it automatically sends the system/job monitoring parameters (if the user requested) and it checks the configuration file/URLs for changes.
(this is done in a separate thread).
Definition at line 918 of file ApMon.cpp.
{
#else
DWORD WINAPI bkTask(void *param) {
#endif
struct stat st;
#ifndef WIN32
struct timespec delay;
#else
DWORD delay;
#endif
bool resourceChanged, haveChange;
int nextOp = -1, i, ret;
int generalInfoCount;
time_t crtTime, timeRemained;
time_t nextRecheck = 0, nextJobInfoSend = 0, nextSysInfoSend = 0;
ApMon *apm = (ApMon *)param;
char logmsg[200];
logger(INFO, "[Starting background thread...]");
apm -> bkThreadStarted = true;
crtTime = time(NULL);
pthread_mutex_lock(&(apm -> mutexBack));
if (apm -> confCheck) {
nextRecheck = crtTime + apm -> crtRecheckInterval;
//sprintf(logmsg, "###1 crt %ld interv %ld recheck %ld ", crtTime,
// apm -> crtRecheckInterval, nextRecheck);
//logger(FINE, logmsg);
//fflush(stdout);
}
if (apm -> jobMonitoring)
nextJobInfoSend = crtTime + apm -> jobMonitorInterval;
if (apm -> sysMonitoring)
nextSysInfoSend = crtTime + apm -> sysMonitorInterval;
pthread_mutex_unlock(&(apm -> mutexBack));
timeRemained = -1;
generalInfoCount = 0;
while (1) {
pthread_mutex_lock(&apm -> mutexBack);
if (apm -> stopBkThread) {
// printf("### stopBkThread \n");
pthread_mutex_unlock(&apm -> mutexBack);
break;
}
pthread_mutex_unlock(&apm -> mutexBack);
//sprintf(logmsg, "### 2 recheck %ld sys %ld ", nextRecheck,
// nextSysInfoSend);
//logger(FINE, logmsg);
/* determine the next operation that must be performed */
if (nextRecheck > 0 && (nextJobInfoSend <= 0 ||
nextRecheck <= nextJobInfoSend)) {
if (nextSysInfoSend <= 0 || nextRecheck <= nextSysInfoSend) {
nextOp = RECHECK_CONF;
timeRemained = nextRecheck - crtTime;
} else {
nextOp = SYS_INFO_SEND;
timeRemained = nextSysInfoSend - crtTime;
}
} else {
if (nextJobInfoSend > 0 && (nextSysInfoSend <= 0 ||
nextJobInfoSend <= nextSysInfoSend)) {
nextOp = JOB_INFO_SEND;
timeRemained = nextJobInfoSend - crtTime;
} else if (nextSysInfoSend > 0) {
nextOp = SYS_INFO_SEND;
timeRemained = nextSysInfoSend - crtTime;
}
}
if (timeRemained == -1)
timeRemained = RECHECK_INTERVAL;
#ifndef WIN32
/* the moment when the next operation should be performed */
delay.tv_sec = crtTime + timeRemained;
delay.tv_nsec = 0;
#else
delay = (/*crtTime +*/ timeRemained) * 1000; // this is in millis
#endif
pthread_mutex_lock(&(apm -> mutexBack));
pthread_mutex_lock(&(apm -> mutexCond));
/* check for changes in the settings */
haveChange = false;
if (apm -> jobMonChanged || apm -> sysMonChanged || apm -> recheckChanged)
haveChange = true;
if (apm -> jobMonChanged) {
if (apm -> jobMonitoring)
nextJobInfoSend = crtTime + apm -> jobMonitorInterval;
else
nextJobInfoSend = -1;
apm -> jobMonChanged = false;
}
if (apm -> sysMonChanged) {
if (apm -> sysMonitoring)
nextSysInfoSend = crtTime + apm -> sysMonitorInterval;
else
nextSysInfoSend = -1;
apm -> sysMonChanged = false;
}
if (apm -> recheckChanged) {
if (apm -> confCheck) {
nextRecheck = crtTime + apm -> crtRecheckInterval;
}
else
nextRecheck = -1;
apm -> recheckChanged = false;
}
pthread_mutex_unlock(&(apm -> mutexBack));
if (haveChange) {
pthread_mutex_unlock(&(apm -> mutexCond));
continue;
}
/* wait until the next operation should be performed or until
a change in the settings occurs */
#ifndef WIN32
ret = pthread_cond_timedwait(&(apm -> confChangedCond),
&(apm -> mutexCond), &delay);
pthread_mutex_unlock(&(apm -> mutexCond));
#else
pthread_mutex_unlock(&(apm -> mutexCond));
ret = WaitForSingleObject(apm->confChangedCond, delay);
#endif
if (ret == ETIMEDOUT) {
// printf("### ret TIMEDOUT\n");
/* now perform the operation */
if (nextOp == JOB_INFO_SEND) {
apm -> sendJobInfo();
crtTime = time(NULL);
nextJobInfoSend = crtTime + apm -> getJobMonitorInterval();
}
if (nextOp == SYS_INFO_SEND) {
apm -> sendSysInfo();
if (apm -> getGenMonitoring()) {
if (generalInfoCount <= 1)
apm -> sendGeneralInfo();
generalInfoCount = (generalInfoCount + 1) % apm -> genMonitorIntervals;
}
crtTime = time(NULL);
nextSysInfoSend = crtTime + apm -> getSysMonitorInterval();
}
if (nextOp == RECHECK_CONF) {
//logger(FINE, "### recheck conf");
resourceChanged = false;
try {
if (apm -> initType == FILE_INIT) {
sprintf(logmsg, "Checking for modifications for file %s ",
apm -> initSources[0]);
logger(INFO, logmsg);
stat(apm -> initSources[0], &st);
if (st.st_mtime > apm -> lastModifFile) {
sprintf(logmsg, "File %s modified ", apm -> initSources[0]);
logger(INFO, logmsg);
resourceChanged = true;
}
}
// check the configuration URLs
for (i = 0; i < apm -> confURLs.nConfURLs; i++) {
sprintf(logmsg, "[Checking for modifications for URL %s ] ",
apm -> confURLs.vURLs[i]);
logger(INFO, logmsg);
if (urlModified(apm -> confURLs.vURLs[i], apm -> confURLs.lastModifURLs[i])) {
sprintf(logmsg, "URL %s modified ", apm -> confURLs.vURLs[i]);
logger(INFO, logmsg);
resourceChanged = true;
break;
}
}
if (resourceChanged) {
logger(INFO, "Reloading configuration...");
if (apm -> initType == FILE_INIT)
apm -> initialize(apm -> initSources[0], false);
else
apm -> initialize(apm -> nInitSources, apm -> initSources, false);
}
apm -> setCrtRecheckInterval(apm -> getRecheckInterval());
} catch (runtime_error &err) {
logger(WARNING, err.what());
logger(WARNING, "Increasing the time interval for reloading the configuration...");
apm -> setCrtRecheckInterval(apm -> getRecheckInterval() * 5);
}
crtTime = time(NULL);
nextRecheck = crtTime + apm -> getCrtRecheckInterval();
//sleep(apm -> getCrtRecheckInterval());
}
}
} // while
#ifndef WIN32
return NULL; // it doesn't matter what we return here
#else
return 0;
#endif
}
int ApMon::actGenMonitorParams[MAX_GEN_PARAMS] [protected] |
int ApMon::actJobMonitorParams[MAX_JOB_PARAMS] [protected] |
int ApMon::actSysMonitorParams[MAX_SYS_PARAMS] [protected] |
char ApMon::allMyIPs[20][20] [protected] |
bool ApMon::autoDisableMonitoring [protected] |
pthread_t ApMon::bkThread [protected] |
bool ApMon::bkThreadStarted [protected] |
char* ApMon::buf [protected] |
char* ApMon::clusterName [protected] |
pthread_cond_t ApMon::confChangedCond [protected] |
bool ApMon::confCheck [protected] |
ConfURLs ApMon::confURLs [protected] |
char ApMon::cpuFamily[100] [protected] |
char ApMon::cpuModel[100] [protected] |
char ApMon::cpuModelName[200] [protected] |
char ApMon::cpuVendor[100] [protected] |
long ApMon::crtDrop [protected] |
long ApMon::crtRecheckInterval [protected] |
long ApMon::crtSent [protected] |
long ApMon::crtTime [protected] |
double ApMon::currentGenVals[MAX_GEN_PARAMS] [protected] |
double ApMon::currentJobVals[MAX_JOB_PARAMS] [protected] |
double * ApMon::currentNetErrs [protected] |
double* ApMon::currentNetIn [protected] |
double * ApMon::currentNetOut [protected] |
double ApMon::currentNSockets[4] [protected] |
double ApMon::currentProcessStates[NLETTERS] [protected] |
double ApMon::currentSocketsTCP[20] [protected] |
double ApMon::currentSysVals[MAX_SYS_PARAMS] [protected] |
char** ApMon::destAddresses [protected] |
char** ApMon::destPasswds [protected] |
int* ApMon::destPorts [protected] |
int ApMon::dgramSize [protected] |
bool ApMon::genMonitoring [protected] |
int ApMon::genMonitorIntervals [protected] |
char* ApMon::genMonitorParams[MAX_GEN_PARAMS] [protected] |
int ApMon::genRetResults[MAX_GEN_PARAMS] [protected] |
char ApMon::groupname[MAX_STRING_LEN] [protected] |
bool ApMon::haveBkThread [protected] |
double ApMon::hWeight [protected] |
char** ApMon::initSources [protected] |
int ApMon::initType [protected] |
int ApMon::instance_id [protected] |
char ApMon::interfaceNames[20][20] [protected] |
bool ApMon::jobMonChanged [protected] |
bool ApMon::jobMonitoring [protected] |
long ApMon::jobMonitorInterval [protected] |
char* ApMon::jobMonitorParams[MAX_JOB_PARAMS] [protected] |
int ApMon::jobRetResults[MAX_JOB_PARAMS] [protected] |
double ApMon::lastBytesReceived[20] [protected] |
double ApMon::lastBytesSent[20] [protected] |
time_t ApMon::lastJobInfoSend [protected] |
long ApMon::lastModifFile [protected] |
double ApMon::lastNetErrs[20] [protected] |
time_t ApMon::lastSysInfoSend [protected] |
double ApMon::lastSysVals[MAX_SYS_PARAMS] [protected] |
int ApMon::maxMsgRate [protected] |
MonitoredJob* ApMon::monJobs [protected] |
pthread_mutex_t ApMon::mutex [protected] |
pthread_mutex_t ApMon::mutexBack [protected] |
pthread_mutex_t ApMon::mutexCond [protected] |
char ApMon::myHostname[MAX_STRING_LEN] [protected] |
char ApMon::myIP[MAX_STRING_LEN] [protected] |
int ApMon::nDestinations [protected] |
int ApMon::nGenMonitorParams [protected] |
int ApMon::nInitSources [protected] |
int ApMon::nInterfaces [protected] |
int ApMon::nJobMonitorParams [protected] |
int ApMon::nMonJobs [protected] |
char* ApMon::nodeName [protected] |
int ApMon::nSysMonitorParams [protected] |
int ApMon::numCPUs [protected] |
int ApMon::numIPs [protected] |
double ApMon::prvDrop [protected] |
double ApMon::prvSent [protected] |
long ApMon::prvTime [protected] |
bool ApMon::recheckChanged [protected] |
long ApMon::recheckInterval [protected] |
int ApMon::seq_nr [protected] |
char* ApMon::socketStatesMapTCP[20] [protected] |
int ApMon::sockfd [protected] |
bool ApMon::stopBkThread [protected] |
bool ApMon::sysInfo_first [protected] |
bool ApMon::sysMonChanged [protected] |
char* ApMon::sysMonCluster [protected] |
bool ApMon::sysMonitoring [protected] |
long ApMon::sysMonitorInterval [protected] |
char* ApMon::sysMonitorParams[MAX_SYS_PARAMS] [protected] |
char* ApMon::sysMonNode [protected] |
int ApMon::sysRetResults[MAX_SYS_PARAMS] [protected] |
char ApMon::username[MAX_STRING_LEN] [protected] |