Gaudi Framework, version v22r4

Home   Generated: Fri Sep 2 2011

monitor_utils.cpp

Go to the documentation of this file.
00001 
00007 /*
00008  * ApMon - Application Monitoring Tool
00009  * Version: 2.2.0
00010  *
00011  * Copyright (C) 2006 California Institute of Technology
00012  *
00013  * Permission is hereby granted, free of charge, to use, copy and modify
00014  * this software and its documentation (the "Software") for any
00015  * purpose, provided that existing copyright notices are retained in
00016  * all copies and that this notice is included verbatim in any distributions
00017  * or substantial portions of the Software.
00018  * This software is a part of the MonALISA framework (http://monalisa.cacr.caltech.edu).
00019  * Users of the Software are asked to feed back problems, benefits,
00020  * and/or suggestions about the software to the MonALISA Development Team
00021  * (developers@monalisa.cern.ch). Support for this software - fixing of bugs,
00022  * incorporation of new features - is done on a best effort basis. All bug
00023  * fixes and enhancements will be made available under the same terms and
00024  * conditions as the original software,
00025 
00026  * IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
00027  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
00028  * OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY DERIVATIVES THEREOF,
00029  * EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00030 
00031  * THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
00032  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY,
00033  * FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT. THIS SOFTWARE IS
00034  * PROVIDED ON AN "AS IS" BASIS, AND THE AUTHORS AND DISTRIBUTORS HAVE NO
00035  * OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR
00036  * MODIFICATIONS.
00037  */
00038 
00039 #include "ApMon.h"
00040 #include "monitor_utils.h"
00041 #include "proc_utils.h"
00042 #include "utils.h"
00043 #include "mon_constants.h"
00044 
00045 using namespace apmon_utils;
00046 using namespace apmon_mon_utils;
00047 
00048 void ApMon::sendJobInfo() {
00049 #ifndef WIN32
00050   int i;
00051   long crtTime;
00052 
00053  /* the apMon_free() function calls sendJobInfo() from another thread and
00054      we need mutual exclusion */
00055   pthread_mutex_lock(&mutexBack);
00056 
00057   if (nMonJobs == 0) {
00058     logger(WARNING, "There are no jobs to be monitored, not sending job monitoring information.");
00059     pthread_mutex_unlock(&mutexBack);
00060     return;
00061   }
00062 
00063   crtTime = time(NULL);
00064   logger(INFO, "Sending job monitoring information...");
00065   lastJobInfoSend = (time_t)crtTime;
00066 
00067   /* send monitoring information for all the jobs specified by the user */
00068   for (i = 0; i < nMonJobs; i++)
00069     sendOneJobInfo(monJobs[i]);
00070 
00071   pthread_mutex_unlock(&mutexBack);
00072 #endif
00073 }
00074 
00075 void ApMon::updateJobInfo(MonitoredJob job) {
00076   bool needJobInfo, needDiskInfo;
00077   bool jobExists = true;
00078   char err_msg[200];
00079 
00080   PsInfo jobInfo;
00081   JobDirInfo dirInfo;
00082 
00083   /**** runtime, CPU & memory usage information ****/
00084   needJobInfo = actJobMonitorParams[JOB_RUN_TIME]
00085     || actJobMonitorParams[JOB_CPU_TIME]
00086     || actJobMonitorParams[JOB_CPU_USAGE]
00087     || actJobMonitorParams[JOB_MEM_USAGE]
00088     || actJobMonitorParams[JOB_VIRTUALMEM]
00089     || actJobMonitorParams[JOB_RSS]
00090     || actJobMonitorParams[JOB_OPEN_FILES];
00091   if (needJobInfo) {
00092     try {
00093       readJobInfo(job.pid, jobInfo);
00094       currentJobVals[JOB_RUN_TIME] = jobInfo.etime;
00095       currentJobVals[JOB_CPU_TIME] = jobInfo.cputime;
00096       currentJobVals[JOB_CPU_USAGE] = jobInfo.pcpu;
00097       currentJobVals[JOB_MEM_USAGE] = jobInfo.pmem;
00098       currentJobVals[JOB_VIRTUALMEM] = jobInfo.vsz;
00099       currentJobVals[JOB_RSS] = jobInfo.rsz;
00100 
00101       if (jobInfo.open_fd < 0)
00102         jobRetResults[JOB_OPEN_FILES] = RET_ERROR;
00103       currentJobVals[JOB_OPEN_FILES] = jobInfo.open_fd;
00104 
00105     } catch (runtime_error &err) {
00106       logger(WARNING, err.what());
00107       jobRetResults[JOB_RUN_TIME] = jobRetResults[JOB_CPU_TIME] =
00108         jobRetResults[JOB_CPU_USAGE] = jobRetResults[JOB_MEM_USAGE] =
00109         jobRetResults[JOB_VIRTUALMEM] = jobRetResults[JOB_RSS] =
00110         jobRetResults[JOB_OPEN_FILES] = RET_ERROR;
00111       strcpy(err_msg, err.what());
00112       if (strstr(err_msg, "does not exist") != NULL)
00113         jobExists = false;
00114     }
00115   }
00116 
00117   /* if the monitored job has terminated, remove it */
00118   if (!jobExists) {
00119     try {
00120       removeJobToMonitor(job.pid);
00121     } catch (runtime_error &err) {
00122       logger(WARNING, err.what());
00123     }
00124     return;
00125   }
00126 
00127   /* disk usage information */
00128   needDiskInfo = actJobMonitorParams[JOB_DISK_TOTAL]
00129     || actJobMonitorParams[JOB_DISK_USED]
00130     || actJobMonitorParams[JOB_DISK_FREE]
00131     || actJobMonitorParams[JOB_DISK_USAGE]
00132     || actJobMonitorParams[JOB_WORKDIR_SIZE];
00133   if (needDiskInfo) {
00134     try {
00135       readJobDiskUsage(job, dirInfo);
00136       currentJobVals[JOB_WORKDIR_SIZE] = dirInfo.workdir_size;
00137       currentJobVals[JOB_DISK_TOTAL] = dirInfo.disk_total;
00138       currentJobVals[JOB_DISK_USED] = dirInfo.disk_used;
00139       currentJobVals[JOB_DISK_USAGE] = dirInfo.disk_usage;
00140       currentJobVals[JOB_DISK_FREE] = dirInfo.disk_free;
00141     } catch (runtime_error& err) {
00142       logger(WARNING, err.what());
00143       jobRetResults[JOB_WORKDIR_SIZE] = jobRetResults[JOB_DISK_TOTAL]
00144         = jobRetResults[JOB_DISK_USED]
00145         = jobRetResults[JOB_DISK_USAGE]
00146         = jobRetResults[JOB_DISK_FREE]
00147         = RET_ERROR;
00148     }
00149   }
00150 }
00151 
00152 void ApMon::sendOneJobInfo(MonitoredJob job) {
00153   int i;
00154   int nParams = 0;
00155 
00156   char **paramNames, **paramValues;
00157   int *valueTypes;
00158 
00159   valueTypes = (int *)malloc(nJobMonitorParams * sizeof(int));
00160   paramNames = (char **)malloc(nJobMonitorParams * sizeof(char *));
00161   paramValues = (char **)malloc(nJobMonitorParams * sizeof(char *));
00162 
00163   for (i = 0; i < nJobMonitorParams; i++) {
00164     jobRetResults[i] = RET_SUCCESS;
00165     currentJobVals[i] = 0;
00166   }
00167 
00168   updateJobInfo(job);
00169 
00170   for (i = 0; i < nJobMonitorParams; i++) {
00171     if (actJobMonitorParams[i] && jobRetResults[i] != RET_ERROR) {
00172 
00173       paramNames[nParams] = jobMonitorParams[i];
00174       paramValues[nParams] = (char *)&currentJobVals[i];
00175       valueTypes[nParams] = XDR_REAL64;
00176       nParams++;
00177     }
00178     /* don't disable the parameter (maybe for another job it can be
00179          obtained) */
00180       /*
00181         else
00182         if (autoDisableMonitoring)
00183         actJobMonitorParams[ind] = 0;
00184       */
00185   }
00186 
00187   if (nParams == 0) {
00188     free(paramNames); free(valueTypes);
00189     free(paramValues);
00190     return;
00191   }
00192 
00193   try {
00194     if (nParams > 0)
00195       sendParameters(job.clusterName, job.nodeName, nParams,
00196                      paramNames, valueTypes, paramValues);
00197   } catch (runtime_error& err) {
00198     logger(WARNING, err.what());
00199   }
00200 
00201   free(paramNames);
00202   free(valueTypes);
00203   free(paramValues);
00204 }
00205 
00206 
00207 void ApMon::updateSysInfo() {
00208   int needCPUInfo, needSwapPagesInfo, needLoadInfo, needMemInfo,
00209     needNetInfo, needUptime, needProcessesInfo, needNetstatInfo;
00210 
00211   /**** CPU usage information ****/
00212   needCPUInfo = actSysMonitorParams[SYS_CPU_USAGE]
00213     || actSysMonitorParams[SYS_CPU_USR]
00214     || actSysMonitorParams[SYS_CPU_SYS]
00215     || actSysMonitorParams[SYS_CPU_NICE]
00216     || actSysMonitorParams[SYS_CPU_IDLE];
00217   if (needCPUInfo) {
00218     try {
00219       ProcUtils::getCPUUsage(*this, currentSysVals[SYS_CPU_USAGE],
00220                              currentSysVals[SYS_CPU_USR],
00221                              currentSysVals[SYS_CPU_SYS],
00222                              currentSysVals[SYS_CPU_NICE],
00223                              currentSysVals[SYS_CPU_IDLE], numCPUs);
00224     } catch (procutils_error &perr) {
00225       /* "permanent" error (the parameters could not be obtained) */
00226       logger(WARNING, perr.what());
00227       sysRetResults[SYS_CPU_USAGE] = sysRetResults[SYS_CPU_SYS] =
00228         sysRetResults[SYS_CPU_USR] = sysRetResults[SYS_CPU_NICE] =
00229         sysRetResults[SYS_CPU_IDLE] = sysRetResults[SYS_CPU_USAGE] = PROCUTILS_ERROR;
00230     } catch (runtime_error &err) {
00231       /* temporary error (next time we might be able to get the paramerers) */
00232       logger(WARNING, err.what());
00233       sysRetResults[SYS_CPU_USAGE] = sysRetResults[SYS_CPU_SYS]
00234         = sysRetResults[SYS_CPU_USR]
00235         = sysRetResults[SYS_CPU_NICE]
00236         = sysRetResults[SYS_CPU_IDLE]
00237         = sysRetResults[SYS_CPU_USAGE]
00238         = RET_ERROR;
00239     }
00240   }
00241 
00242   needSwapPagesInfo = actSysMonitorParams[SYS_PAGES_IN]
00243     || actSysMonitorParams[SYS_PAGES_OUT]
00244     || actSysMonitorParams[SYS_SWAP_IN]
00245     || actSysMonitorParams[SYS_SWAP_OUT];
00246 
00247   if (needSwapPagesInfo) {
00248     try {
00249       ProcUtils::getSwapPages(*this, currentSysVals[SYS_PAGES_IN],
00250                               currentSysVals[SYS_PAGES_OUT],
00251                               currentSysVals[SYS_SWAP_IN],
00252                               currentSysVals[SYS_SWAP_OUT]);
00253     } catch (procutils_error &perr) {
00254       /* "permanent" error (the parameters could not be obtained) */
00255       logger(WARNING, perr.what());
00256       sysRetResults[SYS_PAGES_IN] = sysRetResults[SYS_PAGES_OUT] =
00257       sysRetResults[SYS_SWAP_OUT] = sysRetResults[SYS_SWAP_IN] = PROCUTILS_ERROR;
00258     } catch (runtime_error &err) {
00259       /* temporary error (next time we might be able to get the paramerers) */
00260       logger(WARNING, err.what());
00261       sysRetResults[SYS_PAGES_IN] = sysRetResults[SYS_PAGES_OUT]
00262         = sysRetResults[SYS_SWAP_IN]
00263         = sysRetResults[SYS_SWAP_OUT]
00264         = RET_ERROR;
00265     }
00266   }
00267 
00268   needLoadInfo = actSysMonitorParams[SYS_LOAD1]
00269     || actSysMonitorParams[SYS_LOAD5]
00270     || actSysMonitorParams[SYS_LOAD15];
00271 
00272   if (needLoadInfo) {
00273     double dummyVal;
00274     try {
00275       /* the number of processes is now obtained with the getProcesses()
00276          function, not with getLoad() */
00277       ProcUtils::getLoad(currentSysVals[SYS_LOAD1], currentSysVals[SYS_LOAD5],
00278                          currentSysVals[SYS_LOAD15],dummyVal);
00279     } catch (procutils_error& perr) {
00280       /* "permanent" error (the parameters could not be obtained) */
00281       logger(WARNING, perr.what());
00282       sysRetResults[SYS_LOAD1] = sysRetResults[SYS_LOAD5]
00283         = sysRetResults[SYS_LOAD15]
00284         = PROCUTILS_ERROR;
00285     }
00286   }
00287 
00288   /**** get statistics about the current processes ****/
00289   needProcessesInfo = actSysMonitorParams[SYS_PROCESSES];
00290   if (needProcessesInfo) {
00291     try {
00292       ProcUtils::getProcesses(currentSysVals[SYS_PROCESSES],
00293                               currentProcessStates);
00294     } catch (runtime_error& err) {
00295       logger(WARNING, err.what());
00296       sysRetResults[SYS_PROCESSES] = RET_ERROR;
00297     }
00298   }
00299 
00300   /**** get the amount of memory currently in use ****/
00301   needMemInfo = actSysMonitorParams[SYS_MEM_USED]
00302     || actSysMonitorParams[SYS_MEM_FREE]
00303     || actSysMonitorParams[SYS_SWAP_USED]
00304     || actSysMonitorParams[SYS_SWAP_FREE]
00305     || actSysMonitorParams[SYS_MEM_USAGE]
00306     || actSysMonitorParams[SYS_SWAP_USAGE];
00307 
00308   if (needMemInfo) {
00309     try {
00310       ProcUtils::getMemUsed(currentSysVals[SYS_MEM_USED],
00311                             currentSysVals[SYS_MEM_FREE],
00312                             currentSysVals[SYS_SWAP_USED],
00313                             currentSysVals[SYS_SWAP_FREE]);
00314       currentSysVals[SYS_MEM_USAGE] = 100 * currentSysVals[SYS_MEM_USED] /
00315         (currentSysVals[SYS_MEM_USED] +  currentSysVals[SYS_MEM_FREE]);
00316       currentSysVals[SYS_SWAP_USAGE] = 100 * currentSysVals[SYS_SWAP_USED] /
00317         (currentSysVals[SYS_SWAP_USED] +  currentSysVals[SYS_SWAP_FREE]);
00318     } catch (procutils_error &perr) {
00319       logger(WARNING, perr.what());
00320       sysRetResults[SYS_MEM_USED] = sysRetResults[SYS_MEM_FREE] =
00321         sysRetResults[SYS_SWAP_USED] = sysRetResults[SYS_SWAP_FREE] =
00322         sysRetResults[SYS_MEM_USAGE] = sysRetResults[SYS_SWAP_USAGE] =
00323         PROCUTILS_ERROR;
00324     }
00325   }
00326 
00327 
00328   /**** network monitoring information ****/
00329   needNetInfo = actSysMonitorParams[SYS_NET_IN] ||
00330     actSysMonitorParams[SYS_NET_OUT] || actSysMonitorParams[SYS_NET_ERRS];
00331   if (needNetInfo && this -> nInterfaces > 0) {
00332     try {
00333       ProcUtils::getNetInfo(*this, &currentNetIn, &currentNetOut,
00334                             &currentNetErrs);
00335     } catch (procutils_error &perr) {
00336       logger(WARNING, perr.what());
00337       sysRetResults[SYS_NET_IN] = sysRetResults[SYS_NET_OUT] =
00338         sysRetResults[SYS_NET_ERRS] = PROCUTILS_ERROR;
00339     } catch (runtime_error &err) {
00340       logger(WARNING, err.what());
00341       sysRetResults[SYS_NET_IN] = sysRetResults[SYS_NET_OUT] =
00342         sysRetResults[SYS_NET_ERRS] = RET_ERROR;
00343     }
00344   }
00345 
00346   needNetstatInfo = actSysMonitorParams[SYS_NET_SOCKETS] ||
00347     actSysMonitorParams[SYS_NET_TCP_DETAILS];
00348   if (needNetstatInfo) {
00349     try {
00350       ProcUtils::getNetstatInfo(*this, this -> currentNSockets,
00351                                 this -> currentSocketsTCP);
00352     } catch (runtime_error &err) {
00353       logger(WARNING, err.what());
00354       sysRetResults[SYS_NET_SOCKETS] = sysRetResults[SYS_NET_TCP_DETAILS] =
00355         RET_ERROR;
00356     }
00357   }
00358 
00359   needUptime = actSysMonitorParams[SYS_UPTIME];
00360   if (needUptime) {
00361     try {
00362       currentSysVals[SYS_UPTIME] = ProcUtils::getUpTime();
00363     } catch (procutils_error &perr) {
00364       logger(WARNING, perr.what());
00365       sysRetResults[SYS_UPTIME] = PROCUTILS_ERROR;
00366     }
00367   }
00368 
00369 }
00370 
00371 void ApMon::sendSysInfo() {
00372 #ifndef WIN32
00373   int nParams = 0, maxNParams;
00374   int i;
00375   long crtTime;
00376 
00377   int *valueTypes;
00378   char **paramNames, **paramValues;
00379 
00380   crtTime = time(NULL);
00381   logger(INFO, "Sending system monitoring information...");
00382 
00383   /* make some initializations only the first time this
00384      function is called */
00385   if (this -> sysInfo_first) {
00386     for (i = 0; i < this -> nInterfaces; i++) {
00387      this -> lastBytesSent[i] = this -> lastBytesReceived[i] = 0.0;
00388      this -> lastNetErrs[i] = 0;
00389 
00390     }
00391     this -> sysInfo_first = FALSE;
00392   }
00393 
00394   /* the maximum number of parameters that can be included in a datagram */
00395   /* (the last three terms are for: parameters corresponding to each possible
00396      state of the processes, parameters corresponding to the types of open
00397      sockets, parameters corresponding to each possible state of the TCP
00398      sockets.) */
00399   maxNParams = nSysMonitorParams + (2 * nInterfaces - 1) + 15 + 4 +
00400     N_TCP_STATES;
00401 
00402   valueTypes = (int *)malloc(maxNParams * sizeof(int));
00403   paramNames = (char **)malloc(maxNParams * sizeof(char *));
00404   paramValues = (char **)malloc(maxNParams * sizeof(char *));
00405 
00406   for (i = 0; i < nSysMonitorParams; i++) {
00407     if (actSysMonitorParams[i] > 0) /* if the parameter is enabled */
00408       sysRetResults[i] = RET_SUCCESS;
00409     else /* mark it with RET_ERROR so that it will be not included in the
00410             datagram */
00411       sysRetResults[i] = RET_ERROR;
00412   }
00413 
00414   updateSysInfo();
00415 
00416   for (i = 0; i < nSysMonitorParams; i++) {
00417     if (i == SYS_NET_IN || i == SYS_NET_OUT || i == SYS_NET_ERRS ||
00418         i == SYS_NET_SOCKETS || i == SYS_NET_TCP_DETAILS || i == SYS_PROCESSES)
00419       continue;
00420 
00421     if (sysRetResults[i] == PROCUTILS_ERROR) {
00422       /* could not read the requested information from /proc, disable this
00423          parameter */
00424       if (autoDisableMonitoring)
00425         actSysMonitorParams[i] = 0;
00426     } else if (sysRetResults[i] != RET_ERROR) {
00427       /* the parameter is enabled and there were no errors obtaining it */
00428       paramNames[nParams] = strdup(sysMonitorParams[i]);
00429       paramValues[nParams] = (char *)&currentSysVals[i];
00430       valueTypes[nParams] = XDR_REAL64;
00431       nParams++;
00432     }
00433   }
00434 
00435   if (actSysMonitorParams[SYS_NET_IN] == 1) {
00436     if (sysRetResults[SYS_NET_IN] == PROCUTILS_ERROR) {
00437       if (autoDisableMonitoring)
00438         actSysMonitorParams[SYS_NET_IN] = 0;
00439     } else  if (sysRetResults[SYS_NET_IN] != RET_ERROR) {
00440       for (i = 0; i < nInterfaces; i++) {
00441         paramNames[nParams] =  (char *)malloc(20 * sizeof(char));
00442         strcpy(paramNames[nParams], interfaceNames[i]);
00443         strcat(paramNames[nParams], "_in");
00444         paramValues[nParams] = (char *)&currentNetIn[i];
00445         valueTypes[nParams] = XDR_REAL64;
00446         nParams++;
00447       }
00448     }
00449   }
00450 
00451   if (actSysMonitorParams[SYS_NET_OUT] == 1) {
00452     if (sysRetResults[SYS_NET_IN] == PROCUTILS_ERROR) {
00453       if (autoDisableMonitoring)
00454         actSysMonitorParams[SYS_NET_OUT] = 0;
00455     } else  if (sysRetResults[SYS_NET_OUT] != RET_ERROR) {
00456       for (i = 0; i < nInterfaces; i++) {
00457         paramNames[nParams] =  (char *)malloc(20 * sizeof(char));
00458         strcpy(paramNames[nParams], interfaceNames[i]);
00459         strcat(paramNames[nParams], "_out");
00460         paramValues[nParams] = (char *)&currentNetOut[i];
00461         valueTypes[nParams] = XDR_REAL64;
00462         nParams++;
00463       }
00464     }
00465   }
00466 
00467   if (actSysMonitorParams[SYS_NET_ERRS] == 1) {
00468     if (sysRetResults[SYS_NET_ERRS] == PROCUTILS_ERROR) {
00469       if (autoDisableMonitoring)
00470         actSysMonitorParams[SYS_NET_ERRS] = 0;
00471     } else  if (sysRetResults[SYS_NET_ERRS] != RET_ERROR) {
00472       for (i = 0; i < nInterfaces; i++) {
00473         paramNames[nParams] =  (char *)malloc(20 * sizeof(char));
00474         strcpy(paramNames[nParams], interfaceNames[i]);
00475         strcat(paramNames[nParams], "_errs");
00476         paramValues[nParams] = (char *)&currentNetErrs[i];
00477         valueTypes[nParams] = XDR_REAL64;
00478         nParams++;
00479       }
00480     }
00481   }
00482 
00483 
00484   if (actSysMonitorParams[SYS_PROCESSES] == 1) {
00485     if (sysRetResults[SYS_PROCESSES] != RET_ERROR) {
00486       char act_states[] = {'D', 'R', 'S', 'T', 'Z'};
00487       for (i = 0; i < 5; i++) {
00488         paramNames[nParams] =  (char *)malloc(20 * sizeof(char));
00489         sprintf(paramNames[nParams], "processes_%c", act_states[i]);
00490         paramValues[nParams] = (char *)&currentProcessStates[act_states[i] - 65];
00491         valueTypes[nParams] = XDR_REAL64;
00492         nParams++;
00493       }
00494     }
00495   }
00496 
00497   if (actSysMonitorParams[SYS_NET_SOCKETS] == 1) {
00498     if (sysRetResults[SYS_NET_SOCKETS] != RET_ERROR) {
00499       const char *socket_types[] = {"tcp", "udp", "icm", "unix"};
00500       for (i = 0; i < 4; i++) {
00501         paramNames[nParams] =  (char *)malloc(30 * sizeof(char));
00502         sprintf(paramNames[nParams], "sockets_%s", socket_types[i]);
00503         paramValues[nParams] = (char *)&currentNSockets[i];
00504         valueTypes[nParams] = XDR_REAL64;
00505         nParams++;
00506       }
00507     }
00508   }
00509 
00510   if (actSysMonitorParams[SYS_NET_TCP_DETAILS] == 1) {
00511     if (sysRetResults[SYS_NET_TCP_DETAILS] != RET_ERROR) {
00512       for (i = 0; i < N_TCP_STATES; i++) {
00513         paramNames[nParams] =  (char *)malloc(30 * sizeof(char));
00514         sprintf(paramNames[nParams], "sockets_tcp_%s", socketStatesMapTCP[i]);
00515         paramValues[nParams] = (char *)&currentSocketsTCP[i];
00516         valueTypes[nParams] = XDR_REAL64;
00517         nParams++;
00518       }
00519     }
00520   }
00521 
00522   try {
00523     if (nParams > 0)
00524       sendParameters(sysMonCluster, sysMonNode, nParams,
00525                      paramNames, valueTypes, paramValues);
00526   } catch (runtime_error& err) {
00527     logger(WARNING, err.what());
00528   }
00529 
00530   this -> lastSysInfoSend = crtTime;
00531 
00532   if (sysRetResults[SYS_NET_IN] == RET_SUCCESS) {
00533     free(currentNetIn);
00534     free(currentNetOut);
00535     free(currentNetErrs);
00536   }
00537 
00538   for (i = 0; i < nParams; i++)
00539     free(paramNames[i]);
00540   free(paramNames);
00541   free(valueTypes);
00542   free(paramValues);
00543 #endif
00544 }
00545 
00546 void ApMon::updateGeneralInfo() {
00547 
00548   strcpy(cpuVendor, ""); strcpy(cpuFamily, "");
00549   strcpy(cpuModel, ""); strcpy(cpuModelName, "");
00550 
00551   if (actGenMonitorParams[GEN_CPU_MHZ] == 1 ||
00552       actGenMonitorParams[GEN_BOGOMIPS] == 1 ||
00553       actGenMonitorParams[GEN_CPU_VENDOR_ID] == 1 ||
00554       actGenMonitorParams[GEN_CPU_FAMILY] == 1 ||
00555       actGenMonitorParams[GEN_CPU_MODEL] == 1 ||
00556       actGenMonitorParams[GEN_CPU_MODEL_NAME] == 1) {
00557     try {
00558       ProcUtils::getCPUInfo(*this);
00559     } catch (procutils_error& err) {
00560       logger(WARNING, err.what());
00561       genRetResults[GEN_CPU_MHZ] = genRetResults[GEN_BOGOMIPS] = PROCUTILS_ERROR;
00562     }
00563   }
00564 
00565   if (actGenMonitorParams[GEN_TOTAL_MEM] == 1 ||
00566       actGenMonitorParams[GEN_TOTAL_SWAP] == 1) {
00567     try {
00568       ProcUtils::getSysMem(currentGenVals[GEN_TOTAL_MEM],
00569                            currentGenVals[GEN_TOTAL_SWAP]);
00570     } catch (procutils_error& perr) {
00571       logger(WARNING, perr.what());
00572       genRetResults[GEN_TOTAL_MEM] = genRetResults[GEN_TOTAL_SWAP] = PROCUTILS_ERROR;
00573     }
00574   }
00575 
00576   if (this -> numCPUs > 0)
00577     currentGenVals[GEN_NO_CPUS] = this -> numCPUs;
00578   else
00579     genRetResults[GEN_NO_CPUS] = PROCUTILS_ERROR;
00580 }
00581 
00582 void ApMon::sendGeneralInfo() {
00583 #ifndef WIN32
00584   int nParams, maxNParams, i;
00585   char tmp_s[50];
00586 
00587   char **paramNames, **paramValues;
00588   int *valueTypes;
00589 
00590   logger(INFO, "Sending general monitoring information...");
00591 
00592   maxNParams = nGenMonitorParams + numIPs;
00593   valueTypes = (int *)malloc(maxNParams * sizeof(int));
00594   paramNames = (char **)malloc(maxNParams * sizeof(char *));
00595   paramValues = (char **)malloc(maxNParams * sizeof(char *));
00596 
00597   nParams = 0;
00598 
00599   updateGeneralInfo();
00600 
00601   if (actGenMonitorParams[GEN_HOSTNAME]) {
00602     paramNames[nParams] = strdup(genMonitorParams[GEN_HOSTNAME]);
00603     valueTypes[nParams] = XDR_STRING;
00604     paramValues[nParams] = myHostname;
00605     nParams++;
00606   }
00607 
00608   if (actGenMonitorParams[GEN_IP]) {
00609     for (i = 0; i < this -> numIPs; i++) {
00610       strcpy(tmp_s, "ip_");
00611       strcat(tmp_s, interfaceNames[i]);
00612       paramNames[nParams] = strdup(tmp_s);
00613       valueTypes[nParams] = XDR_STRING;
00614       paramValues[nParams] = this -> allMyIPs[i];
00615       nParams++;
00616     }
00617   }
00618 
00619   if (actGenMonitorParams[GEN_CPU_VENDOR_ID] && strlen(cpuVendor) != 0) {
00620     paramNames[nParams] = strdup(genMonitorParams[GEN_CPU_VENDOR_ID]);
00621     valueTypes[nParams] = XDR_STRING;
00622     paramValues[nParams] = cpuVendor;
00623     nParams++;
00624   }
00625 
00626   if (actGenMonitorParams[GEN_CPU_FAMILY] && strlen(cpuFamily) != 0) {
00627     paramNames[nParams] = strdup(genMonitorParams[GEN_CPU_FAMILY]);
00628     valueTypes[nParams] = XDR_STRING;
00629     paramValues[nParams] = cpuFamily;
00630     nParams++;
00631   }
00632 
00633   if (actGenMonitorParams[GEN_CPU_MODEL] && strlen(cpuModel) != 0) {
00634     paramNames[nParams] = strdup(genMonitorParams[GEN_CPU_MODEL]);
00635     valueTypes[nParams] = XDR_STRING;
00636     paramValues[nParams] = cpuModel;
00637     nParams++;
00638   }
00639 
00640   if (actGenMonitorParams[GEN_CPU_MODEL_NAME] && strlen(cpuModelName) != 0) {
00641     paramNames[nParams] = strdup(genMonitorParams[GEN_CPU_MODEL_NAME]);
00642     valueTypes[nParams] = XDR_STRING;
00643     paramValues[nParams] = cpuModelName;
00644     nParams++;
00645   }
00646 
00647   for (i = 0; i < nGenMonitorParams; i++) {
00648     if (actGenMonitorParams[i] != 1 || i == GEN_IP || i == GEN_HOSTNAME ||
00649         i == GEN_CPU_VENDOR_ID || i == GEN_CPU_FAMILY || i == GEN_CPU_MODEL
00650         || i == GEN_CPU_MODEL_NAME)
00651       continue;
00652 
00653     if (genRetResults[i] == PROCUTILS_ERROR) {
00654       /* could not read the requested information from /proc, disable this
00655          parameter */
00656       if (autoDisableMonitoring)
00657         actGenMonitorParams[i] = 0;
00658     } else if (genRetResults[i] != RET_ERROR) {
00659       paramNames[nParams] = strdup(genMonitorParams[i]);
00660       paramValues[nParams] = (char *)&currentGenVals[i];
00661       valueTypes[nParams] = XDR_REAL64;
00662       nParams++;
00663     }
00664   }
00665 
00666   try {
00667     if (nParams > 0)
00668       sendParameters(sysMonCluster, sysMonNode, nParams,
00669                      paramNames, valueTypes, paramValues);
00670   } catch (runtime_error& err) {
00671     logger(WARNING, err.what());
00672   }
00673 
00674   for (i = 0; i < nParams; i++)
00675     free(paramNames[i]);
00676   free(paramNames);
00677   free(valueTypes);
00678   free(paramValues);
00679 #endif
00680 }
00681 
00682 void ApMon::initMonitoring() {
00683   int i;
00684 
00685   this -> autoDisableMonitoring = true;
00686   this -> sysMonitoring = false;
00687   this -> jobMonitoring = false;
00688   this -> genMonitoring = false;
00689   this -> confCheck = false;
00690 
00691 #ifndef WIN32
00692   pthread_mutex_init(&this -> mutex, NULL);
00693   pthread_mutex_init(&this -> mutexBack, NULL);
00694   pthread_mutex_init(&this -> mutexCond, NULL);
00695   pthread_cond_init(&this -> confChangedCond, NULL);
00696 #else
00697   logger(INFO, "init mutexes...");
00698   this -> mutex     = CreateMutex(NULL, FALSE, NULL);
00699   this -> mutexBack = CreateMutex(NULL, FALSE, NULL);
00700   this -> mutexCond = CreateMutex(NULL, FALSE, NULL);
00701   this -> confChangedCond = CreateEvent(NULL, FALSE, FALSE, NULL);
00702 
00703   // Initialize the Windows Sockets library
00704 
00705   WORD wVersionRequested;
00706   WSADATA wsaData;
00707   int err;
00708   wVersionRequested = MAKEWORD( 2, 0 );
00709   err = WSAStartup( wVersionRequested, &wsaData );
00710   if ( err != 0 ) {
00711     logger(FATAL, "Could not initialize the Windows Sockets library (WS2_32.dll)");
00712   }
00713 
00714 #endif
00715 
00716   this -> haveBkThread = false;
00717   this -> bkThreadStarted = false;
00718   this -> stopBkThread = false;
00719 
00720   this -> recheckChanged = false;
00721   this -> jobMonChanged = false;
00722   this -> sysMonChanged = false;
00723 
00724   this -> recheckInterval = RECHECK_INTERVAL;
00725   this -> crtRecheckInterval = RECHECK_INTERVAL;
00726   this -> jobMonitorInterval = JOB_MONITOR_INTERVAL;
00727   this -> sysMonitorInterval = SYS_MONITOR_INTERVAL;
00728 
00729   this -> nSysMonitorParams = initSysParams(this -> sysMonitorParams);
00730 
00731   this -> nGenMonitorParams = initGenParams(this -> genMonitorParams);
00732 
00733   this -> nJobMonitorParams = initJobParams(this -> jobMonitorParams);
00734 
00735   initSocketStatesMapTCP(this -> socketStatesMapTCP);
00736 
00737   this -> sysInfo_first = true;
00738 
00739   try {
00740     this -> lastSysInfoSend = ProcUtils::getBootTime();
00741   } catch (procutils_error& perr) {
00742     logger(WARNING, perr.what());
00743     logger(WARNING, "The first system monitoring values may be inaccurate");
00744     this -> lastSysInfoSend = 0;
00745   }
00746 
00747   for (i = 0; i < nSysMonitorParams; i++)
00748     this -> lastSysVals[i] = 0;
00749 
00750   //this -> lastUsrTime = this -> lastSysTime = 0;
00751   //this -> lastNiceTime = this -> lastIdleTime = 0;
00752 
00753   for (i = 0; i < nSysMonitorParams; i++) {
00754     actSysMonitorParams[i] = 1;
00755     sysRetResults[i] = RET_SUCCESS;
00756   }
00757 
00758   for (i = 0; i < nGenMonitorParams; i++) {
00759     actGenMonitorParams[i] = 1;
00760     genRetResults[i] = RET_SUCCESS;
00761   }
00762 
00763   for (i = 0; i < nJobMonitorParams; i++) {
00764     actJobMonitorParams[i] = 1;
00765     jobRetResults[i] = RET_SUCCESS;
00766   }
00767 
00768   this -> maxMsgRate = MAX_MSG_RATE;
00769 }
00770 
00771 void ApMon::parseXApMonLine(char *line) {
00772   bool flag, found;
00773   int ind;
00774   char tmp[MAX_STRING_LEN], logmsg[200];
00775   char *param, *value;
00776 //  char sbuf[MAX_STRING_LEN];
00777 //  char *pbuf = sbuf;
00778   const char *sep = " =";
00779 
00780   strcpy(tmp, line);
00781   char *tmp2 = tmp + strlen("xApMon_");
00782 
00783   param = strtok/*_r*/(tmp2, sep);//, &pbuf);
00784   value = strtok/*_r*/(NULL, sep);//, &pbuf);
00785 
00786   /* if it is an on/off parameter, assign its value to flag */
00787   if (strcmp(value, "on") == 0)
00788     flag = true;
00789   else /* if it is not an on/off paramenter the value of flag doesn't matter */
00790     flag = false;
00791 
00792   pthread_mutex_lock(&mutexBack);
00793 
00794   found = false;
00795   if (strcmp(param, "job_monitoring") == 0) {
00796     this -> jobMonitoring = flag; found = true;
00797   }
00798   if (strcmp(param, "sys_monitoring") == 0) {
00799     this -> sysMonitoring = flag; found = true;
00800   }
00801   if (strcmp(param, "job_interval") == 0) {
00802     this -> jobMonitorInterval = atol(value); found = true;
00803   }
00804   if (strcmp(param, "sys_interval") == 0) {
00805     this -> sysMonitorInterval = atol(value); found = true;
00806   }
00807   if (strcmp(param, "general_info") == 0) {
00808     this -> genMonitoring = flag; found = true;
00809   }
00810   if (strcmp(param, "conf_recheck") == 0) {
00811     this -> confCheck = flag; found = true;
00812   }
00813   if (strcmp(param, "recheck_interval") == 0) {
00814     this -> recheckInterval = this -> crtRecheckInterval = atol(value);
00815     found = true;
00816   }
00817   if (strcmp(param, "auto_disable") == 0) {
00818     this -> autoDisableMonitoring = flag;
00819     found = true;
00820   }
00821   if (strcmp(param, "maxMsgRate") == 0) {
00822     this -> maxMsgRate = atoi(value);
00823     found = true;
00824   }
00825 
00826   if (found) {
00827     pthread_mutex_unlock(&mutexBack);
00828     return;
00829   }
00830 
00831   if (strstr(param, "sys_") == param) {
00832     ind = getVectIndex(param + strlen("sys_"), sysMonitorParams,
00833                        nSysMonitorParams);
00834     if (ind < 0) {
00835       pthread_mutex_unlock(&mutexBack);
00836       sprintf(logmsg, "Invalid parameter name in the configuration file: %s",
00837             param);
00838       logger(WARNING, logmsg);
00839       return;
00840     }
00841     found = true;
00842     this -> actSysMonitorParams[ind] = (int)flag;
00843   }
00844 
00845   if (strstr(param, "job_") == param) {
00846     ind = getVectIndex(param + strlen("job_"), jobMonitorParams,
00847                        nJobMonitorParams);
00848 
00849     if (ind < 0) {
00850       pthread_mutex_unlock(&mutexBack);
00851       sprintf(logmsg, "Invalid parameter name in the configuration file: %s",
00852             param);
00853       logger(WARNING, logmsg);
00854       return;
00855     }
00856     found = true;
00857     this -> actJobMonitorParams[ind] = (int)flag;
00858   }
00859 
00860   if (!found) {
00861     ind = getVectIndex(param, genMonitorParams,
00862                        nGenMonitorParams);
00863     if (ind < 0) {
00864       pthread_mutex_unlock(&mutexBack);
00865       sprintf(logmsg, "Invalid parameter name in the configuration file: %s",
00866               param);
00867       logger(WARNING, logmsg);
00868       return;
00869     } else {
00870       found = true;
00871       this -> actGenMonitorParams[ind] = (int)flag;
00872     }
00873   }
00874 
00875   if (!found) {
00876     sprintf(logmsg, "Invalid parameter name in the configuration file: %s",
00877             param);
00878     logger(WARNING, logmsg);
00879   }
00880   pthread_mutex_unlock(&mutexBack);
00881 }
00882 
00883 long *apmon_mon_utils::getChildren(long pid, int& nChildren)
00884   throw(runtime_error) {
00885 #ifdef WIN32
00886         return 0;
00887 #else
00888   FILE *pf;
00889   long *pids, *ppids, *children;
00890   int nProcesses;
00891   int i, j, status;
00892   pid_t cpid;
00893   char *argv[4], msg[MAX_STRING_LEN], sval[20];
00894   bool processFound;
00895   long mypid = getpid();
00896   char children_f[50], np_f[50], cmd[200];
00897 
00898   /* generate the names of the temporary files in which we have the output
00899      of some commands */
00900   sprintf(children_f, "/tmp/apmon_children%ld", mypid);
00901   sprintf(np_f, "/tmp/apmon_np%ld", mypid);
00902 
00903   switch (cpid = fork()) {
00904   case -1:
00905     throw runtime_error("[ getChildren() ] Unable to fork()");
00906   case 0:
00907     argv[0] = (char *)"/bin/sh"; argv[1] = (char *)"-c";
00908     sprintf(cmd, "ps --no-headers -A -o ppid,pid > %s && wc -l %s > %s",
00909             children_f, children_f, np_f);
00910     argv[2] = cmd;
00911     /*
00912     argv[2] = "ps --no-headers -eo ppid,pid > /tmp/apmon_children.txt && wc -l /tmp/out_children.txt > /tmp/out_np.txt";
00913     */
00914     argv[3] = 0;
00915     execv("/bin/sh", argv);
00916     exit(RET_ERROR);
00917   default:
00918     if (waitpid(cpid, &status, 0) == -1) {
00919       sprintf(msg, "[ getChildren() ] The number of sub-processes for %ld could not be determined", pid);
00920       unlink(children_f); unlink(np_f);
00921       throw runtime_error(msg);
00922     }
00923     break ;
00924   }
00925 
00926   /* find the number of processes */
00927   pf = fopen(np_f, "rt");
00928   if (pf == NULL) {
00929     unlink(np_f); unlink(children_f);
00930     sprintf(msg, "[ getChildren() ] The number of sub-processes for %ld could not be determined",
00931             pid);
00932     throw runtime_error(msg);
00933   }
00934   fscanf(pf, "%d", &nProcesses);
00935   fclose(pf);
00936   unlink(np_f);
00937 
00938   pids = (long *)malloc(nProcesses * sizeof(long));
00939   ppids = (long *)malloc(nProcesses * sizeof(long));
00940   /* estimated maximum size for the returned vector; it will be realloc'ed */
00941   children = (long *)malloc(nProcesses * sizeof(long));
00942 
00943   pf = fopen(children_f, "rt");
00944   if (pf == NULL) {
00945     free(pids); free(ppids); free(children);
00946     unlink(children_f);
00947     sprintf(msg, "[ getChildren() ] The sub-processes for %ld could not be determined", pid);
00948     throw runtime_error(msg);
00949   }
00950 
00951   /* scan the output of the ps command and find the children of the process,
00952    and also check if the process is still running */
00953   children[0] = pid; nChildren = 1;
00954   processFound = false;
00955   for (i = 0; i < nProcesses; i++) {
00956     fscanf(pf, "%ld %ld", &ppids[i], &pids[i]);
00957     /* look for the given process */
00958     if (pids[i] == children[0] || ppids[i] == children[0])
00959       processFound = true;
00960     if (ppids[i] == children[0]) {
00961       children[nChildren++] = pids[i];
00962     }
00963   }
00964   fclose(pf);
00965   unlink(children_f);
00966 
00967   if (processFound == false) {
00968     free(pids); free(ppids); free(children);
00969     nChildren = 0;
00970     sprintf(msg, "[ getChildren() ] The process %ld does not exist", pid);
00971     throw runtime_error(msg);
00972   }
00973 
00974   /* find the PIDs of all the descendant processes */
00975   i = 1;
00976   while (i < nChildren) {
00977     /* find the children of the i-th child */
00978     for (j = 0; j < nProcesses; j++) {
00979       if (ppids[j] == children[i]) {
00980         children[nChildren++] = pids[j];
00981       }
00982     }
00983     i++;
00984   }
00985 
00986   sprintf(msg, "Sub-processes for process %ld: ", pid);
00987   for (i = 0; i < nChildren; i++) {
00988     sprintf(sval, "%ld ", children[i]);
00989     if (strlen(msg) + strlen(sval) < MAX_STRING_LEN - 1)
00990       strcat(msg, sval);
00991   }
00992   logger(DEBUG, msg);
00993 
00994   free(pids); free(ppids);
00995   children = (long *)realloc(children, (nChildren) * sizeof(long));
00996   return children;
00997 #endif
00998 }
00999 
01000 void apmon_mon_utils::readJobInfo(long pid, PsInfo& info) throw(runtime_error) {
01001 #ifndef WIN32
01002   long *children;
01003   FILE *fp;
01004   int i, nChildren, status, ch, ret, open_fd;
01005   char *cmd , *mem_cmd_s, *argv[4], *ret_s;
01006   char pid_s[10], msg[100];
01007   char cmdName[MAX_STRING_LEN1], buf[MAX_STRING_LEN1], buf2[MAX_STRING_LEN1];
01008   char etime_s[20], cputime_s[20];
01009   double rsz, vsz;
01010   double etime, cputime;
01011   double pcpu, pmem;
01012   /* this list contains strings of the form "rsz_vsz_command" for every pid;
01013      it is used to avoid adding several times processes that have multiple
01014      threads and appear in ps as sepparate processes, occupying exactly the
01015      same amount of memory and having the same command name. For every line
01016      from the output of the ps command we verify if the rsz_vsz_command
01017      combination is already in the list.
01018   */
01019   char **mem_cmd_list;
01020   int listSize;
01021   long cpid, crt_pid;
01022   //unsigned int maxCmdLen = 5 * MAX_STRING_LEN;
01023   long mypid = getpid();
01024   char ps_f[50];
01025 
01026   /* get the list of the process' descendants */
01027   children = getChildren(pid, nChildren);
01028 
01029   /* generate a name for the temporary file which holds the output of the
01030      ps command */
01031   sprintf(ps_f, "/tmp/apmon_ps%ld", mypid);
01032 
01033   unsigned int cmdLen = (150 + 6 * nChildren) * sizeof(char);
01034   cmd = (char *)malloc (cmdLen);
01035 
01036   /* issue the "ps" command to obtain information on all the descendants */
01037   strcpy(cmd, "ps --no-headers --pid ");
01038   for (i = 0; i < nChildren - 1; i++) {
01039     sprintf(pid_s, "%ld,", children[i]);
01040     if (strlen(cmd) + strlen(pid_s) + 1 >= cmdLen) {
01041       free(cmd);
01042       sprintf(msg, "[ readJobInfo() ] Job %ld has too many sub-processes to be monitored",
01043               pid);
01044       throw runtime_error(msg);
01045     }
01046     strcat(cmd, pid_s);
01047     //strcat(cmd, " 2>&1");
01048   }
01049 
01050   /* the last part of the command */
01051   sprintf(pid_s, "%ld", children[nChildren - 1]);
01052   sprintf(cmdName, " -o pid,etime,time,%%cpu,%%mem,rsz,vsz,comm > %s", ps_f);
01053   if (strlen(cmd) + strlen(pid_s) + strlen(cmdName) >= cmdLen) {
01054     free(cmd);
01055     sprintf(msg, "[ readJobInfo() ] Job %ld has too many sub-processes to be monitored",
01056               pid);
01057     throw runtime_error(msg);
01058   }
01059   strcat(cmd, pid_s);
01060   strcat(cmd, cmdName);
01061   //strcat(cmd, " 2>&1");
01062 
01063   switch (cpid = fork()) {
01064   case -1:
01065     free(cmd);
01066     sprintf(msg, "[ readJobInfo() ] Unable to fork(). The job information could not be determined for %ld", pid);
01067     throw runtime_error(msg);
01068   case 0:
01069     argv[0] = (char *)"/bin/sh"; argv[1] = (char *)"-c";
01070     argv[2] = cmd; argv[3] = 0;
01071     execv("/bin/sh", argv);
01072     exit(RET_ERROR);
01073   default:
01074     if (waitpid(cpid, &status, 0) == -1) {
01075       free(cmd);
01076       sprintf(msg, "[ readJobInfo() ] The job information for %ld could not be determined", pid);
01077       throw runtime_error(msg);
01078     }
01079     break ;
01080   }
01081 
01082   free(cmd);
01083   fp = fopen(ps_f, "rt");
01084   if (fp == NULL) {
01085     sprintf(msg, "[ readJobInfo() ] Error opening the ps output file for process %ld", pid);
01086     throw runtime_error(msg);
01087   }
01088 
01089   /* parse the output file */
01090   info.etime = info.cputime = 0;
01091   info.pcpu = info.pmem = 0;
01092   info.rsz = info.vsz = 0;
01093   info.open_fd = 0;
01094   mem_cmd_list = (char **)malloc(nChildren * sizeof(char *));
01095   listSize = 0;
01096   cmdName[0] = 0;
01097   while (1) {
01098     ret_s = fgets(buf, MAX_STRING_LEN, fp);
01099     if (ret_s == NULL)
01100       break;
01101     buf[MAX_STRING_LEN - 1] = 0;
01102 
01103     /* if the line was too long and fgets hasn't read it entirely, */
01104     /* keep only the first 512 chars from the line */
01105     ch = fgetc(fp); // see if we are at the end of the file
01106     ungetc(ch, fp);
01107     if (buf[strlen(buf) - 1] != 10 && ch != EOF) {
01108       while (1) {
01109         char *sret = fgets(buf2, MAX_STRING_LEN, fp);
01110         if (sret == NULL || buf[strlen(buf) - 1] == 10)
01111           break;
01112       }
01113     }
01114 
01115     ret = sscanf(buf, "%ld %s %s %lf %lf %lf %lf %s", &crt_pid, etime_s,
01116                  cputime_s, &pcpu, &pmem, &rsz, &vsz, cmdName);
01117     if (ret != 8) {
01118       fclose(fp);
01119       unlink(ps_f);
01120       free(children);
01121       for (i = 0; i < listSize; i++) {
01122         free(mem_cmd_list[i]);
01123       }
01124       free(mem_cmd_list);
01125       throw runtime_error("[ readJobInfo() ] Error parsing the output of the ps command");
01126     }
01127 
01128     /* etime is the maximum of the elapsed times for the subprocesses */
01129     etime = parsePSTime(etime_s);
01130 
01131     info.etime = (info.etime > etime) ? info.etime : etime;
01132 
01133     /* cputime is the sum of the cpu times for the subprocesses */
01134     cputime = parsePSTime(cputime_s);
01135     info.cputime += cputime;
01136     info.pcpu += pcpu;
01137 
01138     /* get the number of opened file descriptors */
01139     try {
01140       open_fd = ProcUtils::countOpenFiles(crt_pid);
01141     } catch (procutils_error& err) {
01142       logger(WARNING, err.what());
01143       /* don't throw an exception if we couldn't read the number of files */
01144       open_fd = PROCUTILS_ERROR;
01145     }
01146 
01147     /* see if this is a process or just a thread */
01148     mem_cmd_s = (char *)malloc(MAX_STRING_LEN * sizeof(char));
01149     sprintf(mem_cmd_s, "%f_%f_%s", rsz, vsz, cmdName);
01150     //printf("### mem_cmd_s: %s\n", mem_cmd_s);
01151     if (getVectIndex(mem_cmd_s, mem_cmd_list, listSize) == -1) {
01152       /* another pid with the same command name, rsz and vsz was not found,
01153          so this is a new process and we can add the amount of memory used by
01154          it */
01155       info.pmem += pmem;
01156       info.vsz += vsz; info.rsz += rsz;
01157 
01158       if (info.open_fd >= 0) // if no error occured so far
01159         info.open_fd += open_fd;
01160       /* add an entry in the list so that next time we see another thread of
01161          this process we don't add the amount of  memory again */
01162       mem_cmd_list[listSize++] = mem_cmd_s;
01163     } else {
01164       free(mem_cmd_s);
01165     }
01166 
01167     /* if we monitor the current process, we have two extra opened files
01168        that we shouldn't take into account (the output file for ps and
01169        /proc/<pid>/fd/)
01170     */
01171     if (crt_pid == getpid())
01172       info.open_fd -= 2;
01173   }
01174 
01175   fclose(fp);
01176   unlink(ps_f);
01177   free(children);
01178   for (i = 0; i < listSize; i++) {
01179     free(mem_cmd_list[i]);
01180   }
01181   free(mem_cmd_list);
01182 #endif
01183 }
01184 
01185 double apmon_mon_utils::parsePSTime(char *s) {
01186   long days, hours, mins, secs;
01187 
01188   if (strchr(s, '-') != NULL) {
01189     sscanf(s, "%ld-%ld:%ld:%ld", &days, &hours, &mins, &secs);
01190     return 24. * 3600 * days + 3600 * hours + 60 * mins + secs;
01191   } else {
01192     if (strchr(s, ':') != NULL && strchr(s, ':') !=  strrchr(s, ':')) {
01193        sscanf(s, "%ld:%ld:%ld", &hours, &mins, &secs);
01194        return 3600. * hours + 60 * mins + secs;
01195     } else {
01196       if (strchr(s, ':') != NULL) {
01197         sscanf(s, "%ld:%ld", &mins, &secs);
01198         return 60. * mins + secs;
01199       } else {
01200         return RET_ERROR;
01201       }
01202     }
01203   }
01204 }
01205 
01206 void apmon_mon_utils::readJobDiskUsage(MonitoredJob job,
01207                                 JobDirInfo& info) throw(runtime_error) {
01208 #ifndef WIN32
01209   int status;
01210   pid_t cpid;
01211   char *cmd, s_tmp[20], *argv[4], msg[100];
01212   FILE *fp;
01213   long mypid = getpid();
01214   char du_f[50], df_f[50];
01215 
01216   /* generate names for the temporary files which will hold the output of the
01217      du and df commands */
01218   sprintf(du_f, "/tmp/apmon_du%ld", mypid);
01219   sprintf(df_f, "/tmp/apmon_df%ld", mypid);
01220 
01221   if (strlen(job.workdir) == 0) {
01222     sprintf(msg, "[ readJobDiskUsage() ] The working directory for the job %ld was not specified, not monitoring disk usage", job.pid);
01223     throw runtime_error(msg);
01224   }
01225 
01226   cmd = (char *)malloc((300 + 2 * strlen(job.workdir)) * sizeof(char));
01227   strcpy(cmd, "PRT=`du -Lsk ");
01228   strcat(cmd, job.workdir);
01229   //strcat(cmd, " | tail -1 | cut -f 1 > ");
01230   strcat(cmd, " ` ; if [[ $? -eq 0 ]] ; then OUT=`echo $PRT | cut -f 1` ; echo $OUT ; exit 0 ; else exit -1 ; fi > ");
01231   strcat(cmd, du_f);
01232 
01233 
01234   switch (cpid = fork()) {
01235   case -1:
01236     sprintf(msg, "[ readJobDiskUsage() ] Unable to fork(). The disk usage information could not be determined for %ld", job.pid);
01237     throw runtime_error(msg);
01238   case 0:
01239     argv[0] = (char *)"/bin/sh"; argv[1] = (char *)"-c";
01240     argv[2] = cmd; argv[3] = 0;
01241     execv("/bin/sh", argv);
01242     exit(RET_ERROR);
01243   default:
01244     if (waitpid(cpid, &status, 0) == -1) {
01245       free(cmd);
01246       sprintf(msg, "[ readJobDiskUsage() ] The disk usage (du) information for %ld could not be determined", job.pid);
01247       unlink(du_f); unlink(df_f);
01248       throw runtime_error(msg);
01249     }
01250     break ;
01251   }
01252 
01253   strcpy(cmd, "PRT=`df -m ");
01254   strcat(cmd, job.workdir);
01255   //strcat(cmd, " | tail -1 > ");
01256   strcat(cmd, " `; if [[ $? -eq 0 ]] ; then OUT=`echo $PRT | cut -d ' ' -f 8-` ; echo $OUT ; exit 0 ; else exit -1 ; fi > ");
01257 
01258   strcat(cmd, df_f);
01259   //printf("### cmd: %s\n", cmd);
01260 
01261   switch (cpid = fork()) {
01262   case -1:
01263     sprintf(msg, "[ readJobDiskUsage() ] Unable to fork(). The disk usage information could not be determined for %ld", job.pid);
01264     throw runtime_error(msg);
01265   case 0:
01266     argv[0] = (char *)"/bin/sh"; argv[1] = (char *)"-c";
01267     argv[2] = cmd; argv[3] = 0;
01268     execv("/bin/sh", argv);
01269     exit(RET_ERROR);
01270   default:
01271     if (waitpid(cpid, &status, 0) == -1) {
01272       free(cmd);
01273       sprintf(msg, "[ readJobDiskUsage() ] The disk usage (df) information for %ld could not be determined", job.pid);
01274       unlink(du_f); unlink(df_f);
01275       throw runtime_error(msg);
01276     }
01277     break ;
01278   }
01279 
01280   free(cmd);
01281   fp = fopen(du_f, "rt");
01282   if (fp == NULL) {
01283     sprintf(msg, "[ readJobDiskUsage() ] Error opening du output file for process %ld", job.pid);
01284     throw runtime_error(msg);
01285   }
01286 
01287   fscanf(fp, "%lf", &(info.workdir_size));
01288   /* keep the directory size in MB */
01289   info.workdir_size /= 1024.0;
01290   fclose(fp);
01291   unlink(du_f);
01292 
01293   fp = fopen(df_f, "rt");
01294   if (fp == NULL) {
01295     sprintf(msg, "[ readJobDiskUsage() ] Error opening df output file for process %ld", job.pid);
01296     throw runtime_error(msg);
01297   }
01298   fscanf(fp, "%s %lf %lf %lf %lf", s_tmp, &(info.disk_total),
01299          &(info.disk_used), &(info.disk_free), &(info.disk_usage));
01300   fclose(fp);
01301   unlink(df_f);
01302 #endif
01303 }
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Defines

Generated at Fri Sep 2 2011 16:24:46 for Gaudi Framework, version v22r4 by Doxygen version 1.7.2 written by Dimitri van Heesch, © 1997-2004