Gaudi Framework, version v21r9

Home   Generated: 3 May 2010

ApMon.cpp

Go to the documentation of this file.
00001 
00006 /*
00007  * ApMon - Application Monitoring Tool
00008  * Version: 2.2.0
00009  *
00010  * Copyright (C) 2006 California Institute of Technology
00011  *
00012  * Permission is hereby granted, free of charge, to use, copy and modify 
00013  * this software and its documentation (the "Software") for any
00014  * purpose, provided that existing copyright notices are retained in 
00015  * all copies and that this notice is included verbatim in any distributions
00016  * or substantial portions of the Software. 
00017  * This software is a part of the MonALISA framework (http://monalisa.cacr.caltech.edu).
00018  * Users of the Software are asked to feed back problems, benefits,
00019  * and/or suggestions about the software to the MonALISA Development Team
00020  * (developers@monalisa.cern.ch). Support for this software - fixing of bugs,
00021  * incorporation of new features - is done on a best effort basis. All bug
00022  * fixes and enhancements will be made available under the same terms and
00023  * conditions as the original software,
00024 
00025  * IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
00026  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
00027  * OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY DERIVATIVES THEREOF,
00028  * EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00029 
00030  * THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
00031  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY,
00032  * FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT. THIS SOFTWARE IS
00033  * PROVIDED ON AN "AS IS" BASIS, AND THE AUTHORS AND DISTRIBUTORS HAVE NO
00034  * OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR
00035  * MODIFICATIONS.
00036  */
00037 
00038 #include "ApMon.h"
00039 #include "utils.h"
00040 #include "proc_utils.h"
00041 #include "monitor_utils.h"
00042 
00043 #include <math.h>
00044 
00045 using namespace apmon_utils;
00046 using namespace apmon_mon_utils;
00047 
00048 #define RECHECK_CONF 0
00049 #define SYS_INFO_SEND 1
00050 #define JOB_INFO_SEND 2
00051 
00052 char boolStrings[][10] = {"false", "true"};
00053 
00054 //========= Implementations of the functions ===================
00055 
00056 ApMon::ApMon(char *initsource) 
00057   throw(runtime_error) {
00058 
00059   if (initsource == NULL)
00060     throw runtime_error("[ ApMon() ]  No conf file/URL provided");
00061 
00062   if (strstr(initsource, "http://") == initsource) {
00063     char *destList[1];
00064     destList[0] = initsource;
00065     constructFromList(1, destList);
00066   } else {
00067     nInitSources = 1;
00068     initType = FILE_INIT;
00069     initSources = (char **)malloc(nInitSources * sizeof(char *));
00070     if (initSources == NULL)
00071       throw runtime_error("[ ApMon() ] Error allocating memory.");
00072     
00073     initSources[0] = strdup(initsource);
00074     initMonitoring();
00075 
00076     initialize(initsource, true);
00077   }
00078 }
00079 
00080 void ApMon::initialize(char *filename, bool firstTime) 
00081   throw(runtime_error) {
00082 
00083   char *destAddresses[MAX_N_DESTINATIONS];
00084   int destPorts[MAX_N_DESTINATIONS];
00085   char *destPasswds[MAX_N_DESTINATIONS];
00086   int nDest = 0, i; 
00087   ConfURLs confURLs;
00088   
00089   confURLs.nConfURLs = 0;
00090 
00091   try {
00092     loadFile(filename, &nDest, destAddresses, destPorts, destPasswds);
00093 
00094     arrayInit(nDest, destAddresses, destPorts, destPasswds, firstTime);
00095   } catch (runtime_error& err) {
00096     if (firstTime)
00097       throw err;
00098     else {
00099       logger(WARNING, err.what());
00100       logger(WARNING, "Error reloading the configuration. Keeping the previous one.");
00101       return;
00102     }
00103   }
00104 
00105   for (i = 0; i < nDest; i++) {
00106     free(destAddresses[i]);
00107     free(destPasswds[i]);
00108   }
00109 
00110   pthread_mutex_lock(&mutex);
00111   this -> confURLs = confURLs;
00112   pthread_mutex_unlock(&mutex);
00113 }
00114 
00115 void ApMon::loadFile(char *filename, int *nDestinations, char **destAddresses, 
00116                      int *destPorts, char **destPasswds)
00117   throw(runtime_error) {
00118   FILE *f;
00119   char msg[100];
00120  
00121   /* initializations for the destination addresses */
00122   f = fopen(filename, "rt");
00123   if (f == NULL) {
00124     throw runtime_error("[ loadFile() ] Error opening configuration file");
00125   }
00126 
00127   sprintf(msg, "Loading file %s ...", filename);
00128   logger(INFO, msg);
00129 
00130   lastModifFile = time(NULL);
00131   
00132   parseConf(f, nDestinations, destAddresses, destPorts, 
00133             destPasswds);
00134   fclose(f);
00135 }
00136 
00137 ApMon::ApMon(int nDestinations, char **destinationsList) throw(runtime_error){
00138   constructFromList(nDestinations, destinationsList);
00139 }
00140 
00141 void ApMon::constructFromList(int nDestinations, char **destinationsList)
00142   throw(runtime_error) {
00143   int i;
00144 
00145   if (destinationsList == NULL) 
00146     throw runtime_error("[ constructFromList() ] Null destination list");
00147  
00148 #ifdef __APPLE__
00149   initType = OLIST_INIT;
00150 #else
00151   initType = LIST_INIT;
00152 #endif
00153 
00154   initMonitoring();
00155 
00156   /* save the initialization list */
00157   nInitSources = nDestinations;
00158   initSources = (char **)malloc(nInitSources * sizeof(char*));
00159   if (initSources == NULL)
00160     throw runtime_error("[ ApMon() ] Error allocating memory.");
00161 
00162   for (i = 0; i < nInitSources; i++)
00163     initSources[i] = strdup(destinationsList[i]);
00164 
00165   initialize(nDestinations, destinationsList, true);
00166 }
00167 
00168 void ApMon::initialize(int nDestinations, char **destinationsList,
00169                        bool firstTime) throw(runtime_error) { 
00170   char *destAddresses[MAX_N_DESTINATIONS];
00171   int destPorts[MAX_N_DESTINATIONS];
00172   char *destPasswds[MAX_N_DESTINATIONS];
00173   char errmsg[200];
00174   int i;
00175   int cnt = 0;
00176   ConfURLs confURLs;
00177 
00178   logger(INFO, "Initializing destination addresses & ports:");
00179 
00180   if (nDestinations > MAX_N_DESTINATIONS)
00181     throw runtime_error("[ initialize() ] Maximum number of destinations exceeded");
00182 
00183   
00184   confURLs.nConfURLs = 0;
00185 
00186   for (i = 0; i < nDestinations; i++) {
00187     try { 
00188       if (strstr(destinationsList[i], "http") == destinationsList[i])
00189         getDestFromWeb(destinationsList[i], &cnt, 
00190                        destAddresses, destPorts, destPasswds, confURLs);
00191       else 
00192         addToDestinations(destinationsList[i], &cnt, 
00193                         destAddresses, destPorts, destPasswds);
00194 
00195     } catch (runtime_error &e) {
00196       sprintf(errmsg, "[ initialize() ] Error while loading the configuration: %s", e.what());
00197       logger(WARNING, errmsg);
00198       if (!firstTime) {
00199         for (i = 0; i < cnt; i++) {
00200           free(destAddresses[i]);
00201           free(destPasswds[i]);
00202         }
00203         logger(WARNING, "Configuration not reloaded successfully. Keeping the previous one.");
00204         return; 
00205       }     
00206     }  // catch
00207   }  // for
00208 
00209   try {
00210     arrayInit(cnt, destAddresses, destPorts, destPasswds, firstTime);
00211   } catch (runtime_error& err) {
00212     if (firstTime)
00213       throw err;
00214     else {
00215       logger(WARNING, "Error reloading the configuration. Keeping the previous one.");
00216       return;
00217     }
00218   }
00219   
00220   for (i = 0; i < cnt; i++) {
00221     free(destAddresses[i]);
00222     free(destPasswds[i]);
00223   }
00224 
00225   pthread_mutex_lock(&mutex);
00226   this -> confURLs = confURLs;
00227   pthread_mutex_unlock(&mutex);
00228 }
00229 
00230 void ApMon::addToDestinations(char *line, int *nDestinations, 
00231                char *destAddresses[], int destPorts[], char *destPasswds[]) {
00232   char *addr, *port, *passwd;
00233   const char *sep1 = " \t";
00234   const char *sep2 = ":";
00235  
00236   char *tmp = strdup(line);
00237   char *firstToken;
00238 //  char buf[MAX_STRING_LEN];
00239 //  char *pbuf = buf;
00240 
00241   /* the address & port are separated from the password with spaces */
00242   firstToken = strtok/*_r*/(tmp, sep1);//, &pbuf);
00243   passwd = strtok/*_r*/(NULL, sep1);//, &pbuf);
00244 
00245   /* the address and the port are separated with ":" */
00246   addr = strtok/*_r*/(firstToken, sep2);//, &pbuf);
00247   port = strtok/*_r*/(NULL, sep2);//, &pbuf);
00248   destAddresses[*nDestinations] = strdup(addr);
00249   if (port == NULL)
00250     destPorts[*nDestinations] = DEFAULT_PORT;
00251   else
00252     destPorts[*nDestinations] = atoi(port);
00253   if (passwd == NULL)
00254     destPasswds[*nDestinations] = strdup("");
00255   else
00256     destPasswds[*nDestinations] = strdup(passwd);
00257   (*nDestinations)++;
00258 
00259   free(tmp);
00260 }
00261   
00262 void ApMon::getDestFromWeb(char *url, int *nDestinations, 
00263   char *destAddresses[], int destPorts[], char *destPasswds[],
00264                            ConfURLs& confURLs) throw(runtime_error) {
00265   char temp_filename[300];
00266   FILE *tmp_file;
00267   char *line, *ret, *tmp = NULL;
00268   bool modifLineFound;
00269   long mypid = getpid();
00270   char str1[20], str2[20];
00271   int totalSize, headerSize, contentSize;
00272 
00273 #ifndef WIN32
00274   sprintf(temp_filename, "/tmp/apmon_webconf%ld", mypid);
00275 #else
00276   char *tmpp = getenv("TEMP");
00277   if(tmpp == NULL)
00278           tmpp = getenv("TMP");
00279   if(tmpp == NULL)
00280           tmpp = "c:";
00281   sprintf(temp_filename, "%s\\apmon_webconf%ld", tmpp, mypid);
00282 #endif 
00283   /* get the configuration file from web and put it in a temporary file */
00284   totalSize = httpRequest(url, (char*)"GET", temp_filename);
00285 
00286   /* read the configuration from the temporary file */
00287   tmp_file = fopen(temp_filename, "rt");
00288   if (tmp_file == NULL)
00289     throw runtime_error("[ getDestFromWeb() ] Error getting the configuration web page");
00290 
00291   line = (char*)malloc((MAX_STRING_LEN + 1) * sizeof(char));
00292 
00293   //check the HTTP header to see if we got the page correctly
00294   fgets(line, MAX_STRING_LEN, tmp_file);
00295   sscanf(line, "%s %s", str1, str2);
00296   if (atoi(str2) != 200) {
00297     free(line);
00298     fclose(tmp_file);
00299     throw runtime_error("[ getDestFromWeb() ] The web page does not exist on the server");
00300   }
00301 
00302   confURLs.vURLs[confURLs.nConfURLs] = strdup(url);
00303 
00304   // check the  header for the "Last-Modified" and "Content-Length" lines
00305   modifLineFound = false; 
00306   contentSize = 0;
00307   do {
00308     if (tmp != NULL)
00309       free(tmp);
00310     ret = fgets(line, MAX_STRING_LEN, tmp_file);
00311     if (ret == NULL) {
00312       free(line); fclose(tmp_file);
00313       throw runtime_error("[ getDestFromWeb() ] Error getting the configuration web page");
00314     }
00315     if (strstr(line, "Last-Modified") == line) {
00316       modifLineFound = true;
00317       confURLs.lastModifURLs[confURLs.nConfURLs] = strdup(line);
00318     }
00319 
00320     if (strstr(line, "Content-Length") == line) {
00321       sscanf(line, "%s %d", str1, &contentSize);
00322     }
00323 
00324     tmp = trimString(line);
00325   } while (strlen(tmp) != 0);
00326   free(tmp); free(line);
00327 
00328   if (!modifLineFound)
00329     confURLs.lastModifURLs[confURLs.nConfURLs] = strdup("");
00330   confURLs.nConfURLs++;
00331 
00332   headerSize = ftell(tmp_file);
00333   if (totalSize - headerSize < contentSize) {
00334     fclose(tmp_file);
00335     throw runtime_error("[ getDestFromWeb() ] Web page received incompletely");
00336   }
00337 
00338   try {
00339     parseConf(tmp_file, nDestinations, destAddresses, destPorts, 
00340               destPasswds);
00341   } catch (...) {
00342     fclose(tmp_file);
00343     unlink(temp_filename);
00344     throw;
00345   } 
00346 
00347   fclose(tmp_file);
00348   unlink(temp_filename);
00349 }
00350 
00351 
00352 ApMon::ApMon(int nDestinations, char **destAddresses, int *destPorts, 
00353              char **destPasswds) 
00354 throw(runtime_error) {
00355   initMonitoring();
00356 
00357   arrayInit(nDestinations, destAddresses, destPorts, destPasswds);
00358 }
00359 
00360 void ApMon::arrayInit(int nDestinations, char **destAddresses, 
00361                       int *destPorts, char **destPasswds) 
00362   throw(runtime_error) {
00363         arrayInit(nDestinations, destAddresses, destPorts, destPasswds, true);
00364 }
00365 
00366 
00367 void ApMon::arrayInit(int nDestinations, char **destAddresses, int *destPorts,
00368                       char **destPasswds, bool firstTime) 
00369 throw(runtime_error) {
00370   int i, j;
00371   int ret;
00372   char *ipAddr, logmsg[100];
00373   bool found, havePublicIP;
00374   int tmpNDestinations;
00375   char **tmpAddresses, **tmpPasswds;
00376   int *tmpPorts;
00377 
00378   if (destAddresses == NULL || destPorts == NULL || nDestinations == 0)
00379     throw runtime_error("[ arrayInit() ] Destination addresses or ports not provided");
00380 
00381   /* initializations that we have to do only once */
00382   if (firstTime) {
00383     //this -> appPID = getpid();
00384 
00385     this -> nMonJobs = 0;
00386     this -> monJobs = (MonitoredJob *)malloc(MAX_MONITORED_JOBS * 
00387                                             sizeof(MonitoredJob));
00388    
00389     try {
00390       this -> numCPUs = ProcUtils::getNumCPUs();
00391     } catch (procutils_error &err) {
00392       logger(WARNING, err.what());
00393       this -> numCPUs = 0;
00394     }
00395 
00396     /* get the names of the network interfaces */
00397     this -> nInterfaces = 0;
00398     try {
00399       ProcUtils::getNetworkInterfaces(this -> nInterfaces, 
00400                                      this -> interfaceNames);
00401     } catch (procutils_error &err) {
00402       logger(WARNING, err.what());
00403       this -> nInterfaces = 0;
00404     } 
00405      
00406     /* get the hostname of the machine */
00407     ret = gethostname(this -> myHostname, MAX_STRING_LEN -1);
00408     if (ret < 0) {
00409       logger(WARNING, "Could not obtain the local hostname");
00410       strcpy(myHostname, "unknown");
00411     } else
00412       myHostname[MAX_STRING_LEN - 1] = 0;
00413 
00414     /* get the IPs of the machine */
00415     this -> numIPs = 0; havePublicIP = false;
00416     strcpy(this -> myIP, "unknown");
00417    
00418     /* default values for cluster name and node name */
00419     this -> clusterName = strdup("ApMon_UserSend");
00420     this -> nodeName = strdup(myHostname);
00421 
00422 #ifndef WIN32
00423     int sockd = socket(PF_INET, SOCK_STREAM, 0);
00424     if(sockd < 0){
00425       logger(WARNING, "Could not obtain local IP addresses");
00426     } else {
00427       for (i = 0; i < this -> nInterfaces; i++) {
00428     struct ifreq ifr;
00429         memset(&ifr, 0, sizeof(ifr));
00430         strncpy(ifr.ifr_name, this -> interfaceNames[i], sizeof(ifr.ifr_name) - 1); 
00431         if(ioctl(sockd, SIOCGIFADDR, &ifr)<0)
00432           continue;     //????????
00433         char ip[4], tmp_s[20];
00434 #ifdef __APPLE__
00435         memcpy(ip, ifr.ifr_addr.sa_data+2, 4);
00436 #else
00437         memcpy(ip, ifr.ifr_hwaddr.sa_data+2, 4);
00438 #endif
00439         strcpy(tmp_s, inet_ntoa(*(struct in_addr *)ip));
00440         sprintf(logmsg, "Found local IP address: %s", tmp_s);
00441         logger(FINE, logmsg);
00442         if (strcmp(tmp_s, "127.0.0.1") != 0 && !havePublicIP) {
00443           strcpy(this -> myIP, tmp_s);
00444           if (!isPrivateAddress(tmp_s))
00445             havePublicIP = true;
00446         }
00447         strcpy(this -> allMyIPs[this -> numIPs], tmp_s);
00448         this -> numIPs++;
00449       }
00450     }
00451 #else
00452         struct hostent *hptr;
00453     if ((hptr = gethostbyname(myHostname))!= NULL) {
00454       i = 0;
00455           struct in_addr addr;
00456           while ((hptr -> h_addr_list)[i] != NULL) {
00457             memcpy(&(addr.s_addr), (hptr -> h_addr_list)[i], 4);
00458         ipAddr = inet_ntoa(addr);
00459             if (strcmp(ipAddr, "127.0.0.1") != 0) {
00460               strcpy(this -> myIP, ipAddr);
00461               if (!isPrivateAddress(ipAddr))
00462                     break;
00463                 }
00464             i++;
00465           }
00466         }
00467 #endif
00468 
00469     this -> sysMonCluster = strdup("ApMon_SysMon");
00470     this -> sysMonNode = strdup(this -> myIP);
00471 
00472     this -> prvTime = 0;
00473     this -> prvSent = 0;
00474     this -> prvDrop = 0;
00475     this -> crtTime = 0;
00476     this -> crtSent = 0;
00477     this -> crtDrop = 0;
00478     this -> hWeight = exp(-5.0/60.0);
00479 
00480     srand(time(NULL));
00481 
00482     /* initialize buffer for XDR encoding */
00483     this -> buf = (char *)malloc(MAX_DGRAM_SIZE);
00484     if (this -> buf == NULL)
00485       throw runtime_error("[ arrayInit() ] Error allocating memory");
00486     this -> dgramSize = 0;
00487 
00488     /*create the socket & set options*/
00489     initSocket();
00490 
00491     /* initialize the sender ID and the sequence number */
00492     instance_id = rand();
00493     seq_nr = 0;
00494   }
00495   
00496   /* put the destination addresses, ports & passwords in some temporary
00497      buffers (because we don't want to lock mutex while making DNS
00498      requests)
00499   */
00500   tmpNDestinations = 0;
00501   tmpPorts = (int *)malloc(nDestinations * sizeof(int));
00502   tmpAddresses = (char **)malloc(nDestinations * sizeof(char *));
00503   tmpPasswds = (char **)malloc(nDestinations * sizeof(char *));
00504   if (tmpPorts == NULL || tmpAddresses == NULL || 
00505       tmpPasswds == NULL)
00506     throw runtime_error("[ arrayInit() ] Error allocating memory");
00507 
00508   for (i = 0; i < nDestinations; i++) {
00509     try {
00510       ipAddr = findIP(destAddresses[i]);
00511     } catch (runtime_error &err) {
00512       logger(FATAL, err.what());
00513       continue;
00514     }
00515     
00516     /* make sure this address is not already in the list */
00517     found = false;
00518     for (j = 0; j < tmpNDestinations; j++) {
00519       if (!strcmp(ipAddr, tmpAddresses[j])) {
00520         found = true;
00521         break;
00522       }
00523     }
00524 
00525     /* add the address to the list */
00526     if (!found) {
00527       tmpAddresses[tmpNDestinations] = ipAddr;
00528       tmpPorts[tmpNDestinations] = destPorts[i];
00529       tmpPasswds[tmpNDestinations] = strdup(destPasswds[i]);
00530 
00531       sprintf(logmsg, "Adding destination host: %s  - port %d", 
00532              tmpAddresses[tmpNDestinations], tmpPorts[tmpNDestinations]);
00533       logger(INFO, logmsg);
00534 
00535       tmpNDestinations++;
00536     }
00537   }
00538 
00539   if (tmpNDestinations == 0) {
00540     freeMat(tmpAddresses, tmpNDestinations);
00541     freeMat(tmpPasswds, tmpNDestinations);  
00542     throw runtime_error("[ arrayInit() ] There is no destination host specified correctly!");
00543   }
00544 
00545   pthread_mutex_lock(&mutex);
00546   if (!firstTime)
00547       freeConf();
00548   this -> nDestinations = tmpNDestinations;
00549   this -> destAddresses = tmpAddresses;
00550   this -> destPorts = tmpPorts;
00551   this -> destPasswds = tmpPasswds;
00552   pthread_mutex_unlock(&mutex);
00553 
00554   /* start job/system monitoring according to the settings previously read 
00555      from the configuration file */
00556   setJobMonitoring(jobMonitoring, jobMonitorInterval);
00557   setSysMonitoring(sysMonitoring, sysMonitorInterval);
00558   setGenMonitoring(genMonitoring, genMonitorIntervals);
00559   setConfRecheck(confCheck, recheckInterval);
00560 }
00561 
00562 
00563 ApMon::~ApMon() {
00564   int i;
00565 
00566   if (bkThreadStarted) {
00567     if (getJobMonitoring()) {
00568       /* send a datagram with job monitoring information which covers
00569          the last time interval */
00570       sendJobInfo();
00571     }
00572   }
00573 
00574   pthread_mutex_lock(&mutexBack);
00575   setBackgroundThread(false);
00576   pthread_mutex_unlock(&mutexBack);
00577 
00578   pthread_mutex_destroy(&mutex);
00579   pthread_mutex_destroy(&mutexBack);
00580   pthread_mutex_destroy(&mutexCond);
00581   pthread_cond_destroy(&confChangedCond);
00582 
00583   free(clusterName);
00584   free(nodeName);
00585   free(sysMonCluster); free(sysMonNode);
00586 
00587   freeConf();
00588 
00589   free(monJobs);
00590   for (i = 0; i < nInitSources; i++) {
00591     free(initSources[i]);
00592   }
00593   free(initSources);
00594   
00595   free(buf);
00596 #ifndef WIN32
00597   close(sockfd);
00598 #else
00599   closesocket(sockfd);
00600   WSACleanup();
00601 #endif
00602 }
00603 
00604 void ApMon::freeConf() {
00605   int i;
00606   freeMat(destAddresses, nDestinations);
00607   freeMat(destPasswds, nDestinations);
00608   free(destPorts);
00609 
00610   for (i = 0; i < confURLs.nConfURLs; i++) {
00611       free(confURLs.vURLs[i]);
00612       free(confURLs.lastModifURLs[i]);
00613   }
00614 }
00615 
00616 int ApMon::sendParameters(char *clusterName, char *nodeName,
00617                int nParams, char **paramNames, int *valueTypes, 
00618                          char **paramValues) throw(runtime_error){
00619  return sendTimedParameters(clusterName, nodeName, nParams, 
00620                             paramNames, valueTypes, paramValues, -1);
00621 }
00622 
00623 int ApMon::sendTimedParameters(char *clusterName, char *nodeName,
00624                int nParams, char **paramNames, int *valueTypes, 
00625                char **paramValues, int timestamp) throw(runtime_error){
00626   int i;
00627   int ret, ret1, ret2;
00628   char msg[100], buf2[MAX_HEADER_LENGTH+4], newBuf[MAX_DGRAM_SIZE];
00629 #ifdef WIN32
00630   char crtAddr[20];
00631 #endif
00632   char *headerTmp;
00633   char header[MAX_HEADER_LENGTH] = "v:";
00634   strcat(header, APMON_VERSION);
00635   strcat(header, "_cpp"); // to indicate this is the C++ version
00636   strcat(header, "p:");
00637 
00638   pthread_mutex_lock(&mutex);
00639 
00640   if(!shouldSend()) {
00641      pthread_mutex_unlock(&mutex);
00642      return RET_NOT_SENT;
00643   }
00644                 
00645   if (clusterName != NULL) { // don't keep the cached values for cluster name
00646     // and node name
00647     free(this -> clusterName);
00648     this -> clusterName = strdup(clusterName);
00649     
00650     if (nodeName != NULL) {  /* the user provided a name */
00651       free(this -> nodeName);
00652       this -> nodeName = strdup(nodeName);
00653     }
00654     else { /* set the node name to the node's IP */
00655       free(this -> nodeName);
00656       this -> nodeName = strdup(this -> myHostname);
00657     }  // else
00658   } // if
00659   
00660   if (this -> clusterName == NULL || this -> nodeName == NULL) {
00661     pthread_mutex_unlock(&mutex);
00662     throw runtime_error("[ sendTimedParameters() ] Null cluster name or node name");
00663   }
00664 
00665   //sortParams(nParams, paramNames, valueTypes, paramValues);
00666 
00667   /* try to encode the parameters */
00668   try {
00669     encodeParams(nParams, paramNames, valueTypes, paramValues, timestamp);
00670   } catch (runtime_error& err) {
00671     pthread_mutex_unlock(&mutex);
00672     throw err;
00673   }
00674 
00675   headerTmp = (char *)malloc(MAX_HEADER_LENGTH * sizeof(char)); 
00676   /* for each destination */
00677   for (i = 0; i < nDestinations; i++) {
00678     XDR xdrs;
00679     struct sockaddr_in destAddr;
00680 
00681     /* initialize the destination address */
00682     memset(&destAddr, 0, sizeof(destAddr));
00683     destAddr.sin_family = AF_INET;
00684     destAddr.sin_port = htons(destPorts[i]);
00685 #ifndef WIN32
00686     inet_pton(AF_INET, destAddresses[i], &destAddr.sin_addr);
00687 #else
00688     int dummy = sizeof(destAddr);
00689     sprintf(crtAddr, "%s:%d", destAddresses[i], destPorts[i]);
00690     ret = WSAStringToAddress(crtAddr, AF_INET, NULL, (struct sockaddr *) &destAddr, &dummy);
00691     if(ret){
00692       ret = WSAGetLastError();
00693       sprintf(msg, "[ sendTimedParameters() ] Error packing address %s, code %d ", crtAddr, ret);
00694       throw runtime_error(msg);
00695     }
00696 #endif
00697     /* add the header (which is different for each destination) */
00698    
00699     strcpy(headerTmp, header);
00700     strcat(headerTmp, destPasswds[i]);
00701 
00702     /* initialize the XDR stream to encode the header */
00703     xdrmem_create(&xdrs, buf2, MAX_HEADER_LENGTH, XDR_ENCODE); 
00704 
00705     /* encode the header */
00706     ret = xdr_string(&xdrs, &(headerTmp), strlen(headerTmp) + 1);
00707     /* add the instance ID and the sequence number */
00708     ret1 = xdr_int(&xdrs, &(instance_id));
00709     ret2 = xdr_int(&xdrs, &(seq_nr));
00710 
00711     if (!ret || !ret1 || !ret2) {
00712       free(headerTmp);
00713           pthread_mutex_unlock(&mutex);
00714       throw runtime_error("[ sendTimedParameters() ] XDR encoding error for the header");
00715     }
00716 
00717     /* concatenate the header and the rest of the datagram */
00718     int buf2Length = xdrSize(XDR_STRING, headerTmp) + 2 * xdrSize(XDR_INT32, NULL);
00719     memcpy(newBuf, buf2, buf2Length);
00720     memcpy(newBuf + buf2Length, buf, dgramSize);
00721 
00722     /* send the buffer */
00723     ret = sendto(sockfd, newBuf, dgramSize + buf2Length, 0, 
00724                  (struct sockaddr *)&destAddr, sizeof(destAddr));
00725     if (ret == RET_ERROR) {
00726       free(headerTmp);
00727       pthread_mutex_unlock(&mutex);
00728 
00729       /*re-initialize the socket */
00730 #ifndef WIN32
00731       close(sockfd);
00732 #else
00733       closesocket(sockfd);
00734 #endif
00735       initSocket();
00736 
00737       /* throw exception because the datagram was not sent */
00738       sprintf(msg, "[ sendTimedParameters() ] Error sending data to destination %s ", 
00739               destAddresses[i]);
00740       throw runtime_error(msg);
00741     }
00742     else {
00743       sprintf(msg, "Datagram with size %d, instance id %d, sequence number %d, sent to %s, containing parameters:", 
00744              ret, instance_id, seq_nr, destAddresses[i]);
00745       logger(FINE, msg);
00746       logParameters(FINE, nParams, paramNames, valueTypes, paramValues);
00747     }
00748     xdr_destroy(&xdrs);
00749     
00750   }
00751 
00752   seq_nr = (seq_nr + 1) % TWO_BILLION;
00753   free(headerTmp);
00754   pthread_mutex_unlock(&mutex);
00755   return RET_SUCCESS;
00756 }
00757 
00758 
00759 int ApMon::sendParameter(char *clusterName, char *nodeName,
00760                         char *paramName, int valueType, char *paramValue) 
00761  throw(runtime_error){
00762 
00763   return sendParameters(clusterName, nodeName, 1, &paramName, 
00764                               &valueType, &paramValue);
00765 }
00766 
00767 int ApMon::sendTimedParameter(char *clusterName, char *nodeName,
00768               char *paramName, int valueType, char *paramValue, int timestamp) 
00769  throw(runtime_error){
00770 
00771   return sendTimedParameters(clusterName, nodeName, 1, &paramName, 
00772                               &valueType, &paramValue, timestamp);
00773 }
00774 
00775 int ApMon::sendParameter(char *clusterName, char *nodeName,
00776                 char *paramName, int paramValue) throw(runtime_error) {
00777   
00778   return sendParameter(clusterName, nodeName, paramName, XDR_INT32, 
00779                     (char *)&paramValue);
00780 }
00781 
00782 int ApMon::sendParameter(char *clusterName, char *nodeName,
00783                 char *paramName, float paramValue) throw(runtime_error) {
00784   
00785   return sendParameter(clusterName, nodeName, paramName, XDR_REAL32, 
00786                     (char *)&paramValue);
00787 }
00788 
00789 int ApMon::sendParameter(char *clusterName, char *nodeName,
00790                 char *paramName, double paramValue) throw(runtime_error) {
00791   
00792   return sendParameter(clusterName, nodeName, paramName, XDR_REAL64, 
00793                     (char *)&paramValue);
00794 }
00795 
00796 int ApMon::sendParameter(char *clusterName, char *nodeName,
00797                 char *paramName, char *paramValue) throw(runtime_error) {
00798   
00799   return sendParameter(clusterName, nodeName, paramName, XDR_STRING, 
00800                     paramValue);
00801 }
00802 
00803 void ApMon::encodeParams(int nParams, char **paramNames, int *valueTypes, 
00804                         char **paramValues, int timestamp) 
00805   throw(runtime_error){
00806   XDR xdrs; /* XDR handle. */
00807   int i, effectiveNParams;
00808 
00809   /* count the number of parameters actually sent in the datagram
00810      (the parameters with a NULL name and the string parameters
00811      with a NULL value are skipped)
00812   */
00813   effectiveNParams = nParams;
00814   for (i = 0; i < nParams; i++) {
00815       if (paramNames[i] == NULL || (valueTypes[i] == XDR_STRING && 
00816                                     paramValues[i] == NULL)) {
00817         effectiveNParams--;
00818       }
00819   }
00820   if (effectiveNParams == 0)
00821       throw runtime_error("[ encodeParams() ] No valid parameters in datagram, sending aborted");
00822 
00823   /*** estimate the length of the send buffer ***/
00824 
00825   /* add the length of the cluster name & node name */
00826   dgramSize =  xdrSize(XDR_STRING, clusterName) + 
00827       xdrSize(XDR_STRING, nodeName) + xdrSize(XDR_INT32, NULL);
00828   /* add the lengths for the parameters (name + size + value) */
00829   for (i = 0; i < nParams; i++) {
00830     dgramSize += xdrSize(XDR_STRING, paramNames[i]) +  xdrSize(XDR_INT32, NULL) +
00831       + xdrSize(valueTypes[i], paramValues[i]);
00832   }
00833 
00834   /* check that the maximum datagram size is not exceeded */
00835   if (dgramSize + MAX_HEADER_LENGTH > MAX_DGRAM_SIZE) 
00836     throw runtime_error("[ encodeParams() ] Maximum datagram size exceeded");
00837 
00838   /* initialize the XDR stream */
00839   xdrmem_create(&xdrs, buf, MAX_DGRAM_SIZE, XDR_ENCODE); 
00840 
00841   try {
00842     /* encode the cluster name, the node name and the number of parameters */
00843     if (!xdr_string(&xdrs, &(clusterName), strlen(clusterName) 
00844                     + 1))
00845       throw runtime_error("[ encodeParams() ] XDR encoding error for the cluster name");
00846 
00847     if (!xdr_string(&xdrs, &(nodeName), strlen(nodeName) + 1))
00848       throw runtime_error("[ encodeParams() ] XDR encoding error for the node name");
00849     
00850     if (!xdr_int(&xdrs, &(effectiveNParams)))
00851       throw runtime_error("[ encodeParams() ] XDR encoding error for the number of parameters");
00852 
00853     /* encode the parameters */
00854     for (i = 0; i < nParams; i++) {
00855       if (paramNames[i] == NULL || (valueTypes[i] == XDR_STRING && 
00856                                     paramValues[i] == NULL)) {
00857         logger(WARNING, "NULL parameter name or value - skipping parameter...");
00858         continue;
00859       }
00860 
00861       /* parameter name */
00862       if (!xdr_string(&xdrs, &(paramNames[i]), strlen(paramNames[i]) + 1))
00863         throw runtime_error("[ encodeParams() ] XDR encoding error for parameter name");
00864     
00865       /* parameter value type */
00866       if (!xdr_int(&xdrs, &(valueTypes[i])))  
00867         throw runtime_error("[ encodeParams() ] XDR encoding error for parameter value type");
00868 
00869       /* parameter value */
00870       switch (valueTypes[i]) {
00871       case XDR_STRING:
00872         if (!xdr_string(&xdrs, &(paramValues[i]), 
00873                         strlen(paramValues[i]) + 1))
00874           throw runtime_error("[ encodeParams() ] XDR encoding error for parameter value");
00875         break;
00876         //INT16 is not supported
00877         /*    case XDR_INT16:  
00878               if (!xdr_short(&xdrs, (short *)(paramValues[i])))
00879               return RET_ERROR;
00880               break;
00881         */    case XDR_INT32:
00882                 if (!xdr_int(&xdrs, (int *)(paramValues[i])))  
00883                   throw runtime_error("[ encodeParams() ] XDR encoding error for parameter value");
00884                 break;
00885       case XDR_REAL32:
00886         if (!xdr_float(&xdrs, (float *)(paramValues[i])))
00887           throw runtime_error("[ encodeParams() ] XDR encoding error for parameter value");
00888         break;
00889       case XDR_REAL64:
00890         if (!xdr_double(&xdrs, (double *)(paramValues[i])))
00891           throw runtime_error("[ encodeParams() ] XDR encoding error for parameter value");
00892         break;
00893       default:
00894         throw runtime_error("[ encodeParams() ] Unknown type for XDR encoding");
00895       }
00896     }
00897    
00898     /* encode the timestamp if necessary */
00899     if (timestamp > 0) {
00900       if (!xdr_int(&xdrs, &timestamp))  
00901         throw runtime_error("[ encodeParams() ] XDR encoding error for the timestamp");
00902       dgramSize += xdrSize(XDR_INT32, NULL);
00903     }
00904   } catch (runtime_error& err) {
00905     xdr_destroy(&xdrs);
00906     throw err;
00907   }
00908 
00909   xdr_destroy(&xdrs);
00910 }
00911 
00912 #ifndef WIN32
00913 void *bkTask(void *param) { 
00914 #else
00915 DWORD WINAPI bkTask(void *param) {
00916 #endif
00917   struct stat st;
00918 #ifndef WIN32
00919   struct timespec delay;
00920 #else
00921   DWORD delay;
00922 #endif
00923   bool resourceChanged, haveChange;
00924   int nextOp = -1, i, ret;
00925   int generalInfoCount;
00926   time_t crtTime, timeRemained;
00927   time_t nextRecheck = 0, nextJobInfoSend = 0, nextSysInfoSend = 0;
00928   ApMon *apm = (ApMon *)param;
00929   char logmsg[200];
00930 
00931   logger(INFO, "[Starting background thread...]");
00932   apm -> bkThreadStarted = true;
00933 
00934   crtTime = time(NULL);
00935 
00936   pthread_mutex_lock(&(apm -> mutexBack));
00937   if (apm -> confCheck) {
00938     nextRecheck = crtTime + apm -> crtRecheckInterval;
00939     //sprintf(logmsg, "###1 crt %ld interv %ld recheck %ld ", crtTime,
00940     //   apm -> crtRecheckInterval, nextRecheck);
00941     //logger(FINE, logmsg);
00942     //fflush(stdout);
00943   }
00944   if (apm -> jobMonitoring)
00945     nextJobInfoSend = crtTime + apm -> jobMonitorInterval;
00946   if (apm -> sysMonitoring)
00947     nextSysInfoSend = crtTime + apm -> sysMonitorInterval;
00948   pthread_mutex_unlock(&(apm -> mutexBack));
00949   
00950   timeRemained = -1;
00951   generalInfoCount = 0;
00952 
00953   while (1) {
00954     pthread_mutex_lock(&apm -> mutexBack);
00955     if (apm -> stopBkThread) {
00956 //      printf("### stopBkThread \n");
00957       pthread_mutex_unlock(&apm -> mutexBack);
00958       break;
00959     }
00960     pthread_mutex_unlock(&apm -> mutexBack);
00961 
00962     //sprintf(logmsg, "### 2 recheck %ld sys %ld ", nextRecheck, 
00963     //    nextSysInfoSend);
00964     //logger(FINE, logmsg);
00965 
00966     /* determine the next operation that must be performed */
00967     if (nextRecheck > 0 && (nextJobInfoSend <= 0 || 
00968                             nextRecheck <= nextJobInfoSend)) {
00969       if (nextSysInfoSend <= 0 || nextRecheck <= nextSysInfoSend) {
00970         nextOp = RECHECK_CONF;
00971         timeRemained = nextRecheck - crtTime;
00972       } else {
00973         nextOp = SYS_INFO_SEND;
00974         timeRemained = nextSysInfoSend - crtTime;
00975       }
00976     } else {
00977       if (nextJobInfoSend > 0 && (nextSysInfoSend <= 0 || 
00978                                   nextJobInfoSend <= nextSysInfoSend)) {
00979         nextOp = JOB_INFO_SEND;
00980         timeRemained = nextJobInfoSend - crtTime;
00981       } else if (nextSysInfoSend > 0) {
00982         nextOp = SYS_INFO_SEND;
00983         timeRemained = nextSysInfoSend - crtTime;
00984       }
00985     }
00986 
00987     if (timeRemained == -1)
00988       timeRemained = RECHECK_INTERVAL;
00989 
00990 #ifndef WIN32
00991     /* the moment when the next operation should be performed */
00992     delay.tv_sec = crtTime + timeRemained;
00993     delay.tv_nsec = 0;
00994 #else
00995     delay = (/*crtTime +*/ timeRemained) * 1000;  // this is in millis
00996 #endif
00997 
00998     pthread_mutex_lock(&(apm -> mutexBack));
00999 
01000     pthread_mutex_lock(&(apm -> mutexCond));
01001     /* check for changes in the settings */
01002     haveChange = false;
01003     if (apm -> jobMonChanged || apm -> sysMonChanged || apm -> recheckChanged)
01004       haveChange = true;
01005     if (apm -> jobMonChanged) {
01006       if (apm -> jobMonitoring) 
01007         nextJobInfoSend = crtTime + apm -> jobMonitorInterval;
01008       else
01009         nextJobInfoSend = -1;
01010       apm -> jobMonChanged = false;
01011     }
01012     if (apm -> sysMonChanged) {
01013       if (apm -> sysMonitoring) 
01014         nextSysInfoSend = crtTime + apm -> sysMonitorInterval;
01015       else
01016         nextSysInfoSend = -1;
01017       apm -> sysMonChanged = false;
01018     }
01019     if (apm -> recheckChanged) {
01020       if (apm -> confCheck) {
01021         nextRecheck = crtTime + apm -> crtRecheckInterval;
01022       }
01023       else
01024         nextRecheck = -1;
01025       apm -> recheckChanged = false;
01026     }
01027     pthread_mutex_unlock(&(apm -> mutexBack));
01028 
01029     if (haveChange) {
01030       pthread_mutex_unlock(&(apm -> mutexCond));
01031       continue;
01032     }
01033     
01034     /* wait until the next operation should be performed or until
01035        a change in the settings occurs */
01036 #ifndef WIN32
01037     ret = pthread_cond_timedwait(&(apm -> confChangedCond), 
01038                                 &(apm -> mutexCond), &delay);
01039     pthread_mutex_unlock(&(apm -> mutexCond));
01040 #else
01041     pthread_mutex_unlock(&(apm -> mutexCond));
01042     ret = WaitForSingleObject(apm->confChangedCond, delay);
01043 #endif
01044     if (ret == ETIMEDOUT) {
01045 //      printf("### ret TIMEDOUT\n");
01046       /* now perform the operation */
01047       if (nextOp == JOB_INFO_SEND) {
01048         apm -> sendJobInfo();
01049         crtTime = time(NULL);
01050         nextJobInfoSend = crtTime + apm -> getJobMonitorInterval();
01051       }
01052       
01053       if (nextOp == SYS_INFO_SEND) {
01054         apm -> sendSysInfo();
01055         if (apm -> getGenMonitoring()) {
01056           if (generalInfoCount <= 1)
01057             apm -> sendGeneralInfo();
01058           generalInfoCount = (generalInfoCount + 1) % apm -> genMonitorIntervals;
01059         }
01060         crtTime = time(NULL);
01061         nextSysInfoSend = crtTime + apm -> getSysMonitorInterval();
01062       }
01063 
01064       if (nextOp == RECHECK_CONF) {
01065         //logger(FINE, "### recheck conf");
01066         resourceChanged = false;
01067         try {
01068           if (apm -> initType == FILE_INIT) {
01069             sprintf(logmsg, "Checking for modifications for file %s ", 
01070                     apm -> initSources[0]);
01071             logger(INFO, logmsg);
01072             stat(apm -> initSources[0], &st);
01073             if (st.st_mtime > apm -> lastModifFile) {
01074               sprintf(logmsg, "File %s modified ", apm -> initSources[0]);
01075               logger(INFO, logmsg);
01076               resourceChanged = true;
01077             }
01078           }
01079 
01080           // check the configuration URLs
01081           for (i = 0; i < apm -> confURLs.nConfURLs; i++) {
01082             sprintf(logmsg, "[Checking for modifications for URL %s ] ", 
01083                    apm -> confURLs.vURLs[i]);
01084             logger(INFO, logmsg);
01085             if (urlModified(apm -> confURLs.vURLs[i], apm -> confURLs.lastModifURLs[i])) {
01086               sprintf(logmsg, "URL %s modified ", apm -> confURLs.vURLs[i]);
01087               logger(INFO, logmsg);
01088               resourceChanged = true;
01089               break;
01090             }
01091           }
01092 
01093           if (resourceChanged) {
01094             logger(INFO, "Reloading configuration...");
01095             if (apm -> initType == FILE_INIT)
01096               apm -> initialize(apm -> initSources[0], false);
01097             else
01098               apm -> initialize(apm -> nInitSources, apm -> initSources, false);
01099           }
01100           apm -> setCrtRecheckInterval(apm -> getRecheckInterval());
01101         } catch (runtime_error &err) {
01102           logger(WARNING, err.what());
01103           logger(WARNING, "Increasing the time interval for reloading the configuration...");
01104           apm -> setCrtRecheckInterval(apm -> getRecheckInterval() * 5);
01105         }
01106         crtTime = time(NULL);
01107         nextRecheck = crtTime + apm -> getCrtRecheckInterval();
01108         //sleep(apm -> getCrtRecheckInterval());
01109       }
01110     }
01111  
01112   } // while
01113 
01114 #ifndef WIN32
01115   return NULL; // it doesn't matter what we return here
01116 #else
01117   return 0;
01118 #endif
01119 }
01120 
01121 void ApMon::setConfRecheck(bool confCheck, long interval) {
01122   char logmsg[100];
01123   if (confCheck) {
01124     sprintf(logmsg, "Enabling configuration reloading (interval %ld)", 
01125             interval);
01126     logger(INFO, logmsg);
01127   }
01128 
01129   pthread_mutex_lock(&mutexBack);
01130   if (initType == DIRECT_INIT) { // no need to reload the configuration
01131     logger(WARNING, "[ setConfRecheck() } No configuration file/URL to reload.");
01132     return;
01133   }
01134 
01135   this -> confCheck = confCheck;
01136   this -> recheckChanged = true;
01137   if (confCheck) {
01138     if (interval > 0) {
01139       this -> recheckInterval = interval;
01140       this -> crtRecheckInterval = interval;
01141     } else {
01142       this -> recheckInterval = RECHECK_INTERVAL;
01143       this -> crtRecheckInterval = RECHECK_INTERVAL;
01144     }
01145     setBackgroundThread(true);
01146   }
01147   else {
01148     if (jobMonitoring == false && sysMonitoring == false)
01149       setBackgroundThread(false);
01150   }
01151   pthread_mutex_unlock(&mutexBack);
01152     
01153 }
01154 
01155 void ApMon::setRecheckInterval(long val) {
01156   if (val > 0) {
01157     setConfRecheck(true, val);
01158   }
01159   else {
01160     setConfRecheck(false, val);
01161   }
01162 }
01163 
01164 void ApMon::setCrtRecheckInterval(long val) {
01165   pthread_mutex_lock(&mutexBack);
01166   crtRecheckInterval = val;
01167   pthread_mutex_unlock(&mutexBack);
01168 }
01169 
01170 void ApMon::setJobMonitoring(bool jobMonitoring, long interval) {
01171   char logmsg[100];
01172   if (jobMonitoring) {
01173     sprintf(logmsg, "Enabling job monitoring, time interval %ld s... ", interval);
01174     logger(INFO, logmsg);
01175   } else
01176     logger(INFO, "Disabling job monitoring...");
01177 
01178   pthread_mutex_lock(&mutexBack);
01179   this -> jobMonitoring = jobMonitoring;
01180   this -> jobMonChanged = true;
01181   if (jobMonitoring == true) {
01182     if (interval > 0)
01183       this -> jobMonitorInterval = interval;
01184     else
01185       this -> jobMonitorInterval = JOB_MONITOR_INTERVAL;
01186     setBackgroundThread(true);
01187   } else {
01188     // disable the background thread if it is not needed anymore
01189     if (this -> sysMonitoring == false && this -> confCheck == false)
01190       setBackgroundThread(false);
01191   }
01192   pthread_mutex_unlock(&mutexBack);
01193 }
01194 
01195 void ApMon::setSysMonitoring(bool sysMonitoring, long interval) {
01196   char logmsg[100];
01197   if (sysMonitoring) {
01198     sprintf(logmsg, "Enabling system monitoring, time interval %ld s... ", interval);
01199     logger(INFO, logmsg);
01200   } else
01201     logger(INFO, "Disabling system monitoring...");
01202 
01203   pthread_mutex_lock(&mutexBack);
01204   this -> sysMonitoring = sysMonitoring;
01205   this -> sysMonChanged = true;
01206   if (sysMonitoring == true) {
01207     if (interval > 0)
01208       this -> sysMonitorInterval = interval;
01209     else 
01210       this -> sysMonitorInterval = SYS_MONITOR_INTERVAL;
01211     setBackgroundThread(true);
01212   }  else {
01213     // disable the background thread if it is not needed anymore
01214     if (this -> jobMonitoring == false && this -> confCheck == false)
01215       setBackgroundThread(false);
01216   }
01217   pthread_mutex_unlock(&mutexBack);
01218 }
01219   
01220 void ApMon::setGenMonitoring(bool genMonitoring, int nIntervals) {
01221   char logmsg[100];
01222   sprintf(logmsg, "Setting general information monitoring to %s ", 
01223          boolStrings[(int)genMonitoring]);
01224   logger(INFO, logmsg);
01225 
01226   pthread_mutex_lock(&mutexBack);
01227   this -> genMonitoring = genMonitoring;
01228   this -> sysMonChanged = true;
01229   if (genMonitoring == true) {
01230     if (nIntervals > 0)
01231       this -> genMonitorIntervals = nIntervals;
01232     else 
01233       this -> genMonitorIntervals = GEN_MONITOR_INTERVALS; 
01234     
01235     if (this -> sysMonitoring == false) {
01236       pthread_mutex_unlock(&mutexBack);
01237       setSysMonitoring(true);
01238       pthread_mutex_lock(&mutexBack);
01239     }
01240   } // TODO: else check if we can stop the background thread (if no
01241   // system parameters are enabled for monitoring)
01242   pthread_mutex_unlock(&mutexBack);
01243 }
01244 
01245 void ApMon::setBackgroundThread(bool val) {
01246   // mutexBack is locked
01247   if (val == true) {
01248     if (!haveBkThread) {
01249 #ifndef WIN32
01250       pthread_create(&bkThread, NULL, &bkTask, this);
01251 #else
01252       DWORD dummy;
01253       bkThread = CreateThread(NULL, 65536, &bkTask, this, 0, &dummy);
01254 #endif
01255       haveBkThread = true;
01256     } else {
01257       pthread_mutex_lock(&mutexCond);
01258       pthread_cond_signal(&confChangedCond);
01259       pthread_mutex_unlock(&mutexCond);
01260     }
01261   }
01262   if (val == false) {
01263     //if (bkThreadStarted) {
01264     if (haveBkThread) {
01265       stopBkThread = true;
01266       pthread_mutex_unlock(&mutexBack);
01267 #ifndef WIN32
01268       pthread_mutex_lock(&mutexCond);
01269 #endif
01270       pthread_cond_signal(&confChangedCond);
01271       logger(INFO, "[Stopping the background thread...]");
01272 #ifndef WIN32
01273       pthread_mutex_unlock(&mutexCond);
01274       pthread_join(bkThread, NULL);
01275 #else
01276       WaitForSingleObject(bkThread, INFINITE);
01277 #endif
01278       pthread_mutex_lock(&mutexBack);
01279 //      logger(INFO, "bk thread stopped!");
01280       haveBkThread = false;
01281       bkThreadStarted = false;
01282       stopBkThread = false;
01283     }
01284   }
01285 }
01286 
01287 void ApMon::addJobToMonitor(long pid, char *workdir, char *clusterName,
01288                             char *nodeName) throw(runtime_error) {
01289   if (nMonJobs >= MAX_MONITORED_JOBS)
01290     throw runtime_error("[ addJobToMonitor() ] Maximum number of jobs that can be monitored exceeded.");
01291   MonitoredJob job;
01292   job.pid = pid;
01293   if (workdir == NULL) 
01294     strcpy(job.workdir, "");
01295   else
01296     strcpy(job.workdir, workdir);
01297 
01298  if (clusterName == NULL || strlen(clusterName) == 0) 
01299     strcpy(job.clusterName, "ApMon_JobMon");
01300   else
01301     strcpy(job.clusterName, clusterName);
01302  if (nodeName == NULL || strlen(nodeName) == 0) 
01303     strcpy(job.nodeName, this -> myIP);
01304   else
01305     strcpy(job.nodeName, nodeName);
01306 
01307   monJobs[nMonJobs++] = job;
01308 }
01309 
01310 void ApMon::removeJobToMonitor(long pid) throw(runtime_error) {
01311   int i, j;
01312   char msg[100];
01313 
01314   if (nMonJobs <= 0)
01315     throw runtime_error("[ removeJobToMonitor() ] There are no monitored jobs.");
01316   
01317   for (i = 0; i < nMonJobs; i++) { 
01318     if (monJobs[i].pid == pid) {
01319       /* found the job, now remove it */
01320       for (j = i; j < nMonJobs - 1; j++)
01321         monJobs[j] = monJobs[j + 1];
01322       nMonJobs--;
01323       return;
01324     }
01325   }
01326 
01327   /* the job was not found */
01328   sprintf(msg, "removeJobToMonitor(): Job %ld not found.", pid);
01329   throw runtime_error(msg);
01330 }
01331 
01332 void ApMon::setSysMonClusterNode(char *clusterName, char *nodeName) {
01333   free (sysMonCluster); free(sysMonNode);
01334   sysMonCluster = strdup(clusterName);
01335   sysMonNode = strdup(nodeName);
01336 }
01337 
01338 void ApMon::setLogLevel(char *newLevel_s) {
01339   int newLevel;
01340   const char *levels[5] = {"FATAL", "WARNING", "INFO", "FINE", "DEBUG"};
01341   char logmsg[100];
01342 
01343   for (newLevel = 0; newLevel < 5; newLevel++)
01344     if (strcmp(newLevel_s, levels[newLevel]) == 0)
01345       break;
01346 
01347   if (newLevel >= 5) {
01348     sprintf(logmsg, "[ setLogLevel() ] Invalid level value: %s", newLevel_s);
01349     logger(WARNING, logmsg); 
01350   }
01351   else
01352     logger(0, NULL, newLevel);
01353 }
01354 
01355         
01356 void ApMon::setMaxMsgRate(int maxRate) {
01357   if (maxRate > 0)
01358     this -> maxMsgRate  = maxRate;
01359 }
01360 
01361 void ApMon::initSocket() throw(runtime_error) {
01362   int optval1 = 1;
01363   struct timeval optval2; 
01364   int ret1, ret2, ret3;
01365 
01366   sockfd = socket(AF_INET, SOCK_DGRAM, 0);
01367   if (sockfd < 0) 
01368     throw runtime_error("[ initSocket() ] Error creating socket");
01369   ret1 = setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, (char *) &optval1, 
01370                     sizeof(optval1));
01371     
01372   /* set connection timeout */
01373   optval2.tv_sec = 20;
01374   optval2.tv_usec = 0;
01375   ret2 = setsockopt(sockfd, SOL_SOCKET, SO_SNDTIMEO, (char *) &optval2, 
01376                     sizeof(optval2));
01377   ret3 = setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, (char *) &optval2, 
01378                     sizeof(optval2));
01379   if (ret1 != 0 || ret2 != 0 || ret3 != 0)
01380     throw runtime_error("[ initSocket() ] Error initializing socket.");
01381 }
01382 
01383 
01384 void ApMon::parseConf(FILE *fp, int *nDestinations, char **destAddresses, 
01385                      int *destPorts, char **destPasswds)
01386   throw(runtime_error) {
01387   int i, ch;
01388   char *line = (char *)malloc ((MAX_STRING_LEN1) * sizeof(char));
01389   char *tmp = NULL; 
01390   char *loglevel_s;
01391 //  char sbuf[30];
01392 //  char *pbuf = sbuf;
01393 
01394   /* parse the input file */
01395   while(fgets(line, MAX_STRING_LEN, fp) != NULL) {
01396 
01397     if (tmp != NULL) {
01398       free(tmp);
01399       tmp = NULL;
01400     }
01401 
01402     line[MAX_STRING_LEN - 1] = 0;
01403     /* check if the line was too long */
01404     ch = fgetc(fp); // see if we are at the end of the file
01405     ungetc(ch, fp);
01406     if (line[strlen(line) - 1] != 10 && ch != EOF) {
01407       /* if the line doesn't end with a \n and we are not at the end
01408          of file, the line from the file was longer than MAX_STRING_LEN */
01409       fclose(fp);
01410       throw runtime_error ("[ parseConf() ] Maximum line length exceeded in the conf file");
01411     }
01412 
01413     tmp = trimString(line);
01414       
01415     /* skip the blank lines and the comment lines */
01416     if (strlen(tmp) == 0 || strchr(tmp, '#') == tmp)
01417       continue;
01418     
01419     if (strstr(tmp, "xApMon_loglevel") == tmp) {
01420       char *tmp2 = tmp;
01421       strtok/*_r*/(tmp2, "= ");//, &pbuf);
01422       loglevel_s = strtok/*_r*/(NULL, "= ");//, &pbuf);
01423       setLogLevel(loglevel_s);
01424       continue;
01425     }
01426 
01427     if (strstr(tmp, "xApMon_") == tmp) {
01428       parseXApMonLine(tmp);
01429       continue;
01430     }
01431     
01432     if (*nDestinations >= MAX_N_DESTINATIONS) {
01433       free(line); free(tmp); 
01434       for (i = 0; i < *nDestinations; i++) {
01435         free(destAddresses[i]);
01436         free(destPasswds[i]);
01437       }
01438       fclose(fp);
01439       throw runtime_error("[ parseConf() ] Maximum number of destinations exceeded.");
01440     }
01441 
01442     addToDestinations(tmp, nDestinations, destAddresses, destPorts, 
01443                       destPasswds);
01444   }
01445 
01446   if (tmp != NULL)
01447     free(tmp);
01448   free(line);
01449 }
01450 
01451 bool ApMon::shouldSend() {
01452 
01453   long now = time(NULL);
01454   bool doSend;
01455   char msg[200];
01456 
01457   //printf("now %ld crtTime %ld\n", now, crtTime);
01458 
01459   if (now != crtTime){
01461     prvSent = hWeight * prvSent + (1.0 - hWeight) * crtSent / (now - crtTime);
01462     prvTime = crtTime;
01463     sprintf(msg, "previously sent: %ld dropped: %ld", crtSent, crtDrop);
01464     logger(DEBUG, msg);
01466     crtTime = now;
01467     crtSent = 0;
01468     crtDrop = 0;
01469     //printf("\n");
01470   }
01471                 
01473   int valSent = (int)(prvSent * hWeight + crtSent * (1.0 - hWeight));
01474 
01475   doSend = true;
01477   int level = this -> maxMsgRate - this -> maxMsgRate / 10;
01478 
01479  
01480   if (valSent > (this -> maxMsgRate - level)) {
01481     //int max10 = this -> maxMsgRate / 10;
01482     int rnd  = rand() % (this -> maxMsgRate / 10);
01483     doSend = (rnd <  (this -> maxMsgRate - valSent));
01484   }
01486   if (doSend) {
01487     crtSent++;
01488     //printf("#");
01489   } else {
01490     crtDrop++;
01491     //printf(".");
01492   }
01493   
01494   return doSend;
01495 }
01496         
01497  
01498 
01499 
01500   

Generated at Mon May 3 12:14:37 2010 for Gaudi Framework, version v21r9 by Doxygen version 1.5.6 written by Dimitri van Heesch, © 1997-2004