![]() |
|
|
Generated: 8 Jan 2009 |
00001 00006 /* 00007 * ApMon - Application Monitoring Tool 00008 * Version: 2.2.0 00009 * 00010 * Copyright (C) 2006 California Institute of Technology 00011 * 00012 * Permission is hereby granted, free of charge, to use, copy and modify 00013 * this software and its documentation (the "Software") for any 00014 * purpose, provided that existing copyright notices are retained in 00015 * all copies and that this notice is included verbatim in any distributions 00016 * or substantial portions of the Software. 00017 * This software is a part of the MonALISA framework (http://monalisa.cacr.caltech.edu). 00018 * Users of the Software are asked to feed back problems, benefits, 00019 * and/or suggestions about the software to the MonALISA Development Team 00020 * (developers@monalisa.cern.ch). Support for this software - fixing of bugs, 00021 * incorporation of new features - is done on a best effort basis. All bug 00022 * fixes and enhancements will be made available under the same terms and 00023 * conditions as the original software, 00024 00025 * IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR 00026 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT 00027 * OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY DERIVATIVES THEREOF, 00028 * EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00029 00030 * THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES, 00031 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY, 00032 * FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT. THIS SOFTWARE IS 00033 * PROVIDED ON AN "AS IS" BASIS, AND THE AUTHORS AND DISTRIBUTORS HAVE NO 00034 * OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR 00035 * MODIFICATIONS. 00036 */ 00037 00038 #include "ApMon.h" 00039 #include "utils.h" 00040 #include "proc_utils.h" 00041 #include "monitor_utils.h" 00042 00043 #include <math.h> 00044 00045 using namespace apmon_utils; 00046 using namespace apmon_mon_utils; 00047 00048 #define RECHECK_CONF 0 00049 #define SYS_INFO_SEND 1 00050 #define JOB_INFO_SEND 2 00051 00052 char boolStrings[][10] = {"false", "true"}; 00053 00054 //========= Implementations of the functions =================== 00055 00056 ApMon::ApMon(char *initsource) 00057 throw(runtime_error) { 00058 00059 if (initsource == NULL) 00060 throw runtime_error("[ ApMon() ] No conf file/URL provided"); 00061 00062 if (strstr(initsource, "http://") == initsource) { 00063 char *destList[1]; 00064 destList[0] = initsource; 00065 constructFromList(1, destList); 00066 } else { 00067 nInitSources = 1; 00068 initType = FILE_INIT; 00069 initSources = (char **)malloc(nInitSources * sizeof(char *)); 00070 if (initSources == NULL) 00071 throw runtime_error("[ ApMon() ] Error allocating memory."); 00072 00073 initSources[0] = strdup(initsource); 00074 initMonitoring(); 00075 00076 initialize(initsource, true); 00077 } 00078 } 00079 00080 void ApMon::initialize(char *filename, bool firstTime) 00081 throw(runtime_error) { 00082 00083 char *destAddresses[MAX_N_DESTINATIONS]; 00084 int destPorts[MAX_N_DESTINATIONS]; 00085 char *destPasswds[MAX_N_DESTINATIONS]; 00086 int nDest = 0, i; 00087 ConfURLs confURLs; 00088 00089 confURLs.nConfURLs = 0; 00090 00091 try { 00092 loadFile(filename, &nDest, destAddresses, destPorts, destPasswds); 00093 00094 arrayInit(nDest, destAddresses, destPorts, destPasswds, firstTime); 00095 } catch (runtime_error& err) { 00096 if (firstTime) 00097 throw err; 00098 else { 00099 logger(WARNING, err.what()); 00100 logger(WARNING, "Error reloading the configuration. Keeping the previous one."); 00101 return; 00102 } 00103 } 00104 00105 for (i = 0; i < nDest; i++) { 00106 free(destAddresses[i]); 00107 free(destPasswds[i]); 00108 } 00109 00110 pthread_mutex_lock(&mutex); 00111 this -> confURLs = confURLs; 00112 pthread_mutex_unlock(&mutex); 00113 } 00114 00115 void ApMon::loadFile(char *filename, int *nDestinations, char **destAddresses, 00116 int *destPorts, char **destPasswds) 00117 throw(runtime_error) { 00118 FILE *f; 00119 char msg[100]; 00120 00121 /* initializations for the destination addresses */ 00122 f = fopen(filename, "rt"); 00123 if (f == NULL) { 00124 throw runtime_error("[ loadFile() ] Error opening configuration file"); 00125 } 00126 00127 sprintf(msg, "Loading file %s ...", filename); 00128 logger(INFO, msg); 00129 00130 lastModifFile = time(NULL); 00131 00132 parseConf(f, nDestinations, destAddresses, destPorts, 00133 destPasswds); 00134 fclose(f); 00135 } 00136 00137 ApMon::ApMon(int nDestinations, char **destinationsList) throw(runtime_error){ 00138 constructFromList(nDestinations, destinationsList); 00139 } 00140 00141 void ApMon::constructFromList(int nDestinations, char **destinationsList) 00142 throw(runtime_error) { 00143 int i; 00144 00145 if (destinationsList == NULL) 00146 throw runtime_error("[ constructFromList() ] Null destination list"); 00147 00148 #ifdef __APPLE__ 00149 initType = OLIST_INIT; 00150 #else 00151 initType = LIST_INIT; 00152 #endif 00153 00154 initMonitoring(); 00155 00156 /* save the initialization list */ 00157 nInitSources = nDestinations; 00158 initSources = (char **)malloc(nInitSources * sizeof(char*)); 00159 if (initSources == NULL) 00160 throw runtime_error("[ ApMon() ] Error allocating memory."); 00161 00162 for (i = 0; i < nInitSources; i++) 00163 initSources[i] = strdup(destinationsList[i]); 00164 00165 initialize(nDestinations, destinationsList, true); 00166 } 00167 00168 void ApMon::initialize(int nDestinations, char **destinationsList, 00169 bool firstTime) throw(runtime_error) { 00170 char *destAddresses[MAX_N_DESTINATIONS]; 00171 int destPorts[MAX_N_DESTINATIONS]; 00172 char *destPasswds[MAX_N_DESTINATIONS]; 00173 char errmsg[200]; 00174 int i; 00175 int cnt = 0; 00176 ConfURLs confURLs; 00177 00178 logger(INFO, "Initializing destination addresses & ports:"); 00179 00180 if (nDestinations > MAX_N_DESTINATIONS) 00181 throw runtime_error("[ initialize() ] Maximum number of destinations exceeded"); 00182 00183 00184 confURLs.nConfURLs = 0; 00185 00186 for (i = 0; i < nDestinations; i++) { 00187 try { 00188 if (strstr(destinationsList[i], "http") == destinationsList[i]) 00189 getDestFromWeb(destinationsList[i], &cnt, 00190 destAddresses, destPorts, destPasswds, confURLs); 00191 else 00192 addToDestinations(destinationsList[i], &cnt, 00193 destAddresses, destPorts, destPasswds); 00194 00195 } catch (runtime_error &e) { 00196 sprintf(errmsg, "[ initialize() ] Error while loading the configuration: %s", e.what()); 00197 logger(WARNING, errmsg); 00198 if (!firstTime) { 00199 for (i = 0; i < cnt; i++) { 00200 free(destAddresses[i]); 00201 free(destPasswds[i]); 00202 } 00203 logger(WARNING, "Configuration not reloaded successfully. Keeping the previous one."); 00204 return; 00205 } 00206 } // catch 00207 } // for 00208 00209 try { 00210 arrayInit(cnt, destAddresses, destPorts, destPasswds, firstTime); 00211 } catch (runtime_error& err) { 00212 if (firstTime) 00213 throw err; 00214 else { 00215 logger(WARNING, "Error reloading the configuration. Keeping the previous one."); 00216 return; 00217 } 00218 } 00219 00220 for (i = 0; i < cnt; i++) { 00221 free(destAddresses[i]); 00222 free(destPasswds[i]); 00223 } 00224 00225 pthread_mutex_lock(&mutex); 00226 this -> confURLs = confURLs; 00227 pthread_mutex_unlock(&mutex); 00228 } 00229 00230 void ApMon::addToDestinations(char *line, int *nDestinations, 00231 char *destAddresses[], int destPorts[], char *destPasswds[]) { 00232 char *addr, *port, *passwd; 00233 const char *sep1 = " \t"; 00234 const char *sep2 = ":"; 00235 00236 char *tmp = strdup(line); 00237 char *firstToken; 00238 // char buf[MAX_STRING_LEN]; 00239 // char *pbuf = buf; 00240 00241 /* the address & port are separated from the password with spaces */ 00242 firstToken = strtok/*_r*/(tmp, sep1);//, &pbuf); 00243 passwd = strtok/*_r*/(NULL, sep1);//, &pbuf); 00244 00245 /* the address and the port are separated with ":" */ 00246 addr = strtok/*_r*/(firstToken, sep2);//, &pbuf); 00247 port = strtok/*_r*/(NULL, sep2);//, &pbuf); 00248 destAddresses[*nDestinations] = strdup(addr); 00249 if (port == NULL) 00250 destPorts[*nDestinations] = DEFAULT_PORT; 00251 else 00252 destPorts[*nDestinations] = atoi(port); 00253 if (passwd == NULL) 00254 destPasswds[*nDestinations] = strdup(""); 00255 else 00256 destPasswds[*nDestinations] = strdup(passwd); 00257 (*nDestinations)++; 00258 00259 free(tmp); 00260 } 00261 00262 void ApMon::getDestFromWeb(char *url, int *nDestinations, 00263 char *destAddresses[], int destPorts[], char *destPasswds[], 00264 ConfURLs& confURLs) throw(runtime_error) { 00265 char temp_filename[300]; 00266 FILE *tmp_file; 00267 char *line, *ret, *tmp = NULL; 00268 bool modifLineFound; 00269 long mypid = getpid(); 00270 char str1[20], str2[20]; 00271 int totalSize, headerSize, contentSize; 00272 00273 #ifndef WIN32 00274 sprintf(temp_filename, "/tmp/apmon_webconf%ld", mypid); 00275 #else 00276 char *tmpp = getenv("TEMP"); 00277 if(tmpp == NULL) 00278 tmpp = getenv("TMP"); 00279 if(tmpp == NULL) 00280 tmpp = "c:"; 00281 sprintf(temp_filename, "%s\\apmon_webconf%ld", tmpp, mypid); 00282 #endif 00283 /* get the configuration file from web and put it in a temporary file */ 00284 totalSize = httpRequest(url, (char*)"GET", temp_filename); 00285 00286 /* read the configuration from the temporary file */ 00287 tmp_file = fopen(temp_filename, "rt"); 00288 if (tmp_file == NULL) 00289 throw runtime_error("[ getDestFromWeb() ] Error getting the configuration web page"); 00290 00291 line = (char*)malloc((MAX_STRING_LEN + 1) * sizeof(char)); 00292 00293 //check the HTTP header to see if we got the page correctly 00294 fgets(line, MAX_STRING_LEN, tmp_file); 00295 sscanf(line, "%s %s", str1, str2); 00296 if (atoi(str2) != 200) { 00297 free(line); 00298 fclose(tmp_file); 00299 throw runtime_error("[ getDestFromWeb() ] The web page does not exist on the server"); 00300 } 00301 00302 confURLs.vURLs[confURLs.nConfURLs] = strdup(url); 00303 00304 // check the header for the "Last-Modified" and "Content-Length" lines 00305 modifLineFound = false; 00306 contentSize = 0; 00307 do { 00308 if (tmp != NULL) 00309 free(tmp); 00310 ret = fgets(line, MAX_STRING_LEN, tmp_file); 00311 if (ret == NULL) { 00312 free(line); fclose(tmp_file); 00313 throw runtime_error("[ getDestFromWeb() ] Error getting the configuration web page"); 00314 } 00315 if (strstr(line, "Last-Modified") == line) { 00316 modifLineFound = true; 00317 confURLs.lastModifURLs[confURLs.nConfURLs] = strdup(line); 00318 } 00319 00320 if (strstr(line, "Content-Length") == line) { 00321 sscanf(line, "%s %d", str1, &contentSize); 00322 } 00323 00324 tmp = trimString(line); 00325 } while (strlen(tmp) != 0); 00326 free(tmp); free(line); 00327 00328 if (!modifLineFound) 00329 confURLs.lastModifURLs[confURLs.nConfURLs] = strdup(""); 00330 confURLs.nConfURLs++; 00331 00332 headerSize = ftell(tmp_file); 00333 if (totalSize - headerSize < contentSize) { 00334 fclose(tmp_file); 00335 throw runtime_error("[ getDestFromWeb() ] Web page received incompletely"); 00336 } 00337 00338 try { 00339 parseConf(tmp_file, nDestinations, destAddresses, destPorts, 00340 destPasswds); 00341 } catch (...) { 00342 fclose(tmp_file); 00343 unlink(temp_filename); 00344 throw; 00345 } 00346 00347 fclose(tmp_file); 00348 unlink(temp_filename); 00349 } 00350 00351 00352 ApMon::ApMon(int nDestinations, char **destAddresses, int *destPorts, 00353 char **destPasswds) 00354 throw(runtime_error) { 00355 initMonitoring(); 00356 00357 arrayInit(nDestinations, destAddresses, destPorts, destPasswds); 00358 } 00359 00360 void ApMon::arrayInit(int nDestinations, char **destAddresses, 00361 int *destPorts, char **destPasswds) 00362 throw(runtime_error) { 00363 arrayInit(nDestinations, destAddresses, destPorts, destPasswds, true); 00364 } 00365 00366 00367 void ApMon::arrayInit(int nDestinations, char **destAddresses, int *destPorts, 00368 char **destPasswds, bool firstTime) 00369 throw(runtime_error) { 00370 int i, j; 00371 int ret; 00372 char *ipAddr, logmsg[100]; 00373 bool found, havePublicIP; 00374 int tmpNDestinations; 00375 char **tmpAddresses, **tmpPasswds; 00376 int *tmpPorts; 00377 00378 if (destAddresses == NULL || destPorts == NULL || nDestinations == 0) 00379 throw runtime_error("[ arrayInit() ] Destination addresses or ports not provided"); 00380 00381 /* initializations that we have to do only once */ 00382 if (firstTime) { 00383 //this -> appPID = getpid(); 00384 00385 this -> nMonJobs = 0; 00386 this -> monJobs = (MonitoredJob *)malloc(MAX_MONITORED_JOBS * 00387 sizeof(MonitoredJob)); 00388 00389 try { 00390 this -> numCPUs = ProcUtils::getNumCPUs(); 00391 } catch (procutils_error &err) { 00392 logger(WARNING, err.what()); 00393 this -> numCPUs = 0; 00394 } 00395 00396 /* get the names of the network interfaces */ 00397 this -> nInterfaces = 0; 00398 try { 00399 ProcUtils::getNetworkInterfaces(this -> nInterfaces, 00400 this -> interfaceNames); 00401 } catch (procutils_error &err) { 00402 logger(WARNING, err.what()); 00403 this -> nInterfaces = 0; 00404 } 00405 00406 /* get the hostname of the machine */ 00407 ret = gethostname(this -> myHostname, MAX_STRING_LEN -1); 00408 if (ret < 0) { 00409 logger(WARNING, "Could not obtain the local hostname"); 00410 strcpy(myHostname, "unknown"); 00411 } else 00412 myHostname[MAX_STRING_LEN - 1] = 0; 00413 00414 /* get the IPs of the machine */ 00415 this -> numIPs = 0; havePublicIP = false; 00416 strcpy(this -> myIP, "unknown"); 00417 00418 /* default values for cluster name and node name */ 00419 this -> clusterName = strdup("ApMon_UserSend"); 00420 this -> nodeName = strdup(myHostname); 00421 00422 #ifndef WIN32 00423 int sockd = socket(PF_INET, SOCK_STREAM, 0); 00424 if(sockd < 0){ 00425 logger(WARNING, "Could not obtain local IP addresses"); 00426 } else { 00427 for (i = 0; i < this -> nInterfaces; i++) { 00428 struct ifreq ifr; 00429 memset(&ifr, 0, sizeof(ifr)); 00430 strncpy(ifr.ifr_name, this -> interfaceNames[i], sizeof(ifr.ifr_name) - 1); 00431 if(ioctl(sockd, SIOCGIFADDR, &ifr)<0) 00432 continue; //???????? 00433 char ip[4], tmp_s[20]; 00434 #ifdef __APPLE__ 00435 memcpy(ip, ifr.ifr_addr.sa_data+2, 4); 00436 #else 00437 memcpy(ip, ifr.ifr_hwaddr.sa_data+2, 4); 00438 #endif 00439 strcpy(tmp_s, inet_ntoa(*(struct in_addr *)ip)); 00440 sprintf(logmsg, "Found local IP address: %s", tmp_s); 00441 logger(FINE, logmsg); 00442 if (strcmp(tmp_s, "127.0.0.1") != 0 && !havePublicIP) { 00443 strcpy(this -> myIP, tmp_s); 00444 if (!isPrivateAddress(tmp_s)) 00445 havePublicIP = true; 00446 } 00447 strcpy(this -> allMyIPs[this -> numIPs], tmp_s); 00448 this -> numIPs++; 00449 } 00450 } 00451 #else 00452 struct hostent *hptr; 00453 if ((hptr = gethostbyname(myHostname))!= NULL) { 00454 i = 0; 00455 struct in_addr addr; 00456 while ((hptr -> h_addr_list)[i] != NULL) { 00457 memcpy(&(addr.s_addr), (hptr -> h_addr_list)[i], 4); 00458 ipAddr = inet_ntoa(addr); 00459 if (strcmp(ipAddr, "127.0.0.1") != 0) { 00460 strcpy(this -> myIP, ipAddr); 00461 if (!isPrivateAddress(ipAddr)) 00462 break; 00463 } 00464 i++; 00465 } 00466 } 00467 #endif 00468 00469 this -> sysMonCluster = strdup("ApMon_SysMon"); 00470 this -> sysMonNode = strdup(this -> myIP); 00471 00472 this -> prvTime = 0; 00473 this -> prvSent = 0; 00474 this -> prvDrop = 0; 00475 this -> crtTime = 0; 00476 this -> crtSent = 0; 00477 this -> crtDrop = 0; 00478 this -> hWeight = exp(-5.0/60.0); 00479 00480 srand(time(NULL)); 00481 00482 /* initialize buffer for XDR encoding */ 00483 this -> buf = (char *)malloc(MAX_DGRAM_SIZE); 00484 if (this -> buf == NULL) 00485 throw runtime_error("[ arrayInit() ] Error allocating memory"); 00486 this -> dgramSize = 0; 00487 00488 /*create the socket & set options*/ 00489 initSocket(); 00490 00491 /* initialize the sender ID and the sequence number */ 00492 instance_id = rand(); 00493 seq_nr = 0; 00494 } 00495 00496 /* put the destination addresses, ports & passwords in some temporary 00497 buffers (because we don't want to lock mutex while making DNS 00498 requests) 00499 */ 00500 tmpNDestinations = 0; 00501 tmpPorts = (int *)malloc(nDestinations * sizeof(int)); 00502 tmpAddresses = (char **)malloc(nDestinations * sizeof(char *)); 00503 tmpPasswds = (char **)malloc(nDestinations * sizeof(char *)); 00504 if (tmpPorts == NULL || tmpAddresses == NULL || 00505 tmpPasswds == NULL) 00506 throw runtime_error("[ arrayInit() ] Error allocating memory"); 00507 00508 for (i = 0; i < nDestinations; i++) { 00509 try { 00510 ipAddr = findIP(destAddresses[i]); 00511 } catch (runtime_error &err) { 00512 logger(FATAL, err.what()); 00513 continue; 00514 } 00515 00516 /* make sure this address is not already in the list */ 00517 found = false; 00518 for (j = 0; j < tmpNDestinations; j++) { 00519 if (!strcmp(ipAddr, tmpAddresses[j])) { 00520 found = true; 00521 break; 00522 } 00523 } 00524 00525 /* add the address to the list */ 00526 if (!found) { 00527 tmpAddresses[tmpNDestinations] = ipAddr; 00528 tmpPorts[tmpNDestinations] = destPorts[i]; 00529 tmpPasswds[tmpNDestinations] = strdup(destPasswds[i]); 00530 00531 sprintf(logmsg, "Adding destination host: %s - port %d", 00532 tmpAddresses[tmpNDestinations], tmpPorts[tmpNDestinations]); 00533 logger(INFO, logmsg); 00534 00535 tmpNDestinations++; 00536 } 00537 } 00538 00539 if (tmpNDestinations == 0) { 00540 freeMat(tmpAddresses, tmpNDestinations); 00541 freeMat(tmpPasswds, tmpNDestinations); 00542 throw runtime_error("[ arrayInit() ] There is no destination host specified correctly!"); 00543 } 00544 00545 pthread_mutex_lock(&mutex); 00546 if (!firstTime) 00547 freeConf(); 00548 this -> nDestinations = tmpNDestinations; 00549 this -> destAddresses = tmpAddresses; 00550 this -> destPorts = tmpPorts; 00551 this -> destPasswds = tmpPasswds; 00552 pthread_mutex_unlock(&mutex); 00553 00554 /* start job/system monitoring according to the settings previously read 00555 from the configuration file */ 00556 setJobMonitoring(jobMonitoring, jobMonitorInterval); 00557 setSysMonitoring(sysMonitoring, sysMonitorInterval); 00558 setGenMonitoring(genMonitoring, genMonitorIntervals); 00559 setConfRecheck(confCheck, recheckInterval); 00560 } 00561 00562 00563 ApMon::~ApMon() { 00564 int i; 00565 00566 if (bkThreadStarted) { 00567 if (getJobMonitoring()) { 00568 /* send a datagram with job monitoring information which covers 00569 the last time interval */ 00570 sendJobInfo(); 00571 } 00572 } 00573 00574 pthread_mutex_lock(&mutexBack); 00575 setBackgroundThread(false); 00576 pthread_mutex_unlock(&mutexBack); 00577 00578 pthread_mutex_destroy(&mutex); 00579 pthread_mutex_destroy(&mutexBack); 00580 pthread_mutex_destroy(&mutexCond); 00581 pthread_cond_destroy(&confChangedCond); 00582 00583 free(clusterName); 00584 free(nodeName); 00585 free(sysMonCluster); free(sysMonNode); 00586 00587 freeConf(); 00588 00589 free(monJobs); 00590 for (i = 0; i < nInitSources; i++) { 00591 free(initSources[i]); 00592 } 00593 free(initSources); 00594 00595 free(buf); 00596 #ifndef WIN32 00597 close(sockfd); 00598 #else 00599 closesocket(sockfd); 00600 WSACleanup(); 00601 #endif 00602 } 00603 00604 void ApMon::freeConf() { 00605 int i; 00606 freeMat(destAddresses, nDestinations); 00607 freeMat(destPasswds, nDestinations); 00608 free(destPorts); 00609 00610 for (i = 0; i < confURLs.nConfURLs; i++) { 00611 free(confURLs.vURLs[i]); 00612 free(confURLs.lastModifURLs[i]); 00613 } 00614 } 00615 00616 int ApMon::sendParameters(char *clusterName, char *nodeName, 00617 int nParams, char **paramNames, int *valueTypes, 00618 char **paramValues) throw(runtime_error){ 00619 return sendTimedParameters(clusterName, nodeName, nParams, 00620 paramNames, valueTypes, paramValues, -1); 00621 } 00622 00623 int ApMon::sendTimedParameters(char *clusterName, char *nodeName, 00624 int nParams, char **paramNames, int *valueTypes, 00625 char **paramValues, int timestamp) throw(runtime_error){ 00626 int i; 00627 int ret, ret1, ret2; 00628 char msg[100], buf2[MAX_HEADER_LENGTH+4], newBuf[MAX_DGRAM_SIZE]; 00629 #ifdef WIN32 00630 char crtAddr[20]; 00631 #endif 00632 char *headerTmp; 00633 char header[MAX_HEADER_LENGTH] = "v:"; 00634 strcat(header, APMON_VERSION); 00635 strcat(header, "_cpp"); // to indicate this is the C++ version 00636 strcat(header, "p:"); 00637 00638 pthread_mutex_lock(&mutex); 00639 00640 if(!shouldSend()) { 00641 pthread_mutex_unlock(&mutex); 00642 return RET_NOT_SENT; 00643 } 00644 00645 if (clusterName != NULL) { // don't keep the cached values for cluster name 00646 // and node name 00647 free(this -> clusterName); 00648 this -> clusterName = strdup(clusterName); 00649 00650 if (nodeName != NULL) { /* the user provided a name */ 00651 free(this -> nodeName); 00652 this -> nodeName = strdup(nodeName); 00653 } 00654 else { /* set the node name to the node's IP */ 00655 free(this -> nodeName); 00656 this -> nodeName = strdup(this -> myHostname); 00657 } // else 00658 } // if 00659 00660 if (this -> clusterName == NULL || this -> nodeName == NULL) { 00661 pthread_mutex_unlock(&mutex); 00662 throw runtime_error("[ sendTimedParameters() ] Null cluster name or node name"); 00663 } 00664 00665 //sortParams(nParams, paramNames, valueTypes, paramValues); 00666 00667 /* try to encode the parameters */ 00668 try { 00669 encodeParams(nParams, paramNames, valueTypes, paramValues, timestamp); 00670 } catch (runtime_error& err) { 00671 pthread_mutex_unlock(&mutex); 00672 throw err; 00673 } 00674 00675 headerTmp = (char *)malloc(MAX_HEADER_LENGTH * sizeof(char)); 00676 /* for each destination */ 00677 for (i = 0; i < nDestinations; i++) { 00678 XDR xdrs; 00679 struct sockaddr_in destAddr; 00680 00681 /* initialize the destination address */ 00682 memset(&destAddr, 0, sizeof(destAddr)); 00683 destAddr.sin_family = AF_INET; 00684 destAddr.sin_port = htons(destPorts[i]); 00685 #ifndef WIN32 00686 inet_pton(AF_INET, destAddresses[i], &destAddr.sin_addr); 00687 #else 00688 int dummy = sizeof(destAddr); 00689 sprintf(crtAddr, "%s:%d", destAddresses[i], destPorts[i]); 00690 ret = WSAStringToAddress(crtAddr, AF_INET, NULL, (struct sockaddr *) &destAddr, &dummy); 00691 if(ret){ 00692 ret = WSAGetLastError(); 00693 sprintf(msg, "[ sendTimedParameters() ] Error packing address %s, code %d ", crtAddr, ret); 00694 throw runtime_error(msg); 00695 } 00696 #endif 00697 /* add the header (which is different for each destination) */ 00698 00699 strcpy(headerTmp, header); 00700 strcat(headerTmp, destPasswds[i]); 00701 00702 /* initialize the XDR stream to encode the header */ 00703 xdrmem_create(&xdrs, buf2, MAX_HEADER_LENGTH, XDR_ENCODE); 00704 00705 /* encode the header */ 00706 ret = xdr_string(&xdrs, &(headerTmp), strlen(headerTmp) + 1); 00707 /* add the instance ID and the sequence number */ 00708 ret1 = xdr_int(&xdrs, &(instance_id)); 00709 ret2 = xdr_int(&xdrs, &(seq_nr)); 00710 00711 if (!ret || !ret1 || !ret2) { 00712 free(headerTmp); 00713 pthread_mutex_unlock(&mutex); 00714 throw runtime_error("[ sendTimedParameters() ] XDR encoding error for the header"); 00715 } 00716 00717 /* concatenate the header and the rest of the datagram */ 00718 int buf2Length = xdrSize(XDR_STRING, headerTmp) + 2 * xdrSize(XDR_INT32, NULL); 00719 memcpy(newBuf, buf2, buf2Length); 00720 memcpy(newBuf + buf2Length, buf, dgramSize); 00721 00722 /* send the buffer */ 00723 ret = sendto(sockfd, newBuf, dgramSize + buf2Length, 0, 00724 (struct sockaddr *)&destAddr, sizeof(destAddr)); 00725 if (ret == RET_ERROR) { 00726 free(headerTmp); 00727 pthread_mutex_unlock(&mutex); 00728 00729 /*re-initialize the socket */ 00730 #ifndef WIN32 00731 close(sockfd); 00732 #else 00733 closesocket(sockfd); 00734 #endif 00735 initSocket(); 00736 00737 /* throw exception because the datagram was not sent */ 00738 sprintf(msg, "[ sendTimedParameters() ] Error sending data to destination %s ", 00739 destAddresses[i]); 00740 throw runtime_error(msg); 00741 } 00742 else { 00743 sprintf(msg, "Datagram with size %d, instance id %d, sequence number %d, sent to %s, containing parameters:", 00744 ret, instance_id, seq_nr, destAddresses[i]); 00745 logger(FINE, msg); 00746 logParameters(FINE, nParams, paramNames, valueTypes, paramValues); 00747 } 00748 xdr_destroy(&xdrs); 00749 00750 } 00751 00752 seq_nr = (seq_nr + 1) % TWO_BILLION; 00753 free(headerTmp); 00754 pthread_mutex_unlock(&mutex); 00755 return RET_SUCCESS; 00756 } 00757 00758 00759 int ApMon::sendParameter(char *clusterName, char *nodeName, 00760 char *paramName, int valueType, char *paramValue) 00761 throw(runtime_error){ 00762 00763 return sendParameters(clusterName, nodeName, 1, ¶mName, 00764 &valueType, ¶mValue); 00765 } 00766 00767 int ApMon::sendTimedParameter(char *clusterName, char *nodeName, 00768 char *paramName, int valueType, char *paramValue, int timestamp) 00769 throw(runtime_error){ 00770 00771 return sendTimedParameters(clusterName, nodeName, 1, ¶mName, 00772 &valueType, ¶mValue, timestamp); 00773 } 00774 00775 int ApMon::sendParameter(char *clusterName, char *nodeName, 00776 char *paramName, int paramValue) throw(runtime_error) { 00777 00778 return sendParameter(clusterName, nodeName, paramName, XDR_INT32, 00779 (char *)¶mValue); 00780 } 00781 00782 int ApMon::sendParameter(char *clusterName, char *nodeName, 00783 char *paramName, float paramValue) throw(runtime_error) { 00784 00785 return sendParameter(clusterName, nodeName, paramName, XDR_REAL32, 00786 (char *)¶mValue); 00787 } 00788 00789 int ApMon::sendParameter(char *clusterName, char *nodeName, 00790 char *paramName, double paramValue) throw(runtime_error) { 00791 00792 return sendParameter(clusterName, nodeName, paramName, XDR_REAL64, 00793 (char *)¶mValue); 00794 } 00795 00796 int ApMon::sendParameter(char *clusterName, char *nodeName, 00797 char *paramName, char *paramValue) throw(runtime_error) { 00798 00799 return sendParameter(clusterName, nodeName, paramName, XDR_STRING, 00800 paramValue); 00801 } 00802 00803 void ApMon::encodeParams(int nParams, char **paramNames, int *valueTypes, 00804 char **paramValues, int timestamp) 00805 throw(runtime_error){ 00806 XDR xdrs; /* XDR handle. */ 00807 int i, effectiveNParams; 00808 00809 /* count the number of parameters actually sent in the datagram 00810 (the parameters with a NULL name and the string parameters 00811 with a NULL value are skipped) 00812 */ 00813 effectiveNParams = nParams; 00814 for (i = 0; i < nParams; i++) { 00815 if (paramNames[i] == NULL || (valueTypes[i] == XDR_STRING && 00816 paramValues[i] == NULL)) { 00817 effectiveNParams--; 00818 } 00819 } 00820 if (effectiveNParams == 0) 00821 throw runtime_error("[ encodeParams() ] No valid parameters in datagram, sending aborted"); 00822 00823 /*** estimate the length of the send buffer ***/ 00824 00825 /* add the length of the cluster name & node name */ 00826 dgramSize = xdrSize(XDR_STRING, clusterName) + 00827 xdrSize(XDR_STRING, nodeName) + xdrSize(XDR_INT32, NULL); 00828 /* add the lengths for the parameters (name + size + value) */ 00829 for (i = 0; i < nParams; i++) { 00830 dgramSize += xdrSize(XDR_STRING, paramNames[i]) + xdrSize(XDR_INT32, NULL) + 00831 + xdrSize(valueTypes[i], paramValues[i]); 00832 } 00833 00834 /* check that the maximum datagram size is not exceeded */ 00835 if (dgramSize + MAX_HEADER_LENGTH > MAX_DGRAM_SIZE) 00836 throw runtime_error("[ encodeParams() ] Maximum datagram size exceeded"); 00837 00838 /* initialize the XDR stream */ 00839 xdrmem_create(&xdrs, buf, MAX_DGRAM_SIZE, XDR_ENCODE); 00840 00841 try { 00842 /* encode the cluster name, the node name and the number of parameters */ 00843 if (!xdr_string(&xdrs, &(clusterName), strlen(clusterName) 00844 + 1)) 00845 throw runtime_error("[ encodeParams() ] XDR encoding error for the cluster name"); 00846 00847 if (!xdr_string(&xdrs, &(nodeName), strlen(nodeName) + 1)) 00848 throw runtime_error("[ encodeParams() ] XDR encoding error for the node name"); 00849 00850 if (!xdr_int(&xdrs, &(effectiveNParams))) 00851 throw runtime_error("[ encodeParams() ] XDR encoding error for the number of parameters"); 00852 00853 /* encode the parameters */ 00854 for (i = 0; i < nParams; i++) { 00855 if (paramNames[i] == NULL || (valueTypes[i] == XDR_STRING && 00856 paramValues[i] == NULL)) { 00857 logger(WARNING, "NULL parameter name or value - skipping parameter..."); 00858 continue; 00859 } 00860 00861 /* parameter name */ 00862 if (!xdr_string(&xdrs, &(paramNames[i]), strlen(paramNames[i]) + 1)) 00863 throw runtime_error("[ encodeParams() ] XDR encoding error for parameter name"); 00864 00865 /* parameter value type */ 00866 if (!xdr_int(&xdrs, &(valueTypes[i]))) 00867 throw runtime_error("[ encodeParams() ] XDR encoding error for parameter value type"); 00868 00869 /* parameter value */ 00870 switch (valueTypes[i]) { 00871 case XDR_STRING: 00872 if (!xdr_string(&xdrs, &(paramValues[i]), 00873 strlen(paramValues[i]) + 1)) 00874 throw runtime_error("[ encodeParams() ] XDR encoding error for parameter value"); 00875 break; 00876 //INT16 is not supported 00877 /* case XDR_INT16: 00878 if (!xdr_short(&xdrs, (short *)(paramValues[i]))) 00879 return RET_ERROR; 00880 break; 00881 */ case XDR_INT32: 00882 if (!xdr_int(&xdrs, (int *)(paramValues[i]))) 00883 throw runtime_error("[ encodeParams() ] XDR encoding error for parameter value"); 00884 break; 00885 case XDR_REAL32: 00886 if (!xdr_float(&xdrs, (float *)(paramValues[i]))) 00887 throw runtime_error("[ encodeParams() ] XDR encoding error for parameter value"); 00888 break; 00889 case XDR_REAL64: 00890 if (!xdr_double(&xdrs, (double *)(paramValues[i]))) 00891 throw runtime_error("[ encodeParams() ] XDR encoding error for parameter value"); 00892 break; 00893 default: 00894 throw runtime_error("[ encodeParams() ] Unknown type for XDR encoding"); 00895 } 00896 } 00897 00898 /* encode the timestamp if necessary */ 00899 if (timestamp > 0) { 00900 if (!xdr_int(&xdrs, ×tamp)) 00901 throw runtime_error("[ encodeParams() ] XDR encoding error for the timestamp"); 00902 dgramSize += xdrSize(XDR_INT32, NULL); 00903 } 00904 } catch (runtime_error& err) { 00905 xdr_destroy(&xdrs); 00906 throw err; 00907 } 00908 00909 xdr_destroy(&xdrs); 00910 } 00911 00912 #ifndef WIN32 00913 void *bkTask(void *param) { 00914 #else 00915 DWORD WINAPI bkTask(void *param) { 00916 #endif 00917 struct stat st; 00918 #ifndef WIN32 00919 struct timespec delay; 00920 #else 00921 DWORD delay; 00922 #endif 00923 bool resourceChanged, haveChange; 00924 int nextOp = -1, i, ret; 00925 int generalInfoCount; 00926 time_t crtTime, timeRemained; 00927 time_t nextRecheck = 0, nextJobInfoSend = 0, nextSysInfoSend = 0; 00928 ApMon *apm = (ApMon *)param; 00929 char logmsg[200]; 00930 00931 logger(INFO, "[Starting background thread...]"); 00932 apm -> bkThreadStarted = true; 00933 00934 crtTime = time(NULL); 00935 00936 pthread_mutex_lock(&(apm -> mutexBack)); 00937 if (apm -> confCheck) { 00938 nextRecheck = crtTime + apm -> crtRecheckInterval; 00939 //sprintf(logmsg, "###1 crt %ld interv %ld recheck %ld ", crtTime, 00940 // apm -> crtRecheckInterval, nextRecheck); 00941 //logger(FINE, logmsg); 00942 //fflush(stdout); 00943 } 00944 if (apm -> jobMonitoring) 00945 nextJobInfoSend = crtTime + apm -> jobMonitorInterval; 00946 if (apm -> sysMonitoring) 00947 nextSysInfoSend = crtTime + apm -> sysMonitorInterval; 00948 pthread_mutex_unlock(&(apm -> mutexBack)); 00949 00950 timeRemained = -1; 00951 generalInfoCount = 0; 00952 00953 while (1) { 00954 pthread_mutex_lock(&apm -> mutexBack); 00955 if (apm -> stopBkThread) { 00956 // printf("### stopBkThread \n"); 00957 pthread_mutex_unlock(&apm -> mutexBack); 00958 break; 00959 } 00960 pthread_mutex_unlock(&apm -> mutexBack); 00961 00962 //sprintf(logmsg, "### 2 recheck %ld sys %ld ", nextRecheck, 00963 // nextSysInfoSend); 00964 //logger(FINE, logmsg); 00965 00966 /* determine the next operation that must be performed */ 00967 if (nextRecheck > 0 && (nextJobInfoSend <= 0 || 00968 nextRecheck <= nextJobInfoSend)) { 00969 if (nextSysInfoSend <= 0 || nextRecheck <= nextSysInfoSend) { 00970 nextOp = RECHECK_CONF; 00971 timeRemained = nextRecheck - crtTime; 00972 } else { 00973 nextOp = SYS_INFO_SEND; 00974 timeRemained = nextSysInfoSend - crtTime; 00975 } 00976 } else { 00977 if (nextJobInfoSend > 0 && (nextSysInfoSend <= 0 || 00978 nextJobInfoSend <= nextSysInfoSend)) { 00979 nextOp = JOB_INFO_SEND; 00980 timeRemained = nextJobInfoSend - crtTime; 00981 } else if (nextSysInfoSend > 0) { 00982 nextOp = SYS_INFO_SEND; 00983 timeRemained = nextSysInfoSend - crtTime; 00984 } 00985 } 00986 00987 if (timeRemained == -1) 00988 timeRemained = RECHECK_INTERVAL; 00989 00990 #ifndef WIN32 00991 /* the moment when the next operation should be performed */ 00992 delay.tv_sec = crtTime + timeRemained; 00993 delay.tv_nsec = 0; 00994 #else 00995 delay = (/*crtTime +*/ timeRemained) * 1000; // this is in millis 00996 #endif 00997 00998 pthread_mutex_lock(&(apm -> mutexBack)); 00999 01000 pthread_mutex_lock(&(apm -> mutexCond)); 01001 /* check for changes in the settings */ 01002 haveChange = false; 01003 if (apm -> jobMonChanged || apm -> sysMonChanged || apm -> recheckChanged) 01004 haveChange = true; 01005 if (apm -> jobMonChanged) { 01006 if (apm -> jobMonitoring) 01007 nextJobInfoSend = crtTime + apm -> jobMonitorInterval; 01008 else 01009 nextJobInfoSend = -1; 01010 apm -> jobMonChanged = false; 01011 } 01012 if (apm -> sysMonChanged) { 01013 if (apm -> sysMonitoring) 01014 nextSysInfoSend = crtTime + apm -> sysMonitorInterval; 01015 else 01016 nextSysInfoSend = -1; 01017 apm -> sysMonChanged = false; 01018 } 01019 if (apm -> recheckChanged) { 01020 if (apm -> confCheck) { 01021 nextRecheck = crtTime + apm -> crtRecheckInterval; 01022 } 01023 else 01024 nextRecheck = -1; 01025 apm -> recheckChanged = false; 01026 } 01027 pthread_mutex_unlock(&(apm -> mutexBack)); 01028 01029 if (haveChange) { 01030 pthread_mutex_unlock(&(apm -> mutexCond)); 01031 continue; 01032 } 01033 01034 /* wait until the next operation should be performed or until 01035 a change in the settings occurs */ 01036 #ifndef WIN32 01037 ret = pthread_cond_timedwait(&(apm -> confChangedCond), 01038 &(apm -> mutexCond), &delay); 01039 pthread_mutex_unlock(&(apm -> mutexCond)); 01040 #else 01041 pthread_mutex_unlock(&(apm -> mutexCond)); 01042 ret = WaitForSingleObject(apm->confChangedCond, delay); 01043 #endif 01044 if (ret == ETIMEDOUT) { 01045 // printf("### ret TIMEDOUT\n"); 01046 /* now perform the operation */ 01047 if (nextOp == JOB_INFO_SEND) { 01048 apm -> sendJobInfo(); 01049 crtTime = time(NULL); 01050 nextJobInfoSend = crtTime + apm -> getJobMonitorInterval(); 01051 } 01052 01053 if (nextOp == SYS_INFO_SEND) { 01054 apm -> sendSysInfo(); 01055 if (apm -> getGenMonitoring()) { 01056 if (generalInfoCount <= 1) 01057 apm -> sendGeneralInfo(); 01058 generalInfoCount = (generalInfoCount + 1) % apm -> genMonitorIntervals; 01059 } 01060 crtTime = time(NULL); 01061 nextSysInfoSend = crtTime + apm -> getSysMonitorInterval(); 01062 } 01063 01064 if (nextOp == RECHECK_CONF) { 01065 //logger(FINE, "### recheck conf"); 01066 resourceChanged = false; 01067 try { 01068 if (apm -> initType == FILE_INIT) { 01069 sprintf(logmsg, "Checking for modifications for file %s ", 01070 apm -> initSources[0]); 01071 logger(INFO, logmsg); 01072 stat(apm -> initSources[0], &st); 01073 if (st.st_mtime > apm -> lastModifFile) { 01074 sprintf(logmsg, "File %s modified ", apm -> initSources[0]); 01075 logger(INFO, logmsg); 01076 resourceChanged = true; 01077 } 01078 } 01079 01080 // check the configuration URLs 01081 for (i = 0; i < apm -> confURLs.nConfURLs; i++) { 01082 sprintf(logmsg, "[Checking for modifications for URL %s ] ", 01083 apm -> confURLs.vURLs[i]); 01084 logger(INFO, logmsg); 01085 if (urlModified(apm -> confURLs.vURLs[i], apm -> confURLs.lastModifURLs[i])) { 01086 sprintf(logmsg, "URL %s modified ", apm -> confURLs.vURLs[i]); 01087 logger(INFO, logmsg); 01088 resourceChanged = true; 01089 break; 01090 } 01091 } 01092 01093 if (resourceChanged) { 01094 logger(INFO, "Reloading configuration..."); 01095 if (apm -> initType == FILE_INIT) 01096 apm -> initialize(apm -> initSources[0], false); 01097 else 01098 apm -> initialize(apm -> nInitSources, apm -> initSources, false); 01099 } 01100 apm -> setCrtRecheckInterval(apm -> getRecheckInterval()); 01101 } catch (runtime_error &err) { 01102 logger(WARNING, err.what()); 01103 logger(WARNING, "Increasing the time interval for reloading the configuration..."); 01104 apm -> setCrtRecheckInterval(apm -> getRecheckInterval() * 5); 01105 } 01106 crtTime = time(NULL); 01107 nextRecheck = crtTime + apm -> getCrtRecheckInterval(); 01108 //sleep(apm -> getCrtRecheckInterval()); 01109 } 01110 } 01111 01112 } // while 01113 01114 #ifndef WIN32 01115 return NULL; // it doesn't matter what we return here 01116 #else 01117 return 0; 01118 #endif 01119 } 01120 01121 void ApMon::setConfRecheck(bool confCheck, long interval) { 01122 char logmsg[100]; 01123 if (confCheck) { 01124 sprintf(logmsg, "Enabling configuration reloading (interval %ld)", 01125 interval); 01126 logger(INFO, logmsg); 01127 } 01128 01129 pthread_mutex_lock(&mutexBack); 01130 if (initType == DIRECT_INIT) { // no need to reload the configuration 01131 logger(WARNING, "[ setConfRecheck() } No configuration file/URL to reload."); 01132 return; 01133 } 01134 01135 this -> confCheck = confCheck; 01136 this -> recheckChanged = true; 01137 if (confCheck) { 01138 if (interval > 0) { 01139 this -> recheckInterval = interval; 01140 this -> crtRecheckInterval = interval; 01141 } else { 01142 this -> recheckInterval = RECHECK_INTERVAL; 01143 this -> crtRecheckInterval = RECHECK_INTERVAL; 01144 } 01145 setBackgroundThread(true); 01146 } 01147 else { 01148 if (jobMonitoring == false && sysMonitoring == false) 01149 setBackgroundThread(false); 01150 } 01151 pthread_mutex_unlock(&mutexBack); 01152 01153 } 01154 01155 void ApMon::setRecheckInterval(long val) { 01156 if (val > 0) { 01157 setConfRecheck(true, val); 01158 } 01159 else { 01160 setConfRecheck(false, val); 01161 } 01162 } 01163 01164 void ApMon::setCrtRecheckInterval(long val) { 01165 pthread_mutex_lock(&mutexBack); 01166 crtRecheckInterval = val; 01167 pthread_mutex_unlock(&mutexBack); 01168 } 01169 01170 void ApMon::setJobMonitoring(bool jobMonitoring, long interval) { 01171 char logmsg[100]; 01172 if (jobMonitoring) { 01173 sprintf(logmsg, "Enabling job monitoring, time interval %ld s... ", interval); 01174 logger(INFO, logmsg); 01175 } else 01176 logger(INFO, "Disabling job monitoring..."); 01177 01178 pthread_mutex_lock(&mutexBack); 01179 this -> jobMonitoring = jobMonitoring; 01180 this -> jobMonChanged = true; 01181 if (jobMonitoring == true) { 01182 if (interval > 0) 01183 this -> jobMonitorInterval = interval; 01184 else 01185 this -> jobMonitorInterval = JOB_MONITOR_INTERVAL; 01186 setBackgroundThread(true); 01187 } else { 01188 // disable the background thread if it is not needed anymore 01189 if (this -> sysMonitoring == false && this -> confCheck == false) 01190 setBackgroundThread(false); 01191 } 01192 pthread_mutex_unlock(&mutexBack); 01193 } 01194 01195 void ApMon::setSysMonitoring(bool sysMonitoring, long interval) { 01196 char logmsg[100]; 01197 if (sysMonitoring) { 01198 sprintf(logmsg, "Enabling system monitoring, time interval %ld s... ", interval); 01199 logger(INFO, logmsg); 01200 } else 01201 logger(INFO, "Disabling system monitoring..."); 01202 01203 pthread_mutex_lock(&mutexBack); 01204 this -> sysMonitoring = sysMonitoring; 01205 this -> sysMonChanged = true; 01206 if (sysMonitoring == true) { 01207 if (interval > 0) 01208 this -> sysMonitorInterval = interval; 01209 else 01210 this -> sysMonitorInterval = SYS_MONITOR_INTERVAL; 01211 setBackgroundThread(true); 01212 } else { 01213 // disable the background thread if it is not needed anymore 01214 if (this -> jobMonitoring == false && this -> confCheck == false) 01215 setBackgroundThread(false); 01216 } 01217 pthread_mutex_unlock(&mutexBack); 01218 } 01219 01220 void ApMon::setGenMonitoring(bool genMonitoring, int nIntervals) { 01221 char logmsg[100]; 01222 sprintf(logmsg, "Setting general information monitoring to %s ", 01223 boolStrings[(int)genMonitoring]); 01224 logger(INFO, logmsg); 01225 01226 pthread_mutex_lock(&mutexBack); 01227 this -> genMonitoring = genMonitoring; 01228 this -> sysMonChanged = true; 01229 if (genMonitoring == true) { 01230 if (nIntervals > 0) 01231 this -> genMonitorIntervals = nIntervals; 01232 else 01233 this -> genMonitorIntervals = GEN_MONITOR_INTERVALS; 01234 01235 if (this -> sysMonitoring == false) { 01236 pthread_mutex_unlock(&mutexBack); 01237 setSysMonitoring(true); 01238 pthread_mutex_lock(&mutexBack); 01239 } 01240 } // TODO: else check if we can stop the background thread (if no 01241 // system parameters are enabled for monitoring) 01242 pthread_mutex_unlock(&mutexBack); 01243 } 01244 01245 void ApMon::setBackgroundThread(bool val) { 01246 // mutexBack is locked 01247 if (val == true) { 01248 if (!haveBkThread) { 01249 #ifndef WIN32 01250 pthread_create(&bkThread, NULL, &bkTask, this); 01251 #else 01252 DWORD dummy; 01253 bkThread = CreateThread(NULL, 65536, &bkTask, this, 0, &dummy); 01254 #endif 01255 haveBkThread = true; 01256 } else { 01257 pthread_mutex_lock(&mutexCond); 01258 pthread_cond_signal(&confChangedCond); 01259 pthread_mutex_unlock(&mutexCond); 01260 } 01261 } 01262 if (val == false) { 01263 //if (bkThreadStarted) { 01264 if (haveBkThread) { 01265 stopBkThread = true; 01266 pthread_mutex_unlock(&mutexBack); 01267 #ifndef WIN32 01268 pthread_mutex_lock(&mutexCond); 01269 #endif 01270 pthread_cond_signal(&confChangedCond); 01271 logger(INFO, "[Stopping the background thread...]"); 01272 #ifndef WIN32 01273 pthread_mutex_unlock(&mutexCond); 01274 pthread_join(bkThread, NULL); 01275 #else 01276 WaitForSingleObject(bkThread, INFINITE); 01277 #endif 01278 pthread_mutex_lock(&mutexBack); 01279 // logger(INFO, "bk thread stopped!"); 01280 haveBkThread = false; 01281 bkThreadStarted = false; 01282 stopBkThread = false; 01283 } 01284 } 01285 } 01286 01287 void ApMon::addJobToMonitor(long pid, char *workdir, char *clusterName, 01288 char *nodeName) throw(runtime_error) { 01289 if (nMonJobs >= MAX_MONITORED_JOBS) 01290 throw runtime_error("[ addJobToMonitor() ] Maximum number of jobs that can be monitored exceeded."); 01291 MonitoredJob job; 01292 job.pid = pid; 01293 if (workdir == NULL) 01294 strcpy(job.workdir, ""); 01295 else 01296 strcpy(job.workdir, workdir); 01297 01298 if (clusterName == NULL || strlen(clusterName) == 0) 01299 strcpy(job.clusterName, "ApMon_JobMon"); 01300 else 01301 strcpy(job.clusterName, clusterName); 01302 if (nodeName == NULL || strlen(nodeName) == 0) 01303 strcpy(job.nodeName, this -> myIP); 01304 else 01305 strcpy(job.nodeName, nodeName); 01306 01307 monJobs[nMonJobs++] = job; 01308 } 01309 01310 void ApMon::removeJobToMonitor(long pid) throw(runtime_error) { 01311 int i, j; 01312 char msg[100]; 01313 01314 if (nMonJobs <= 0) 01315 throw runtime_error("[ removeJobToMonitor() ] There are no monitored jobs."); 01316 01317 for (i = 0; i < nMonJobs; i++) { 01318 if (monJobs[i].pid == pid) { 01319 /* found the job, now remove it */ 01320 for (j = i; j < nMonJobs - 1; j++) 01321 monJobs[j] = monJobs[j + 1]; 01322 nMonJobs--; 01323 return; 01324 } 01325 } 01326 01327 /* the job was not found */ 01328 sprintf(msg, "removeJobToMonitor(): Job %ld not found.", pid); 01329 throw runtime_error(msg); 01330 } 01331 01332 void ApMon::setSysMonClusterNode(char *clusterName, char *nodeName) { 01333 free (sysMonCluster); free(sysMonNode); 01334 sysMonCluster = strdup(clusterName); 01335 sysMonNode = strdup(nodeName); 01336 } 01337 01338 void ApMon::setLogLevel(char *newLevel_s) { 01339 int newLevel; 01340 const char *levels[5] = {"FATAL", "WARNING", "INFO", "FINE", "DEBUG"}; 01341 char logmsg[100]; 01342 01343 for (newLevel = 0; newLevel < 5; newLevel++) 01344 if (strcmp(newLevel_s, levels[newLevel]) == 0) 01345 break; 01346 01347 if (newLevel >= 5) { 01348 sprintf(logmsg, "[ setLogLevel() ] Invalid level value: %s", newLevel_s); 01349 logger(WARNING, logmsg); 01350 } 01351 else 01352 logger(0, NULL, newLevel); 01353 } 01354 01355 01356 void ApMon::setMaxMsgRate(int maxRate) { 01357 if (maxRate > 0) 01358 this -> maxMsgRate = maxRate; 01359 } 01360 01361 void ApMon::initSocket() throw(runtime_error) { 01362 int optval1 = 1; 01363 struct timeval optval2; 01364 int ret1, ret2, ret3; 01365 01366 sockfd = socket(AF_INET, SOCK_DGRAM, 0); 01367 if (sockfd < 0) 01368 throw runtime_error("[ initSocket() ] Error creating socket"); 01369 ret1 = setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, (char *) &optval1, 01370 sizeof(optval1)); 01371 01372 /* set connection timeout */ 01373 optval2.tv_sec = 20; 01374 optval2.tv_usec = 0; 01375 ret2 = setsockopt(sockfd, SOL_SOCKET, SO_SNDTIMEO, (char *) &optval2, 01376 sizeof(optval2)); 01377 ret3 = setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, (char *) &optval2, 01378 sizeof(optval2)); 01379 if (ret1 != 0 || ret2 != 0 || ret3 != 0) 01380 throw runtime_error("[ initSocket() ] Error initializing socket."); 01381 } 01382 01383 01384 void ApMon::parseConf(FILE *fp, int *nDestinations, char **destAddresses, 01385 int *destPorts, char **destPasswds) 01386 throw(runtime_error) { 01387 int i, ch; 01388 char *line = (char *)malloc ((MAX_STRING_LEN1) * sizeof(char)); 01389 char *tmp = NULL; 01390 char *loglevel_s; 01391 // char sbuf[30]; 01392 // char *pbuf = sbuf; 01393 01394 /* parse the input file */ 01395 while(fgets(line, MAX_STRING_LEN, fp) != NULL) { 01396 01397 if (tmp != NULL) { 01398 free(tmp); 01399 tmp = NULL; 01400 } 01401 01402 line[MAX_STRING_LEN - 1] = 0; 01403 /* check if the line was too long */ 01404 ch = fgetc(fp); // see if we are at the end of the file 01405 ungetc(ch, fp); 01406 if (line[strlen(line) - 1] != 10 && ch != EOF) { 01407 /* if the line doesn't end with a \n and we are not at the end 01408 of file, the line from the file was longer than MAX_STRING_LEN */ 01409 fclose(fp); 01410 throw runtime_error ("[ parseConf() ] Maximum line length exceeded in the conf file"); 01411 } 01412 01413 tmp = trimString(line); 01414 01415 /* skip the blank lines and the comment lines */ 01416 if (strlen(tmp) == 0 || strchr(tmp, '#') == tmp) 01417 continue; 01418 01419 if (strstr(tmp, "xApMon_loglevel") == tmp) { 01420 char *tmp2 = tmp; 01421 strtok/*_r*/(tmp2, "= ");//, &pbuf); 01422 loglevel_s = strtok/*_r*/(NULL, "= ");//, &pbuf); 01423 setLogLevel(loglevel_s); 01424 continue; 01425 } 01426 01427 if (strstr(tmp, "xApMon_") == tmp) { 01428 parseXApMonLine(tmp); 01429 continue; 01430 } 01431 01432 if (*nDestinations >= MAX_N_DESTINATIONS) { 01433 free(line); free(tmp); 01434 for (i = 0; i < *nDestinations; i++) { 01435 free(destAddresses[i]); 01436 free(destPasswds[i]); 01437 } 01438 fclose(fp); 01439 throw runtime_error("[ parseConf() ] Maximum number of destinations exceeded."); 01440 } 01441 01442 addToDestinations(tmp, nDestinations, destAddresses, destPorts, 01443 destPasswds); 01444 } 01445 01446 if (tmp != NULL) 01447 free(tmp); 01448 free(line); 01449 } 01450 01451 bool ApMon::shouldSend() { 01452 01453 long now = time(NULL); 01454 bool doSend; 01455 char msg[200]; 01456 01457 //printf("now %ld crtTime %ld\n", now, crtTime); 01458 01459 if (now != crtTime){ 01461 prvSent = hWeight * prvSent + (1.0 - hWeight) * crtSent / (now - crtTime); 01462 prvTime = crtTime; 01463 sprintf(msg, "previously sent: %ld dropped: %ld", crtSent, crtDrop); 01464 logger(DEBUG, msg); 01466 crtTime = now; 01467 crtSent = 0; 01468 crtDrop = 0; 01469 //printf("\n"); 01470 } 01471 01473 int valSent = (int)(prvSent * hWeight + crtSent * (1.0 - hWeight)); 01474 01475 doSend = true; 01477 int level = this -> maxMsgRate - this -> maxMsgRate / 10; 01478 01479 01480 if (valSent > (this -> maxMsgRate - level)) { 01481 //int max10 = this -> maxMsgRate / 10; 01482 int rnd = rand() % (this -> maxMsgRate / 10); 01483 doSend = (rnd < (this -> maxMsgRate - valSent)); 01484 } 01486 if (doSend) { 01487 crtSent++; 01488 //printf("#"); 01489 } else { 01490 crtDrop++; 01491 //printf("."); 01492 } 01493 01494 return doSend; 01495 } 01496 01497 01498 01499 01500