Gaudi Framework, version v21r8

Home   Generated: 17 Mar 2010

monitor_utils.cpp

Go to the documentation of this file.
00001 
00007 /*
00008  * ApMon - Application Monitoring Tool
00009  * Version: 2.2.0
00010  *
00011  * Copyright (C) 2006 California Institute of Technology
00012  *
00013  * Permission is hereby granted, free of charge, to use, copy and modify 
00014  * this software and its documentation (the "Software") for any
00015  * purpose, provided that existing copyright notices are retained in 
00016  * all copies and that this notice is included verbatim in any distributions
00017  * or substantial portions of the Software. 
00018  * This software is a part of the MonALISA framework (http://monalisa.cacr.caltech.edu).
00019  * Users of the Software are asked to feed back problems, benefits,
00020  * and/or suggestions about the software to the MonALISA Development Team
00021  * (developers@monalisa.cern.ch). Support for this software - fixing of bugs,
00022  * incorporation of new features - is done on a best effort basis. All bug
00023  * fixes and enhancements will be made available under the same terms and
00024  * conditions as the original software,
00025 
00026  * IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
00027  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
00028  * OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY DERIVATIVES THEREOF,
00029  * EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00030 
00031  * THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
00032  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY,
00033  * FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT. THIS SOFTWARE IS
00034  * PROVIDED ON AN "AS IS" BASIS, AND THE AUTHORS AND DISTRIBUTORS HAVE NO
00035  * OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR
00036  * MODIFICATIONS.
00037  */
00038 
00039 #include "ApMon.h"
00040 #include "monitor_utils.h"
00041 #include "proc_utils.h"
00042 #include "utils.h"
00043 #include "mon_constants.h"
00044 
00045 using namespace apmon_utils;
00046 using namespace apmon_mon_utils;
00047 
00048 void ApMon::sendJobInfo() {
00049 #ifndef WIN32
00050   int i;
00051   long crtTime;
00052 
00053  /* the apMon_free() function calls sendJobInfo() from another thread and 
00054      we need mutual exclusion */
00055   pthread_mutex_lock(&mutexBack);
00056 
00057   if (nMonJobs == 0) {
00058     logger(WARNING, "There are no jobs to be monitored, not sending job monitoring information.");
00059     pthread_mutex_unlock(&mutexBack);
00060     return;
00061   }
00062 
00063   crtTime = time(NULL);
00064   logger(INFO, "Sending job monitoring information...");
00065   lastJobInfoSend = (time_t)crtTime; 
00066 
00067   /* send monitoring information for all the jobs specified by the user */
00068   for (i = 0; i < nMonJobs; i++) 
00069     sendOneJobInfo(monJobs[i]);
00070 
00071   pthread_mutex_unlock(&mutexBack);
00072 #endif
00073 }
00074 
00075 void ApMon::updateJobInfo(MonitoredJob job) {
00076   bool needJobInfo, needDiskInfo;
00077   bool jobExists = true;
00078   char err_msg[200];
00079 
00080   PsInfo jobInfo;
00081   JobDirInfo dirInfo;
00082 
00083   /**** runtime, CPU & memory usage information ****/ 
00084   needJobInfo = actJobMonitorParams[JOB_RUN_TIME] 
00085     || actJobMonitorParams[JOB_CPU_TIME] 
00086     || actJobMonitorParams[JOB_CPU_USAGE] 
00087     || actJobMonitorParams[JOB_MEM_USAGE] 
00088     || actJobMonitorParams[JOB_VIRTUALMEM] 
00089     || actJobMonitorParams[JOB_RSS] 
00090     || actJobMonitorParams[JOB_OPEN_FILES];
00091   if (needJobInfo) {
00092     try {
00093       readJobInfo(job.pid, jobInfo);
00094       currentJobVals[JOB_RUN_TIME] = jobInfo.etime;
00095       currentJobVals[JOB_CPU_TIME] = jobInfo.cputime; 
00096       currentJobVals[JOB_CPU_USAGE] = jobInfo.pcpu;
00097       currentJobVals[JOB_MEM_USAGE] = jobInfo.pmem; 
00098       currentJobVals[JOB_VIRTUALMEM] = jobInfo.vsz;
00099       currentJobVals[JOB_RSS] = jobInfo.rsz;
00100 
00101       if (jobInfo.open_fd < 0)
00102         jobRetResults[JOB_OPEN_FILES] = RET_ERROR;
00103       currentJobVals[JOB_OPEN_FILES] = jobInfo.open_fd;
00104 
00105     } catch (runtime_error &err) {
00106       logger(WARNING, err.what());
00107       jobRetResults[JOB_RUN_TIME] = jobRetResults[JOB_CPU_TIME] = 
00108         jobRetResults[JOB_CPU_USAGE] = jobRetResults[JOB_MEM_USAGE] =
00109         jobRetResults[JOB_VIRTUALMEM] = jobRetResults[JOB_RSS] =
00110         jobRetResults[JOB_OPEN_FILES] = RET_ERROR;
00111       strcpy(err_msg, err.what());
00112       if (strstr(err_msg, "does not exist") != NULL)
00113         jobExists = false;
00114     } 
00115   }
00116 
00117   /* if the monitored job has terminated, remove it */
00118   if (!jobExists) {
00119     try {
00120       removeJobToMonitor(job.pid);
00121     } catch (runtime_error &err) {
00122       logger(WARNING, err.what());
00123     }
00124     return;
00125   }
00126 
00127   /* disk usage information */
00128   needDiskInfo = actJobMonitorParams[JOB_DISK_TOTAL] 
00129     || actJobMonitorParams[JOB_DISK_USED] 
00130     || actJobMonitorParams[JOB_DISK_FREE] 
00131     || actJobMonitorParams[JOB_DISK_USAGE] 
00132     || actJobMonitorParams[JOB_WORKDIR_SIZE];
00133   if (needDiskInfo) {
00134     try {
00135       readJobDiskUsage(job, dirInfo);
00136       currentJobVals[JOB_WORKDIR_SIZE] = dirInfo.workdir_size;
00137       currentJobVals[JOB_DISK_TOTAL] = dirInfo.disk_total; 
00138       currentJobVals[JOB_DISK_USED] = dirInfo.disk_used;
00139       currentJobVals[JOB_DISK_USAGE] = dirInfo.disk_usage; 
00140       currentJobVals[JOB_DISK_FREE] = dirInfo.disk_free;
00141     } catch (runtime_error& err) {
00142       logger(WARNING, err.what());
00143       jobRetResults[JOB_WORKDIR_SIZE] = jobRetResults[JOB_DISK_TOTAL] 
00144         = jobRetResults[JOB_DISK_USED] 
00145         = jobRetResults[JOB_DISK_USAGE] 
00146         = jobRetResults[JOB_DISK_FREE] 
00147         = RET_ERROR;
00148     }
00149   }
00150 }
00151  
00152 void ApMon::sendOneJobInfo(MonitoredJob job) {
00153   int i;
00154   int nParams = 0;
00155 
00156   char **paramNames, **paramValues;
00157   int *valueTypes;
00158 
00159   valueTypes = (int *)malloc(nJobMonitorParams * sizeof(int));
00160   paramNames = (char **)malloc(nJobMonitorParams * sizeof(char *));
00161   paramValues = (char **)malloc(nJobMonitorParams * sizeof(char *));
00162 
00163   for (i = 0; i < nJobMonitorParams; i++) {
00164     jobRetResults[i] = RET_SUCCESS;
00165     currentJobVals[i] = 0;
00166   }
00167     
00168   updateJobInfo(job);
00169 
00170   for (i = 0; i < nJobMonitorParams; i++) {
00171     if (actJobMonitorParams[i] && jobRetResults[i] != RET_ERROR) {
00172      
00173       paramNames[nParams] = jobMonitorParams[i];
00174       paramValues[nParams] = (char *)&currentJobVals[i];
00175       valueTypes[nParams] = XDR_REAL64;
00176       nParams++;
00177     } 
00178     /* don't disable the parameter (maybe for another job it can be
00179          obtained) */
00180       /*
00181         else
00182         if (autoDisableMonitoring)
00183         actJobMonitorParams[ind] = 0;
00184       */
00185   }
00186 
00187   if (nParams == 0) {
00188     free(paramNames); free(valueTypes);
00189     free(paramValues);
00190     return;
00191   }
00192 
00193   try {
00194     if (nParams > 0)
00195       sendParameters(job.clusterName, job.nodeName, nParams, 
00196                      paramNames, valueTypes, paramValues);
00197   } catch (runtime_error& err) {
00198     logger(WARNING, err.what());
00199   }
00200 
00201   free(paramNames);
00202   free(valueTypes);
00203   free(paramValues);
00204 }
00205 
00206 
00207 void ApMon::updateSysInfo() {
00208   int needCPUInfo, needSwapPagesInfo, needLoadInfo, needMemInfo,
00209     needNetInfo, needUptime, needProcessesInfo, needNetstatInfo; 
00210  
00211   /**** CPU usage information ****/ 
00212   needCPUInfo = actSysMonitorParams[SYS_CPU_USAGE] 
00213     || actSysMonitorParams[SYS_CPU_USR] 
00214     || actSysMonitorParams[SYS_CPU_SYS] 
00215     || actSysMonitorParams[SYS_CPU_NICE] 
00216     || actSysMonitorParams[SYS_CPU_IDLE];
00217   if (needCPUInfo) {
00218     try {
00219       ProcUtils::getCPUUsage(*this, currentSysVals[SYS_CPU_USAGE], 
00220                              currentSysVals[SYS_CPU_USR], 
00221                              currentSysVals[SYS_CPU_SYS],
00222                              currentSysVals[SYS_CPU_NICE], 
00223                              currentSysVals[SYS_CPU_IDLE], numCPUs);
00224     } catch (procutils_error &perr) {
00225       /* "permanent" error (the parameters could not be obtained) */
00226       logger(WARNING, perr.what());
00227       sysRetResults[SYS_CPU_USAGE] = sysRetResults[SYS_CPU_SYS] = 
00228         sysRetResults[SYS_CPU_USR] = sysRetResults[SYS_CPU_NICE] =
00229         sysRetResults[SYS_CPU_IDLE] = sysRetResults[SYS_CPU_USAGE] = PROCUTILS_ERROR;
00230     } catch (runtime_error &err) {
00231       /* temporary error (next time we might be able to get the paramerers) */
00232       logger(WARNING, err.what());
00233       sysRetResults[SYS_CPU_USAGE] = sysRetResults[SYS_CPU_SYS] 
00234         = sysRetResults[SYS_CPU_USR] 
00235         = sysRetResults[SYS_CPU_NICE] 
00236         = sysRetResults[SYS_CPU_IDLE] 
00237         = sysRetResults[SYS_CPU_USAGE] 
00238         = RET_ERROR;
00239     }
00240   }
00241 
00242   needSwapPagesInfo = actSysMonitorParams[SYS_PAGES_IN] 
00243     || actSysMonitorParams[SYS_PAGES_OUT] 
00244     || actSysMonitorParams[SYS_SWAP_IN] 
00245     || actSysMonitorParams[SYS_SWAP_OUT];
00246 
00247   if (needSwapPagesInfo) {
00248     try {
00249       ProcUtils::getSwapPages(*this, currentSysVals[SYS_PAGES_IN], 
00250                               currentSysVals[SYS_PAGES_OUT], 
00251                               currentSysVals[SYS_SWAP_IN],
00252                               currentSysVals[SYS_SWAP_OUT]);
00253     } catch (procutils_error &perr) {
00254       /* "permanent" error (the parameters could not be obtained) */
00255       logger(WARNING, perr.what());
00256       sysRetResults[SYS_PAGES_IN] = sysRetResults[SYS_PAGES_OUT] = 
00257       sysRetResults[SYS_SWAP_OUT] = sysRetResults[SYS_SWAP_IN] = PROCUTILS_ERROR;
00258     } catch (runtime_error &err) {
00259       /* temporary error (next time we might be able to get the paramerers) */
00260       logger(WARNING, err.what());
00261       sysRetResults[SYS_PAGES_IN] = sysRetResults[SYS_PAGES_OUT] 
00262         = sysRetResults[SYS_SWAP_IN] 
00263         = sysRetResults[SYS_SWAP_OUT] 
00264         = RET_ERROR;
00265     }
00266   }
00267 
00268   needLoadInfo = actSysMonitorParams[SYS_LOAD1] 
00269     || actSysMonitorParams[SYS_LOAD5] 
00270     || actSysMonitorParams[SYS_LOAD15];
00271     
00272   if (needLoadInfo) {
00273     double dummyVal;
00274     try {
00275       /* the number of processes is now obtained with the getProcesses()
00276          function, not with getLoad() */
00277       ProcUtils::getLoad(currentSysVals[SYS_LOAD1], currentSysVals[SYS_LOAD5], 
00278                          currentSysVals[SYS_LOAD15],dummyVal);
00279     } catch (procutils_error& perr) {
00280       /* "permanent" error (the parameters could not be obtained) */
00281       logger(WARNING, perr.what());
00282       sysRetResults[SYS_LOAD1] = sysRetResults[SYS_LOAD5] 
00283         = sysRetResults[SYS_LOAD15] 
00284         = PROCUTILS_ERROR;
00285     }
00286   }
00287 
00288   /**** get statistics about the current processes ****/
00289   needProcessesInfo = actSysMonitorParams[SYS_PROCESSES];
00290   if (needProcessesInfo) {
00291     try {
00292       ProcUtils::getProcesses(currentSysVals[SYS_PROCESSES], 
00293                               currentProcessStates);
00294     } catch (runtime_error& err) {
00295       logger(WARNING, err.what());
00296       sysRetResults[SYS_PROCESSES] = RET_ERROR;
00297     }
00298   }
00299 
00300   /**** get the amount of memory currently in use ****/
00301   needMemInfo = actSysMonitorParams[SYS_MEM_USED] 
00302     || actSysMonitorParams[SYS_MEM_FREE] 
00303     || actSysMonitorParams[SYS_SWAP_USED] 
00304     || actSysMonitorParams[SYS_SWAP_FREE] 
00305     || actSysMonitorParams[SYS_MEM_USAGE] 
00306     || actSysMonitorParams[SYS_SWAP_USAGE];
00307 
00308   if (needMemInfo) {
00309     try {
00310       ProcUtils::getMemUsed(currentSysVals[SYS_MEM_USED], 
00311                             currentSysVals[SYS_MEM_FREE], 
00312                             currentSysVals[SYS_SWAP_USED],
00313                             currentSysVals[SYS_SWAP_FREE]);
00314       currentSysVals[SYS_MEM_USAGE] = 100 * currentSysVals[SYS_MEM_USED] /
00315         (currentSysVals[SYS_MEM_USED] +  currentSysVals[SYS_MEM_FREE]); 
00316       currentSysVals[SYS_SWAP_USAGE] = 100 * currentSysVals[SYS_SWAP_USED] /
00317         (currentSysVals[SYS_SWAP_USED] +  currentSysVals[SYS_SWAP_FREE]); 
00318     } catch (procutils_error &perr) {
00319       logger(WARNING, perr.what());
00320       sysRetResults[SYS_MEM_USED] = sysRetResults[SYS_MEM_FREE] = 
00321         sysRetResults[SYS_SWAP_USED] = sysRetResults[SYS_SWAP_FREE] = 
00322         sysRetResults[SYS_MEM_USAGE] = sysRetResults[SYS_SWAP_USAGE] = 
00323         PROCUTILS_ERROR;
00324     }
00325   }
00326 
00327   
00328   /**** network monitoring information ****/
00329   needNetInfo = actSysMonitorParams[SYS_NET_IN] || 
00330     actSysMonitorParams[SYS_NET_OUT] || actSysMonitorParams[SYS_NET_ERRS];
00331   if (needNetInfo && this -> nInterfaces > 0) {
00332     try {
00333       ProcUtils::getNetInfo(*this, &currentNetIn, &currentNetOut, 
00334                             &currentNetErrs);
00335     } catch (procutils_error &perr) {
00336       logger(WARNING, perr.what());
00337       sysRetResults[SYS_NET_IN] = sysRetResults[SYS_NET_OUT] = 
00338         sysRetResults[SYS_NET_ERRS] = PROCUTILS_ERROR;     
00339     } catch (runtime_error &err) {
00340       logger(WARNING, err.what());
00341       sysRetResults[SYS_NET_IN] = sysRetResults[SYS_NET_OUT] = 
00342         sysRetResults[SYS_NET_ERRS] = RET_ERROR; 
00343     }
00344   }
00345 
00346   needNetstatInfo = actSysMonitorParams[SYS_NET_SOCKETS] || 
00347     actSysMonitorParams[SYS_NET_TCP_DETAILS];
00348   if (needNetstatInfo) {
00349     try {
00350       ProcUtils::getNetstatInfo(*this, this -> currentNSockets, 
00351                                 this -> currentSocketsTCP); 
00352     } catch (runtime_error &err) {
00353       logger(WARNING, err.what());
00354       sysRetResults[SYS_NET_SOCKETS] = sysRetResults[SYS_NET_TCP_DETAILS] = 
00355         RET_ERROR; 
00356     }
00357   }
00358 
00359   needUptime = actSysMonitorParams[SYS_UPTIME];
00360   if (needUptime) {
00361     try {
00362       currentSysVals[SYS_UPTIME] = ProcUtils::getUpTime();
00363     } catch (procutils_error &perr) {
00364       logger(WARNING, perr.what());
00365       sysRetResults[SYS_UPTIME] = PROCUTILS_ERROR;
00366     } 
00367   }
00368 
00369 }
00370 
00371 void ApMon::sendSysInfo() {
00372 #ifndef WIN32
00373   int nParams = 0, maxNParams;
00374   int i;
00375   long crtTime;
00376 
00377   int *valueTypes;
00378   char **paramNames, **paramValues;
00379 
00380   crtTime = time(NULL);
00381   logger(INFO, "Sending system monitoring information...");
00382 
00383   /* make some initializations only the first time this
00384      function is called */
00385   if (this -> sysInfo_first) {
00386     for (i = 0; i < this -> nInterfaces; i++) {
00387      this -> lastBytesSent[i] = this -> lastBytesReceived[i] = 0.0;
00388      this -> lastNetErrs[i] = 0;
00389      
00390     }
00391     this -> sysInfo_first = FALSE;
00392   }
00393 
00394   /* the maximum number of parameters that can be included in a datagram */
00395   /* (the last three terms are for: parameters corresponding to each possible
00396      state of the processes, parameters corresponding to the types of open 
00397      sockets, parameters corresponding to each possible state of the TCP
00398      sockets.) */
00399   maxNParams = nSysMonitorParams + (2 * nInterfaces - 1) + 15 + 4 + 
00400     N_TCP_STATES;
00401 
00402   valueTypes = (int *)malloc(maxNParams * sizeof(int));
00403   paramNames = (char **)malloc(maxNParams * sizeof(char *));
00404   paramValues = (char **)malloc(maxNParams * sizeof(char *));
00405 
00406   for (i = 0; i < nSysMonitorParams; i++) {
00407     if (actSysMonitorParams[i] > 0) /* if the parameter is enabled */
00408       sysRetResults[i] = RET_SUCCESS;
00409     else /* mark it with RET_ERROR so that it will be not included in the
00410             datagram */
00411       sysRetResults[i] = RET_ERROR;
00412   }
00413 
00414   updateSysInfo();
00415 
00416   for (i = 0; i < nSysMonitorParams; i++) {
00417     if (i == SYS_NET_IN || i == SYS_NET_OUT || i == SYS_NET_ERRS ||
00418         i == SYS_NET_SOCKETS || i == SYS_NET_TCP_DETAILS || i == SYS_PROCESSES)
00419       continue;
00420 
00421     if (sysRetResults[i] == PROCUTILS_ERROR) {
00422       /* could not read the requested information from /proc, disable this
00423          parameter */
00424       if (autoDisableMonitoring)
00425         actSysMonitorParams[i] = 0;
00426     } else if (sysRetResults[i] != RET_ERROR) {
00427       /* the parameter is enabled and there were no errors obtaining it */
00428       paramNames[nParams] = strdup(sysMonitorParams[i]);
00429       paramValues[nParams] = (char *)&currentSysVals[i];
00430       valueTypes[nParams] = XDR_REAL64;
00431       nParams++;
00432     } 
00433   }
00434 
00435   if (actSysMonitorParams[SYS_NET_IN] == 1) {
00436     if (sysRetResults[SYS_NET_IN] == PROCUTILS_ERROR) {
00437       if (autoDisableMonitoring)
00438         actSysMonitorParams[SYS_NET_IN] = 0;
00439     } else  if (sysRetResults[SYS_NET_IN] != RET_ERROR) {
00440       for (i = 0; i < nInterfaces; i++) { 
00441         paramNames[nParams] =  (char *)malloc(20 * sizeof(char));
00442         strcpy(paramNames[nParams], interfaceNames[i]);
00443         strcat(paramNames[nParams], "_in");
00444         paramValues[nParams] = (char *)&currentNetIn[i];
00445         valueTypes[nParams] = XDR_REAL64;
00446         nParams++;
00447       }
00448     }
00449   }
00450 
00451   if (actSysMonitorParams[SYS_NET_OUT] == 1) {
00452     if (sysRetResults[SYS_NET_IN] == PROCUTILS_ERROR) {
00453       if (autoDisableMonitoring)
00454         actSysMonitorParams[SYS_NET_OUT] = 0;
00455     } else  if (sysRetResults[SYS_NET_OUT] != RET_ERROR) {
00456       for (i = 0; i < nInterfaces; i++) { 
00457         paramNames[nParams] =  (char *)malloc(20 * sizeof(char));
00458         strcpy(paramNames[nParams], interfaceNames[i]);
00459         strcat(paramNames[nParams], "_out");
00460         paramValues[nParams] = (char *)&currentNetOut[i];
00461         valueTypes[nParams] = XDR_REAL64;
00462         nParams++;
00463       }
00464     }
00465   }
00466 
00467   if (actSysMonitorParams[SYS_NET_ERRS] == 1) {
00468     if (sysRetResults[SYS_NET_ERRS] == PROCUTILS_ERROR) {
00469       if (autoDisableMonitoring)
00470         actSysMonitorParams[SYS_NET_ERRS] = 0;
00471     } else  if (sysRetResults[SYS_NET_ERRS] != RET_ERROR) {
00472       for (i = 0; i < nInterfaces; i++) { 
00473         paramNames[nParams] =  (char *)malloc(20 * sizeof(char));
00474         strcpy(paramNames[nParams], interfaceNames[i]);
00475         strcat(paramNames[nParams], "_errs");
00476         paramValues[nParams] = (char *)&currentNetErrs[i];
00477         valueTypes[nParams] = XDR_REAL64;
00478         nParams++;
00479       }
00480     }
00481   }
00482 
00483 
00484   if (actSysMonitorParams[SYS_PROCESSES] == 1) {
00485     if (sysRetResults[SYS_PROCESSES] != RET_ERROR) {
00486       char act_states[] = {'D', 'R', 'S', 'T', 'Z'};
00487       for (i = 0; i < 5; i++) { 
00488         paramNames[nParams] =  (char *)malloc(20 * sizeof(char));
00489         sprintf(paramNames[nParams], "processes_%c", act_states[i]);
00490         paramValues[nParams] = (char *)&currentProcessStates[act_states[i] - 65];
00491         valueTypes[nParams] = XDR_REAL64;
00492         nParams++;
00493       }
00494     }
00495   }
00496 
00497   if (actSysMonitorParams[SYS_NET_SOCKETS] == 1) {
00498     if (sysRetResults[SYS_NET_SOCKETS] != RET_ERROR) {
00499       const char *socket_types[] = {"tcp", "udp", "icm", "unix"};
00500       for (i = 0; i < 4; i++) { 
00501         paramNames[nParams] =  (char *)malloc(30 * sizeof(char));
00502         sprintf(paramNames[nParams], "sockets_%s", socket_types[i]);
00503         paramValues[nParams] = (char *)&currentNSockets[i];
00504         valueTypes[nParams] = XDR_REAL64;
00505         nParams++;
00506       }
00507     }
00508   }
00509 
00510   if (actSysMonitorParams[SYS_NET_TCP_DETAILS] == 1) {
00511     if (sysRetResults[SYS_NET_TCP_DETAILS] != RET_ERROR) {
00512       for (i = 0; i < N_TCP_STATES; i++) { 
00513         paramNames[nParams] =  (char *)malloc(30 * sizeof(char));
00514         sprintf(paramNames[nParams], "sockets_tcp_%s", socketStatesMapTCP[i]);
00515         paramValues[nParams] = (char *)&currentSocketsTCP[i];
00516         valueTypes[nParams] = XDR_REAL64;
00517         nParams++;
00518       }
00519     }
00520   }
00521 
00522   try {
00523     if (nParams > 0)
00524       sendParameters(sysMonCluster, sysMonNode, nParams, 
00525                      paramNames, valueTypes, paramValues);
00526   } catch (runtime_error& err) {
00527     logger(WARNING, err.what());
00528   }
00529 
00530   this -> lastSysInfoSend = crtTime;
00531 
00532   if (sysRetResults[SYS_NET_IN] == RET_SUCCESS) {
00533     free(currentNetIn);
00534     free(currentNetOut);
00535     free(currentNetErrs);
00536   }
00537 
00538   for (i = 0; i < nParams; i++)
00539     free(paramNames[i]);
00540   free(paramNames);
00541   free(valueTypes);
00542   free(paramValues);
00543 #endif
00544 }
00545 
00546 void ApMon::updateGeneralInfo() {
00547 
00548   strcpy(cpuVendor, ""); strcpy(cpuFamily, "");
00549   strcpy(cpuModel, ""); strcpy(cpuModelName, "");
00550 
00551   if (actGenMonitorParams[GEN_CPU_MHZ] == 1 || 
00552       actGenMonitorParams[GEN_BOGOMIPS] == 1 || 
00553       actGenMonitorParams[GEN_CPU_VENDOR_ID] == 1 ||
00554       actGenMonitorParams[GEN_CPU_FAMILY] == 1 || 
00555       actGenMonitorParams[GEN_CPU_MODEL] == 1 ||
00556       actGenMonitorParams[GEN_CPU_MODEL_NAME] == 1) {
00557     try {
00558       ProcUtils::getCPUInfo(*this);
00559     } catch (procutils_error& err) {
00560       logger(WARNING, err.what());
00561       genRetResults[GEN_CPU_MHZ] = genRetResults[GEN_BOGOMIPS] = PROCUTILS_ERROR;
00562     }
00563   }
00564 
00565   if (actGenMonitorParams[GEN_TOTAL_MEM] == 1 || 
00566       actGenMonitorParams[GEN_TOTAL_SWAP] == 1) {
00567     try {
00568       ProcUtils::getSysMem(currentGenVals[GEN_TOTAL_MEM], 
00569                            currentGenVals[GEN_TOTAL_SWAP]);
00570     } catch (procutils_error& perr) {
00571       logger(WARNING, perr.what());
00572       genRetResults[GEN_TOTAL_MEM] = genRetResults[GEN_TOTAL_SWAP] = PROCUTILS_ERROR;
00573     }
00574   }
00575 
00576   if (this -> numCPUs > 0)
00577     currentGenVals[GEN_NO_CPUS] = this -> numCPUs;
00578   else
00579     genRetResults[GEN_NO_CPUS] = PROCUTILS_ERROR;
00580 }
00581 
00582 void ApMon::sendGeneralInfo() {
00583 #ifndef WIN32
00584   int nParams, maxNParams, i;
00585   char tmp_s[50];
00586   
00587   char **paramNames, **paramValues;
00588   int *valueTypes;
00589 
00590   logger(INFO, "Sending general monitoring information...");
00591   
00592   maxNParams = nGenMonitorParams + numIPs;
00593   valueTypes = (int *)malloc(maxNParams * sizeof(int));
00594   paramNames = (char **)malloc(maxNParams * sizeof(char *));
00595   paramValues = (char **)malloc(maxNParams * sizeof(char *));
00596   
00597   nParams = 0;
00598 
00599   updateGeneralInfo();
00600 
00601   if (actGenMonitorParams[GEN_HOSTNAME]) {
00602     paramNames[nParams] = strdup(genMonitorParams[GEN_HOSTNAME]);
00603     valueTypes[nParams] = XDR_STRING;
00604     paramValues[nParams] = myHostname;
00605     nParams++;
00606   }
00607 
00608   if (actGenMonitorParams[GEN_IP]) {
00609     for (i = 0; i < this -> numIPs; i++) {
00610       strcpy(tmp_s, "ip_");
00611       strcat(tmp_s, interfaceNames[i]);
00612       paramNames[nParams] = strdup(tmp_s);
00613       valueTypes[nParams] = XDR_STRING;
00614       paramValues[nParams] = this -> allMyIPs[i];
00615       nParams++;
00616     }
00617   }
00618 
00619   if (actGenMonitorParams[GEN_CPU_VENDOR_ID] && strlen(cpuVendor) != 0) {
00620     paramNames[nParams] = strdup(genMonitorParams[GEN_CPU_VENDOR_ID]);
00621     valueTypes[nParams] = XDR_STRING;
00622     paramValues[nParams] = cpuVendor;
00623     nParams++;
00624   }
00625 
00626   if (actGenMonitorParams[GEN_CPU_FAMILY] && strlen(cpuFamily) != 0) {
00627     paramNames[nParams] = strdup(genMonitorParams[GEN_CPU_FAMILY]);
00628     valueTypes[nParams] = XDR_STRING;
00629     paramValues[nParams] = cpuFamily;
00630     nParams++;
00631   }
00632 
00633   if (actGenMonitorParams[GEN_CPU_MODEL] && strlen(cpuModel) != 0) {
00634     paramNames[nParams] = strdup(genMonitorParams[GEN_CPU_MODEL]);
00635     valueTypes[nParams] = XDR_STRING;
00636     paramValues[nParams] = cpuModel;
00637     nParams++;
00638   }
00639   
00640   if (actGenMonitorParams[GEN_CPU_MODEL_NAME] && strlen(cpuModelName) != 0) {
00641     paramNames[nParams] = strdup(genMonitorParams[GEN_CPU_MODEL_NAME]);
00642     valueTypes[nParams] = XDR_STRING;
00643     paramValues[nParams] = cpuModelName;
00644     nParams++;
00645   }
00646 
00647   for (i = 0; i < nGenMonitorParams; i++) {
00648     if (actGenMonitorParams[i] != 1 || i == GEN_IP || i == GEN_HOSTNAME ||
00649         i == GEN_CPU_VENDOR_ID || i == GEN_CPU_FAMILY || i == GEN_CPU_MODEL
00650         || i == GEN_CPU_MODEL_NAME)
00651       continue;
00652 
00653     if (genRetResults[i] == PROCUTILS_ERROR) {
00654       /* could not read the requested information from /proc, disable this
00655          parameter */
00656       if (autoDisableMonitoring)
00657         actGenMonitorParams[i] = 0;
00658     } else if (genRetResults[i] != RET_ERROR) {
00659       paramNames[nParams] = strdup(genMonitorParams[i]);
00660       paramValues[nParams] = (char *)&currentGenVals[i];
00661       valueTypes[nParams] = XDR_REAL64;
00662       nParams++;
00663     } 
00664   }
00665 
00666   try {
00667     if (nParams > 0)
00668       sendParameters(sysMonCluster, sysMonNode, nParams, 
00669                      paramNames, valueTypes, paramValues);
00670   } catch (runtime_error& err) {
00671     logger(WARNING, err.what());
00672   }
00673 
00674   for (i = 0; i < nParams; i++)
00675     free(paramNames[i]);
00676   free(paramNames);
00677   free(valueTypes);
00678   free(paramValues);
00679 #endif
00680 }
00681 
00682 void ApMon::initMonitoring() {
00683   int i;
00684 
00685   this -> autoDisableMonitoring = true;
00686   this -> sysMonitoring = false;
00687   this -> jobMonitoring = false;
00688   this -> genMonitoring = false;
00689   this -> confCheck = false;
00690 
00691 #ifndef WIN32
00692   pthread_mutex_init(&this -> mutex, NULL);
00693   pthread_mutex_init(&this -> mutexBack, NULL);
00694   pthread_mutex_init(&this -> mutexCond, NULL);
00695   pthread_cond_init(&this -> confChangedCond, NULL);
00696 #else
00697   logger(INFO, "init mutexes...");
00698   this -> mutex     = CreateMutex(NULL, FALSE, NULL);
00699   this -> mutexBack = CreateMutex(NULL, FALSE, NULL);
00700   this -> mutexCond = CreateMutex(NULL, FALSE, NULL);
00701   this -> confChangedCond = CreateEvent(NULL, FALSE, FALSE, NULL);
00702 
00703   // Initialize the Windows Sockets library
00704 
00705   WORD wVersionRequested;
00706   WSADATA wsaData;
00707   int err;
00708   wVersionRequested = MAKEWORD( 2, 0 );
00709   err = WSAStartup( wVersionRequested, &wsaData );
00710   if ( err != 0 ) {
00711     logger(FATAL, "Could not initialize the Windows Sockets library (WS2_32.dll)");
00712   }
00713 
00714 #endif
00715 
00716   this -> haveBkThread = false;
00717   this -> bkThreadStarted = false;
00718   this -> stopBkThread = false;
00719 
00720   this -> recheckChanged = false;
00721   this -> jobMonChanged = false;
00722   this -> sysMonChanged = false;
00723 
00724   this -> recheckInterval = RECHECK_INTERVAL;
00725   this -> crtRecheckInterval = RECHECK_INTERVAL;
00726   this -> jobMonitorInterval = JOB_MONITOR_INTERVAL;
00727   this -> sysMonitorInterval = SYS_MONITOR_INTERVAL;
00728 
00729   this -> nSysMonitorParams = initSysParams(this -> sysMonitorParams);
00730 
00731   this -> nGenMonitorParams = initGenParams(this -> genMonitorParams);
00732 
00733   this -> nJobMonitorParams = initJobParams(this -> jobMonitorParams);
00734 
00735   initSocketStatesMapTCP(this -> socketStatesMapTCP);
00736 
00737   this -> sysInfo_first = true;
00738   
00739   try {
00740     this -> lastSysInfoSend = ProcUtils::getBootTime();
00741   } catch (procutils_error& perr) {
00742     logger(WARNING, perr.what());
00743     logger(WARNING, "The first system monitoring values may be inaccurate");
00744     this -> lastSysInfoSend = 0;
00745   } 
00746 
00747   for (i = 0; i < nSysMonitorParams; i++)
00748     this -> lastSysVals[i] = 0;
00749 
00750   //this -> lastUsrTime = this -> lastSysTime = 0;
00751   //this -> lastNiceTime = this -> lastIdleTime = 0;
00752 
00753   for (i = 0; i < nSysMonitorParams; i++) {
00754     actSysMonitorParams[i] = 1;
00755     sysRetResults[i] = RET_SUCCESS;
00756   }
00757 
00758   for (i = 0; i < nGenMonitorParams; i++) {
00759     actGenMonitorParams[i] = 1;
00760     genRetResults[i] = RET_SUCCESS;
00761   }
00762 
00763   for (i = 0; i < nJobMonitorParams; i++) {
00764     actJobMonitorParams[i] = 1;
00765     jobRetResults[i] = RET_SUCCESS;
00766   }
00767 
00768   this -> maxMsgRate = MAX_MSG_RATE;
00769 }
00770 
00771 void ApMon::parseXApMonLine(char *line) {
00772   bool flag, found;
00773   int ind;
00774   char tmp[MAX_STRING_LEN], logmsg[200];
00775   char *param, *value;
00776 //  char sbuf[MAX_STRING_LEN];
00777 //  char *pbuf = sbuf;
00778   const char *sep = " =";
00779 
00780   strcpy(tmp, line);
00781   char *tmp2 = tmp + strlen("xApMon_");
00782 
00783   param = strtok/*_r*/(tmp2, sep);//, &pbuf);
00784   value = strtok/*_r*/(NULL, sep);//, &pbuf);
00785 
00786   /* if it is an on/off parameter, assign its value to flag */
00787   if (strcmp(value, "on") == 0)
00788     flag = true;
00789   else /* if it is not an on/off paramenter the value of flag doesn't matter */
00790     flag = false;
00791 
00792   pthread_mutex_lock(&mutexBack);
00793 
00794   found = false;
00795   if (strcmp(param, "job_monitoring") == 0) {
00796     this -> jobMonitoring = flag; found = true;
00797   }
00798   if (strcmp(param, "sys_monitoring") == 0) {
00799     this -> sysMonitoring = flag; found = true;
00800   }
00801   if (strcmp(param, "job_interval") == 0) {
00802     this -> jobMonitorInterval = atol(value); found = true;
00803   }
00804   if (strcmp(param, "sys_interval") == 0) {
00805     this -> sysMonitorInterval = atol(value); found = true;
00806   }
00807   if (strcmp(param, "general_info") == 0) {
00808     this -> genMonitoring = flag; found = true;
00809   }
00810   if (strcmp(param, "conf_recheck") == 0) {
00811     this -> confCheck = flag; found = true;
00812   }
00813   if (strcmp(param, "recheck_interval") == 0) {
00814     this -> recheckInterval = this -> crtRecheckInterval = atol(value); 
00815     found = true;
00816   }
00817   if (strcmp(param, "auto_disable") == 0) {
00818     this -> autoDisableMonitoring = flag;
00819     found = true;
00820   }
00821   if (strcmp(param, "maxMsgRate") == 0) {
00822     this -> maxMsgRate = atoi(value);
00823     found = true;
00824   }
00825 
00826   if (found) {
00827     pthread_mutex_unlock(&mutexBack);
00828     return;
00829   }
00830 
00831   if (strstr(param, "sys_") == param) {
00832     ind = getVectIndex(param + strlen("sys_"), sysMonitorParams, 
00833                        nSysMonitorParams);
00834     if (ind < 0) {
00835       pthread_mutex_unlock(&mutexBack);
00836       sprintf(logmsg, "Invalid parameter name in the configuration file: %s",
00837             param);
00838       logger(WARNING, logmsg);
00839       return;
00840     }
00841     found = true;
00842     this -> actSysMonitorParams[ind] = (int)flag;
00843   }
00844 
00845   if (strstr(param, "job_") == param) {
00846     ind = getVectIndex(param + strlen("job_"), jobMonitorParams, 
00847                        nJobMonitorParams);
00848     
00849     if (ind < 0) {
00850       pthread_mutex_unlock(&mutexBack);
00851       sprintf(logmsg, "Invalid parameter name in the configuration file: %s",
00852             param);
00853       logger(WARNING, logmsg);
00854       return;
00855     }
00856     found = true;
00857     this -> actJobMonitorParams[ind] = (int)flag;
00858   }
00859 
00860   if (!found) {
00861     ind = getVectIndex(param, genMonitorParams, 
00862                        nGenMonitorParams);
00863     if (ind < 0) {
00864       pthread_mutex_unlock(&mutexBack);
00865       sprintf(logmsg, "Invalid parameter name in the configuration file: %s",
00866               param);
00867       logger(WARNING, logmsg);
00868       return;
00869     } else {
00870       found = true;
00871       this -> actGenMonitorParams[ind] = (int)flag;
00872     }
00873   }
00874 
00875   if (!found) {
00876     sprintf(logmsg, "Invalid parameter name in the configuration file: %s",
00877             param);
00878     logger(WARNING, logmsg);
00879   }
00880   pthread_mutex_unlock(&mutexBack);
00881 }
00882   
00883 long *apmon_mon_utils::getChildren(long pid, int& nChildren) 
00884   throw(runtime_error) {
00885 #ifdef WIN32
00886         return 0;
00887 #else
00888   FILE *pf;
00889   long *pids, *ppids, *children;
00890   int nProcesses;
00891   int i, j, status;
00892   pid_t cpid;
00893   char *argv[4], msg[MAX_STRING_LEN], sval[20];
00894   bool processFound;
00895   long mypid = getpid();
00896   char children_f[50], np_f[50], cmd[200];
00897 
00898   /* generate the names of the temporary files in which we have the output
00899      of some commands */
00900   sprintf(children_f, "/tmp/apmon_children%ld", mypid);
00901   sprintf(np_f, "/tmp/apmon_np%ld", mypid);
00902 
00903   switch (cpid = fork()) {
00904   case -1:
00905     throw runtime_error("[ getChildren() ] Unable to fork()");
00906   case 0:
00907     argv[0] = (char *)"/bin/sh"; argv[1] = (char *)"-c";
00908     sprintf(cmd, "ps --no-headers -A -o ppid,pid > %s && wc -l %s > %s",
00909             children_f, children_f, np_f);
00910     argv[2] = cmd;
00911     /*
00912     argv[2] = "ps --no-headers -eo ppid,pid > /tmp/apmon_children.txt && wc -l /tmp/out_children.txt > /tmp/out_np.txt";
00913     */
00914     argv[3] = 0;
00915     execv("/bin/sh", argv);
00916     exit(RET_ERROR);
00917   default:
00918     if (waitpid(cpid, &status, 0) == -1) {
00919       sprintf(msg, "[ getChildren() ] The number of sub-processes for %ld could not be determined", pid);
00920       unlink(children_f); unlink(np_f);
00921       throw runtime_error(msg); 
00922     }
00923   }
00924 
00925   /* find the number of processes */
00926   pf = fopen(np_f, "rt");
00927   if (pf == NULL) {
00928     unlink(np_f); unlink(children_f);
00929     sprintf(msg, "[ getChildren() ] The number of sub-processes for %ld could not be determined",
00930             pid);
00931     throw runtime_error(msg);
00932   } 
00933   fscanf(pf, "%d", &nProcesses);
00934   fclose(pf);   
00935   unlink(np_f);
00936 
00937   pids = (long *)malloc(nProcesses * sizeof(long)); 
00938   ppids = (long *)malloc(nProcesses * sizeof(long)); 
00939   /* estimated maximum size for the returned vector; it will be realloc'ed */
00940   children = (long *)malloc(nProcesses * sizeof(long));
00941 
00942   pf = fopen(children_f, "rt");
00943   if (pf == NULL) {
00944     free(pids); free(ppids); free(children);
00945     unlink(children_f);
00946     sprintf(msg, "[ getChildren() ] The sub-processes for %ld could not be determined", pid);
00947     throw runtime_error(msg);
00948   } 
00949  
00950   /* scan the output of the ps command and find the children of the process,
00951    and also check if the process is still running */
00952   children[0] = pid; nChildren = 1;
00953   processFound = false;
00954   for (i = 0; i < nProcesses; i++) {
00955     fscanf(pf, "%ld %ld", &ppids[i], &pids[i]);
00956     /* look for the given process */
00957     if (pids[i] == children[0] || ppids[i] == children[0])
00958       processFound = true;
00959     if (ppids[i] == children[0]) {
00960       children[nChildren++] = pids[i];
00961     }
00962   }
00963   fclose(pf);
00964   unlink(children_f);
00965 
00966   if (processFound == false) {
00967     free(pids); free(ppids); free(children);
00968     nChildren = 0;
00969     sprintf(msg, "[ getChildren() ] The process %ld does not exist", pid);
00970     throw runtime_error(msg);
00971   } 
00972 
00973   /* find the PIDs of all the descendant processes */
00974   i = 1;
00975   while (i < nChildren) {
00976     /* find the children of the i-th child */ 
00977     for (j = 0; j < nProcesses; j++) {
00978       if (ppids[j] == children[i]) {
00979         children[nChildren++] = pids[j];
00980       }
00981     }
00982     i++;
00983   }
00984 
00985   sprintf(msg, "Sub-processes for process %ld: ", pid);
00986   for (i = 0; i < nChildren; i++) {
00987     sprintf(sval, "%ld ", children[i]);
00988     if (strlen(msg) + strlen(sval) < MAX_STRING_LEN - 1)
00989       strcat(msg, sval);
00990   }
00991   logger(DEBUG, msg);
00992 
00993   free(pids); free(ppids);
00994   children = (long *)realloc(children, (nChildren) * sizeof(long));
00995   return children;
00996 #endif
00997 }
00998 
00999 void apmon_mon_utils::readJobInfo(long pid, PsInfo& info) throw(runtime_error) {
01000 #ifndef WIN32
01001   long *children;
01002   FILE *fp;
01003   int i, nChildren, status, ch, ret, open_fd;
01004   char *cmd , *mem_cmd_s, *argv[4], *ret_s;
01005   char pid_s[10], msg[100];
01006   char cmdName[MAX_STRING_LEN1], buf[MAX_STRING_LEN1], buf2[MAX_STRING_LEN1];
01007   char etime_s[20], cputime_s[20];
01008   double rsz, vsz;
01009   double etime, cputime;
01010   double pcpu, pmem;
01011   /* this list contains strings of the form "rsz_vsz_command" for every pid;
01012      it is used to avoid adding several times processes that have multiple 
01013      threads and appear in ps as sepparate processes, occupying exactly the 
01014      same amount of memory and having the same command name. For every line 
01015      from the output of the ps command we verify if the rsz_vsz_command 
01016      combination is already in the list.
01017   */
01018   char **mem_cmd_list;
01019   int listSize;
01020   long cpid, crt_pid;
01021   //unsigned int maxCmdLen = 5 * MAX_STRING_LEN;
01022   long mypid = getpid();
01023   char ps_f[50];
01024 
01025   /* get the list of the process' descendants */
01026   children = getChildren(pid, nChildren);
01027 
01028   /* generate a name for the temporary file which holds the output of the 
01029      ps command */
01030   sprintf(ps_f, "/tmp/apmon_ps%ld", mypid);
01031 
01032   unsigned int cmdLen = (150 + 6 * nChildren) * sizeof(char);
01033   cmd = (char *)malloc (cmdLen);
01034 
01035   /* issue the "ps" command to obtain information on all the descendants */
01036   strcpy(cmd, "ps --no-headers --pid ");
01037   for (i = 0; i < nChildren - 1; i++) {
01038     sprintf(pid_s, "%ld,", children[i]);
01039     if (strlen(cmd) + strlen(pid_s) + 1 >= cmdLen) {
01040       free(cmd);
01041       sprintf(msg, "[ readJobInfo() ] Job %ld has too many sub-processes to be monitored",
01042               pid);
01043       throw runtime_error(msg);
01044     }
01045     strcat(cmd, pid_s);
01046     //strcat(cmd, " 2>&1");
01047   }
01048 
01049   /* the last part of the command */
01050   sprintf(pid_s, "%ld", children[nChildren - 1]);
01051   sprintf(cmdName, " -o pid,etime,time,%%cpu,%%mem,rsz,vsz,comm > %s", ps_f);
01052   if (strlen(cmd) + strlen(pid_s) + strlen(cmdName) >= cmdLen) {
01053     free(cmd);
01054     sprintf(msg, "[ readJobInfo() ] Job %ld has too many sub-processes to be monitored",
01055               pid);
01056     throw runtime_error(msg);
01057   }
01058   strcat(cmd, pid_s);
01059   strcat(cmd, cmdName);
01060   //strcat(cmd, " 2>&1");
01061 
01062   switch (cpid = fork()) {
01063   case -1:
01064     free(cmd);
01065     sprintf(msg, "[ readJobInfo() ] Unable to fork(). The job information could not be determined for %ld", pid);
01066     throw runtime_error(msg);
01067   case 0:
01068     argv[0] = (char *)"/bin/sh"; argv[1] = (char *)"-c";
01069     argv[2] = cmd; argv[3] = 0;
01070     execv("/bin/sh", argv);
01071     exit(RET_ERROR);
01072   default:
01073     if (waitpid(cpid, &status, 0) == -1) {
01074       free(cmd);
01075       sprintf(msg, "[ readJobInfo() ] The job information for %ld could not be determined", pid);
01076       throw runtime_error(msg); 
01077     }
01078   }
01079 
01080   free(cmd);
01081   fp = fopen(ps_f, "rt");
01082   if (fp == NULL) {
01083     sprintf(msg, "[ readJobInfo() ] Error opening the ps output file for process %ld", pid);
01084     throw runtime_error(msg);
01085   }
01086 
01087   /* parse the output file */
01088   info.etime = info.cputime = 0;
01089   info.pcpu = info.pmem = 0;
01090   info.rsz = info.vsz = 0;
01091   info.open_fd = 0;
01092   mem_cmd_list = (char **)malloc(nChildren * sizeof(char *));
01093   listSize = 0;
01094   cmdName[0] = 0;
01095   while (1) {
01096     ret_s = fgets(buf, MAX_STRING_LEN, fp);
01097     if (ret_s == NULL) 
01098       break;
01099     buf[MAX_STRING_LEN - 1] = 0;
01100 
01101     /* if the line was too long and fgets hasn't read it entirely, */
01102     /* keep only the first 512 chars from the line */
01103     ch = fgetc(fp); // see if we are at the end of the file
01104     ungetc(ch, fp);
01105     if (buf[strlen(buf) - 1] != 10 && ch != EOF) { 
01106       while (1) {
01107         char *sret = fgets(buf2, MAX_STRING_LEN, fp);
01108         if (sret == NULL || buf[strlen(buf) - 1] == 10)
01109           break;
01110       }
01111     }
01112 
01113     ret = sscanf(buf, "%ld %s %s %lf %lf %lf %lf %s", &crt_pid, etime_s, 
01114                  cputime_s, &pcpu, &pmem, &rsz, &vsz, cmdName);
01115     if (ret != 8) {
01116       fclose(fp);
01117       unlink(ps_f);
01118       free(children);
01119       for (i = 0; i < listSize; i++) {
01120         free(mem_cmd_list[i]);
01121       }
01122       free(mem_cmd_list);
01123       throw runtime_error("[ readJobInfo() ] Error parsing the output of the ps command");
01124     }
01125 
01126     /* etime is the maximum of the elapsed times for the subprocesses */
01127     etime = parsePSTime(etime_s);
01128     info.etime = (info.etime > etime) ? info.etime : etime;
01129 
01130     /* cputime is the sum of the cpu times for the subprocesses */
01131     cputime = parsePSTime(cputime_s);
01132     info.cputime += cputime;
01133     info.pcpu += pcpu;
01134 
01135     /* get the number of opened file descriptors */
01136     try {
01137       open_fd = ProcUtils::countOpenFiles(crt_pid);
01138     } catch (procutils_error& err) {
01139       logger(WARNING, err.what());
01140       /* don't throw an exception if we couldn't read the number of files */
01141       open_fd = PROCUTILS_ERROR;
01142     }
01143 
01144     /* see if this is a process or just a thread */
01145     mem_cmd_s = (char *)malloc(MAX_STRING_LEN * sizeof(char));
01146     sprintf(mem_cmd_s, "%f_%f_%s", rsz, vsz, cmdName);
01147     //printf("### mem_cmd_s: %s\n", mem_cmd_s);
01148     if (getVectIndex(mem_cmd_s, mem_cmd_list, listSize) == -1) {
01149       /* aonther pid with the same command name, rsz and vsz was not found,
01150          so this is a new process and we can add the amount of memory used by 
01151          it */
01152       info.pmem += pmem;
01153       info.vsz += vsz; info.rsz += rsz;
01154 
01155       if (info.open_fd >= 0) // if no error occured so far
01156         info.open_fd += open_fd;
01157       /* add an entry in the list so that next time we see another thread of
01158          this process we don't add the amount of  memory again */
01159       mem_cmd_list[listSize++] = mem_cmd_s;     
01160     } else {
01161       free(mem_cmd_s);
01162     }
01163 
01164     /* if we monitor the current process, we have two extra opened files
01165        that we shouldn't take into account (the output file for ps and
01166        /proc/<pid>/fd/)
01167     */
01168     if (crt_pid == getpid())
01169       info.open_fd -= 2;
01170   } 
01171 
01172   fclose(fp);
01173   unlink(ps_f);
01174   free(children);
01175   for (i = 0; i < listSize; i++) {
01176     free(mem_cmd_list[i]);
01177   }
01178   free(mem_cmd_list);
01179 #endif
01180 }
01181 
01182 long apmon_mon_utils::parsePSTime(char *s) {
01183   long days, hours, mins, secs;
01184 
01185   if (strchr(s, '-') != NULL) {
01186     sscanf(s, "%ld-%ld:%ld:%ld", &days, &hours, &mins, &secs);
01187     return 24 * 3600 * days + 3600 * hours + 60 * mins + secs;
01188   } else {
01189     if (strchr(s, ':') != NULL && strchr(s, ':') !=  strrchr(s, ':')) {
01190        sscanf(s, "%ld:%ld:%ld", &hours, &mins, &secs);
01191        return 3600 * hours + 60 * mins + secs;
01192     } else {
01193       if (strchr(s, ':') != NULL) {
01194         sscanf(s, "%ld:%ld", &mins, &secs);
01195         return 60 * mins + secs;
01196       } else {
01197         return RET_ERROR;
01198       }
01199     }
01200   }
01201 }
01202 
01203 void apmon_mon_utils::readJobDiskUsage(MonitoredJob job, 
01204                                 JobDirInfo& info) throw(runtime_error) {
01205 #ifndef WIN32
01206   int status;
01207   pid_t cpid;
01208   char *cmd, s_tmp[20], *argv[4], msg[100];
01209   FILE *fp;
01210   long mypid = getpid();
01211   char du_f[50], df_f[50]; 
01212 
01213   /* generate names for the temporary files which will hold the output of the
01214      du and df commands */
01215   sprintf(du_f, "/tmp/apmon_du%ld", mypid);
01216   sprintf(df_f, "/tmp/apmon_df%ld", mypid);
01217   
01218   if (strlen(job.workdir) == 0) {
01219     sprintf(msg, "[ readJobDiskUsage() ] The working directory for the job %ld was not specified, not monitoring disk usage", job.pid);
01220     throw runtime_error(msg);
01221   }
01222   
01223   cmd = (char *)malloc((300 + 2 * strlen(job.workdir)) * sizeof(char));
01224   strcpy(cmd, "PRT=`du -Lsk ");
01225   strcat(cmd, job.workdir);
01226   //strcat(cmd, " | tail -1 | cut -f 1 > ");
01227   strcat(cmd, " ` ; if [[ $? -eq 0 ]] ; then OUT=`echo $PRT | cut -f 1` ; echo $OUT ; exit 0 ; else exit -1 ; fi > "); 
01228   strcat(cmd, du_f);
01229 
01230 
01231   switch (cpid = fork()) {
01232   case -1:
01233     sprintf(msg, "[ readJobDiskUsage() ] Unable to fork(). The disk usage information could not be determined for %ld", job.pid);
01234     throw runtime_error(msg);
01235   case 0:
01236     argv[0] = (char *)"/bin/sh"; argv[1] = (char *)"-c";
01237     argv[2] = cmd; argv[3] = 0;
01238     execv("/bin/sh", argv);
01239     exit(RET_ERROR);
01240   default:
01241     if (waitpid(cpid, &status, 0) == -1) {
01242       free(cmd);
01243       sprintf(msg, "[ readJobDiskUsage() ] The disk usage (du) information for %ld could not be determined", job.pid);
01244       unlink(du_f); unlink(df_f);
01245       throw runtime_error(msg); 
01246     }
01247   }
01248 
01249   strcpy(cmd, "PRT=`df -m ");
01250   strcat(cmd, job.workdir);
01251   //strcat(cmd, " | tail -1 > ");
01252   strcat(cmd, " `; if [[ $? -eq 0 ]] ; then OUT=`echo $PRT | cut -d ' ' -f 8-` ; echo $OUT ; exit 0 ; else exit -1 ; fi > ");
01253 
01254   strcat(cmd, df_f);
01255   //printf("### cmd: %s\n", cmd);
01256 
01257   switch (cpid = fork()) {
01258   case -1:
01259     sprintf(msg, "[ readJobDiskUsage() ] Unable to fork(). The disk usage information could not be determined for %ld", job.pid);
01260     throw runtime_error(msg);
01261   case 0:
01262     argv[0] = (char *)"/bin/sh"; argv[1] = (char *)"-c";
01263     argv[2] = cmd; argv[3] = 0;
01264     execv("/bin/sh", argv);
01265     exit(RET_ERROR);
01266   default:
01267     if (waitpid(cpid, &status, 0) == -1) {
01268       free(cmd);
01269       sprintf(msg, "[ readJobDiskUsage() ] The disk usage (df) information for %ld could not be determined", job.pid);
01270       unlink(du_f); unlink(df_f);
01271       throw runtime_error(msg); 
01272     }
01273   }
01274 
01275   free(cmd);
01276   fp = fopen(du_f, "rt");
01277   if (fp == NULL) {
01278     sprintf(msg, "[ readJobDiskUsage() ] Error opening du output file for process %ld", job.pid);
01279     throw runtime_error(msg);
01280   }
01281 
01282   fscanf(fp, "%lf", &(info.workdir_size));
01283   /* keep the directory size in MB */
01284   info.workdir_size /= 1024.0;
01285   fclose(fp);
01286   unlink(du_f);
01287  
01288   fp = fopen(df_f, "rt");
01289   if (fp == NULL) {
01290     sprintf(msg, "[ readJobDiskUsage() ] Error opening df output file for process %ld", job.pid);
01291     throw runtime_error(msg);
01292   }
01293   fscanf(fp, "%s %lf %lf %lf %lf", s_tmp, &(info.disk_total), 
01294          &(info.disk_used), &(info.disk_free), &(info.disk_usage));
01295   fclose(fp);
01296   unlink(df_f);
01297 #endif
01298 }

Generated at Wed Mar 17 18:06:37 2010 for Gaudi Framework, version v21r8 by Doxygen version 1.5.6 written by Dimitri van Heesch, © 1997-2004