Gaudi Framework, version v22r0

Home   Generated: 9 Feb 2011

ApMon.cpp

Go to the documentation of this file.
00001 
00006 /*
00007  * ApMon - Application Monitoring Tool
00008  * Version: 2.2.0
00009  *
00010  * Copyright (C) 2006 California Institute of Technology
00011  *
00012  * Permission is hereby granted, free of charge, to use, copy and modify
00013  * this software and its documentation (the "Software") for any
00014  * purpose, provided that existing copyright notices are retained in
00015  * all copies and that this notice is included verbatim in any distributions
00016  * or substantial portions of the Software.
00017  * This software is a part of the MonALISA framework (http://monalisa.cacr.caltech.edu).
00018  * Users of the Software are asked to feed back problems, benefits,
00019  * and/or suggestions about the software to the MonALISA Development Team
00020  * (developers@monalisa.cern.ch). Support for this software - fixing of bugs,
00021  * incorporation of new features - is done on a best effort basis. All bug
00022  * fixes and enhancements will be made available under the same terms and
00023  * conditions as the original software,
00024 
00025  * IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
00026  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
00027  * OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY DERIVATIVES THEREOF,
00028  * EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00029 
00030  * THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
00031  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY,
00032  * FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT. THIS SOFTWARE IS
00033  * PROVIDED ON AN "AS IS" BASIS, AND THE AUTHORS AND DISTRIBUTORS HAVE NO
00034  * OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR
00035  * MODIFICATIONS.
00036  */
00037 
00038 #include "ApMon.h"
00039 #include "utils.h"
00040 #include "proc_utils.h"
00041 #include "monitor_utils.h"
00042 
00043 #include <math.h>
00044 
00045 using namespace apmon_utils;
00046 using namespace apmon_mon_utils;
00047 
00048 #define RECHECK_CONF 0
00049 #define SYS_INFO_SEND 1
00050 #define JOB_INFO_SEND 2
00051 
00052 char boolStrings[][10] = {"false", "true"};
00053 
00054 #ifdef __ICC
00055 // disable icc remark #2259: non-pointer conversion from "X" to "Y" may lose significant bits
00056 #pragma warning(disable:2259)
00057 #endif
00058 
00059 //========= Implementations of the functions ===================
00060 
00061 ApMon::ApMon(char *initsource)
00062   throw(runtime_error) {
00063 
00064   if (initsource == NULL)
00065     throw runtime_error("[ ApMon() ]  No conf file/URL provided");
00066 
00067   if (strstr(initsource, "http://") == initsource) {
00068     char *destList[1];
00069     destList[0] = initsource;
00070     constructFromList(1, destList);
00071   } else {
00072     nInitSources = 1;
00073     initType = FILE_INIT;
00074     initSources = (char **)malloc(nInitSources * sizeof(char *));
00075     if (initSources == NULL)
00076       throw runtime_error("[ ApMon() ] Error allocating memory.");
00077 
00078     initSources[0] = strdup(initsource);
00079     initMonitoring();
00080 
00081     initialize(initsource, true);
00082   }
00083 }
00084 
00085 void ApMon::initialize(char *filename, bool firstTime)
00086   throw(runtime_error) {
00087 
00088   char *destAddresses[MAX_N_DESTINATIONS];
00089   int destPorts[MAX_N_DESTINATIONS];
00090   char *destPasswds[MAX_N_DESTINATIONS];
00091   int nDest = 0, i;
00092   ConfURLs confURLs;
00093 
00094   confURLs.nConfURLs = 0;
00095 
00096   try {
00097     loadFile(filename, &nDest, destAddresses, destPorts, destPasswds);
00098 
00099     arrayInit(nDest, destAddresses, destPorts, destPasswds, firstTime);
00100   } catch (runtime_error& err) {
00101     if (firstTime)
00102       throw err;
00103     else {
00104       logger(WARNING, err.what());
00105       logger(WARNING, "Error reloading the configuration. Keeping the previous one.");
00106       return;
00107     }
00108   }
00109 
00110   for (i = 0; i < nDest; i++) {
00111     free(destAddresses[i]);
00112     free(destPasswds[i]);
00113   }
00114 
00115   pthread_mutex_lock(&mutex);
00116   this -> confURLs = confURLs;
00117   pthread_mutex_unlock(&mutex);
00118 }
00119 
00120 void ApMon::loadFile(char *filename, int *nDestinations, char **destAddresses,
00121                      int *destPorts, char **destPasswds)
00122   throw(runtime_error) {
00123   FILE *f;
00124   char msg[100];
00125 
00126   /* initializations for the destination addresses */
00127   f = fopen(filename, "rt");
00128   if (f == NULL) {
00129     throw runtime_error("[ loadFile() ] Error opening configuration file");
00130   }
00131 
00132   sprintf(msg, "Loading file %s ...", filename);
00133   logger(INFO, msg);
00134 
00135   lastModifFile = time(NULL);
00136 
00137   parseConf(f, nDestinations, destAddresses, destPorts,
00138             destPasswds);
00139   fclose(f);
00140 }
00141 
00142 ApMon::ApMon(int nDestinations, char **destinationsList) throw(runtime_error){
00143   constructFromList(nDestinations, destinationsList);
00144 }
00145 
00146 void ApMon::constructFromList(int nDestinations, char **destinationsList)
00147   throw(runtime_error) {
00148   int i;
00149 
00150   if (destinationsList == NULL)
00151     throw runtime_error("[ constructFromList() ] Null destination list");
00152 
00153 #ifdef __APPLE__
00154   initType = OLIST_INIT;
00155 #else
00156   initType = LIST_INIT;
00157 #endif
00158 
00159   initMonitoring();
00160 
00161   /* save the initialization list */
00162   nInitSources = nDestinations;
00163   initSources = (char **)malloc(nInitSources * sizeof(char*));
00164   if (initSources == NULL)
00165     throw runtime_error("[ ApMon() ] Error allocating memory.");
00166 
00167   for (i = 0; i < nInitSources; i++)
00168     initSources[i] = strdup(destinationsList[i]);
00169 
00170   initialize(nDestinations, destinationsList, true);
00171 }
00172 
00173 void ApMon::initialize(int nDestinations, char **destinationsList,
00174                        bool firstTime) throw(runtime_error) {
00175   char *destAddresses[MAX_N_DESTINATIONS];
00176   int destPorts[MAX_N_DESTINATIONS];
00177   char *destPasswds[MAX_N_DESTINATIONS];
00178   char errmsg[200];
00179   int i;
00180   int cnt = 0;
00181   ConfURLs confURLs;
00182 
00183   logger(INFO, "Initializing destination addresses & ports:");
00184 
00185   if (nDestinations > MAX_N_DESTINATIONS)
00186     throw runtime_error("[ initialize() ] Maximum number of destinations exceeded");
00187 
00188 
00189   confURLs.nConfURLs = 0;
00190 
00191   for (i = 0; i < nDestinations; i++) {
00192     try {
00193       if (strstr(destinationsList[i], "http") == destinationsList[i])
00194         getDestFromWeb(destinationsList[i], &cnt,
00195                        destAddresses, destPorts, destPasswds, confURLs);
00196       else
00197         addToDestinations(destinationsList[i], &cnt,
00198                         destAddresses, destPorts, destPasswds);
00199 
00200     } catch (runtime_error &e) {
00201       sprintf(errmsg, "[ initialize() ] Error while loading the configuration: %s", e.what());
00202       logger(WARNING, errmsg);
00203       if (!firstTime) {
00204         for (i = 0; i < cnt; i++) {
00205           free(destAddresses[i]);
00206           free(destPasswds[i]);
00207         }
00208         logger(WARNING, "Configuration not reloaded successfully. Keeping the previous one.");
00209         return;
00210       }
00211     }  // catch
00212   }  // for
00213 
00214   try {
00215     arrayInit(cnt, destAddresses, destPorts, destPasswds, firstTime);
00216   } catch (runtime_error& err) {
00217     if (firstTime)
00218       throw err;
00219     else {
00220       logger(WARNING, "Error reloading the configuration. Keeping the previous one.");
00221       return;
00222     }
00223   }
00224 
00225   for (i = 0; i < cnt; i++) {
00226     free(destAddresses[i]);
00227     free(destPasswds[i]);
00228   }
00229 
00230   pthread_mutex_lock(&mutex);
00231   this -> confURLs = confURLs;
00232   pthread_mutex_unlock(&mutex);
00233 }
00234 
00235 void ApMon::addToDestinations(char *line, int *nDestinations,
00236                char *destAddresses[], int destPorts[], char *destPasswds[]) {
00237   char *addr, *port, *passwd;
00238   const char *sep1 = " \t";
00239   const char *sep2 = ":";
00240 
00241   char *tmp = strdup(line);
00242   char *firstToken;
00243 //  char buf[MAX_STRING_LEN];
00244 //  char *pbuf = buf;
00245 
00246   /* the address & port are separated from the password with spaces */
00247   firstToken = strtok/*_r*/(tmp, sep1);//, &pbuf);
00248   passwd = strtok/*_r*/(NULL, sep1);//, &pbuf);
00249 
00250   /* the address and the port are separated with ":" */
00251   addr = strtok/*_r*/(firstToken, sep2);//, &pbuf);
00252   port = strtok/*_r*/(NULL, sep2);//, &pbuf);
00253   destAddresses[*nDestinations] = strdup(addr);
00254   if (port == NULL)
00255     destPorts[*nDestinations] = DEFAULT_PORT;
00256   else
00257     destPorts[*nDestinations] = atoi(port);
00258   if (passwd == NULL)
00259     destPasswds[*nDestinations] = strdup("");
00260   else
00261     destPasswds[*nDestinations] = strdup(passwd);
00262   (*nDestinations)++;
00263 
00264   free(tmp);
00265 }
00266 
00267 void ApMon::getDestFromWeb(char *url, int *nDestinations,
00268   char *destAddresses[], int destPorts[], char *destPasswds[],
00269                            ConfURLs& confURLs) throw(runtime_error) {
00270   char temp_filename[300];
00271   FILE *tmp_file;
00272   char *line, *ret, *tmp = NULL;
00273   bool modifLineFound;
00274   long mypid = getpid();
00275   char str1[20], str2[20];
00276   int totalSize, headerSize, contentSize;
00277 
00278 #ifndef WIN32
00279   sprintf(temp_filename, "/tmp/apmon_webconf%ld", mypid);
00280 #else
00281   char *tmpp = getenv("TEMP");
00282   if(tmpp == NULL)
00283           tmpp = getenv("TMP");
00284   if(tmpp == NULL)
00285           tmpp = "c:";
00286   sprintf(temp_filename, "%s\\apmon_webconf%ld", tmpp, mypid);
00287 #endif
00288   /* get the configuration file from web and put it in a temporary file */
00289   totalSize = httpRequest(url, (char*)"GET", temp_filename);
00290 
00291   /* read the configuration from the temporary file */
00292   tmp_file = fopen(temp_filename, "rt");
00293   if (tmp_file == NULL)
00294     throw runtime_error("[ getDestFromWeb() ] Error getting the configuration web page");
00295 
00296   line = (char*)malloc((MAX_STRING_LEN + 1) * sizeof(char));
00297 
00298   //check the HTTP header to see if we got the page correctly
00299   fgets(line, MAX_STRING_LEN, tmp_file);
00300   sscanf(line, "%s %s", str1, str2);
00301   if (atoi(str2) != 200) {
00302     free(line);
00303     fclose(tmp_file);
00304     throw runtime_error("[ getDestFromWeb() ] The web page does not exist on the server");
00305   }
00306 
00307   confURLs.vURLs[confURLs.nConfURLs] = strdup(url);
00308 
00309   // check the  header for the "Last-Modified" and "Content-Length" lines
00310   modifLineFound = false;
00311   contentSize = 0;
00312   do {
00313     if (tmp != NULL)
00314       free(tmp);
00315     ret = fgets(line, MAX_STRING_LEN, tmp_file);
00316     if (ret == NULL) {
00317       free(line); fclose(tmp_file);
00318       throw runtime_error("[ getDestFromWeb() ] Error getting the configuration web page");
00319     }
00320     if (strstr(line, "Last-Modified") == line) {
00321       modifLineFound = true;
00322       confURLs.lastModifURLs[confURLs.nConfURLs] = strdup(line);
00323     }
00324 
00325     if (strstr(line, "Content-Length") == line) {
00326       sscanf(line, "%s %d", str1, &contentSize);
00327     }
00328 
00329     tmp = trimString(line);
00330   } while (strlen(tmp) != 0);
00331   free(tmp); free(line);
00332 
00333   if (!modifLineFound)
00334     confURLs.lastModifURLs[confURLs.nConfURLs] = strdup("");
00335   confURLs.nConfURLs++;
00336 
00337   headerSize = ftell(tmp_file);
00338   if (totalSize - headerSize < contentSize) {
00339     fclose(tmp_file);
00340     throw runtime_error("[ getDestFromWeb() ] Web page received incompletely");
00341   }
00342 
00343   try {
00344     parseConf(tmp_file, nDestinations, destAddresses, destPorts,
00345               destPasswds);
00346   } catch (...) {
00347     fclose(tmp_file);
00348     unlink(temp_filename);
00349     throw;
00350   }
00351 
00352   fclose(tmp_file);
00353   unlink(temp_filename);
00354 }
00355 
00356 
00357 ApMon::ApMon(int nDestinations, char **destAddresses, int *destPorts,
00358              char **destPasswds)
00359 throw(runtime_error) {
00360   initMonitoring();
00361 
00362   arrayInit(nDestinations, destAddresses, destPorts, destPasswds);
00363 }
00364 
00365 void ApMon::arrayInit(int nDestinations, char **destAddresses,
00366                       int *destPorts, char **destPasswds)
00367   throw(runtime_error) {
00368         arrayInit(nDestinations, destAddresses, destPorts, destPasswds, true);
00369 }
00370 
00371 
00372 void ApMon::arrayInit(int nDestinations, char **destAddresses, int *destPorts,
00373                       char **destPasswds, bool firstTime)
00374 throw(runtime_error) {
00375   int i, j;
00376   int ret;
00377   char *ipAddr, logmsg[100];
00378   bool found, havePublicIP;
00379   int tmpNDestinations;
00380   char **tmpAddresses, **tmpPasswds;
00381   int *tmpPorts;
00382 
00383   if (destAddresses == NULL || destPorts == NULL || nDestinations == 0)
00384     throw runtime_error("[ arrayInit() ] Destination addresses or ports not provided");
00385 
00386   /* initializations that we have to do only once */
00387   if (firstTime) {
00388     //this -> appPID = getpid();
00389 
00390     this -> nMonJobs = 0;
00391     this -> monJobs = (MonitoredJob *)malloc(MAX_MONITORED_JOBS *
00392                                             sizeof(MonitoredJob));
00393 
00394     try {
00395       this -> numCPUs = ProcUtils::getNumCPUs();
00396     } catch (procutils_error &err) {
00397       logger(WARNING, err.what());
00398       this -> numCPUs = 0;
00399     }
00400 
00401     /* get the names of the network interfaces */
00402     this -> nInterfaces = 0;
00403     try {
00404       ProcUtils::getNetworkInterfaces(this -> nInterfaces,
00405                                      this -> interfaceNames);
00406     } catch (procutils_error &err) {
00407       logger(WARNING, err.what());
00408       this -> nInterfaces = 0;
00409     }
00410 
00411     /* get the hostname of the machine */
00412     ret = gethostname(this -> myHostname, MAX_STRING_LEN -1);
00413     if (ret < 0) {
00414       logger(WARNING, "Could not obtain the local hostname");
00415       strcpy(myHostname, "unknown");
00416     } else
00417       myHostname[MAX_STRING_LEN - 1] = 0;
00418 
00419     /* get the IPs of the machine */
00420     this -> numIPs = 0; havePublicIP = false;
00421     strcpy(this -> myIP, "unknown");
00422 
00423     /* default values for cluster name and node name */
00424     this -> clusterName = strdup("ApMon_UserSend");
00425     this -> nodeName = strdup(myHostname);
00426 
00427 #ifndef WIN32
00428     int sockd = socket(PF_INET, SOCK_STREAM, 0);
00429     if(sockd < 0){
00430       logger(WARNING, "Could not obtain local IP addresses");
00431     } else {
00432       for (i = 0; i < this -> nInterfaces; i++) {
00433     struct ifreq ifr;
00434         memset(&ifr, 0, sizeof(ifr));
00435         strncpy(ifr.ifr_name, this -> interfaceNames[i], sizeof(ifr.ifr_name) - 1);
00436         if(ioctl(sockd, SIOCGIFADDR, &ifr)<0)
00437           continue;     //????????
00438         char ip[4], tmp_s[20];
00439 #ifdef __APPLE__
00440         memcpy(ip, ifr.ifr_addr.sa_data+2, 4);
00441 #else
00442         memcpy(ip, ifr.ifr_hwaddr.sa_data+2, 4);
00443 #endif
00444         strcpy(tmp_s, inet_ntoa(*(struct in_addr *)ip));
00445         sprintf(logmsg, "Found local IP address: %s", tmp_s);
00446         logger(FINE, logmsg);
00447         if (strcmp(tmp_s, "127.0.0.1") != 0 && !havePublicIP) {
00448           strcpy(this -> myIP, tmp_s);
00449           if (!isPrivateAddress(tmp_s))
00450             havePublicIP = true;
00451         }
00452         strcpy(this -> allMyIPs[this -> numIPs], tmp_s);
00453         this -> numIPs++;
00454       }
00455     }
00456 #else
00457         struct hostent *hptr;
00458     if ((hptr = gethostbyname(myHostname))!= NULL) {
00459       i = 0;
00460           struct in_addr addr;
00461           while ((hptr -> h_addr_list)[i] != NULL) {
00462             memcpy(&(addr.s_addr), (hptr -> h_addr_list)[i], 4);
00463         ipAddr = inet_ntoa(addr);
00464             if (strcmp(ipAddr, "127.0.0.1") != 0) {
00465               strcpy(this -> myIP, ipAddr);
00466               if (!isPrivateAddress(ipAddr))
00467                     break;
00468                 }
00469             i++;
00470           }
00471         }
00472 #endif
00473 
00474     this -> sysMonCluster = strdup("ApMon_SysMon");
00475     this -> sysMonNode = strdup(this -> myIP);
00476 
00477     this -> prvTime = 0;
00478     this -> prvSent = 0;
00479     this -> prvDrop = 0;
00480     this -> crtTime = 0;
00481     this -> crtSent = 0;
00482     this -> crtDrop = 0;
00483     this -> hWeight = exp(-5.0/60.0);
00484 
00485     srand(time(NULL));
00486 
00487     /* initialize buffer for XDR encoding */
00488     this -> buf = (char *)malloc(MAX_DGRAM_SIZE);
00489     if (this -> buf == NULL)
00490       throw runtime_error("[ arrayInit() ] Error allocating memory");
00491     this -> dgramSize = 0;
00492 
00493     /*create the socket & set options*/
00494     initSocket();
00495 
00496     /* initialize the sender ID and the sequence number */
00497     instance_id = rand();
00498     seq_nr = 0;
00499   }
00500 
00501   /* put the destination addresses, ports & passwords in some temporary
00502      buffers (because we don't want to lock mutex while making DNS
00503      requests)
00504   */
00505   tmpNDestinations = 0;
00506   tmpPorts = (int *)malloc(nDestinations * sizeof(int));
00507   tmpAddresses = (char **)malloc(nDestinations * sizeof(char *));
00508   tmpPasswds = (char **)malloc(nDestinations * sizeof(char *));
00509   if (tmpPorts == NULL || tmpAddresses == NULL ||
00510       tmpPasswds == NULL)
00511     throw runtime_error("[ arrayInit() ] Error allocating memory");
00512 
00513   for (i = 0; i < nDestinations; i++) {
00514     try {
00515       ipAddr = findIP(destAddresses[i]);
00516     } catch (runtime_error &err) {
00517       logger(FATAL, err.what());
00518       continue;
00519     }
00520 
00521     /* make sure this address is not already in the list */
00522     found = false;
00523     for (j = 0; j < tmpNDestinations; j++) {
00524       if (!strcmp(ipAddr, tmpAddresses[j])) {
00525         found = true;
00526         break;
00527       }
00528     }
00529 
00530     /* add the address to the list */
00531     if (!found) {
00532       tmpAddresses[tmpNDestinations] = ipAddr;
00533       tmpPorts[tmpNDestinations] = destPorts[i];
00534       tmpPasswds[tmpNDestinations] = strdup(destPasswds[i]);
00535 
00536       sprintf(logmsg, "Adding destination host: %s  - port %d",
00537              tmpAddresses[tmpNDestinations], tmpPorts[tmpNDestinations]);
00538       logger(INFO, logmsg);
00539 
00540       tmpNDestinations++;
00541     }
00542   }
00543 
00544   if (tmpNDestinations == 0) {
00545     freeMat(tmpAddresses, tmpNDestinations);
00546     freeMat(tmpPasswds, tmpNDestinations);
00547     throw runtime_error("[ arrayInit() ] There is no destination host specified correctly!");
00548   }
00549 
00550   pthread_mutex_lock(&mutex);
00551   if (!firstTime)
00552       freeConf();
00553   this -> nDestinations = tmpNDestinations;
00554   this -> destAddresses = tmpAddresses;
00555   this -> destPorts = tmpPorts;
00556   this -> destPasswds = tmpPasswds;
00557   pthread_mutex_unlock(&mutex);
00558 
00559   /* start job/system monitoring according to the settings previously read
00560      from the configuration file */
00561   setJobMonitoring(jobMonitoring, jobMonitorInterval);
00562   setSysMonitoring(sysMonitoring, sysMonitorInterval);
00563   setGenMonitoring(genMonitoring, genMonitorIntervals);
00564   setConfRecheck(confCheck, recheckInterval);
00565 }
00566 
00567 
00568 ApMon::~ApMon() {
00569   int i;
00570 
00571   if (bkThreadStarted) {
00572     if (getJobMonitoring()) {
00573       /* send a datagram with job monitoring information which covers
00574          the last time interval */
00575       sendJobInfo();
00576     }
00577   }
00578 
00579   pthread_mutex_lock(&mutexBack);
00580   setBackgroundThread(false);
00581   pthread_mutex_unlock(&mutexBack);
00582 
00583   pthread_mutex_destroy(&mutex);
00584   pthread_mutex_destroy(&mutexBack);
00585   pthread_mutex_destroy(&mutexCond);
00586   pthread_cond_destroy(&confChangedCond);
00587 
00588   free(clusterName);
00589   free(nodeName);
00590   free(sysMonCluster); free(sysMonNode);
00591 
00592   freeConf();
00593 
00594   free(monJobs);
00595   for (i = 0; i < nInitSources; i++) {
00596     free(initSources[i]);
00597   }
00598   free(initSources);
00599 
00600   free(buf);
00601 #ifndef WIN32
00602   close(sockfd);
00603 #else
00604   closesocket(sockfd);
00605   WSACleanup();
00606 #endif
00607 }
00608 
00609 void ApMon::freeConf() {
00610   int i;
00611   freeMat(destAddresses, nDestinations);
00612   freeMat(destPasswds, nDestinations);
00613   free(destPorts);
00614 
00615   for (i = 0; i < confURLs.nConfURLs; i++) {
00616       free(confURLs.vURLs[i]);
00617       free(confURLs.lastModifURLs[i]);
00618   }
00619 }
00620 
00621 int ApMon::sendParameters(char *clusterName, char *nodeName,
00622                int nParams, char **paramNames, int *valueTypes,
00623                          char **paramValues) throw(runtime_error){
00624  return sendTimedParameters(clusterName, nodeName, nParams,
00625                             paramNames, valueTypes, paramValues, -1);
00626 }
00627 
00628 int ApMon::sendTimedParameters(char *clusterName, char *nodeName,
00629                int nParams, char **paramNames, int *valueTypes,
00630                char **paramValues, int timestamp) throw(runtime_error){
00631   int i;
00632   int ret, ret1, ret2;
00633   char msg[100], buf2[MAX_HEADER_LENGTH+4], newBuf[MAX_DGRAM_SIZE];
00634 #ifdef WIN32
00635   char crtAddr[20];
00636 #endif
00637   char *headerTmp;
00638   char header[MAX_HEADER_LENGTH] = "v:";
00639   strcat(header, APMON_VERSION);
00640   strcat(header, "_cpp"); // to indicate this is the C++ version
00641   strcat(header, "p:");
00642 
00643   pthread_mutex_lock(&mutex);
00644 
00645   if(!shouldSend()) {
00646      pthread_mutex_unlock(&mutex);
00647      return RET_NOT_SENT;
00648   }
00649 
00650   if (clusterName != NULL) { // don't keep the cached values for cluster name
00651     // and node name
00652     free(this -> clusterName);
00653     this -> clusterName = strdup(clusterName);
00654 
00655     if (nodeName != NULL) {  /* the user provided a name */
00656       free(this -> nodeName);
00657       this -> nodeName = strdup(nodeName);
00658     }
00659     else { /* set the node name to the node's IP */
00660       free(this -> nodeName);
00661       this -> nodeName = strdup(this -> myHostname);
00662     }  // else
00663   } // if
00664 
00665   if (this -> clusterName == NULL || this -> nodeName == NULL) {
00666     pthread_mutex_unlock(&mutex);
00667     throw runtime_error("[ sendTimedParameters() ] Null cluster name or node name");
00668   }
00669 
00670   //sortParams(nParams, paramNames, valueTypes, paramValues);
00671 
00672   /* try to encode the parameters */
00673   try {
00674     encodeParams(nParams, paramNames, valueTypes, paramValues, timestamp);
00675   } catch (runtime_error& err) {
00676     pthread_mutex_unlock(&mutex);
00677     throw err;
00678   }
00679 
00680   headerTmp = (char *)malloc(MAX_HEADER_LENGTH * sizeof(char));
00681   /* for each destination */
00682   for (i = 0; i < nDestinations; i++) {
00683     XDR xdrs;
00684     struct sockaddr_in destAddr;
00685 
00686     /* initialize the destination address */
00687     memset(&destAddr, 0, sizeof(destAddr));
00688     destAddr.sin_family = AF_INET;
00689     destAddr.sin_port = htons(destPorts[i]);
00690 #ifndef WIN32
00691     inet_pton(AF_INET, destAddresses[i], &destAddr.sin_addr);
00692 #else
00693     int dummy = sizeof(destAddr);
00694     sprintf(crtAddr, "%s:%d", destAddresses[i], destPorts[i]);
00695     ret = WSAStringToAddress(crtAddr, AF_INET, NULL, (struct sockaddr *) &destAddr, &dummy);
00696     if(ret){
00697       ret = WSAGetLastError();
00698       sprintf(msg, "[ sendTimedParameters() ] Error packing address %s, code %d ", crtAddr, ret);
00699       throw runtime_error(msg);
00700     }
00701 #endif
00702     /* add the header (which is different for each destination) */
00703 
00704     strcpy(headerTmp, header);
00705     strcat(headerTmp, destPasswds[i]);
00706 
00707     /* initialize the XDR stream to encode the header */
00708     xdrmem_create(&xdrs, buf2, MAX_HEADER_LENGTH, XDR_ENCODE);
00709 
00710     /* encode the header */
00711     ret = xdr_string(&xdrs, &(headerTmp), strlen(headerTmp) + 1);
00712     /* add the instance ID and the sequence number */
00713     ret1 = xdr_int(&xdrs, &(instance_id));
00714     ret2 = xdr_int(&xdrs, &(seq_nr));
00715 
00716     if (!ret || !ret1 || !ret2) {
00717       free(headerTmp);
00718           pthread_mutex_unlock(&mutex);
00719       throw runtime_error("[ sendTimedParameters() ] XDR encoding error for the header");
00720     }
00721 
00722     /* concatenate the header and the rest of the datagram */
00723     int buf2Length = xdrSize(XDR_STRING, headerTmp) + 2 * xdrSize(XDR_INT32, NULL);
00724     memcpy(newBuf, buf2, buf2Length);
00725     memcpy(newBuf + buf2Length, buf, dgramSize);
00726 
00727     /* send the buffer */
00728     ret = sendto(sockfd, newBuf, dgramSize + buf2Length, 0,
00729                  (struct sockaddr *)&destAddr, sizeof(destAddr));
00730     if (ret == RET_ERROR) {
00731       free(headerTmp);
00732       pthread_mutex_unlock(&mutex);
00733 
00734       /*re-initialize the socket */
00735 #ifndef WIN32
00736       close(sockfd);
00737 #else
00738       closesocket(sockfd);
00739 #endif
00740       initSocket();
00741 
00742       /* throw exception because the datagram was not sent */
00743       sprintf(msg, "[ sendTimedParameters() ] Error sending data to destination %s ",
00744               destAddresses[i]);
00745       throw runtime_error(msg);
00746     }
00747     else {
00748       sprintf(msg, "Datagram with size %d, instance id %d, sequence number %d, sent to %s, containing parameters:",
00749              ret, instance_id, seq_nr, destAddresses[i]);
00750       logger(FINE, msg);
00751       logParameters(FINE, nParams, paramNames, valueTypes, paramValues);
00752     }
00753     xdr_destroy(&xdrs);
00754 
00755   }
00756 
00757   seq_nr = (seq_nr + 1) % TWO_BILLION;
00758   free(headerTmp);
00759   pthread_mutex_unlock(&mutex);
00760   return RET_SUCCESS;
00761 }
00762 
00763 
00764 int ApMon::sendParameter(char *clusterName, char *nodeName,
00765                         char *paramName, int valueType, char *paramValue)
00766  throw(runtime_error){
00767 
00768   return sendParameters(clusterName, nodeName, 1, &paramName,
00769                               &valueType, &paramValue);
00770 }
00771 
00772 int ApMon::sendTimedParameter(char *clusterName, char *nodeName,
00773               char *paramName, int valueType, char *paramValue, int timestamp)
00774  throw(runtime_error){
00775 
00776   return sendTimedParameters(clusterName, nodeName, 1, &paramName,
00777                               &valueType, &paramValue, timestamp);
00778 }
00779 
00780 int ApMon::sendParameter(char *clusterName, char *nodeName,
00781                 char *paramName, int paramValue) throw(runtime_error) {
00782 
00783   return sendParameter(clusterName, nodeName, paramName, XDR_INT32,
00784                     (char *)&paramValue);
00785 }
00786 
00787 int ApMon::sendParameter(char *clusterName, char *nodeName,
00788                 char *paramName, float paramValue) throw(runtime_error) {
00789 
00790   return sendParameter(clusterName, nodeName, paramName, XDR_REAL32,
00791                     (char *)&paramValue);
00792 }
00793 
00794 int ApMon::sendParameter(char *clusterName, char *nodeName,
00795                 char *paramName, double paramValue) throw(runtime_error) {
00796 
00797   return sendParameter(clusterName, nodeName, paramName, XDR_REAL64,
00798                     (char *)&paramValue);
00799 }
00800 
00801 int ApMon::sendParameter(char *clusterName, char *nodeName,
00802                 char *paramName, char *paramValue) throw(runtime_error) {
00803 
00804   return sendParameter(clusterName, nodeName, paramName, XDR_STRING,
00805                     paramValue);
00806 }
00807 
00808 void ApMon::encodeParams(int nParams, char **paramNames, int *valueTypes,
00809                         char **paramValues, int timestamp)
00810   throw(runtime_error){
00811   XDR xdrs; /* XDR handle. */
00812   int i, effectiveNParams;
00813 
00814   /* count the number of parameters actually sent in the datagram
00815      (the parameters with a NULL name and the string parameters
00816      with a NULL value are skipped)
00817   */
00818   effectiveNParams = nParams;
00819   for (i = 0; i < nParams; i++) {
00820       if (paramNames[i] == NULL || (valueTypes[i] == XDR_STRING &&
00821                                     paramValues[i] == NULL)) {
00822         effectiveNParams--;
00823       }
00824   }
00825   if (effectiveNParams == 0)
00826       throw runtime_error("[ encodeParams() ] No valid parameters in datagram, sending aborted");
00827 
00828   /*** estimate the length of the send buffer ***/
00829 
00830   /* add the length of the cluster name & node name */
00831   dgramSize =  xdrSize(XDR_STRING, clusterName) +
00832       xdrSize(XDR_STRING, nodeName) + xdrSize(XDR_INT32, NULL);
00833   /* add the lengths for the parameters (name + size + value) */
00834   for (i = 0; i < nParams; i++) {
00835     dgramSize += xdrSize(XDR_STRING, paramNames[i]) +  xdrSize(XDR_INT32, NULL) +
00836       + xdrSize(valueTypes[i], paramValues[i]);
00837   }
00838 
00839   /* check that the maximum datagram size is not exceeded */
00840   if (dgramSize + MAX_HEADER_LENGTH > MAX_DGRAM_SIZE)
00841     throw runtime_error("[ encodeParams() ] Maximum datagram size exceeded");
00842 
00843   /* initialize the XDR stream */
00844   xdrmem_create(&xdrs, buf, MAX_DGRAM_SIZE, XDR_ENCODE);
00845 
00846   try {
00847     /* encode the cluster name, the node name and the number of parameters */
00848     if (!xdr_string(&xdrs, &(clusterName), strlen(clusterName)
00849                     + 1))
00850       throw runtime_error("[ encodeParams() ] XDR encoding error for the cluster name");
00851 
00852     if (!xdr_string(&xdrs, &(nodeName), strlen(nodeName) + 1))
00853       throw runtime_error("[ encodeParams() ] XDR encoding error for the node name");
00854 
00855     if (!xdr_int(&xdrs, &(effectiveNParams)))
00856       throw runtime_error("[ encodeParams() ] XDR encoding error for the number of parameters");
00857 
00858     /* encode the parameters */
00859     for (i = 0; i < nParams; i++) {
00860       if (paramNames[i] == NULL || (valueTypes[i] == XDR_STRING &&
00861                                     paramValues[i] == NULL)) {
00862         logger(WARNING, "NULL parameter name or value - skipping parameter...");
00863         continue;
00864       }
00865 
00866       /* parameter name */
00867       if (!xdr_string(&xdrs, &(paramNames[i]), strlen(paramNames[i]) + 1))
00868         throw runtime_error("[ encodeParams() ] XDR encoding error for parameter name");
00869 
00870       /* parameter value type */
00871       if (!xdr_int(&xdrs, &(valueTypes[i])))
00872         throw runtime_error("[ encodeParams() ] XDR encoding error for parameter value type");
00873 
00874       /* parameter value */
00875       switch (valueTypes[i]) {
00876       case XDR_STRING:
00877         if (!xdr_string(&xdrs, &(paramValues[i]),
00878                         strlen(paramValues[i]) + 1))
00879           throw runtime_error("[ encodeParams() ] XDR encoding error for parameter value");
00880         break;
00881         //INT16 is not supported
00882         /*    case XDR_INT16:
00883               if (!xdr_short(&xdrs, (short *)(paramValues[i])))
00884               return RET_ERROR;
00885               break;
00886         */    case XDR_INT32:
00887                 if (!xdr_int(&xdrs, (int *)(paramValues[i])))
00888                   throw runtime_error("[ encodeParams() ] XDR encoding error for parameter value");
00889                 break;
00890       case XDR_REAL32:
00891         if (!xdr_float(&xdrs, (float *)(paramValues[i])))
00892           throw runtime_error("[ encodeParams() ] XDR encoding error for parameter value");
00893         break;
00894       case XDR_REAL64:
00895         if (!xdr_double(&xdrs, (double *)(paramValues[i])))
00896           throw runtime_error("[ encodeParams() ] XDR encoding error for parameter value");
00897         break;
00898       default:
00899         throw runtime_error("[ encodeParams() ] Unknown type for XDR encoding");
00900       }
00901     }
00902 
00903     /* encode the timestamp if necessary */
00904     if (timestamp > 0) {
00905       if (!xdr_int(&xdrs, &timestamp))
00906         throw runtime_error("[ encodeParams() ] XDR encoding error for the timestamp");
00907       dgramSize += xdrSize(XDR_INT32, NULL);
00908     }
00909   } catch (runtime_error& err) {
00910     xdr_destroy(&xdrs);
00911     throw err;
00912   }
00913 
00914   xdr_destroy(&xdrs);
00915 }
00916 
00917 #ifndef WIN32
00918 void *bkTask(void *param) {
00919 #else
00920 DWORD WINAPI bkTask(void *param) {
00921 #endif
00922   struct stat st;
00923 #ifndef WIN32
00924   struct timespec delay;
00925 #else
00926   DWORD delay;
00927 #endif
00928   bool resourceChanged, haveChange;
00929   int nextOp = -1, i, ret;
00930   int generalInfoCount;
00931   time_t crtTime, timeRemained;
00932   time_t nextRecheck = 0, nextJobInfoSend = 0, nextSysInfoSend = 0;
00933   ApMon *apm = (ApMon *)param;
00934   char logmsg[200];
00935 
00936   logger(INFO, "[Starting background thread...]");
00937   apm -> bkThreadStarted = true;
00938 
00939   crtTime = time(NULL);
00940 
00941   pthread_mutex_lock(&(apm -> mutexBack));
00942   if (apm -> confCheck) {
00943     nextRecheck = crtTime + apm -> crtRecheckInterval;
00944     //sprintf(logmsg, "###1 crt %ld interv %ld recheck %ld ", crtTime,
00945     //   apm -> crtRecheckInterval, nextRecheck);
00946     //logger(FINE, logmsg);
00947     //fflush(stdout);
00948   }
00949   if (apm -> jobMonitoring)
00950     nextJobInfoSend = crtTime + apm -> jobMonitorInterval;
00951   if (apm -> sysMonitoring)
00952     nextSysInfoSend = crtTime + apm -> sysMonitorInterval;
00953   pthread_mutex_unlock(&(apm -> mutexBack));
00954 
00955   timeRemained = -1;
00956   generalInfoCount = 0;
00957 
00958   while (1) {
00959     pthread_mutex_lock(&apm -> mutexBack);
00960     if (apm -> stopBkThread) {
00961 //      printf("### stopBkThread \n");
00962       pthread_mutex_unlock(&apm -> mutexBack);
00963       break;
00964     }
00965     pthread_mutex_unlock(&apm -> mutexBack);
00966 
00967     //sprintf(logmsg, "### 2 recheck %ld sys %ld ", nextRecheck,
00968     //    nextSysInfoSend);
00969     //logger(FINE, logmsg);
00970 
00971     /* determine the next operation that must be performed */
00972     if (nextRecheck > 0 && (nextJobInfoSend <= 0 ||
00973                             nextRecheck <= nextJobInfoSend)) {
00974       if (nextSysInfoSend <= 0 || nextRecheck <= nextSysInfoSend) {
00975         nextOp = RECHECK_CONF;
00976         timeRemained = nextRecheck - crtTime;
00977       } else {
00978         nextOp = SYS_INFO_SEND;
00979         timeRemained = nextSysInfoSend - crtTime;
00980       }
00981     } else {
00982       if (nextJobInfoSend > 0 && (nextSysInfoSend <= 0 ||
00983                                   nextJobInfoSend <= nextSysInfoSend)) {
00984         nextOp = JOB_INFO_SEND;
00985         timeRemained = nextJobInfoSend - crtTime;
00986       } else if (nextSysInfoSend > 0) {
00987         nextOp = SYS_INFO_SEND;
00988         timeRemained = nextSysInfoSend - crtTime;
00989       }
00990     }
00991 
00992     if (timeRemained == -1)
00993       timeRemained = RECHECK_INTERVAL;
00994 
00995 #ifndef WIN32
00996     /* the moment when the next operation should be performed */
00997     delay.tv_sec = crtTime + timeRemained;
00998     delay.tv_nsec = 0;
00999 #else
01000     delay = (/*crtTime +*/ timeRemained) * 1000;  // this is in millis
01001 #endif
01002 
01003     pthread_mutex_lock(&(apm -> mutexBack));
01004 
01005     pthread_mutex_lock(&(apm -> mutexCond));
01006     /* check for changes in the settings */
01007     haveChange = false;
01008     if (apm -> jobMonChanged || apm -> sysMonChanged || apm -> recheckChanged)
01009       haveChange = true;
01010     if (apm -> jobMonChanged) {
01011       if (apm -> jobMonitoring)
01012         nextJobInfoSend = crtTime + apm -> jobMonitorInterval;
01013       else
01014         nextJobInfoSend = -1;
01015       apm -> jobMonChanged = false;
01016     }
01017     if (apm -> sysMonChanged) {
01018       if (apm -> sysMonitoring)
01019         nextSysInfoSend = crtTime + apm -> sysMonitorInterval;
01020       else
01021         nextSysInfoSend = -1;
01022       apm -> sysMonChanged = false;
01023     }
01024     if (apm -> recheckChanged) {
01025       if (apm -> confCheck) {
01026         nextRecheck = crtTime + apm -> crtRecheckInterval;
01027       }
01028       else
01029         nextRecheck = -1;
01030       apm -> recheckChanged = false;
01031     }
01032     pthread_mutex_unlock(&(apm -> mutexBack));
01033 
01034     if (haveChange) {
01035       pthread_mutex_unlock(&(apm -> mutexCond));
01036       continue;
01037     }
01038 
01039     /* wait until the next operation should be performed or until
01040        a change in the settings occurs */
01041 #ifndef WIN32
01042     ret = pthread_cond_timedwait(&(apm -> confChangedCond),
01043                                 &(apm -> mutexCond), &delay);
01044     pthread_mutex_unlock(&(apm -> mutexCond));
01045 #else
01046     pthread_mutex_unlock(&(apm -> mutexCond));
01047     ret = WaitForSingleObject(apm->confChangedCond, delay);
01048 #endif
01049     if (ret == ETIMEDOUT) {
01050 //      printf("### ret TIMEDOUT\n");
01051       /* now perform the operation */
01052       if (nextOp == JOB_INFO_SEND) {
01053         apm -> sendJobInfo();
01054         crtTime = time(NULL);
01055         nextJobInfoSend = crtTime + apm -> getJobMonitorInterval();
01056       }
01057 
01058       if (nextOp == SYS_INFO_SEND) {
01059         apm -> sendSysInfo();
01060         if (apm -> getGenMonitoring()) {
01061           if (generalInfoCount <= 1)
01062             apm -> sendGeneralInfo();
01063           generalInfoCount = (generalInfoCount + 1) % apm -> genMonitorIntervals;
01064         }
01065         crtTime = time(NULL);
01066         nextSysInfoSend = crtTime + apm -> getSysMonitorInterval();
01067       }
01068 
01069       if (nextOp == RECHECK_CONF) {
01070         //logger(FINE, "### recheck conf");
01071         resourceChanged = false;
01072         try {
01073           if (apm -> initType == FILE_INIT) {
01074             sprintf(logmsg, "Checking for modifications for file %s ",
01075                     apm -> initSources[0]);
01076             logger(INFO, logmsg);
01077             stat(apm -> initSources[0], &st);
01078             if (st.st_mtime > apm -> lastModifFile) {
01079               sprintf(logmsg, "File %s modified ", apm -> initSources[0]);
01080               logger(INFO, logmsg);
01081               resourceChanged = true;
01082             }
01083           }
01084 
01085           // check the configuration URLs
01086           for (i = 0; i < apm -> confURLs.nConfURLs; i++) {
01087             sprintf(logmsg, "[Checking for modifications for URL %s ] ",
01088                    apm -> confURLs.vURLs[i]);
01089             logger(INFO, logmsg);
01090             if (urlModified(apm -> confURLs.vURLs[i], apm -> confURLs.lastModifURLs[i])) {
01091               sprintf(logmsg, "URL %s modified ", apm -> confURLs.vURLs[i]);
01092               logger(INFO, logmsg);
01093               resourceChanged = true;
01094               break;
01095             }
01096           }
01097 
01098           if (resourceChanged) {
01099             logger(INFO, "Reloading configuration...");
01100             if (apm -> initType == FILE_INIT)
01101               apm -> initialize(apm -> initSources[0], false);
01102             else
01103               apm -> initialize(apm -> nInitSources, apm -> initSources, false);
01104           }
01105           apm -> setCrtRecheckInterval(apm -> getRecheckInterval());
01106         } catch (runtime_error &err) {
01107           logger(WARNING, err.what());
01108           logger(WARNING, "Increasing the time interval for reloading the configuration...");
01109           apm -> setCrtRecheckInterval(apm -> getRecheckInterval() * 5);
01110         }
01111         crtTime = time(NULL);
01112         nextRecheck = crtTime + apm -> getCrtRecheckInterval();
01113         //sleep(apm -> getCrtRecheckInterval());
01114       }
01115     }
01116 
01117   } // while
01118 
01119 #ifndef WIN32
01120   return NULL; // it doesn't matter what we return here
01121 #else
01122   return 0;
01123 #endif
01124 }
01125 
01126 void ApMon::setConfRecheck(bool confCheck, long interval) {
01127   char logmsg[100];
01128   if (confCheck) {
01129     sprintf(logmsg, "Enabling configuration reloading (interval %ld)",
01130             interval);
01131     logger(INFO, logmsg);
01132   }
01133 
01134   pthread_mutex_lock(&mutexBack);
01135   if (initType == DIRECT_INIT) { // no need to reload the configuration
01136     logger(WARNING, "[ setConfRecheck() } No configuration file/URL to reload.");
01137     return;
01138   }
01139 
01140   this -> confCheck = confCheck;
01141   this -> recheckChanged = true;
01142   if (confCheck) {
01143     if (interval > 0) {
01144       this -> recheckInterval = interval;
01145       this -> crtRecheckInterval = interval;
01146     } else {
01147       this -> recheckInterval = RECHECK_INTERVAL;
01148       this -> crtRecheckInterval = RECHECK_INTERVAL;
01149     }
01150     setBackgroundThread(true);
01151   }
01152   else {
01153     if (jobMonitoring == false && sysMonitoring == false)
01154       setBackgroundThread(false);
01155   }
01156   pthread_mutex_unlock(&mutexBack);
01157 
01158 }
01159 
01160 void ApMon::setRecheckInterval(long val) {
01161   if (val > 0) {
01162     setConfRecheck(true, val);
01163   }
01164   else {
01165     setConfRecheck(false, val);
01166   }
01167 }
01168 
01169 void ApMon::setCrtRecheckInterval(long val) {
01170   pthread_mutex_lock(&mutexBack);
01171   crtRecheckInterval = val;
01172   pthread_mutex_unlock(&mutexBack);
01173 }
01174 
01175 void ApMon::setJobMonitoring(bool jobMonitoring, long interval) {
01176   char logmsg[100];
01177   if (jobMonitoring) {
01178     sprintf(logmsg, "Enabling job monitoring, time interval %ld s... ", interval);
01179     logger(INFO, logmsg);
01180   } else
01181     logger(INFO, "Disabling job monitoring...");
01182 
01183   pthread_mutex_lock(&mutexBack);
01184   this -> jobMonitoring = jobMonitoring;
01185   this -> jobMonChanged = true;
01186   if (jobMonitoring == true) {
01187     if (interval > 0)
01188       this -> jobMonitorInterval = interval;
01189     else
01190       this -> jobMonitorInterval = JOB_MONITOR_INTERVAL;
01191     setBackgroundThread(true);
01192   } else {
01193     // disable the background thread if it is not needed anymore
01194     if (this -> sysMonitoring == false && this -> confCheck == false)
01195       setBackgroundThread(false);
01196   }
01197   pthread_mutex_unlock(&mutexBack);
01198 }
01199 
01200 void ApMon::setSysMonitoring(bool sysMonitoring, long interval) {
01201   char logmsg[100];
01202   if (sysMonitoring) {
01203     sprintf(logmsg, "Enabling system monitoring, time interval %ld s... ", interval);
01204     logger(INFO, logmsg);
01205   } else
01206     logger(INFO, "Disabling system monitoring...");
01207 
01208   pthread_mutex_lock(&mutexBack);
01209   this -> sysMonitoring = sysMonitoring;
01210   this -> sysMonChanged = true;
01211   if (sysMonitoring == true) {
01212     if (interval > 0)
01213       this -> sysMonitorInterval = interval;
01214     else
01215       this -> sysMonitorInterval = SYS_MONITOR_INTERVAL;
01216     setBackgroundThread(true);
01217   }  else {
01218     // disable the background thread if it is not needed anymore
01219     if (this -> jobMonitoring == false && this -> confCheck == false)
01220       setBackgroundThread(false);
01221   }
01222   pthread_mutex_unlock(&mutexBack);
01223 }
01224 
01225 void ApMon::setGenMonitoring(bool genMonitoring, int nIntervals) {
01226   char logmsg[100];
01227   sprintf(logmsg, "Setting general information monitoring to %s ",
01228          boolStrings[(int)genMonitoring]);
01229   logger(INFO, logmsg);
01230 
01231   pthread_mutex_lock(&mutexBack);
01232   this -> genMonitoring = genMonitoring;
01233   this -> sysMonChanged = true;
01234   if (genMonitoring == true) {
01235     if (nIntervals > 0)
01236       this -> genMonitorIntervals = nIntervals;
01237     else
01238       this -> genMonitorIntervals = GEN_MONITOR_INTERVALS;
01239 
01240     if (this -> sysMonitoring == false) {
01241       pthread_mutex_unlock(&mutexBack);
01242       setSysMonitoring(true);
01243       pthread_mutex_lock(&mutexBack);
01244     }
01245   } // TODO: else check if we can stop the background thread (if no
01246   // system parameters are enabled for monitoring)
01247   pthread_mutex_unlock(&mutexBack);
01248 }
01249 
01250 void ApMon::setBackgroundThread(bool val) {
01251   // mutexBack is locked
01252   if (val == true) {
01253     if (!haveBkThread) {
01254 #ifndef WIN32
01255       pthread_create(&bkThread, NULL, &bkTask, this);
01256 #else
01257       DWORD dummy;
01258       bkThread = CreateThread(NULL, 65536, &bkTask, this, 0, &dummy);
01259 #endif
01260       haveBkThread = true;
01261     } else {
01262       pthread_mutex_lock(&mutexCond);
01263       pthread_cond_signal(&confChangedCond);
01264       pthread_mutex_unlock(&mutexCond);
01265     }
01266   }
01267   if (val == false) {
01268     //if (bkThreadStarted) {
01269     if (haveBkThread) {
01270       stopBkThread = true;
01271       pthread_mutex_unlock(&mutexBack);
01272 #ifndef WIN32
01273       pthread_mutex_lock(&mutexCond);
01274 #endif
01275       pthread_cond_signal(&confChangedCond);
01276       logger(INFO, "[Stopping the background thread...]");
01277 #ifndef WIN32
01278       pthread_mutex_unlock(&mutexCond);
01279       pthread_join(bkThread, NULL);
01280 #else
01281       WaitForSingleObject(bkThread, INFINITE);
01282 #endif
01283       pthread_mutex_lock(&mutexBack);
01284 //      logger(INFO, "bk thread stopped!");
01285       haveBkThread = false;
01286       bkThreadStarted = false;
01287       stopBkThread = false;
01288     }
01289   }
01290 }
01291 
01292 void ApMon::addJobToMonitor(long pid, char *workdir, char *clusterName,
01293                             char *nodeName) throw(runtime_error) {
01294   if (nMonJobs >= MAX_MONITORED_JOBS)
01295     throw runtime_error("[ addJobToMonitor() ] Maximum number of jobs that can be monitored exceeded.");
01296   MonitoredJob job;
01297   job.pid = pid;
01298   if (workdir == NULL)
01299     strcpy(job.workdir, "");
01300   else
01301     strcpy(job.workdir, workdir);
01302 
01303  if (clusterName == NULL || strlen(clusterName) == 0)
01304     strcpy(job.clusterName, "ApMon_JobMon");
01305   else
01306     strcpy(job.clusterName, clusterName);
01307  if (nodeName == NULL || strlen(nodeName) == 0)
01308     strcpy(job.nodeName, this -> myIP);
01309   else
01310     strcpy(job.nodeName, nodeName);
01311 
01312   monJobs[nMonJobs++] = job;
01313 }
01314 
01315 void ApMon::removeJobToMonitor(long pid) throw(runtime_error) {
01316   int i, j;
01317   char msg[100];
01318 
01319   if (nMonJobs <= 0)
01320     throw runtime_error("[ removeJobToMonitor() ] There are no monitored jobs.");
01321 
01322   for (i = 0; i < nMonJobs; i++) {
01323     if (monJobs[i].pid == pid) {
01324       /* found the job, now remove it */
01325       for (j = i; j < nMonJobs - 1; j++)
01326         monJobs[j] = monJobs[j + 1];
01327       nMonJobs--;
01328       return;
01329     }
01330   }
01331 
01332   /* the job was not found */
01333   sprintf(msg, "removeJobToMonitor(): Job %ld not found.", pid);
01334   throw runtime_error(msg);
01335 }
01336 
01337 void ApMon::setSysMonClusterNode(char *clusterName, char *nodeName) {
01338   free (sysMonCluster); free(sysMonNode);
01339   sysMonCluster = strdup(clusterName);
01340   sysMonNode = strdup(nodeName);
01341 }
01342 
01343 void ApMon::setLogLevel(char *newLevel_s) {
01344   int newLevel;
01345   const char *levels[5] = {"FATAL", "WARNING", "INFO", "FINE", "DEBUG"};
01346   char logmsg[100];
01347 
01348   for (newLevel = 0; newLevel < 5; newLevel++)
01349     if (strcmp(newLevel_s, levels[newLevel]) == 0)
01350       break;
01351 
01352   if (newLevel >= 5) {
01353     sprintf(logmsg, "[ setLogLevel() ] Invalid level value: %s", newLevel_s);
01354     logger(WARNING, logmsg);
01355   }
01356   else
01357     logger(0, NULL, newLevel);
01358 }
01359 
01360 
01361 void ApMon::setMaxMsgRate(int maxRate) {
01362   if (maxRate > 0)
01363     this -> maxMsgRate  = maxRate;
01364 }
01365 
01366 void ApMon::initSocket() throw(runtime_error) {
01367   int optval1 = 1;
01368   struct timeval optval2;
01369   int ret1, ret2, ret3;
01370 
01371   sockfd = socket(AF_INET, SOCK_DGRAM, 0);
01372   if (sockfd < 0)
01373     throw runtime_error("[ initSocket() ] Error creating socket");
01374   ret1 = setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, (char *) &optval1,
01375                     sizeof(optval1));
01376 
01377   /* set connection timeout */
01378   optval2.tv_sec = 20;
01379   optval2.tv_usec = 0;
01380   ret2 = setsockopt(sockfd, SOL_SOCKET, SO_SNDTIMEO, (char *) &optval2,
01381                     sizeof(optval2));
01382   ret3 = setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, (char *) &optval2,
01383                     sizeof(optval2));
01384   if (ret1 != 0 || ret2 != 0 || ret3 != 0)
01385     throw runtime_error("[ initSocket() ] Error initializing socket.");
01386 }
01387 
01388 
01389 void ApMon::parseConf(FILE *fp, int *nDestinations, char **destAddresses,
01390                      int *destPorts, char **destPasswds)
01391   throw(runtime_error) {
01392   int i, ch;
01393   char *line = (char *)malloc ((MAX_STRING_LEN1) * sizeof(char));
01394   char *tmp = NULL;
01395   char *loglevel_s;
01396 //  char sbuf[30];
01397 //  char *pbuf = sbuf;
01398 
01399   /* parse the input file */
01400   while(fgets(line, MAX_STRING_LEN, fp) != NULL) {
01401 
01402     if (tmp != NULL) {
01403       free(tmp);
01404       tmp = NULL;
01405     }
01406 
01407     line[MAX_STRING_LEN - 1] = 0;
01408     /* check if the line was too long */
01409     ch = fgetc(fp); // see if we are at the end of the file
01410     ungetc(ch, fp);
01411     if (line[strlen(line) - 1] != 10 && ch != EOF) {
01412       /* if the line doesn't end with a \n and we are not at the end
01413          of file, the line from the file was longer than MAX_STRING_LEN */
01414       fclose(fp);
01415       throw runtime_error ("[ parseConf() ] Maximum line length exceeded in the conf file");
01416     }
01417 
01418     tmp = trimString(line);
01419 
01420     /* skip the blank lines and the comment lines */
01421     if (strlen(tmp) == 0 || strchr(tmp, '#') == tmp)
01422       continue;
01423 
01424     if (strstr(tmp, "xApMon_loglevel") == tmp) {
01425       char *tmp2 = tmp;
01426       strtok/*_r*/(tmp2, "= ");//, &pbuf);
01427       loglevel_s = strtok/*_r*/(NULL, "= ");//, &pbuf);
01428       setLogLevel(loglevel_s);
01429       continue;
01430     }
01431 
01432     if (strstr(tmp, "xApMon_") == tmp) {
01433       parseXApMonLine(tmp);
01434       continue;
01435     }
01436 
01437     if (*nDestinations >= MAX_N_DESTINATIONS) {
01438       free(line); free(tmp);
01439       for (i = 0; i < *nDestinations; i++) {
01440         free(destAddresses[i]);
01441         free(destPasswds[i]);
01442       }
01443       fclose(fp);
01444       throw runtime_error("[ parseConf() ] Maximum number of destinations exceeded.");
01445     }
01446 
01447     addToDestinations(tmp, nDestinations, destAddresses, destPorts,
01448                       destPasswds);
01449   }
01450 
01451   if (tmp != NULL)
01452     free(tmp);
01453   free(line);
01454 }
01455 
01456 bool ApMon::shouldSend() {
01457 
01458   long now = time(NULL);
01459   bool doSend;
01460   char msg[200];
01461 
01462   //printf("now %ld crtTime %ld\n", now, crtTime);
01463 
01464   if (now != crtTime){
01466     prvSent = hWeight * prvSent + (1.0 - hWeight) * crtSent / (now - crtTime);
01467     prvTime = crtTime;
01468     sprintf(msg, "previously sent: %ld dropped: %ld", crtSent, crtDrop);
01469     logger(DEBUG, msg);
01471     crtTime = now;
01472     crtSent = 0;
01473     crtDrop = 0;
01474     //printf("\n");
01475   }
01476 
01478   int valSent = (int)(prvSent * hWeight + crtSent * (1.0 - hWeight));
01479 
01480   doSend = true;
01482   int level = this -> maxMsgRate - this -> maxMsgRate / 10;
01483 
01484 
01485   if (valSent > (this -> maxMsgRate - level)) {
01486     //int max10 = this -> maxMsgRate / 10;
01487     int rnd  = rand() % (this -> maxMsgRate / 10);
01488     doSend = (rnd <  (this -> maxMsgRate - valSent));
01489   }
01491   if (doSend) {
01492     crtSent++;
01493     //printf("#");
01494   } else {
01495     crtDrop++;
01496     //printf(".");
01497   }
01498 
01499   return doSend;
01500 }
01501 
01502 
01503 
01504 
01505 
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Defines

Generated at Wed Feb 9 16:24:57 2011 for Gaudi Framework, version v22r0 by Doxygen version 1.6.2 written by Dimitri van Heesch, © 1997-2004