00001
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038 #include "ApMon.h"
00039 #include "utils.h"
00040 #include "proc_utils.h"
00041 #include "monitor_utils.h"
00042
00043 #include <math.h>
00044
00045 using namespace apmon_utils;
00046 using namespace apmon_mon_utils;
00047
00048 #define RECHECK_CONF 0
00049 #define SYS_INFO_SEND 1
00050 #define JOB_INFO_SEND 2
00051
00052 char boolStrings[][10] = {"false", "true"};
00053
00054
00055
00056 ApMon::ApMon(char *initsource)
00057 throw(runtime_error) {
00058
00059 if (initsource == NULL)
00060 throw runtime_error("[ ApMon() ] No conf file/URL provided");
00061
00062 if (strstr(initsource, "http://") == initsource) {
00063 char *destList[1];
00064 destList[0] = initsource;
00065 constructFromList(1, destList);
00066 } else {
00067 nInitSources = 1;
00068 initType = FILE_INIT;
00069 initSources = (char **)malloc(nInitSources * sizeof(char *));
00070 if (initSources == NULL)
00071 throw runtime_error("[ ApMon() ] Error allocating memory.");
00072
00073 initSources[0] = strdup(initsource);
00074 initMonitoring();
00075
00076 initialize(initsource, true);
00077 }
00078 }
00079
00080 void ApMon::initialize(char *filename, bool firstTime)
00081 throw(runtime_error) {
00082
00083 char *destAddresses[MAX_N_DESTINATIONS];
00084 int destPorts[MAX_N_DESTINATIONS];
00085 char *destPasswds[MAX_N_DESTINATIONS];
00086 int nDest = 0, i;
00087 ConfURLs confURLs;
00088
00089 confURLs.nConfURLs = 0;
00090
00091 try {
00092 loadFile(filename, &nDest, destAddresses, destPorts, destPasswds);
00093
00094 arrayInit(nDest, destAddresses, destPorts, destPasswds, firstTime);
00095 } catch (runtime_error& err) {
00096 if (firstTime)
00097 throw err;
00098 else {
00099 logger(WARNING, err.what());
00100 logger(WARNING, "Error reloading the configuration. Keeping the previous one.");
00101 return;
00102 }
00103 }
00104
00105 for (i = 0; i < nDest; i++) {
00106 free(destAddresses[i]);
00107 free(destPasswds[i]);
00108 }
00109
00110 pthread_mutex_lock(&mutex);
00111 this -> confURLs = confURLs;
00112 pthread_mutex_unlock(&mutex);
00113 }
00114
00115 void ApMon::loadFile(char *filename, int *nDestinations, char **destAddresses,
00116 int *destPorts, char **destPasswds)
00117 throw(runtime_error) {
00118 FILE *f;
00119 char msg[100];
00120
00121
00122 f = fopen(filename, "rt");
00123 if (f == NULL) {
00124 throw runtime_error("[ loadFile() ] Error opening configuration file");
00125 }
00126
00127 sprintf(msg, "Loading file %s ...", filename);
00128 logger(INFO, msg);
00129
00130 lastModifFile = time(NULL);
00131
00132 parseConf(f, nDestinations, destAddresses, destPorts,
00133 destPasswds);
00134 fclose(f);
00135 }
00136
00137 ApMon::ApMon(int nDestinations, char **destinationsList) throw(runtime_error){
00138 constructFromList(nDestinations, destinationsList);
00139 }
00140
00141 void ApMon::constructFromList(int nDestinations, char **destinationsList)
00142 throw(runtime_error) {
00143 int i;
00144
00145 if (destinationsList == NULL)
00146 throw runtime_error("[ constructFromList() ] Null destination list");
00147
00148 #ifdef __APPLE__
00149 initType = OLIST_INIT;
00150 #else
00151 initType = LIST_INIT;
00152 #endif
00153
00154 initMonitoring();
00155
00156
00157 nInitSources = nDestinations;
00158 initSources = (char **)malloc(nInitSources * sizeof(char*));
00159 if (initSources == NULL)
00160 throw runtime_error("[ ApMon() ] Error allocating memory.");
00161
00162 for (i = 0; i < nInitSources; i++)
00163 initSources[i] = strdup(destinationsList[i]);
00164
00165 initialize(nDestinations, destinationsList, true);
00166 }
00167
00168 void ApMon::initialize(int nDestinations, char **destinationsList,
00169 bool firstTime) throw(runtime_error) {
00170 char *destAddresses[MAX_N_DESTINATIONS];
00171 int destPorts[MAX_N_DESTINATIONS];
00172 char *destPasswds[MAX_N_DESTINATIONS];
00173 char errmsg[200];
00174 int i;
00175 int cnt = 0;
00176 ConfURLs confURLs;
00177
00178 logger(INFO, "Initializing destination addresses & ports:");
00179
00180 if (nDestinations > MAX_N_DESTINATIONS)
00181 throw runtime_error("[ initialize() ] Maximum number of destinations exceeded");
00182
00183
00184 confURLs.nConfURLs = 0;
00185
00186 for (i = 0; i < nDestinations; i++) {
00187 try {
00188 if (strstr(destinationsList[i], "http") == destinationsList[i])
00189 getDestFromWeb(destinationsList[i], &cnt,
00190 destAddresses, destPorts, destPasswds, confURLs);
00191 else
00192 addToDestinations(destinationsList[i], &cnt,
00193 destAddresses, destPorts, destPasswds);
00194
00195 } catch (runtime_error &e) {
00196 sprintf(errmsg, "[ initialize() ] Error while loading the configuration: %s", e.what());
00197 logger(WARNING, errmsg);
00198 if (!firstTime) {
00199 for (i = 0; i < cnt; i++) {
00200 free(destAddresses[i]);
00201 free(destPasswds[i]);
00202 }
00203 logger(WARNING, "Configuration not reloaded successfully. Keeping the previous one.");
00204 return;
00205 }
00206 }
00207 }
00208
00209 try {
00210 arrayInit(cnt, destAddresses, destPorts, destPasswds, firstTime);
00211 } catch (runtime_error& err) {
00212 if (firstTime)
00213 throw err;
00214 else {
00215 logger(WARNING, "Error reloading the configuration. Keeping the previous one.");
00216 return;
00217 }
00218 }
00219
00220 for (i = 0; i < cnt; i++) {
00221 free(destAddresses[i]);
00222 free(destPasswds[i]);
00223 }
00224
00225 pthread_mutex_lock(&mutex);
00226 this -> confURLs = confURLs;
00227 pthread_mutex_unlock(&mutex);
00228 }
00229
00230 void ApMon::addToDestinations(char *line, int *nDestinations,
00231 char *destAddresses[], int destPorts[], char *destPasswds[]) {
00232 char *addr, *port, *passwd;
00233 const char *sep1 = " \t";
00234 const char *sep2 = ":";
00235
00236 char *tmp = strdup(line);
00237 char *firstToken;
00238
00239
00240
00241
00242 firstToken = strtok(tmp, sep1);
00243 passwd = strtok(NULL, sep1);
00244
00245
00246 addr = strtok(firstToken, sep2);
00247 port = strtok(NULL, sep2);
00248 destAddresses[*nDestinations] = strdup(addr);
00249 if (port == NULL)
00250 destPorts[*nDestinations] = DEFAULT_PORT;
00251 else
00252 destPorts[*nDestinations] = atoi(port);
00253 if (passwd == NULL)
00254 destPasswds[*nDestinations] = strdup("");
00255 else
00256 destPasswds[*nDestinations] = strdup(passwd);
00257 (*nDestinations)++;
00258
00259 free(tmp);
00260 }
00261
00262 void ApMon::getDestFromWeb(char *url, int *nDestinations,
00263 char *destAddresses[], int destPorts[], char *destPasswds[],
00264 ConfURLs& confURLs) throw(runtime_error) {
00265 char temp_filename[300];
00266 FILE *tmp_file;
00267 char *line, *ret, *tmp = NULL;
00268 bool modifLineFound;
00269 long mypid = getpid();
00270 char str1[20], str2[20];
00271 int totalSize, headerSize, contentSize;
00272
00273 #ifndef WIN32
00274 sprintf(temp_filename, "/tmp/apmon_webconf%ld", mypid);
00275 #else
00276 char *tmpp = getenv("TEMP");
00277 if(tmpp == NULL)
00278 tmpp = getenv("TMP");
00279 if(tmpp == NULL)
00280 tmpp = "c:";
00281 sprintf(temp_filename, "%s\\apmon_webconf%ld", tmpp, mypid);
00282 #endif
00283
00284 totalSize = httpRequest(url, (char*)"GET", temp_filename);
00285
00286
00287 tmp_file = fopen(temp_filename, "rt");
00288 if (tmp_file == NULL)
00289 throw runtime_error("[ getDestFromWeb() ] Error getting the configuration web page");
00290
00291 line = (char*)malloc((MAX_STRING_LEN + 1) * sizeof(char));
00292
00293
00294 fgets(line, MAX_STRING_LEN, tmp_file);
00295 sscanf(line, "%s %s", str1, str2);
00296 if (atoi(str2) != 200) {
00297 free(line);
00298 fclose(tmp_file);
00299 throw runtime_error("[ getDestFromWeb() ] The web page does not exist on the server");
00300 }
00301
00302 confURLs.vURLs[confURLs.nConfURLs] = strdup(url);
00303
00304
00305 modifLineFound = false;
00306 contentSize = 0;
00307 do {
00308 if (tmp != NULL)
00309 free(tmp);
00310 ret = fgets(line, MAX_STRING_LEN, tmp_file);
00311 if (ret == NULL) {
00312 free(line); fclose(tmp_file);
00313 throw runtime_error("[ getDestFromWeb() ] Error getting the configuration web page");
00314 }
00315 if (strstr(line, "Last-Modified") == line) {
00316 modifLineFound = true;
00317 confURLs.lastModifURLs[confURLs.nConfURLs] = strdup(line);
00318 }
00319
00320 if (strstr(line, "Content-Length") == line) {
00321 sscanf(line, "%s %d", str1, &contentSize);
00322 }
00323
00324 tmp = trimString(line);
00325 } while (strlen(tmp) != 0);
00326 free(tmp); free(line);
00327
00328 if (!modifLineFound)
00329 confURLs.lastModifURLs[confURLs.nConfURLs] = strdup("");
00330 confURLs.nConfURLs++;
00331
00332 headerSize = ftell(tmp_file);
00333 if (totalSize - headerSize < contentSize) {
00334 fclose(tmp_file);
00335 throw runtime_error("[ getDestFromWeb() ] Web page received incompletely");
00336 }
00337
00338 try {
00339 parseConf(tmp_file, nDestinations, destAddresses, destPorts,
00340 destPasswds);
00341 } catch (...) {
00342 fclose(tmp_file);
00343 unlink(temp_filename);
00344 throw;
00345 }
00346
00347 fclose(tmp_file);
00348 unlink(temp_filename);
00349 }
00350
00351
00352 ApMon::ApMon(int nDestinations, char **destAddresses, int *destPorts,
00353 char **destPasswds)
00354 throw(runtime_error) {
00355 initMonitoring();
00356
00357 arrayInit(nDestinations, destAddresses, destPorts, destPasswds);
00358 }
00359
00360 void ApMon::arrayInit(int nDestinations, char **destAddresses,
00361 int *destPorts, char **destPasswds)
00362 throw(runtime_error) {
00363 arrayInit(nDestinations, destAddresses, destPorts, destPasswds, true);
00364 }
00365
00366
00367 void ApMon::arrayInit(int nDestinations, char **destAddresses, int *destPorts,
00368 char **destPasswds, bool firstTime)
00369 throw(runtime_error) {
00370 int i, j;
00371 int ret;
00372 char *ipAddr, logmsg[100];
00373 bool found, havePublicIP;
00374 int tmpNDestinations;
00375 char **tmpAddresses, **tmpPasswds;
00376 int *tmpPorts;
00377
00378 if (destAddresses == NULL || destPorts == NULL || nDestinations == 0)
00379 throw runtime_error("[ arrayInit() ] Destination addresses or ports not provided");
00380
00381
00382 if (firstTime) {
00383
00384
00385 this -> nMonJobs = 0;
00386 this -> monJobs = (MonitoredJob *)malloc(MAX_MONITORED_JOBS *
00387 sizeof(MonitoredJob));
00388
00389 try {
00390 this -> numCPUs = ProcUtils::getNumCPUs();
00391 } catch (procutils_error &err) {
00392 logger(WARNING, err.what());
00393 this -> numCPUs = 0;
00394 }
00395
00396
00397 this -> nInterfaces = 0;
00398 try {
00399 ProcUtils::getNetworkInterfaces(this -> nInterfaces,
00400 this -> interfaceNames);
00401 } catch (procutils_error &err) {
00402 logger(WARNING, err.what());
00403 this -> nInterfaces = 0;
00404 }
00405
00406
00407 ret = gethostname(this -> myHostname, MAX_STRING_LEN -1);
00408 if (ret < 0) {
00409 logger(WARNING, "Could not obtain the local hostname");
00410 strcpy(myHostname, "unknown");
00411 } else
00412 myHostname[MAX_STRING_LEN - 1] = 0;
00413
00414
00415 this -> numIPs = 0; havePublicIP = false;
00416 strcpy(this -> myIP, "unknown");
00417
00418
00419 this -> clusterName = strdup("ApMon_UserSend");
00420 this -> nodeName = strdup(myHostname);
00421
00422 #ifndef WIN32
00423 int sockd = socket(PF_INET, SOCK_STREAM, 0);
00424 if(sockd < 0){
00425 logger(WARNING, "Could not obtain local IP addresses");
00426 } else {
00427 for (i = 0; i < this -> nInterfaces; i++) {
00428 struct ifreq ifr;
00429 memset(&ifr, 0, sizeof(ifr));
00430 strncpy(ifr.ifr_name, this -> interfaceNames[i], sizeof(ifr.ifr_name) - 1);
00431 if(ioctl(sockd, SIOCGIFADDR, &ifr)<0)
00432 continue;
00433 char ip[4], tmp_s[20];
00434 #ifdef __APPLE__
00435 memcpy(ip, ifr.ifr_addr.sa_data+2, 4);
00436 #else
00437 memcpy(ip, ifr.ifr_hwaddr.sa_data+2, 4);
00438 #endif
00439 strcpy(tmp_s, inet_ntoa(*(struct in_addr *)ip));
00440 sprintf(logmsg, "Found local IP address: %s", tmp_s);
00441 logger(FINE, logmsg);
00442 if (strcmp(tmp_s, "127.0.0.1") != 0 && !havePublicIP) {
00443 strcpy(this -> myIP, tmp_s);
00444 if (!isPrivateAddress(tmp_s))
00445 havePublicIP = true;
00446 }
00447 strcpy(this -> allMyIPs[this -> numIPs], tmp_s);
00448 this -> numIPs++;
00449 }
00450 }
00451 #else
00452 struct hostent *hptr;
00453 if ((hptr = gethostbyname(myHostname))!= NULL) {
00454 i = 0;
00455 struct in_addr addr;
00456 while ((hptr -> h_addr_list)[i] != NULL) {
00457 memcpy(&(addr.s_addr), (hptr -> h_addr_list)[i], 4);
00458 ipAddr = inet_ntoa(addr);
00459 if (strcmp(ipAddr, "127.0.0.1") != 0) {
00460 strcpy(this -> myIP, ipAddr);
00461 if (!isPrivateAddress(ipAddr))
00462 break;
00463 }
00464 i++;
00465 }
00466 }
00467 #endif
00468
00469 this -> sysMonCluster = strdup("ApMon_SysMon");
00470 this -> sysMonNode = strdup(this -> myIP);
00471
00472 this -> prvTime = 0;
00473 this -> prvSent = 0;
00474 this -> prvDrop = 0;
00475 this -> crtTime = 0;
00476 this -> crtSent = 0;
00477 this -> crtDrop = 0;
00478 this -> hWeight = exp(-5.0/60.0);
00479
00480 srand(time(NULL));
00481
00482
00483 this -> buf = (char *)malloc(MAX_DGRAM_SIZE);
00484 if (this -> buf == NULL)
00485 throw runtime_error("[ arrayInit() ] Error allocating memory");
00486 this -> dgramSize = 0;
00487
00488
00489 initSocket();
00490
00491
00492 instance_id = rand();
00493 seq_nr = 0;
00494 }
00495
00496
00497
00498
00499
00500 tmpNDestinations = 0;
00501 tmpPorts = (int *)malloc(nDestinations * sizeof(int));
00502 tmpAddresses = (char **)malloc(nDestinations * sizeof(char *));
00503 tmpPasswds = (char **)malloc(nDestinations * sizeof(char *));
00504 if (tmpPorts == NULL || tmpAddresses == NULL ||
00505 tmpPasswds == NULL)
00506 throw runtime_error("[ arrayInit() ] Error allocating memory");
00507
00508 for (i = 0; i < nDestinations; i++) {
00509 try {
00510 ipAddr = findIP(destAddresses[i]);
00511 } catch (runtime_error &err) {
00512 logger(FATAL, err.what());
00513 continue;
00514 }
00515
00516
00517 found = false;
00518 for (j = 0; j < tmpNDestinations; j++) {
00519 if (!strcmp(ipAddr, tmpAddresses[j])) {
00520 found = true;
00521 break;
00522 }
00523 }
00524
00525
00526 if (!found) {
00527 tmpAddresses[tmpNDestinations] = ipAddr;
00528 tmpPorts[tmpNDestinations] = destPorts[i];
00529 tmpPasswds[tmpNDestinations] = strdup(destPasswds[i]);
00530
00531 sprintf(logmsg, "Adding destination host: %s - port %d",
00532 tmpAddresses[tmpNDestinations], tmpPorts[tmpNDestinations]);
00533 logger(INFO, logmsg);
00534
00535 tmpNDestinations++;
00536 }
00537 }
00538
00539 if (tmpNDestinations == 0) {
00540 freeMat(tmpAddresses, tmpNDestinations);
00541 freeMat(tmpPasswds, tmpNDestinations);
00542 throw runtime_error("[ arrayInit() ] There is no destination host specified correctly!");
00543 }
00544
00545 pthread_mutex_lock(&mutex);
00546 if (!firstTime)
00547 freeConf();
00548 this -> nDestinations = tmpNDestinations;
00549 this -> destAddresses = tmpAddresses;
00550 this -> destPorts = tmpPorts;
00551 this -> destPasswds = tmpPasswds;
00552 pthread_mutex_unlock(&mutex);
00553
00554
00555
00556 setJobMonitoring(jobMonitoring, jobMonitorInterval);
00557 setSysMonitoring(sysMonitoring, sysMonitorInterval);
00558 setGenMonitoring(genMonitoring, genMonitorIntervals);
00559 setConfRecheck(confCheck, recheckInterval);
00560 }
00561
00562
00563 ApMon::~ApMon() {
00564 int i;
00565
00566 if (bkThreadStarted) {
00567 if (getJobMonitoring()) {
00568
00569
00570 sendJobInfo();
00571 }
00572 }
00573
00574 pthread_mutex_lock(&mutexBack);
00575 setBackgroundThread(false);
00576 pthread_mutex_unlock(&mutexBack);
00577
00578 pthread_mutex_destroy(&mutex);
00579 pthread_mutex_destroy(&mutexBack);
00580 pthread_mutex_destroy(&mutexCond);
00581 pthread_cond_destroy(&confChangedCond);
00582
00583 free(clusterName);
00584 free(nodeName);
00585 free(sysMonCluster); free(sysMonNode);
00586
00587 freeConf();
00588
00589 free(monJobs);
00590 for (i = 0; i < nInitSources; i++) {
00591 free(initSources[i]);
00592 }
00593 free(initSources);
00594
00595 free(buf);
00596 #ifndef WIN32
00597 close(sockfd);
00598 #else
00599 closesocket(sockfd);
00600 WSACleanup();
00601 #endif
00602 }
00603
00604 void ApMon::freeConf() {
00605 int i;
00606 freeMat(destAddresses, nDestinations);
00607 freeMat(destPasswds, nDestinations);
00608 free(destPorts);
00609
00610 for (i = 0; i < confURLs.nConfURLs; i++) {
00611 free(confURLs.vURLs[i]);
00612 free(confURLs.lastModifURLs[i]);
00613 }
00614 }
00615
00616 int ApMon::sendParameters(char *clusterName, char *nodeName,
00617 int nParams, char **paramNames, int *valueTypes,
00618 char **paramValues) throw(runtime_error){
00619 return sendTimedParameters(clusterName, nodeName, nParams,
00620 paramNames, valueTypes, paramValues, -1);
00621 }
00622
00623 int ApMon::sendTimedParameters(char *clusterName, char *nodeName,
00624 int nParams, char **paramNames, int *valueTypes,
00625 char **paramValues, int timestamp) throw(runtime_error){
00626 int i;
00627 int ret, ret1, ret2;
00628 char msg[100], buf2[MAX_HEADER_LENGTH+4], newBuf[MAX_DGRAM_SIZE];
00629 #ifdef WIN32
00630 char crtAddr[20];
00631 #endif
00632 char *headerTmp;
00633 char header[MAX_HEADER_LENGTH] = "v:";
00634 strcat(header, APMON_VERSION);
00635 strcat(header, "_cpp");
00636 strcat(header, "p:");
00637
00638 pthread_mutex_lock(&mutex);
00639
00640 if(!shouldSend()) {
00641 pthread_mutex_unlock(&mutex);
00642 return RET_NOT_SENT;
00643 }
00644
00645 if (clusterName != NULL) {
00646
00647 free(this -> clusterName);
00648 this -> clusterName = strdup(clusterName);
00649
00650 if (nodeName != NULL) {
00651 free(this -> nodeName);
00652 this -> nodeName = strdup(nodeName);
00653 }
00654 else {
00655 free(this -> nodeName);
00656 this -> nodeName = strdup(this -> myHostname);
00657 }
00658 }
00659
00660 if (this -> clusterName == NULL || this -> nodeName == NULL) {
00661 pthread_mutex_unlock(&mutex);
00662 throw runtime_error("[ sendTimedParameters() ] Null cluster name or node name");
00663 }
00664
00665
00666
00667
00668 try {
00669 encodeParams(nParams, paramNames, valueTypes, paramValues, timestamp);
00670 } catch (runtime_error& err) {
00671 pthread_mutex_unlock(&mutex);
00672 throw err;
00673 }
00674
00675 headerTmp = (char *)malloc(MAX_HEADER_LENGTH * sizeof(char));
00676
00677 for (i = 0; i < nDestinations; i++) {
00678 XDR xdrs;
00679 struct sockaddr_in destAddr;
00680
00681
00682 memset(&destAddr, 0, sizeof(destAddr));
00683 destAddr.sin_family = AF_INET;
00684 destAddr.sin_port = htons(destPorts[i]);
00685 #ifndef WIN32
00686 inet_pton(AF_INET, destAddresses[i], &destAddr.sin_addr);
00687 #else
00688 int dummy = sizeof(destAddr);
00689 sprintf(crtAddr, "%s:%d", destAddresses[i], destPorts[i]);
00690 ret = WSAStringToAddress(crtAddr, AF_INET, NULL, (struct sockaddr *) &destAddr, &dummy);
00691 if(ret){
00692 ret = WSAGetLastError();
00693 sprintf(msg, "[ sendTimedParameters() ] Error packing address %s, code %d ", crtAddr, ret);
00694 throw runtime_error(msg);
00695 }
00696 #endif
00697
00698
00699 strcpy(headerTmp, header);
00700 strcat(headerTmp, destPasswds[i]);
00701
00702
00703 xdrmem_create(&xdrs, buf2, MAX_HEADER_LENGTH, XDR_ENCODE);
00704
00705
00706 ret = xdr_string(&xdrs, &(headerTmp), strlen(headerTmp) + 1);
00707
00708 ret1 = xdr_int(&xdrs, &(instance_id));
00709 ret2 = xdr_int(&xdrs, &(seq_nr));
00710
00711 if (!ret || !ret1 || !ret2) {
00712 free(headerTmp);
00713 pthread_mutex_unlock(&mutex);
00714 throw runtime_error("[ sendTimedParameters() ] XDR encoding error for the header");
00715 }
00716
00717
00718 int buf2Length = xdrSize(XDR_STRING, headerTmp) + 2 * xdrSize(XDR_INT32, NULL);
00719 memcpy(newBuf, buf2, buf2Length);
00720 memcpy(newBuf + buf2Length, buf, dgramSize);
00721
00722
00723 ret = sendto(sockfd, newBuf, dgramSize + buf2Length, 0,
00724 (struct sockaddr *)&destAddr, sizeof(destAddr));
00725 if (ret == RET_ERROR) {
00726 free(headerTmp);
00727 pthread_mutex_unlock(&mutex);
00728
00729
00730 #ifndef WIN32
00731 close(sockfd);
00732 #else
00733 closesocket(sockfd);
00734 #endif
00735 initSocket();
00736
00737
00738 sprintf(msg, "[ sendTimedParameters() ] Error sending data to destination %s ",
00739 destAddresses[i]);
00740 throw runtime_error(msg);
00741 }
00742 else {
00743 sprintf(msg, "Datagram with size %d, instance id %d, sequence number %d, sent to %s, containing parameters:",
00744 ret, instance_id, seq_nr, destAddresses[i]);
00745 logger(FINE, msg);
00746 logParameters(FINE, nParams, paramNames, valueTypes, paramValues);
00747 }
00748 xdr_destroy(&xdrs);
00749
00750 }
00751
00752 seq_nr = (seq_nr + 1) % TWO_BILLION;
00753 free(headerTmp);
00754 pthread_mutex_unlock(&mutex);
00755 return RET_SUCCESS;
00756 }
00757
00758
00759 int ApMon::sendParameter(char *clusterName, char *nodeName,
00760 char *paramName, int valueType, char *paramValue)
00761 throw(runtime_error){
00762
00763 return sendParameters(clusterName, nodeName, 1, ¶mName,
00764 &valueType, ¶mValue);
00765 }
00766
00767 int ApMon::sendTimedParameter(char *clusterName, char *nodeName,
00768 char *paramName, int valueType, char *paramValue, int timestamp)
00769 throw(runtime_error){
00770
00771 return sendTimedParameters(clusterName, nodeName, 1, ¶mName,
00772 &valueType, ¶mValue, timestamp);
00773 }
00774
00775 int ApMon::sendParameter(char *clusterName, char *nodeName,
00776 char *paramName, int paramValue) throw(runtime_error) {
00777
00778 return sendParameter(clusterName, nodeName, paramName, XDR_INT32,
00779 (char *)¶mValue);
00780 }
00781
00782 int ApMon::sendParameter(char *clusterName, char *nodeName,
00783 char *paramName, float paramValue) throw(runtime_error) {
00784
00785 return sendParameter(clusterName, nodeName, paramName, XDR_REAL32,
00786 (char *)¶mValue);
00787 }
00788
00789 int ApMon::sendParameter(char *clusterName, char *nodeName,
00790 char *paramName, double paramValue) throw(runtime_error) {
00791
00792 return sendParameter(clusterName, nodeName, paramName, XDR_REAL64,
00793 (char *)¶mValue);
00794 }
00795
00796 int ApMon::sendParameter(char *clusterName, char *nodeName,
00797 char *paramName, char *paramValue) throw(runtime_error) {
00798
00799 return sendParameter(clusterName, nodeName, paramName, XDR_STRING,
00800 paramValue);
00801 }
00802
00803 void ApMon::encodeParams(int nParams, char **paramNames, int *valueTypes,
00804 char **paramValues, int timestamp)
00805 throw(runtime_error){
00806 XDR xdrs;
00807 int i, effectiveNParams;
00808
00809
00810
00811
00812
00813 effectiveNParams = nParams;
00814 for (i = 0; i < nParams; i++) {
00815 if (paramNames[i] == NULL || (valueTypes[i] == XDR_STRING &&
00816 paramValues[i] == NULL)) {
00817 effectiveNParams--;
00818 }
00819 }
00820 if (effectiveNParams == 0)
00821 throw runtime_error("[ encodeParams() ] No valid parameters in datagram, sending aborted");
00822
00823
00824
00825
00826 dgramSize = xdrSize(XDR_STRING, clusterName) +
00827 xdrSize(XDR_STRING, nodeName) + xdrSize(XDR_INT32, NULL);
00828
00829 for (i = 0; i < nParams; i++) {
00830 dgramSize += xdrSize(XDR_STRING, paramNames[i]) + xdrSize(XDR_INT32, NULL) +
00831 + xdrSize(valueTypes[i], paramValues[i]);
00832 }
00833
00834
00835 if (dgramSize + MAX_HEADER_LENGTH > MAX_DGRAM_SIZE)
00836 throw runtime_error("[ encodeParams() ] Maximum datagram size exceeded");
00837
00838
00839 xdrmem_create(&xdrs, buf, MAX_DGRAM_SIZE, XDR_ENCODE);
00840
00841 try {
00842
00843 if (!xdr_string(&xdrs, &(clusterName), strlen(clusterName)
00844 + 1))
00845 throw runtime_error("[ encodeParams() ] XDR encoding error for the cluster name");
00846
00847 if (!xdr_string(&xdrs, &(nodeName), strlen(nodeName) + 1))
00848 throw runtime_error("[ encodeParams() ] XDR encoding error for the node name");
00849
00850 if (!xdr_int(&xdrs, &(effectiveNParams)))
00851 throw runtime_error("[ encodeParams() ] XDR encoding error for the number of parameters");
00852
00853
00854 for (i = 0; i < nParams; i++) {
00855 if (paramNames[i] == NULL || (valueTypes[i] == XDR_STRING &&
00856 paramValues[i] == NULL)) {
00857 logger(WARNING, "NULL parameter name or value - skipping parameter...");
00858 continue;
00859 }
00860
00861
00862 if (!xdr_string(&xdrs, &(paramNames[i]), strlen(paramNames[i]) + 1))
00863 throw runtime_error("[ encodeParams() ] XDR encoding error for parameter name");
00864
00865
00866 if (!xdr_int(&xdrs, &(valueTypes[i])))
00867 throw runtime_error("[ encodeParams() ] XDR encoding error for parameter value type");
00868
00869
00870 switch (valueTypes[i]) {
00871 case XDR_STRING:
00872 if (!xdr_string(&xdrs, &(paramValues[i]),
00873 strlen(paramValues[i]) + 1))
00874 throw runtime_error("[ encodeParams() ] XDR encoding error for parameter value");
00875 break;
00876
00877
00878
00879
00880
00881 case XDR_INT32:
00882 if (!xdr_int(&xdrs, (int *)(paramValues[i])))
00883 throw runtime_error("[ encodeParams() ] XDR encoding error for parameter value");
00884 break;
00885 case XDR_REAL32:
00886 if (!xdr_float(&xdrs, (float *)(paramValues[i])))
00887 throw runtime_error("[ encodeParams() ] XDR encoding error for parameter value");
00888 break;
00889 case XDR_REAL64:
00890 if (!xdr_double(&xdrs, (double *)(paramValues[i])))
00891 throw runtime_error("[ encodeParams() ] XDR encoding error for parameter value");
00892 break;
00893 default:
00894 throw runtime_error("[ encodeParams() ] Unknown type for XDR encoding");
00895 }
00896 }
00897
00898
00899 if (timestamp > 0) {
00900 if (!xdr_int(&xdrs, ×tamp))
00901 throw runtime_error("[ encodeParams() ] XDR encoding error for the timestamp");
00902 dgramSize += xdrSize(XDR_INT32, NULL);
00903 }
00904 } catch (runtime_error& err) {
00905 xdr_destroy(&xdrs);
00906 throw err;
00907 }
00908
00909 xdr_destroy(&xdrs);
00910 }
00911
00912 #ifndef WIN32
00913 void *bkTask(void *param) {
00914 #else
00915 DWORD WINAPI bkTask(void *param) {
00916 #endif
00917 struct stat st;
00918 #ifndef WIN32
00919 struct timespec delay;
00920 #else
00921 DWORD delay;
00922 #endif
00923 bool resourceChanged, haveChange;
00924 int nextOp = -1, i, ret;
00925 int generalInfoCount;
00926 time_t crtTime, timeRemained;
00927 time_t nextRecheck = 0, nextJobInfoSend = 0, nextSysInfoSend = 0;
00928 ApMon *apm = (ApMon *)param;
00929 char logmsg[200];
00930
00931 logger(INFO, "[Starting background thread...]");
00932 apm -> bkThreadStarted = true;
00933
00934 crtTime = time(NULL);
00935
00936 pthread_mutex_lock(&(apm -> mutexBack));
00937 if (apm -> confCheck) {
00938 nextRecheck = crtTime + apm -> crtRecheckInterval;
00939
00940
00941
00942
00943 }
00944 if (apm -> jobMonitoring)
00945 nextJobInfoSend = crtTime + apm -> jobMonitorInterval;
00946 if (apm -> sysMonitoring)
00947 nextSysInfoSend = crtTime + apm -> sysMonitorInterval;
00948 pthread_mutex_unlock(&(apm -> mutexBack));
00949
00950 timeRemained = -1;
00951 generalInfoCount = 0;
00952
00953 while (1) {
00954 pthread_mutex_lock(&apm -> mutexBack);
00955 if (apm -> stopBkThread) {
00956
00957 pthread_mutex_unlock(&apm -> mutexBack);
00958 break;
00959 }
00960 pthread_mutex_unlock(&apm -> mutexBack);
00961
00962
00963
00964
00965
00966
00967 if (nextRecheck > 0 && (nextJobInfoSend <= 0 ||
00968 nextRecheck <= nextJobInfoSend)) {
00969 if (nextSysInfoSend <= 0 || nextRecheck <= nextSysInfoSend) {
00970 nextOp = RECHECK_CONF;
00971 timeRemained = nextRecheck - crtTime;
00972 } else {
00973 nextOp = SYS_INFO_SEND;
00974 timeRemained = nextSysInfoSend - crtTime;
00975 }
00976 } else {
00977 if (nextJobInfoSend > 0 && (nextSysInfoSend <= 0 ||
00978 nextJobInfoSend <= nextSysInfoSend)) {
00979 nextOp = JOB_INFO_SEND;
00980 timeRemained = nextJobInfoSend - crtTime;
00981 } else if (nextSysInfoSend > 0) {
00982 nextOp = SYS_INFO_SEND;
00983 timeRemained = nextSysInfoSend - crtTime;
00984 }
00985 }
00986
00987 if (timeRemained == -1)
00988 timeRemained = RECHECK_INTERVAL;
00989
00990 #ifndef WIN32
00991
00992 delay.tv_sec = crtTime + timeRemained;
00993 delay.tv_nsec = 0;
00994 #else
00995 delay = ( timeRemained) * 1000;
00996 #endif
00997
00998 pthread_mutex_lock(&(apm -> mutexBack));
00999
01000 pthread_mutex_lock(&(apm -> mutexCond));
01001
01002 haveChange = false;
01003 if (apm -> jobMonChanged || apm -> sysMonChanged || apm -> recheckChanged)
01004 haveChange = true;
01005 if (apm -> jobMonChanged) {
01006 if (apm -> jobMonitoring)
01007 nextJobInfoSend = crtTime + apm -> jobMonitorInterval;
01008 else
01009 nextJobInfoSend = -1;
01010 apm -> jobMonChanged = false;
01011 }
01012 if (apm -> sysMonChanged) {
01013 if (apm -> sysMonitoring)
01014 nextSysInfoSend = crtTime + apm -> sysMonitorInterval;
01015 else
01016 nextSysInfoSend = -1;
01017 apm -> sysMonChanged = false;
01018 }
01019 if (apm -> recheckChanged) {
01020 if (apm -> confCheck) {
01021 nextRecheck = crtTime + apm -> crtRecheckInterval;
01022 }
01023 else
01024 nextRecheck = -1;
01025 apm -> recheckChanged = false;
01026 }
01027 pthread_mutex_unlock(&(apm -> mutexBack));
01028
01029 if (haveChange) {
01030 pthread_mutex_unlock(&(apm -> mutexCond));
01031 continue;
01032 }
01033
01034
01035
01036 #ifndef WIN32
01037 ret = pthread_cond_timedwait(&(apm -> confChangedCond),
01038 &(apm -> mutexCond), &delay);
01039 pthread_mutex_unlock(&(apm -> mutexCond));
01040 #else
01041 pthread_mutex_unlock(&(apm -> mutexCond));
01042 ret = WaitForSingleObject(apm->confChangedCond, delay);
01043 #endif
01044 if (ret == ETIMEDOUT) {
01045
01046
01047 if (nextOp == JOB_INFO_SEND) {
01048 apm -> sendJobInfo();
01049 crtTime = time(NULL);
01050 nextJobInfoSend = crtTime + apm -> getJobMonitorInterval();
01051 }
01052
01053 if (nextOp == SYS_INFO_SEND) {
01054 apm -> sendSysInfo();
01055 if (apm -> getGenMonitoring()) {
01056 if (generalInfoCount <= 1)
01057 apm -> sendGeneralInfo();
01058 generalInfoCount = (generalInfoCount + 1) % apm -> genMonitorIntervals;
01059 }
01060 crtTime = time(NULL);
01061 nextSysInfoSend = crtTime + apm -> getSysMonitorInterval();
01062 }
01063
01064 if (nextOp == RECHECK_CONF) {
01065
01066 resourceChanged = false;
01067 try {
01068 if (apm -> initType == FILE_INIT) {
01069 sprintf(logmsg, "Checking for modifications for file %s ",
01070 apm -> initSources[0]);
01071 logger(INFO, logmsg);
01072 stat(apm -> initSources[0], &st);
01073 if (st.st_mtime > apm -> lastModifFile) {
01074 sprintf(logmsg, "File %s modified ", apm -> initSources[0]);
01075 logger(INFO, logmsg);
01076 resourceChanged = true;
01077 }
01078 }
01079
01080
01081 for (i = 0; i < apm -> confURLs.nConfURLs; i++) {
01082 sprintf(logmsg, "[Checking for modifications for URL %s ] ",
01083 apm -> confURLs.vURLs[i]);
01084 logger(INFO, logmsg);
01085 if (urlModified(apm -> confURLs.vURLs[i], apm -> confURLs.lastModifURLs[i])) {
01086 sprintf(logmsg, "URL %s modified ", apm -> confURLs.vURLs[i]);
01087 logger(INFO, logmsg);
01088 resourceChanged = true;
01089 break;
01090 }
01091 }
01092
01093 if (resourceChanged) {
01094 logger(INFO, "Reloading configuration...");
01095 if (apm -> initType == FILE_INIT)
01096 apm -> initialize(apm -> initSources[0], false);
01097 else
01098 apm -> initialize(apm -> nInitSources, apm -> initSources, false);
01099 }
01100 apm -> setCrtRecheckInterval(apm -> getRecheckInterval());
01101 } catch (runtime_error &err) {
01102 logger(WARNING, err.what());
01103 logger(WARNING, "Increasing the time interval for reloading the configuration...");
01104 apm -> setCrtRecheckInterval(apm -> getRecheckInterval() * 5);
01105 }
01106 crtTime = time(NULL);
01107 nextRecheck = crtTime + apm -> getCrtRecheckInterval();
01108
01109 }
01110 }
01111
01112 }
01113
01114 #ifndef WIN32
01115 return NULL;
01116 #else
01117 return 0;
01118 #endif
01119 }
01120
01121 void ApMon::setConfRecheck(bool confCheck, long interval) {
01122 char logmsg[100];
01123 if (confCheck) {
01124 sprintf(logmsg, "Enabling configuration reloading (interval %ld)",
01125 interval);
01126 logger(INFO, logmsg);
01127 }
01128
01129 pthread_mutex_lock(&mutexBack);
01130 if (initType == DIRECT_INIT) {
01131 logger(WARNING, "[ setConfRecheck() } No configuration file/URL to reload.");
01132 return;
01133 }
01134
01135 this -> confCheck = confCheck;
01136 this -> recheckChanged = true;
01137 if (confCheck) {
01138 if (interval > 0) {
01139 this -> recheckInterval = interval;
01140 this -> crtRecheckInterval = interval;
01141 } else {
01142 this -> recheckInterval = RECHECK_INTERVAL;
01143 this -> crtRecheckInterval = RECHECK_INTERVAL;
01144 }
01145 setBackgroundThread(true);
01146 }
01147 else {
01148 if (jobMonitoring == false && sysMonitoring == false)
01149 setBackgroundThread(false);
01150 }
01151 pthread_mutex_unlock(&mutexBack);
01152
01153 }
01154
01155 void ApMon::setRecheckInterval(long val) {
01156 if (val > 0) {
01157 setConfRecheck(true, val);
01158 }
01159 else {
01160 setConfRecheck(false, val);
01161 }
01162 }
01163
01164 void ApMon::setCrtRecheckInterval(long val) {
01165 pthread_mutex_lock(&mutexBack);
01166 crtRecheckInterval = val;
01167 pthread_mutex_unlock(&mutexBack);
01168 }
01169
01170 void ApMon::setJobMonitoring(bool jobMonitoring, long interval) {
01171 char logmsg[100];
01172 if (jobMonitoring) {
01173 sprintf(logmsg, "Enabling job monitoring, time interval %ld s... ", interval);
01174 logger(INFO, logmsg);
01175 } else
01176 logger(INFO, "Disabling job monitoring...");
01177
01178 pthread_mutex_lock(&mutexBack);
01179 this -> jobMonitoring = jobMonitoring;
01180 this -> jobMonChanged = true;
01181 if (jobMonitoring == true) {
01182 if (interval > 0)
01183 this -> jobMonitorInterval = interval;
01184 else
01185 this -> jobMonitorInterval = JOB_MONITOR_INTERVAL;
01186 setBackgroundThread(true);
01187 } else {
01188
01189 if (this -> sysMonitoring == false && this -> confCheck == false)
01190 setBackgroundThread(false);
01191 }
01192 pthread_mutex_unlock(&mutexBack);
01193 }
01194
01195 void ApMon::setSysMonitoring(bool sysMonitoring, long interval) {
01196 char logmsg[100];
01197 if (sysMonitoring) {
01198 sprintf(logmsg, "Enabling system monitoring, time interval %ld s... ", interval);
01199 logger(INFO, logmsg);
01200 } else
01201 logger(INFO, "Disabling system monitoring...");
01202
01203 pthread_mutex_lock(&mutexBack);
01204 this -> sysMonitoring = sysMonitoring;
01205 this -> sysMonChanged = true;
01206 if (sysMonitoring == true) {
01207 if (interval > 0)
01208 this -> sysMonitorInterval = interval;
01209 else
01210 this -> sysMonitorInterval = SYS_MONITOR_INTERVAL;
01211 setBackgroundThread(true);
01212 } else {
01213
01214 if (this -> jobMonitoring == false && this -> confCheck == false)
01215 setBackgroundThread(false);
01216 }
01217 pthread_mutex_unlock(&mutexBack);
01218 }
01219
01220 void ApMon::setGenMonitoring(bool genMonitoring, int nIntervals) {
01221 char logmsg[100];
01222 sprintf(logmsg, "Setting general information monitoring to %s ",
01223 boolStrings[(int)genMonitoring]);
01224 logger(INFO, logmsg);
01225
01226 pthread_mutex_lock(&mutexBack);
01227 this -> genMonitoring = genMonitoring;
01228 this -> sysMonChanged = true;
01229 if (genMonitoring == true) {
01230 if (nIntervals > 0)
01231 this -> genMonitorIntervals = nIntervals;
01232 else
01233 this -> genMonitorIntervals = GEN_MONITOR_INTERVALS;
01234
01235 if (this -> sysMonitoring == false) {
01236 pthread_mutex_unlock(&mutexBack);
01237 setSysMonitoring(true);
01238 pthread_mutex_lock(&mutexBack);
01239 }
01240 }
01241
01242 pthread_mutex_unlock(&mutexBack);
01243 }
01244
01245 void ApMon::setBackgroundThread(bool val) {
01246
01247 if (val == true) {
01248 if (!haveBkThread) {
01249 #ifndef WIN32
01250 pthread_create(&bkThread, NULL, &bkTask, this);
01251 #else
01252 DWORD dummy;
01253 bkThread = CreateThread(NULL, 65536, &bkTask, this, 0, &dummy);
01254 #endif
01255 haveBkThread = true;
01256 } else {
01257 pthread_mutex_lock(&mutexCond);
01258 pthread_cond_signal(&confChangedCond);
01259 pthread_mutex_unlock(&mutexCond);
01260 }
01261 }
01262 if (val == false) {
01263
01264 if (haveBkThread) {
01265 stopBkThread = true;
01266 pthread_mutex_unlock(&mutexBack);
01267 #ifndef WIN32
01268 pthread_mutex_lock(&mutexCond);
01269 #endif
01270 pthread_cond_signal(&confChangedCond);
01271 logger(INFO, "[Stopping the background thread...]");
01272 #ifndef WIN32
01273 pthread_mutex_unlock(&mutexCond);
01274 pthread_join(bkThread, NULL);
01275 #else
01276 WaitForSingleObject(bkThread, INFINITE);
01277 #endif
01278 pthread_mutex_lock(&mutexBack);
01279
01280 haveBkThread = false;
01281 bkThreadStarted = false;
01282 stopBkThread = false;
01283 }
01284 }
01285 }
01286
01287 void ApMon::addJobToMonitor(long pid, char *workdir, char *clusterName,
01288 char *nodeName) throw(runtime_error) {
01289 if (nMonJobs >= MAX_MONITORED_JOBS)
01290 throw runtime_error("[ addJobToMonitor() ] Maximum number of jobs that can be monitored exceeded.");
01291 MonitoredJob job;
01292 job.pid = pid;
01293 if (workdir == NULL)
01294 strcpy(job.workdir, "");
01295 else
01296 strcpy(job.workdir, workdir);
01297
01298 if (clusterName == NULL || strlen(clusterName) == 0)
01299 strcpy(job.clusterName, "ApMon_JobMon");
01300 else
01301 strcpy(job.clusterName, clusterName);
01302 if (nodeName == NULL || strlen(nodeName) == 0)
01303 strcpy(job.nodeName, this -> myIP);
01304 else
01305 strcpy(job.nodeName, nodeName);
01306
01307 monJobs[nMonJobs++] = job;
01308 }
01309
01310 void ApMon::removeJobToMonitor(long pid) throw(runtime_error) {
01311 int i, j;
01312 char msg[100];
01313
01314 if (nMonJobs <= 0)
01315 throw runtime_error("[ removeJobToMonitor() ] There are no monitored jobs.");
01316
01317 for (i = 0; i < nMonJobs; i++) {
01318 if (monJobs[i].pid == pid) {
01319
01320 for (j = i; j < nMonJobs - 1; j++)
01321 monJobs[j] = monJobs[j + 1];
01322 nMonJobs--;
01323 return;
01324 }
01325 }
01326
01327
01328 sprintf(msg, "removeJobToMonitor(): Job %ld not found.", pid);
01329 throw runtime_error(msg);
01330 }
01331
01332 void ApMon::setSysMonClusterNode(char *clusterName, char *nodeName) {
01333 free (sysMonCluster); free(sysMonNode);
01334 sysMonCluster = strdup(clusterName);
01335 sysMonNode = strdup(nodeName);
01336 }
01337
01338 void ApMon::setLogLevel(char *newLevel_s) {
01339 int newLevel;
01340 const char *levels[5] = {"FATAL", "WARNING", "INFO", "FINE", "DEBUG"};
01341 char logmsg[100];
01342
01343 for (newLevel = 0; newLevel < 5; newLevel++)
01344 if (strcmp(newLevel_s, levels[newLevel]) == 0)
01345 break;
01346
01347 if (newLevel >= 5) {
01348 sprintf(logmsg, "[ setLogLevel() ] Invalid level value: %s", newLevel_s);
01349 logger(WARNING, logmsg);
01350 }
01351 else
01352 logger(0, NULL, newLevel);
01353 }
01354
01355
01356 void ApMon::setMaxMsgRate(int maxRate) {
01357 if (maxRate > 0)
01358 this -> maxMsgRate = maxRate;
01359 }
01360
01361 void ApMon::initSocket() throw(runtime_error) {
01362 int optval1 = 1;
01363 struct timeval optval2;
01364 int ret1, ret2, ret3;
01365
01366 sockfd = socket(AF_INET, SOCK_DGRAM, 0);
01367 if (sockfd < 0)
01368 throw runtime_error("[ initSocket() ] Error creating socket");
01369 ret1 = setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, (char *) &optval1,
01370 sizeof(optval1));
01371
01372
01373 optval2.tv_sec = 20;
01374 optval2.tv_usec = 0;
01375 ret2 = setsockopt(sockfd, SOL_SOCKET, SO_SNDTIMEO, (char *) &optval2,
01376 sizeof(optval2));
01377 ret3 = setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, (char *) &optval2,
01378 sizeof(optval2));
01379 if (ret1 != 0 || ret2 != 0 || ret3 != 0)
01380 throw runtime_error("[ initSocket() ] Error initializing socket.");
01381 }
01382
01383
01384 void ApMon::parseConf(FILE *fp, int *nDestinations, char **destAddresses,
01385 int *destPorts, char **destPasswds)
01386 throw(runtime_error) {
01387 int i, ch;
01388 char *line = (char *)malloc ((MAX_STRING_LEN1) * sizeof(char));
01389 char *tmp = NULL;
01390 char *loglevel_s;
01391
01392
01393
01394
01395 while(fgets(line, MAX_STRING_LEN, fp) != NULL) {
01396
01397 if (tmp != NULL) {
01398 free(tmp);
01399 tmp = NULL;
01400 }
01401
01402 line[MAX_STRING_LEN - 1] = 0;
01403
01404 ch = fgetc(fp);
01405 ungetc(ch, fp);
01406 if (line[strlen(line) - 1] != 10 && ch != EOF) {
01407
01408
01409 fclose(fp);
01410 throw runtime_error ("[ parseConf() ] Maximum line length exceeded in the conf file");
01411 }
01412
01413 tmp = trimString(line);
01414
01415
01416 if (strlen(tmp) == 0 || strchr(tmp, '#') == tmp)
01417 continue;
01418
01419 if (strstr(tmp, "xApMon_loglevel") == tmp) {
01420 char *tmp2 = tmp;
01421 strtok(tmp2, "= ");
01422 loglevel_s = strtok(NULL, "= ");
01423 setLogLevel(loglevel_s);
01424 continue;
01425 }
01426
01427 if (strstr(tmp, "xApMon_") == tmp) {
01428 parseXApMonLine(tmp);
01429 continue;
01430 }
01431
01432 if (*nDestinations >= MAX_N_DESTINATIONS) {
01433 free(line); free(tmp);
01434 for (i = 0; i < *nDestinations; i++) {
01435 free(destAddresses[i]);
01436 free(destPasswds[i]);
01437 }
01438 fclose(fp);
01439 throw runtime_error("[ parseConf() ] Maximum number of destinations exceeded.");
01440 }
01441
01442 addToDestinations(tmp, nDestinations, destAddresses, destPorts,
01443 destPasswds);
01444 }
01445
01446 if (tmp != NULL)
01447 free(tmp);
01448 free(line);
01449 }
01450
01451 bool ApMon::shouldSend() {
01452
01453 long now = time(NULL);
01454 bool doSend;
01455 char msg[200];
01456
01457
01458
01459 if (now != crtTime){
01461 prvSent = hWeight * prvSent + (1.0 - hWeight) * crtSent / (now - crtTime);
01462 prvTime = crtTime;
01463 sprintf(msg, "previously sent: %ld dropped: %ld", crtSent, crtDrop);
01464 logger(DEBUG, msg);
01466 crtTime = now;
01467 crtSent = 0;
01468 crtDrop = 0;
01469
01470 }
01471
01473 int valSent = (int)(prvSent * hWeight + crtSent * (1.0 - hWeight));
01474
01475 doSend = true;
01477 int level = this -> maxMsgRate - this -> maxMsgRate / 10;
01478
01479
01480 if (valSent > (this -> maxMsgRate - level)) {
01481
01482 int rnd = rand() % (this -> maxMsgRate / 10);
01483 doSend = (rnd < (this -> maxMsgRate - valSent));
01484 }
01486 if (doSend) {
01487 crtSent++;
01488
01489 } else {
01490 crtDrop++;
01491
01492 }
01493
01494 return doSend;
01495 }
01496
01497
01498
01499
01500