00001
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038 #include "ApMon.h"
00039 #include "utils.h"
00040 #include "proc_utils.h"
00041 #include "monitor_utils.h"
00042
00043 #include <math.h>
00044
00045 using namespace apmon_utils;
00046 using namespace apmon_mon_utils;
00047
00048 #define RECHECK_CONF 0
00049 #define SYS_INFO_SEND 1
00050 #define JOB_INFO_SEND 2
00051
00052 char boolStrings[][10] = {"false", "true"};
00053
00054 #ifdef __ICC
00055
00056 #pragma warning(disable:2259)
00057 #endif
00058
00059
00060
00061 ApMon::ApMon(char *initsource)
00062 throw(runtime_error) {
00063
00064 if (initsource == NULL)
00065 throw runtime_error("[ ApMon() ] No conf file/URL provided");
00066
00067 if (strstr(initsource, "http://") == initsource) {
00068 char *destList[1];
00069 destList[0] = initsource;
00070 constructFromList(1, destList);
00071 } else {
00072 nInitSources = 1;
00073 initType = FILE_INIT;
00074 initSources = (char **)malloc(nInitSources * sizeof(char *));
00075 if (initSources == NULL)
00076 throw runtime_error("[ ApMon() ] Error allocating memory.");
00077
00078 initSources[0] = strdup(initsource);
00079 initMonitoring();
00080
00081 initialize(initsource, true);
00082 }
00083 }
00084
00085 void ApMon::initialize(char *filename, bool firstTime)
00086 throw(runtime_error) {
00087
00088 char *destAddresses[MAX_N_DESTINATIONS];
00089 int destPorts[MAX_N_DESTINATIONS];
00090 char *destPasswds[MAX_N_DESTINATIONS];
00091 int nDest = 0, i;
00092 ConfURLs confURLs;
00093
00094 confURLs.nConfURLs = 0;
00095
00096 try {
00097 loadFile(filename, &nDest, destAddresses, destPorts, destPasswds);
00098
00099 arrayInit(nDest, destAddresses, destPorts, destPasswds, firstTime);
00100 } catch (runtime_error& err) {
00101 if (firstTime)
00102 throw err;
00103 else {
00104 logger(WARNING, err.what());
00105 logger(WARNING, "Error reloading the configuration. Keeping the previous one.");
00106 return;
00107 }
00108 }
00109
00110 for (i = 0; i < nDest; i++) {
00111 free(destAddresses[i]);
00112 free(destPasswds[i]);
00113 }
00114
00115 pthread_mutex_lock(&mutex);
00116 this -> confURLs = confURLs;
00117 pthread_mutex_unlock(&mutex);
00118 }
00119
00120 void ApMon::loadFile(char *filename, int *nDestinations, char **destAddresses,
00121 int *destPorts, char **destPasswds)
00122 throw(runtime_error) {
00123 FILE *f;
00124 char msg[100];
00125
00126
00127 f = fopen(filename, "rt");
00128 if (f == NULL) {
00129 throw runtime_error("[ loadFile() ] Error opening configuration file");
00130 }
00131
00132 sprintf(msg, "Loading file %s ...", filename);
00133 logger(INFO, msg);
00134
00135 lastModifFile = time(NULL);
00136
00137 parseConf(f, nDestinations, destAddresses, destPorts,
00138 destPasswds);
00139 fclose(f);
00140 }
00141
00142 ApMon::ApMon(int nDestinations, char **destinationsList) throw(runtime_error){
00143 constructFromList(nDestinations, destinationsList);
00144 }
00145
00146 void ApMon::constructFromList(int nDestinations, char **destinationsList)
00147 throw(runtime_error) {
00148 int i;
00149
00150 if (destinationsList == NULL)
00151 throw runtime_error("[ constructFromList() ] Null destination list");
00152
00153 #ifdef __APPLE__
00154 initType = OLIST_INIT;
00155 #else
00156 initType = LIST_INIT;
00157 #endif
00158
00159 initMonitoring();
00160
00161
00162 nInitSources = nDestinations;
00163 initSources = (char **)malloc(nInitSources * sizeof(char*));
00164 if (initSources == NULL)
00165 throw runtime_error("[ ApMon() ] Error allocating memory.");
00166
00167 for (i = 0; i < nInitSources; i++)
00168 initSources[i] = strdup(destinationsList[i]);
00169
00170 initialize(nDestinations, destinationsList, true);
00171 }
00172
00173 void ApMon::initialize(int nDestinations, char **destinationsList,
00174 bool firstTime) throw(runtime_error) {
00175 char *destAddresses[MAX_N_DESTINATIONS];
00176 int destPorts[MAX_N_DESTINATIONS];
00177 char *destPasswds[MAX_N_DESTINATIONS];
00178 char errmsg[200];
00179 int i;
00180 int cnt = 0;
00181 ConfURLs confURLs;
00182
00183 logger(INFO, "Initializing destination addresses & ports:");
00184
00185 if (nDestinations > MAX_N_DESTINATIONS)
00186 throw runtime_error("[ initialize() ] Maximum number of destinations exceeded");
00187
00188
00189 confURLs.nConfURLs = 0;
00190
00191 for (i = 0; i < nDestinations; i++) {
00192 try {
00193 if (strstr(destinationsList[i], "http") == destinationsList[i])
00194 getDestFromWeb(destinationsList[i], &cnt,
00195 destAddresses, destPorts, destPasswds, confURLs);
00196 else
00197 addToDestinations(destinationsList[i], &cnt,
00198 destAddresses, destPorts, destPasswds);
00199
00200 } catch (runtime_error &e) {
00201 sprintf(errmsg, "[ initialize() ] Error while loading the configuration: %s", e.what());
00202 logger(WARNING, errmsg);
00203 if (!firstTime) {
00204 for (i = 0; i < cnt; i++) {
00205 free(destAddresses[i]);
00206 free(destPasswds[i]);
00207 }
00208 logger(WARNING, "Configuration not reloaded successfully. Keeping the previous one.");
00209 return;
00210 }
00211 }
00212 }
00213
00214 try {
00215 arrayInit(cnt, destAddresses, destPorts, destPasswds, firstTime);
00216 } catch (runtime_error& err) {
00217 if (firstTime)
00218 throw err;
00219 else {
00220 logger(WARNING, "Error reloading the configuration. Keeping the previous one.");
00221 return;
00222 }
00223 }
00224
00225 for (i = 0; i < cnt; i++) {
00226 free(destAddresses[i]);
00227 free(destPasswds[i]);
00228 }
00229
00230 pthread_mutex_lock(&mutex);
00231 this -> confURLs = confURLs;
00232 pthread_mutex_unlock(&mutex);
00233 }
00234
00235 void ApMon::addToDestinations(char *line, int *nDestinations,
00236 char *destAddresses[], int destPorts[], char *destPasswds[]) {
00237 char *addr, *port, *passwd;
00238 const char *sep1 = " \t";
00239 const char *sep2 = ":";
00240
00241 char *tmp = strdup(line);
00242 char *firstToken;
00243
00244
00245
00246
00247 firstToken = strtok(tmp, sep1);
00248 passwd = strtok(NULL, sep1);
00249
00250
00251 addr = strtok(firstToken, sep2);
00252 port = strtok(NULL, sep2);
00253 destAddresses[*nDestinations] = strdup(addr);
00254 if (port == NULL)
00255 destPorts[*nDestinations] = DEFAULT_PORT;
00256 else
00257 destPorts[*nDestinations] = atoi(port);
00258 if (passwd == NULL)
00259 destPasswds[*nDestinations] = strdup("");
00260 else
00261 destPasswds[*nDestinations] = strdup(passwd);
00262 (*nDestinations)++;
00263
00264 free(tmp);
00265 }
00266
00267 void ApMon::getDestFromWeb(char *url, int *nDestinations,
00268 char *destAddresses[], int destPorts[], char *destPasswds[],
00269 ConfURLs& confURLs) throw(runtime_error) {
00270 char temp_filename[300];
00271 FILE *tmp_file;
00272 char *line, *ret, *tmp = NULL;
00273 bool modifLineFound;
00274 long mypid = getpid();
00275 char str1[20], str2[20];
00276 int totalSize, headerSize, contentSize;
00277
00278 #ifndef WIN32
00279 sprintf(temp_filename, "/tmp/apmon_webconf%ld", mypid);
00280 #else
00281 char *tmpp = getenv("TEMP");
00282 if(tmpp == NULL)
00283 tmpp = getenv("TMP");
00284 if(tmpp == NULL)
00285 tmpp = "c:";
00286 sprintf(temp_filename, "%s\\apmon_webconf%ld", tmpp, mypid);
00287 #endif
00288
00289 totalSize = httpRequest(url, (char*)"GET", temp_filename);
00290
00291
00292 tmp_file = fopen(temp_filename, "rt");
00293 if (tmp_file == NULL)
00294 throw runtime_error("[ getDestFromWeb() ] Error getting the configuration web page");
00295
00296 line = (char*)malloc((MAX_STRING_LEN + 1) * sizeof(char));
00297
00298
00299 fgets(line, MAX_STRING_LEN, tmp_file);
00300 sscanf(line, "%s %s", str1, str2);
00301 if (atoi(str2) != 200) {
00302 free(line);
00303 fclose(tmp_file);
00304 throw runtime_error("[ getDestFromWeb() ] The web page does not exist on the server");
00305 }
00306
00307 confURLs.vURLs[confURLs.nConfURLs] = strdup(url);
00308
00309
00310 modifLineFound = false;
00311 contentSize = 0;
00312 do {
00313 if (tmp != NULL)
00314 free(tmp);
00315 ret = fgets(line, MAX_STRING_LEN, tmp_file);
00316 if (ret == NULL) {
00317 free(line); fclose(tmp_file);
00318 throw runtime_error("[ getDestFromWeb() ] Error getting the configuration web page");
00319 }
00320 if (strstr(line, "Last-Modified") == line) {
00321 modifLineFound = true;
00322 confURLs.lastModifURLs[confURLs.nConfURLs] = strdup(line);
00323 }
00324
00325 if (strstr(line, "Content-Length") == line) {
00326 sscanf(line, "%s %d", str1, &contentSize);
00327 }
00328
00329 tmp = trimString(line);
00330 } while (strlen(tmp) != 0);
00331 free(tmp); free(line);
00332
00333 if (!modifLineFound)
00334 confURLs.lastModifURLs[confURLs.nConfURLs] = strdup("");
00335 confURLs.nConfURLs++;
00336
00337 headerSize = ftell(tmp_file);
00338 if (totalSize - headerSize < contentSize) {
00339 fclose(tmp_file);
00340 throw runtime_error("[ getDestFromWeb() ] Web page received incompletely");
00341 }
00342
00343 try {
00344 parseConf(tmp_file, nDestinations, destAddresses, destPorts,
00345 destPasswds);
00346 } catch (...) {
00347 fclose(tmp_file);
00348 unlink(temp_filename);
00349 throw;
00350 }
00351
00352 fclose(tmp_file);
00353 unlink(temp_filename);
00354 }
00355
00356
00357 ApMon::ApMon(int nDestinations, char **destAddresses, int *destPorts,
00358 char **destPasswds)
00359 throw(runtime_error) {
00360 initMonitoring();
00361
00362 arrayInit(nDestinations, destAddresses, destPorts, destPasswds);
00363 }
00364
00365 void ApMon::arrayInit(int nDestinations, char **destAddresses,
00366 int *destPorts, char **destPasswds)
00367 throw(runtime_error) {
00368 arrayInit(nDestinations, destAddresses, destPorts, destPasswds, true);
00369 }
00370
00371
00372 void ApMon::arrayInit(int nDestinations, char **destAddresses, int *destPorts,
00373 char **destPasswds, bool firstTime)
00374 throw(runtime_error) {
00375 int i, j;
00376 int ret;
00377 char *ipAddr, logmsg[100];
00378 bool found, havePublicIP;
00379 int tmpNDestinations;
00380 char **tmpAddresses, **tmpPasswds;
00381 int *tmpPorts;
00382
00383 if (destAddresses == NULL || destPorts == NULL || nDestinations == 0)
00384 throw runtime_error("[ arrayInit() ] Destination addresses or ports not provided");
00385
00386
00387 if (firstTime) {
00388
00389
00390 this -> nMonJobs = 0;
00391 this -> monJobs = (MonitoredJob *)malloc(MAX_MONITORED_JOBS *
00392 sizeof(MonitoredJob));
00393
00394 try {
00395 this -> numCPUs = ProcUtils::getNumCPUs();
00396 } catch (procutils_error &err) {
00397 logger(WARNING, err.what());
00398 this -> numCPUs = 0;
00399 }
00400
00401
00402 this -> nInterfaces = 0;
00403 try {
00404 ProcUtils::getNetworkInterfaces(this -> nInterfaces,
00405 this -> interfaceNames);
00406 } catch (procutils_error &err) {
00407 logger(WARNING, err.what());
00408 this -> nInterfaces = 0;
00409 }
00410
00411
00412 ret = gethostname(this -> myHostname, MAX_STRING_LEN -1);
00413 if (ret < 0) {
00414 logger(WARNING, "Could not obtain the local hostname");
00415 strcpy(myHostname, "unknown");
00416 } else
00417 myHostname[MAX_STRING_LEN - 1] = 0;
00418
00419
00420 this -> numIPs = 0; havePublicIP = false;
00421 strcpy(this -> myIP, "unknown");
00422
00423
00424 this -> clusterName = strdup("ApMon_UserSend");
00425 this -> nodeName = strdup(myHostname);
00426
00427 #ifndef WIN32
00428 int sockd = socket(PF_INET, SOCK_STREAM, 0);
00429 if(sockd < 0){
00430 logger(WARNING, "Could not obtain local IP addresses");
00431 } else {
00432 for (i = 0; i < this -> nInterfaces; i++) {
00433 struct ifreq ifr;
00434 memset(&ifr, 0, sizeof(ifr));
00435 strncpy(ifr.ifr_name, this -> interfaceNames[i], sizeof(ifr.ifr_name) - 1);
00436 if(ioctl(sockd, SIOCGIFADDR, &ifr)<0)
00437 continue;
00438 char ip[4], tmp_s[20];
00439 #ifdef __APPLE__
00440 memcpy(ip, ifr.ifr_addr.sa_data+2, 4);
00441 #else
00442 memcpy(ip, ifr.ifr_hwaddr.sa_data+2, 4);
00443 #endif
00444 strcpy(tmp_s, inet_ntoa(*(struct in_addr *)ip));
00445 sprintf(logmsg, "Found local IP address: %s", tmp_s);
00446 logger(FINE, logmsg);
00447 if (strcmp(tmp_s, "127.0.0.1") != 0 && !havePublicIP) {
00448 strcpy(this -> myIP, tmp_s);
00449 if (!isPrivateAddress(tmp_s))
00450 havePublicIP = true;
00451 }
00452 strcpy(this -> allMyIPs[this -> numIPs], tmp_s);
00453 this -> numIPs++;
00454 }
00455 }
00456 #else
00457 struct hostent *hptr;
00458 if ((hptr = gethostbyname(myHostname))!= NULL) {
00459 i = 0;
00460 struct in_addr addr;
00461 while ((hptr -> h_addr_list)[i] != NULL) {
00462 memcpy(&(addr.s_addr), (hptr -> h_addr_list)[i], 4);
00463 ipAddr = inet_ntoa(addr);
00464 if (strcmp(ipAddr, "127.0.0.1") != 0) {
00465 strcpy(this -> myIP, ipAddr);
00466 if (!isPrivateAddress(ipAddr))
00467 break;
00468 }
00469 i++;
00470 }
00471 }
00472 #endif
00473
00474 this -> sysMonCluster = strdup("ApMon_SysMon");
00475 this -> sysMonNode = strdup(this -> myIP);
00476
00477 this -> prvTime = 0;
00478 this -> prvSent = 0;
00479 this -> prvDrop = 0;
00480 this -> crtTime = 0;
00481 this -> crtSent = 0;
00482 this -> crtDrop = 0;
00483 this -> hWeight = exp(-5.0/60.0);
00484
00485 srand(time(NULL));
00486
00487
00488 this -> buf = (char *)malloc(MAX_DGRAM_SIZE);
00489 if (this -> buf == NULL)
00490 throw runtime_error("[ arrayInit() ] Error allocating memory");
00491 this -> dgramSize = 0;
00492
00493
00494 initSocket();
00495
00496
00497 instance_id = rand();
00498 seq_nr = 0;
00499 }
00500
00501
00502
00503
00504
00505 tmpNDestinations = 0;
00506 tmpPorts = (int *)malloc(nDestinations * sizeof(int));
00507 tmpAddresses = (char **)malloc(nDestinations * sizeof(char *));
00508 tmpPasswds = (char **)malloc(nDestinations * sizeof(char *));
00509 if (tmpPorts == NULL || tmpAddresses == NULL ||
00510 tmpPasswds == NULL)
00511 throw runtime_error("[ arrayInit() ] Error allocating memory");
00512
00513 for (i = 0; i < nDestinations; i++) {
00514 try {
00515 ipAddr = findIP(destAddresses[i]);
00516 } catch (runtime_error &err) {
00517 logger(FATAL, err.what());
00518 continue;
00519 }
00520
00521
00522 found = false;
00523 for (j = 0; j < tmpNDestinations; j++) {
00524 if (!strcmp(ipAddr, tmpAddresses[j])) {
00525 found = true;
00526 break;
00527 }
00528 }
00529
00530
00531 if (!found) {
00532 tmpAddresses[tmpNDestinations] = ipAddr;
00533 tmpPorts[tmpNDestinations] = destPorts[i];
00534 tmpPasswds[tmpNDestinations] = strdup(destPasswds[i]);
00535
00536 sprintf(logmsg, "Adding destination host: %s - port %d",
00537 tmpAddresses[tmpNDestinations], tmpPorts[tmpNDestinations]);
00538 logger(INFO, logmsg);
00539
00540 tmpNDestinations++;
00541 }
00542 }
00543
00544 if (tmpNDestinations == 0) {
00545 freeMat(tmpAddresses, tmpNDestinations);
00546 freeMat(tmpPasswds, tmpNDestinations);
00547 throw runtime_error("[ arrayInit() ] There is no destination host specified correctly!");
00548 }
00549
00550 pthread_mutex_lock(&mutex);
00551 if (!firstTime)
00552 freeConf();
00553 this -> nDestinations = tmpNDestinations;
00554 this -> destAddresses = tmpAddresses;
00555 this -> destPorts = tmpPorts;
00556 this -> destPasswds = tmpPasswds;
00557 pthread_mutex_unlock(&mutex);
00558
00559
00560
00561 setJobMonitoring(jobMonitoring, jobMonitorInterval);
00562 setSysMonitoring(sysMonitoring, sysMonitorInterval);
00563 setGenMonitoring(genMonitoring, genMonitorIntervals);
00564 setConfRecheck(confCheck, recheckInterval);
00565 }
00566
00567
00568 ApMon::~ApMon() {
00569 int i;
00570
00571 if (bkThreadStarted) {
00572 if (getJobMonitoring()) {
00573
00574
00575 sendJobInfo();
00576 }
00577 }
00578
00579 pthread_mutex_lock(&mutexBack);
00580 setBackgroundThread(false);
00581 pthread_mutex_unlock(&mutexBack);
00582
00583 pthread_mutex_destroy(&mutex);
00584 pthread_mutex_destroy(&mutexBack);
00585 pthread_mutex_destroy(&mutexCond);
00586 pthread_cond_destroy(&confChangedCond);
00587
00588 free(clusterName);
00589 free(nodeName);
00590 free(sysMonCluster); free(sysMonNode);
00591
00592 freeConf();
00593
00594 free(monJobs);
00595 for (i = 0; i < nInitSources; i++) {
00596 free(initSources[i]);
00597 }
00598 free(initSources);
00599
00600 free(buf);
00601 #ifndef WIN32
00602 close(sockfd);
00603 #else
00604 closesocket(sockfd);
00605 WSACleanup();
00606 #endif
00607 }
00608
00609 void ApMon::freeConf() {
00610 int i;
00611 freeMat(destAddresses, nDestinations);
00612 freeMat(destPasswds, nDestinations);
00613 free(destPorts);
00614
00615 for (i = 0; i < confURLs.nConfURLs; i++) {
00616 free(confURLs.vURLs[i]);
00617 free(confURLs.lastModifURLs[i]);
00618 }
00619 }
00620
00621 int ApMon::sendParameters(char *clusterName, char *nodeName,
00622 int nParams, char **paramNames, int *valueTypes,
00623 char **paramValues) throw(runtime_error){
00624 return sendTimedParameters(clusterName, nodeName, nParams,
00625 paramNames, valueTypes, paramValues, -1);
00626 }
00627
00628 int ApMon::sendTimedParameters(char *clusterName, char *nodeName,
00629 int nParams, char **paramNames, int *valueTypes,
00630 char **paramValues, int timestamp) throw(runtime_error){
00631 int i;
00632 int ret, ret1, ret2;
00633 char msg[100], buf2[MAX_HEADER_LENGTH+4], newBuf[MAX_DGRAM_SIZE];
00634 #ifdef WIN32
00635 char crtAddr[20];
00636 #endif
00637 char *headerTmp;
00638 char header[MAX_HEADER_LENGTH] = "v:";
00639 strcat(header, APMON_VERSION);
00640 strcat(header, "_cpp");
00641 strcat(header, "p:");
00642
00643 pthread_mutex_lock(&mutex);
00644
00645 if(!shouldSend()) {
00646 pthread_mutex_unlock(&mutex);
00647 return RET_NOT_SENT;
00648 }
00649
00650 if (clusterName != NULL) {
00651
00652 free(this -> clusterName);
00653 this -> clusterName = strdup(clusterName);
00654
00655 if (nodeName != NULL) {
00656 free(this -> nodeName);
00657 this -> nodeName = strdup(nodeName);
00658 }
00659 else {
00660 free(this -> nodeName);
00661 this -> nodeName = strdup(this -> myHostname);
00662 }
00663 }
00664
00665 if (this -> clusterName == NULL || this -> nodeName == NULL) {
00666 pthread_mutex_unlock(&mutex);
00667 throw runtime_error("[ sendTimedParameters() ] Null cluster name or node name");
00668 }
00669
00670
00671
00672
00673 try {
00674 encodeParams(nParams, paramNames, valueTypes, paramValues, timestamp);
00675 } catch (runtime_error& err) {
00676 pthread_mutex_unlock(&mutex);
00677 throw err;
00678 }
00679
00680 headerTmp = (char *)malloc(MAX_HEADER_LENGTH * sizeof(char));
00681
00682 for (i = 0; i < nDestinations; i++) {
00683 XDR xdrs;
00684 struct sockaddr_in destAddr;
00685
00686
00687 memset(&destAddr, 0, sizeof(destAddr));
00688 destAddr.sin_family = AF_INET;
00689 destAddr.sin_port = htons(destPorts[i]);
00690 #ifndef WIN32
00691 inet_pton(AF_INET, destAddresses[i], &destAddr.sin_addr);
00692 #else
00693 int dummy = sizeof(destAddr);
00694 sprintf(crtAddr, "%s:%d", destAddresses[i], destPorts[i]);
00695 ret = WSAStringToAddress(crtAddr, AF_INET, NULL, (struct sockaddr *) &destAddr, &dummy);
00696 if(ret){
00697 ret = WSAGetLastError();
00698 sprintf(msg, "[ sendTimedParameters() ] Error packing address %s, code %d ", crtAddr, ret);
00699 throw runtime_error(msg);
00700 }
00701 #endif
00702
00703
00704 strcpy(headerTmp, header);
00705 strcat(headerTmp, destPasswds[i]);
00706
00707
00708 xdrmem_create(&xdrs, buf2, MAX_HEADER_LENGTH, XDR_ENCODE);
00709
00710
00711 ret = xdr_string(&xdrs, &(headerTmp), strlen(headerTmp) + 1);
00712
00713 ret1 = xdr_int(&xdrs, &(instance_id));
00714 ret2 = xdr_int(&xdrs, &(seq_nr));
00715
00716 if (!ret || !ret1 || !ret2) {
00717 free(headerTmp);
00718 pthread_mutex_unlock(&mutex);
00719 throw runtime_error("[ sendTimedParameters() ] XDR encoding error for the header");
00720 }
00721
00722
00723 int buf2Length = xdrSize(XDR_STRING, headerTmp) + 2 * xdrSize(XDR_INT32, NULL);
00724 memcpy(newBuf, buf2, buf2Length);
00725 memcpy(newBuf + buf2Length, buf, dgramSize);
00726
00727
00728 ret = sendto(sockfd, newBuf, dgramSize + buf2Length, 0,
00729 (struct sockaddr *)&destAddr, sizeof(destAddr));
00730 if (ret == RET_ERROR) {
00731 free(headerTmp);
00732 pthread_mutex_unlock(&mutex);
00733
00734
00735 #ifndef WIN32
00736 close(sockfd);
00737 #else
00738 closesocket(sockfd);
00739 #endif
00740 initSocket();
00741
00742
00743 sprintf(msg, "[ sendTimedParameters() ] Error sending data to destination %s ",
00744 destAddresses[i]);
00745 throw runtime_error(msg);
00746 }
00747 else {
00748 sprintf(msg, "Datagram with size %d, instance id %d, sequence number %d, sent to %s, containing parameters:",
00749 ret, instance_id, seq_nr, destAddresses[i]);
00750 logger(FINE, msg);
00751 logParameters(FINE, nParams, paramNames, valueTypes, paramValues);
00752 }
00753 xdr_destroy(&xdrs);
00754
00755 }
00756
00757 seq_nr = (seq_nr + 1) % TWO_BILLION;
00758 free(headerTmp);
00759 pthread_mutex_unlock(&mutex);
00760 return RET_SUCCESS;
00761 }
00762
00763
00764 int ApMon::sendParameter(char *clusterName, char *nodeName,
00765 char *paramName, int valueType, char *paramValue)
00766 throw(runtime_error){
00767
00768 return sendParameters(clusterName, nodeName, 1, ¶mName,
00769 &valueType, ¶mValue);
00770 }
00771
00772 int ApMon::sendTimedParameter(char *clusterName, char *nodeName,
00773 char *paramName, int valueType, char *paramValue, int timestamp)
00774 throw(runtime_error){
00775
00776 return sendTimedParameters(clusterName, nodeName, 1, ¶mName,
00777 &valueType, ¶mValue, timestamp);
00778 }
00779
00780 int ApMon::sendParameter(char *clusterName, char *nodeName,
00781 char *paramName, int paramValue) throw(runtime_error) {
00782
00783 return sendParameter(clusterName, nodeName, paramName, XDR_INT32,
00784 (char *)¶mValue);
00785 }
00786
00787 int ApMon::sendParameter(char *clusterName, char *nodeName,
00788 char *paramName, float paramValue) throw(runtime_error) {
00789
00790 return sendParameter(clusterName, nodeName, paramName, XDR_REAL32,
00791 (char *)¶mValue);
00792 }
00793
00794 int ApMon::sendParameter(char *clusterName, char *nodeName,
00795 char *paramName, double paramValue) throw(runtime_error) {
00796
00797 return sendParameter(clusterName, nodeName, paramName, XDR_REAL64,
00798 (char *)¶mValue);
00799 }
00800
00801 int ApMon::sendParameter(char *clusterName, char *nodeName,
00802 char *paramName, char *paramValue) throw(runtime_error) {
00803
00804 return sendParameter(clusterName, nodeName, paramName, XDR_STRING,
00805 paramValue);
00806 }
00807
00808 void ApMon::encodeParams(int nParams, char **paramNames, int *valueTypes,
00809 char **paramValues, int timestamp)
00810 throw(runtime_error){
00811 XDR xdrs;
00812 int i, effectiveNParams;
00813
00814
00815
00816
00817
00818 effectiveNParams = nParams;
00819 for (i = 0; i < nParams; i++) {
00820 if (paramNames[i] == NULL || (valueTypes[i] == XDR_STRING &&
00821 paramValues[i] == NULL)) {
00822 effectiveNParams--;
00823 }
00824 }
00825 if (effectiveNParams == 0)
00826 throw runtime_error("[ encodeParams() ] No valid parameters in datagram, sending aborted");
00827
00828
00829
00830
00831 dgramSize = xdrSize(XDR_STRING, clusterName) +
00832 xdrSize(XDR_STRING, nodeName) + xdrSize(XDR_INT32, NULL);
00833
00834 for (i = 0; i < nParams; i++) {
00835 dgramSize += xdrSize(XDR_STRING, paramNames[i]) + xdrSize(XDR_INT32, NULL) +
00836 + xdrSize(valueTypes[i], paramValues[i]);
00837 }
00838
00839
00840 if (dgramSize + MAX_HEADER_LENGTH > MAX_DGRAM_SIZE)
00841 throw runtime_error("[ encodeParams() ] Maximum datagram size exceeded");
00842
00843
00844 xdrmem_create(&xdrs, buf, MAX_DGRAM_SIZE, XDR_ENCODE);
00845
00846 try {
00847
00848 if (!xdr_string(&xdrs, &(clusterName), strlen(clusterName)
00849 + 1))
00850 throw runtime_error("[ encodeParams() ] XDR encoding error for the cluster name");
00851
00852 if (!xdr_string(&xdrs, &(nodeName), strlen(nodeName) + 1))
00853 throw runtime_error("[ encodeParams() ] XDR encoding error for the node name");
00854
00855 if (!xdr_int(&xdrs, &(effectiveNParams)))
00856 throw runtime_error("[ encodeParams() ] XDR encoding error for the number of parameters");
00857
00858
00859 for (i = 0; i < nParams; i++) {
00860 if (paramNames[i] == NULL || (valueTypes[i] == XDR_STRING &&
00861 paramValues[i] == NULL)) {
00862 logger(WARNING, "NULL parameter name or value - skipping parameter...");
00863 continue;
00864 }
00865
00866
00867 if (!xdr_string(&xdrs, &(paramNames[i]), strlen(paramNames[i]) + 1))
00868 throw runtime_error("[ encodeParams() ] XDR encoding error for parameter name");
00869
00870
00871 if (!xdr_int(&xdrs, &(valueTypes[i])))
00872 throw runtime_error("[ encodeParams() ] XDR encoding error for parameter value type");
00873
00874
00875 switch (valueTypes[i]) {
00876 case XDR_STRING:
00877 if (!xdr_string(&xdrs, &(paramValues[i]),
00878 strlen(paramValues[i]) + 1))
00879 throw runtime_error("[ encodeParams() ] XDR encoding error for parameter value");
00880 break;
00881
00882
00883
00884
00885
00886 case XDR_INT32:
00887 if (!xdr_int(&xdrs, (int *)(paramValues[i])))
00888 throw runtime_error("[ encodeParams() ] XDR encoding error for parameter value");
00889 break;
00890 case XDR_REAL32:
00891 if (!xdr_float(&xdrs, (float *)(paramValues[i])))
00892 throw runtime_error("[ encodeParams() ] XDR encoding error for parameter value");
00893 break;
00894 case XDR_REAL64:
00895 if (!xdr_double(&xdrs, (double *)(paramValues[i])))
00896 throw runtime_error("[ encodeParams() ] XDR encoding error for parameter value");
00897 break;
00898 default:
00899 throw runtime_error("[ encodeParams() ] Unknown type for XDR encoding");
00900 }
00901 }
00902
00903
00904 if (timestamp > 0) {
00905 if (!xdr_int(&xdrs, ×tamp))
00906 throw runtime_error("[ encodeParams() ] XDR encoding error for the timestamp");
00907 dgramSize += xdrSize(XDR_INT32, NULL);
00908 }
00909 } catch (runtime_error& err) {
00910 xdr_destroy(&xdrs);
00911 throw err;
00912 }
00913
00914 xdr_destroy(&xdrs);
00915 }
00916
00917 #ifndef WIN32
00918 void *bkTask(void *param) {
00919 #else
00920 DWORD WINAPI bkTask(void *param) {
00921 #endif
00922 struct stat st;
00923 #ifndef WIN32
00924 struct timespec delay;
00925 #else
00926 DWORD delay;
00927 #endif
00928 bool resourceChanged, haveChange;
00929 int nextOp = -1, i, ret;
00930 int generalInfoCount;
00931 time_t crtTime, timeRemained;
00932 time_t nextRecheck = 0, nextJobInfoSend = 0, nextSysInfoSend = 0;
00933 ApMon *apm = (ApMon *)param;
00934 char logmsg[200];
00935
00936 logger(INFO, "[Starting background thread...]");
00937 apm -> bkThreadStarted = true;
00938
00939 crtTime = time(NULL);
00940
00941 pthread_mutex_lock(&(apm -> mutexBack));
00942 if (apm -> confCheck) {
00943 nextRecheck = crtTime + apm -> crtRecheckInterval;
00944
00945
00946
00947
00948 }
00949 if (apm -> jobMonitoring)
00950 nextJobInfoSend = crtTime + apm -> jobMonitorInterval;
00951 if (apm -> sysMonitoring)
00952 nextSysInfoSend = crtTime + apm -> sysMonitorInterval;
00953 pthread_mutex_unlock(&(apm -> mutexBack));
00954
00955 timeRemained = -1;
00956 generalInfoCount = 0;
00957
00958 while (1) {
00959 pthread_mutex_lock(&apm -> mutexBack);
00960 if (apm -> stopBkThread) {
00961
00962 pthread_mutex_unlock(&apm -> mutexBack);
00963 break;
00964 }
00965 pthread_mutex_unlock(&apm -> mutexBack);
00966
00967
00968
00969
00970
00971
00972 if (nextRecheck > 0 && (nextJobInfoSend <= 0 ||
00973 nextRecheck <= nextJobInfoSend)) {
00974 if (nextSysInfoSend <= 0 || nextRecheck <= nextSysInfoSend) {
00975 nextOp = RECHECK_CONF;
00976 timeRemained = nextRecheck - crtTime;
00977 } else {
00978 nextOp = SYS_INFO_SEND;
00979 timeRemained = nextSysInfoSend - crtTime;
00980 }
00981 } else {
00982 if (nextJobInfoSend > 0 && (nextSysInfoSend <= 0 ||
00983 nextJobInfoSend <= nextSysInfoSend)) {
00984 nextOp = JOB_INFO_SEND;
00985 timeRemained = nextJobInfoSend - crtTime;
00986 } else if (nextSysInfoSend > 0) {
00987 nextOp = SYS_INFO_SEND;
00988 timeRemained = nextSysInfoSend - crtTime;
00989 }
00990 }
00991
00992 if (timeRemained == -1)
00993 timeRemained = RECHECK_INTERVAL;
00994
00995 #ifndef WIN32
00996
00997 delay.tv_sec = crtTime + timeRemained;
00998 delay.tv_nsec = 0;
00999 #else
01000 delay = ( timeRemained) * 1000;
01001 #endif
01002
01003 pthread_mutex_lock(&(apm -> mutexBack));
01004
01005 pthread_mutex_lock(&(apm -> mutexCond));
01006
01007 haveChange = false;
01008 if (apm -> jobMonChanged || apm -> sysMonChanged || apm -> recheckChanged)
01009 haveChange = true;
01010 if (apm -> jobMonChanged) {
01011 if (apm -> jobMonitoring)
01012 nextJobInfoSend = crtTime + apm -> jobMonitorInterval;
01013 else
01014 nextJobInfoSend = -1;
01015 apm -> jobMonChanged = false;
01016 }
01017 if (apm -> sysMonChanged) {
01018 if (apm -> sysMonitoring)
01019 nextSysInfoSend = crtTime + apm -> sysMonitorInterval;
01020 else
01021 nextSysInfoSend = -1;
01022 apm -> sysMonChanged = false;
01023 }
01024 if (apm -> recheckChanged) {
01025 if (apm -> confCheck) {
01026 nextRecheck = crtTime + apm -> crtRecheckInterval;
01027 }
01028 else
01029 nextRecheck = -1;
01030 apm -> recheckChanged = false;
01031 }
01032 pthread_mutex_unlock(&(apm -> mutexBack));
01033
01034 if (haveChange) {
01035 pthread_mutex_unlock(&(apm -> mutexCond));
01036 continue;
01037 }
01038
01039
01040
01041 #ifndef WIN32
01042 ret = pthread_cond_timedwait(&(apm -> confChangedCond),
01043 &(apm -> mutexCond), &delay);
01044 pthread_mutex_unlock(&(apm -> mutexCond));
01045 #else
01046 pthread_mutex_unlock(&(apm -> mutexCond));
01047 ret = WaitForSingleObject(apm->confChangedCond, delay);
01048 #endif
01049 if (ret == ETIMEDOUT) {
01050
01051
01052 if (nextOp == JOB_INFO_SEND) {
01053 apm -> sendJobInfo();
01054 crtTime = time(NULL);
01055 nextJobInfoSend = crtTime + apm -> getJobMonitorInterval();
01056 }
01057
01058 if (nextOp == SYS_INFO_SEND) {
01059 apm -> sendSysInfo();
01060 if (apm -> getGenMonitoring()) {
01061 if (generalInfoCount <= 1)
01062 apm -> sendGeneralInfo();
01063 generalInfoCount = (generalInfoCount + 1) % apm -> genMonitorIntervals;
01064 }
01065 crtTime = time(NULL);
01066 nextSysInfoSend = crtTime + apm -> getSysMonitorInterval();
01067 }
01068
01069 if (nextOp == RECHECK_CONF) {
01070
01071 resourceChanged = false;
01072 try {
01073 if (apm -> initType == FILE_INIT) {
01074 sprintf(logmsg, "Checking for modifications for file %s ",
01075 apm -> initSources[0]);
01076 logger(INFO, logmsg);
01077 stat(apm -> initSources[0], &st);
01078 if (st.st_mtime > apm -> lastModifFile) {
01079 sprintf(logmsg, "File %s modified ", apm -> initSources[0]);
01080 logger(INFO, logmsg);
01081 resourceChanged = true;
01082 }
01083 }
01084
01085
01086 for (i = 0; i < apm -> confURLs.nConfURLs; i++) {
01087 sprintf(logmsg, "[Checking for modifications for URL %s ] ",
01088 apm -> confURLs.vURLs[i]);
01089 logger(INFO, logmsg);
01090 if (urlModified(apm -> confURLs.vURLs[i], apm -> confURLs.lastModifURLs[i])) {
01091 sprintf(logmsg, "URL %s modified ", apm -> confURLs.vURLs[i]);
01092 logger(INFO, logmsg);
01093 resourceChanged = true;
01094 break;
01095 }
01096 }
01097
01098 if (resourceChanged) {
01099 logger(INFO, "Reloading configuration...");
01100 if (apm -> initType == FILE_INIT)
01101 apm -> initialize(apm -> initSources[0], false);
01102 else
01103 apm -> initialize(apm -> nInitSources, apm -> initSources, false);
01104 }
01105 apm -> setCrtRecheckInterval(apm -> getRecheckInterval());
01106 } catch (runtime_error &err) {
01107 logger(WARNING, err.what());
01108 logger(WARNING, "Increasing the time interval for reloading the configuration...");
01109 apm -> setCrtRecheckInterval(apm -> getRecheckInterval() * 5);
01110 }
01111 crtTime = time(NULL);
01112 nextRecheck = crtTime + apm -> getCrtRecheckInterval();
01113
01114 }
01115 }
01116
01117 }
01118
01119 #ifndef WIN32
01120 return NULL;
01121 #else
01122 return 0;
01123 #endif
01124 }
01125
01126 void ApMon::setConfRecheck(bool confCheck, long interval) {
01127 char logmsg[100];
01128 if (confCheck) {
01129 sprintf(logmsg, "Enabling configuration reloading (interval %ld)",
01130 interval);
01131 logger(INFO, logmsg);
01132 }
01133
01134 pthread_mutex_lock(&mutexBack);
01135 if (initType == DIRECT_INIT) {
01136 logger(WARNING, "[ setConfRecheck() } No configuration file/URL to reload.");
01137 return;
01138 }
01139
01140 this -> confCheck = confCheck;
01141 this -> recheckChanged = true;
01142 if (confCheck) {
01143 if (interval > 0) {
01144 this -> recheckInterval = interval;
01145 this -> crtRecheckInterval = interval;
01146 } else {
01147 this -> recheckInterval = RECHECK_INTERVAL;
01148 this -> crtRecheckInterval = RECHECK_INTERVAL;
01149 }
01150 setBackgroundThread(true);
01151 }
01152 else {
01153 if (jobMonitoring == false && sysMonitoring == false)
01154 setBackgroundThread(false);
01155 }
01156 pthread_mutex_unlock(&mutexBack);
01157
01158 }
01159
01160 void ApMon::setRecheckInterval(long val) {
01161 if (val > 0) {
01162 setConfRecheck(true, val);
01163 }
01164 else {
01165 setConfRecheck(false, val);
01166 }
01167 }
01168
01169 void ApMon::setCrtRecheckInterval(long val) {
01170 pthread_mutex_lock(&mutexBack);
01171 crtRecheckInterval = val;
01172 pthread_mutex_unlock(&mutexBack);
01173 }
01174
01175 void ApMon::setJobMonitoring(bool jobMonitoring, long interval) {
01176 char logmsg[100];
01177 if (jobMonitoring) {
01178 sprintf(logmsg, "Enabling job monitoring, time interval %ld s... ", interval);
01179 logger(INFO, logmsg);
01180 } else
01181 logger(INFO, "Disabling job monitoring...");
01182
01183 pthread_mutex_lock(&mutexBack);
01184 this -> jobMonitoring = jobMonitoring;
01185 this -> jobMonChanged = true;
01186 if (jobMonitoring == true) {
01187 if (interval > 0)
01188 this -> jobMonitorInterval = interval;
01189 else
01190 this -> jobMonitorInterval = JOB_MONITOR_INTERVAL;
01191 setBackgroundThread(true);
01192 } else {
01193
01194 if (this -> sysMonitoring == false && this -> confCheck == false)
01195 setBackgroundThread(false);
01196 }
01197 pthread_mutex_unlock(&mutexBack);
01198 }
01199
01200 void ApMon::setSysMonitoring(bool sysMonitoring, long interval) {
01201 char logmsg[100];
01202 if (sysMonitoring) {
01203 sprintf(logmsg, "Enabling system monitoring, time interval %ld s... ", interval);
01204 logger(INFO, logmsg);
01205 } else
01206 logger(INFO, "Disabling system monitoring...");
01207
01208 pthread_mutex_lock(&mutexBack);
01209 this -> sysMonitoring = sysMonitoring;
01210 this -> sysMonChanged = true;
01211 if (sysMonitoring == true) {
01212 if (interval > 0)
01213 this -> sysMonitorInterval = interval;
01214 else
01215 this -> sysMonitorInterval = SYS_MONITOR_INTERVAL;
01216 setBackgroundThread(true);
01217 } else {
01218
01219 if (this -> jobMonitoring == false && this -> confCheck == false)
01220 setBackgroundThread(false);
01221 }
01222 pthread_mutex_unlock(&mutexBack);
01223 }
01224
01225 void ApMon::setGenMonitoring(bool genMonitoring, int nIntervals) {
01226 char logmsg[100];
01227 sprintf(logmsg, "Setting general information monitoring to %s ",
01228 boolStrings[(int)genMonitoring]);
01229 logger(INFO, logmsg);
01230
01231 pthread_mutex_lock(&mutexBack);
01232 this -> genMonitoring = genMonitoring;
01233 this -> sysMonChanged = true;
01234 if (genMonitoring == true) {
01235 if (nIntervals > 0)
01236 this -> genMonitorIntervals = nIntervals;
01237 else
01238 this -> genMonitorIntervals = GEN_MONITOR_INTERVALS;
01239
01240 if (this -> sysMonitoring == false) {
01241 pthread_mutex_unlock(&mutexBack);
01242 setSysMonitoring(true);
01243 pthread_mutex_lock(&mutexBack);
01244 }
01245 }
01246
01247 pthread_mutex_unlock(&mutexBack);
01248 }
01249
01250 void ApMon::setBackgroundThread(bool val) {
01251
01252 if (val == true) {
01253 if (!haveBkThread) {
01254 #ifndef WIN32
01255 pthread_create(&bkThread, NULL, &bkTask, this);
01256 #else
01257 DWORD dummy;
01258 bkThread = CreateThread(NULL, 65536, &bkTask, this, 0, &dummy);
01259 #endif
01260 haveBkThread = true;
01261 } else {
01262 pthread_mutex_lock(&mutexCond);
01263 pthread_cond_signal(&confChangedCond);
01264 pthread_mutex_unlock(&mutexCond);
01265 }
01266 }
01267 if (val == false) {
01268
01269 if (haveBkThread) {
01270 stopBkThread = true;
01271 pthread_mutex_unlock(&mutexBack);
01272 #ifndef WIN32
01273 pthread_mutex_lock(&mutexCond);
01274 #endif
01275 pthread_cond_signal(&confChangedCond);
01276 logger(INFO, "[Stopping the background thread...]");
01277 #ifndef WIN32
01278 pthread_mutex_unlock(&mutexCond);
01279 pthread_join(bkThread, NULL);
01280 #else
01281 WaitForSingleObject(bkThread, INFINITE);
01282 #endif
01283 pthread_mutex_lock(&mutexBack);
01284
01285 haveBkThread = false;
01286 bkThreadStarted = false;
01287 stopBkThread = false;
01288 }
01289 }
01290 }
01291
01292 void ApMon::addJobToMonitor(long pid, char *workdir, char *clusterName,
01293 char *nodeName) throw(runtime_error) {
01294 if (nMonJobs >= MAX_MONITORED_JOBS)
01295 throw runtime_error("[ addJobToMonitor() ] Maximum number of jobs that can be monitored exceeded.");
01296 MonitoredJob job;
01297 job.pid = pid;
01298 if (workdir == NULL)
01299 strcpy(job.workdir, "");
01300 else
01301 strcpy(job.workdir, workdir);
01302
01303 if (clusterName == NULL || strlen(clusterName) == 0)
01304 strcpy(job.clusterName, "ApMon_JobMon");
01305 else
01306 strcpy(job.clusterName, clusterName);
01307 if (nodeName == NULL || strlen(nodeName) == 0)
01308 strcpy(job.nodeName, this -> myIP);
01309 else
01310 strcpy(job.nodeName, nodeName);
01311
01312 monJobs[nMonJobs++] = job;
01313 }
01314
01315 void ApMon::removeJobToMonitor(long pid) throw(runtime_error) {
01316 int i, j;
01317 char msg[100];
01318
01319 if (nMonJobs <= 0)
01320 throw runtime_error("[ removeJobToMonitor() ] There are no monitored jobs.");
01321
01322 for (i = 0; i < nMonJobs; i++) {
01323 if (monJobs[i].pid == pid) {
01324
01325 for (j = i; j < nMonJobs - 1; j++)
01326 monJobs[j] = monJobs[j + 1];
01327 nMonJobs--;
01328 return;
01329 }
01330 }
01331
01332
01333 sprintf(msg, "removeJobToMonitor(): Job %ld not found.", pid);
01334 throw runtime_error(msg);
01335 }
01336
01337 void ApMon::setSysMonClusterNode(char *clusterName, char *nodeName) {
01338 free (sysMonCluster); free(sysMonNode);
01339 sysMonCluster = strdup(clusterName);
01340 sysMonNode = strdup(nodeName);
01341 }
01342
01343 void ApMon::setLogLevel(char *newLevel_s) {
01344 int newLevel;
01345 const char *levels[5] = {"FATAL", "WARNING", "INFO", "FINE", "DEBUG"};
01346 char logmsg[100];
01347
01348 for (newLevel = 0; newLevel < 5; newLevel++)
01349 if (strcmp(newLevel_s, levels[newLevel]) == 0)
01350 break;
01351
01352 if (newLevel >= 5) {
01353 sprintf(logmsg, "[ setLogLevel() ] Invalid level value: %s", newLevel_s);
01354 logger(WARNING, logmsg);
01355 }
01356 else
01357 logger(0, NULL, newLevel);
01358 }
01359
01360
01361 void ApMon::setMaxMsgRate(int maxRate) {
01362 if (maxRate > 0)
01363 this -> maxMsgRate = maxRate;
01364 }
01365
01366 void ApMon::initSocket() throw(runtime_error) {
01367 int optval1 = 1;
01368 struct timeval optval2;
01369 int ret1, ret2, ret3;
01370
01371 sockfd = socket(AF_INET, SOCK_DGRAM, 0);
01372 if (sockfd < 0)
01373 throw runtime_error("[ initSocket() ] Error creating socket");
01374 ret1 = setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, (char *) &optval1,
01375 sizeof(optval1));
01376
01377
01378 optval2.tv_sec = 20;
01379 optval2.tv_usec = 0;
01380 ret2 = setsockopt(sockfd, SOL_SOCKET, SO_SNDTIMEO, (char *) &optval2,
01381 sizeof(optval2));
01382 ret3 = setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, (char *) &optval2,
01383 sizeof(optval2));
01384 if (ret1 != 0 || ret2 != 0 || ret3 != 0)
01385 throw runtime_error("[ initSocket() ] Error initializing socket.");
01386 }
01387
01388
01389 void ApMon::parseConf(FILE *fp, int *nDestinations, char **destAddresses,
01390 int *destPorts, char **destPasswds)
01391 throw(runtime_error) {
01392 int i, ch;
01393 char *line = (char *)malloc ((MAX_STRING_LEN1) * sizeof(char));
01394 char *tmp = NULL;
01395 char *loglevel_s;
01396
01397
01398
01399
01400 while(fgets(line, MAX_STRING_LEN, fp) != NULL) {
01401
01402 if (tmp != NULL) {
01403 free(tmp);
01404 tmp = NULL;
01405 }
01406
01407 line[MAX_STRING_LEN - 1] = 0;
01408
01409 ch = fgetc(fp);
01410 ungetc(ch, fp);
01411 if (line[strlen(line) - 1] != 10 && ch != EOF) {
01412
01413
01414 fclose(fp);
01415 throw runtime_error ("[ parseConf() ] Maximum line length exceeded in the conf file");
01416 }
01417
01418 tmp = trimString(line);
01419
01420
01421 if (strlen(tmp) == 0 || strchr(tmp, '#') == tmp)
01422 continue;
01423
01424 if (strstr(tmp, "xApMon_loglevel") == tmp) {
01425 char *tmp2 = tmp;
01426 strtok(tmp2, "= ");
01427 loglevel_s = strtok(NULL, "= ");
01428 setLogLevel(loglevel_s);
01429 continue;
01430 }
01431
01432 if (strstr(tmp, "xApMon_") == tmp) {
01433 parseXApMonLine(tmp);
01434 continue;
01435 }
01436
01437 if (*nDestinations >= MAX_N_DESTINATIONS) {
01438 free(line); free(tmp);
01439 for (i = 0; i < *nDestinations; i++) {
01440 free(destAddresses[i]);
01441 free(destPasswds[i]);
01442 }
01443 fclose(fp);
01444 throw runtime_error("[ parseConf() ] Maximum number of destinations exceeded.");
01445 }
01446
01447 addToDestinations(tmp, nDestinations, destAddresses, destPorts,
01448 destPasswds);
01449 }
01450
01451 if (tmp != NULL)
01452 free(tmp);
01453 free(line);
01454 }
01455
01456 bool ApMon::shouldSend() {
01457
01458 long now = time(NULL);
01459 bool doSend;
01460 char msg[200];
01461
01462
01463
01464 if (now != crtTime){
01466 prvSent = hWeight * prvSent + (1.0 - hWeight) * crtSent / (now - crtTime);
01467 prvTime = crtTime;
01468 sprintf(msg, "previously sent: %ld dropped: %ld", crtSent, crtDrop);
01469 logger(DEBUG, msg);
01471 crtTime = now;
01472 crtSent = 0;
01473 crtDrop = 0;
01474
01475 }
01476
01478 int valSent = (int)(prvSent * hWeight + crtSent * (1.0 - hWeight));
01479
01480 doSend = true;
01482 int level = this -> maxMsgRate - this -> maxMsgRate / 10;
01483
01484
01485 if (valSent > (this -> maxMsgRate - level)) {
01486
01487 int rnd = rand() % (this -> maxMsgRate / 10);
01488 doSend = (rnd < (this -> maxMsgRate - valSent));
01489 }
01491 if (doSend) {
01492 crtSent++;
01493
01494 } else {
01495 crtDrop++;
01496
01497 }
01498
01499 return doSend;
01500 }
01501
01502
01503
01504
01505