PCMClient.cpp

Go to the documentation of this file.
00001 
00002 //
00003 // This source file is a part of ParCompMark
00004 // Parallel Compositing Benchmark Framework
00005 //
00006 // for latest info see http://parcompmark.sourceforge.net
00007 
00008 //
00009 // Copyright (C) 2006 IT2 ParCompMark Dev. Team
00010 // 
00011 // This program is free software; you can redistribute it and/or
00012 // modify it under the terms of the GNU General Public License
00013 // as published by the Free Software Foundation; either version 2
00014 // of the License, or (at your option) any later version.
00015 // 
00016 // This program is distributed in the hope that it will be useful,
00017 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00018 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
00019 // GNU General Public License for more details.
00020 // 
00021 // You should have received a copy of the GNU General Public License
00022 // along with this program; if not, write to the Free Software
00023 // Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
00024 
00025 //
00026 // Inner includes
00027 //
00028 
00029 #include "../include/PCMClient.h"
00030 
00031 #include "../include/PCMApplication.h"
00032 
00033 namespace ParCompMark
00034 {
00035 
00036   //
00037   // Constructors & destructor
00038   //
00039 
00040   Client::Client(const std::string & name):
00041         // Parent initializer 
00042   Network(name          /* Name of the class */
00043         )
00044         // You have to initialize the following attributes:
00045         // - mServerIP
00046         // - mType
00047         // - mParent
00048         // - mMessage
00049   {
00050         mParent = 0;
00051   }
00052 
00053  /*----------------------------------------------------------------------*/
00054 
00055   Client::~Client()
00056   {
00057   }
00058 
00059  /*----------------------------------------------------------------------*/
00060 
00061   //
00062   // Methods
00063   //
00064 
00065   void Client::openConnection()
00066   {
00067         Assert(mStreamSocket == -1, INVALID_OPERATION_ERROR, "Client::openConnection()");
00068         Assert(mServerIP != "", INVALID_OPERATION_ERROR, "Client::openConnection()");
00069 
00070         struct sockaddr_in addr;
00071 
00072         if(mStreamSocket != -1)
00073         {
00074          return;
00075         }
00076 
00077         if((mStreamSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0)
00078         {
00079          Except(INTERNAL_ERROR, "Client::openConnection()", "Unable to create socket");
00080         }
00081 
00082         addr.sin_family = AF_INET;
00083         addr.sin_port = htons(mCommunicationPort);
00084         addr.sin_addr.s_addr = inet_addr(mServerIP.c_str());
00085         memset(&(addr.sin_zero), '\0', 8);
00086 
00087         if((connect(mStreamSocket, (struct sockaddr *) &addr, sizeof(struct sockaddr))) < 0)
00088         {
00089          Except(INTERNAL_ERROR, "Client::openConnection()", "Unable to connect socket");
00090         }
00091 
00092         Logger::getInstance()->log(Logger::NOTICE, "TCP/IP connaction is opened.");
00093   }
00094 
00095  /*----------------------------------------------------------------------*/
00096 
00097   void Client::closeConnection()
00098   {
00099         Assert(mStreamSocket != -1, INVALID_OPERATION_ERROR, "Client::closeConnection()");
00100 
00101         close(mStreamSocket);
00102 
00103         mStreamSocket = -1;
00104   }
00105 
00106  /*----------------------------------------------------------------------*/
00107 
00108   void Client::sendMessage(const std::string & type, const std::string & message)
00109   {
00110         Assert(mStreamSocket != -1, INVALID_OPERATION_ERROR, "Client::sendMessage()");
00111 
00112         u32 typeLength,
00113         messageLength;
00114 
00115         std::string m = message;
00116 
00117         typeLength = (u32) type.length();
00118         messageLength = (u32) m.length();
00119 
00120         if(messageLength == 0)
00121         {
00122          m = "NONE";
00123          messageLength = m.length();
00124         }
00125 
00126         u8 buff[4];
00127 
00128         buff[0] = (u8) ((typeLength & 0xFF000000) >> 24);
00129         buff[1] = (u8) ((typeLength & 0x00FF0000) >> 16);
00130         buff[2] = (u8) ((typeLength & 0x0000FF00) >> 8);
00131         buff[3] = (u8) ((typeLength & 0x000000FF) >> 0);
00132 
00133         if(send(mStreamSocket, buff, 4, 0) != 4)
00134         {
00135          Except(INTERNAL_ERROR, "Client::sendMessage()", "Unable to send message");
00136         }
00137 
00138         if(send(mStreamSocket, type.c_str(), typeLength, 0) != typeLength)
00139         {
00140          Except(INTERNAL_ERROR, "Client::sendMessage()", "Unable to send message");
00141         }
00142 
00143         buff[0] = (u8) ((messageLength & 0xFF000000) >> 24);
00144         buff[1] = (u8) ((messageLength & 0x00FF0000) >> 16);
00145         buff[2] = (u8) ((messageLength & 0x0000FF00) >> 8);
00146         buff[3] = (u8) ((messageLength & 0x000000FF) >> 0);
00147 
00148         if(send(mStreamSocket, buff, 4, 0) != 4)
00149         {
00150          Except(INTERNAL_ERROR, "Client::sendMessage()", "Unable to send message");
00151         }
00152 
00153         if(send(mStreamSocket, m.c_str(), messageLength, 0) != messageLength)
00154         {
00155          Except(INTERNAL_ERROR, "Client::sendMessage()", "Unable to send message");
00156         }
00157 
00158         Logger::getInstance()->log(Logger::NOTICE, "TCP message sent: " + type + "|" + message);
00159 
00160   }
00161 
00162  /*----------------------------------------------------------------------*/
00163 
00164   void Client::recieveMessage()
00165   {
00166         Assert(mStreamSocket != -1, INVALID_OPERATION_ERROR, "Client::recieveMessage()");
00167 
00168         std::string type, message, temp, str;
00169 
00170         bool end = false;
00171 
00172         int recvLength,
00173         index;
00174 
00175         u32 length = 0;
00176 
00177         recvLength = -1;
00178 
00179         u32 sumRecieved;
00180 
00181         char *buff;
00182         u8 *lengthBuff = new u8[4];
00183 
00184         //type length and type
00185 
00186         sumRecieved = 0;
00187         while(sumRecieved != 4)
00188         {
00189 
00190          if((recvLength = recv(mStreamSocket, lengthBuff + (sumRecieved), 4 - sumRecieved, 0)) < 0)
00191          {
00192                 Except(INTERNAL_ERROR, "Client::recieveMessage()", "Unable to recieve message");
00193          }
00194          sumRecieved += recvLength;
00195         }
00196 
00197         length = length | (lengthBuff[0] << 24);
00198         length = length | (lengthBuff[1] << 16);
00199         length = length | (lengthBuff[2] << 8);
00200         length = length | (lengthBuff[3] << 0);
00201 
00202         buff = new char[length + 1];
00203 
00204         sumRecieved = 0;
00205         while(sumRecieved != length)
00206         {
00207 
00208          if((recvLength = recv(mStreamSocket, buff + (sumRecieved), length - sumRecieved, 0)) < 0)
00209          {
00210                 Except(INTERNAL_ERROR, "Client::recieveMessage()", "Unable to recieve message");
00211          }
00212          sumRecieved += recvLength;
00213         }
00214 
00215         if(length != sumRecieved)
00216         {
00217          Except(INTERNAL_ERROR, "Client::recieveMessage()",
00218                  "Bad message length. Sended: " + StringConverter::toString(length) + ", Recieved: " +
00219                  StringConverter::toString(sumRecieved));
00220         }
00221 
00222         buff[length] = '\0';
00223 
00224         type = buff;
00225 
00226         //message length and message
00227 
00228         sumRecieved = 0;
00229         while(sumRecieved != 4)
00230         {
00231 
00232          if((recvLength = recv(mStreamSocket, lengthBuff + (sumRecieved), 4 - sumRecieved, 0)) < 0)
00233          {
00234                 Except(INTERNAL_ERROR, "Client::recieveMessage()", "Unable to recieve message");
00235          }
00236          sumRecieved += recvLength;
00237         }
00238 
00239         length = 0;
00240 
00241         length = length | (lengthBuff[0] << 24);
00242         length = length | (lengthBuff[1] << 16);
00243         length = length | (lengthBuff[2] << 8);
00244         length = length | (lengthBuff[3] << 0);
00245 
00246         delete buff;
00247         buff = new char[length + 1];
00248 
00249         sumRecieved = 0;
00250         while(sumRecieved != length)
00251         {
00252 
00253          if((recvLength = recv(mStreamSocket, buff + (sumRecieved), length - sumRecieved, 0)) < 0)
00254          {
00255                 Except(INTERNAL_ERROR, "Client::recieveMessage()", "Unable to recieve message");
00256          }
00257          sumRecieved += recvLength;
00258         }
00259 
00260         if(length != sumRecieved)
00261         {
00262          Except(INTERNAL_ERROR, "Client::recieveMessage()",
00263                  "Bad message length. Sended: " + StringConverter::toString(length) + ", Recieved: " +
00264                  StringConverter::toString(sumRecieved));
00265         }
00266 
00267         buff[length] = '\0';
00268 
00269         message = buff;
00270 
00271         delete buff;
00272 
00273         mMessage = message;
00274         mType = type;
00275 
00276         Logger::getInstance()->log(Logger::DEBUG, "message recieved: " + mType + "|" + mMessage);
00277 
00278   }
00279 
00280  /*----------------------------------------------------------------------*/
00281 
00282   void Client::handleMessage()
00283   {
00284 
00285         //printf("\n%s - %s\n", mType.c_str(), mMessage.c_str());
00286 
00287         Logger::getInstance()->log(Logger::DEBUG, "message handle start: " + mType + "|" + mMessage);
00288 
00289         if((Host::getInstance()->getProcessCount() == 0)
00290          && (mType != "LOGIN" && mType != "LOGINOK" && mType != "INIT" && mType != "INITOK" && mType != "QUIT"
00291            && mType != "QUITOK" && mType != "LOWLEVELSCRIPT" && mType != "LOWLEVELSCRIPTOK" && mType != "TIME"
00292            && mType != "TIMEOK"))
00293         {
00294 
00295          //fprintf(stderr, "NOACTIVE\n");
00296 
00297          openConnection();
00298          sendMessage("NOACTIVE", "NONE");
00299          closeConnection();
00300 
00301         } else
00302         {
00303          //LOGIN (source: server(broadcast); answer: LOGINOK; handle: NetClient)
00304          if(mType == "LOGIN")
00305          {
00306                 openConnection();
00307                 Network::getIP();
00308                 sendMessage("LOGINOK", Network::getHostName());
00309                 recieveMessage();
00310                 handleMessage();
00311                 closeConnection();
00312                 //LOGINOK (source: client(TCP/IP); answer: ; handle: HandleClient)
00313          } else if(mType == "LOGINOK")
00314          {
00315                 struct sockaddr_in addr;
00316                 unsigned int addrLen = sizeof(struct sockaddr);
00317 
00318                 if(getpeername(mStreamSocket, (struct sockaddr *) &addr, &addrLen) < 0)
00319                 {
00320                  Except(INTERNAL_ERROR, "Client::handleMessage()", "Unable to get peer name.");
00321                 }
00322 
00323                 std::string ip = inet_ntoa(addr.sin_addr);
00324 
00325                 unsigned int id = 100000;
00326 
00327                 Cluster::getInstance()->getHosts().lock();
00328                 if(!Cluster::getInstance()->getHosts()->has(mMessage))
00329                 {
00330                  HostInfo::Pointer h(new HostInfo(ip, OutputNode::INFORMATION));
00331 
00332                  Cluster::getInstance()->getHosts()->add(mMessage, h);
00333 
00334                  id = Cluster::getInstance()->getHosts()->getSize();
00335                 }
00336                 Cluster::getInstance()->getHosts().unlock();
00337 
00338                 sendMessage("ID", StringConverter::toString(id));
00339 
00340          } else if(mType == "ID")
00341          {
00342 
00343                 unsigned int id = StringConverter::toU32(mMessage);
00344 
00345                 if(id < 100000)
00346                  Host::getInstance()->setID(id);
00347 
00348          }
00349          //QUIT (source: server(broadcast); answer: QUITOK; handle: NetClient)
00350          else if(mType == "QUIT")
00351          {
00352                 openConnection();
00353                 sendMessage("QUITOK", "NONE");
00354                 closeConnection();
00355                 stopThread();
00356          }
00357          //QUITOK (source: client(TCP/IP); answer: ; handle: HandleClient)
00358          else if(mType == "QUITOK")
00359          {
00360 
00361          }
00362          //INIT (source: server(broadcast); answer: INITOK; handle: NetClient)
00363          else if(mType == "INIT")
00364          {
00365 
00371                 openConnection();
00372                 sendMessage("INITOK", Application::getInstance()->getHostInfo()->serialize2XML());
00373                 recieveMessage();       //LOWLEVELSCRIPT
00374                 handleMessage();
00375                 closeConnection();
00376          }
00377          //INITOK (source: client(TCP/IP); answer: ; handle: HandleClient)
00378          else if(mType == "INITOK")
00379          {
00380                 //colect cluster information
00381                 OutputNode::Pointer hostinfo(Application::getInstance()->getCollectClusterDescription());
00382                 hostinfo.lock();
00383                 hostinfo->createChildNode(mMessage.c_str());
00384                 hostinfo.unlock();
00385 
00386                 sendMessage("LOWLEVELSCRIPT", Application::getInstance()->getLowLevelScript());
00387                 recieveMessage();       //LOWLEVELSCRIPTOK
00388                 handleMessage();
00389          }
00390          //STOP (source: handleclient(TCP/IP); answer: LOWLEVELSCRIPTOK; handle: NetClient)
00391          else if(mType == "LOWLEVELSCRIPT")
00392          {
00393 
00394                 Network::getIP();
00395 
00396                 Host::getInstance()->setName(Network::getHostName());
00397                 Host::getInstance()->setLowLevelScript(mMessage);
00398                 Host::getInstance()->initialize();
00399                 Host::getInstance()->start();
00400 
00401                 sendMessage("LOWLEVELSCRIPTOK", "NONE");
00402          }
00403          //LOWLEVELSCRIPTOK (source: client(TCP/IP); answer: ; handle: HandleClient)
00404          else if(mType == "LOWLEVELSCRIPTOK")
00405          {
00406                 ;
00407          }
00408          //STOP (source: server(broadcast); answer: STOPOK; handle: NetClient)
00409          else if(mType == "STOP")
00410          {
00411                 u32 frameID = Host::getInstance()->stop();
00412 
00413                 openConnection();
00414                 sendMessage("STOPOK", StringConverter::toString(frameID));
00415                 closeConnection();
00416 
00417          }
00418          //STOPOK (source: client(TCP/IP); answer: ; handle: HandleClient)
00419          else if(mType == "STOPOK")
00420          {
00421                 u32 frameID = StringConverter::toU32(mMessage);
00422 
00423                 NetServer::IntPointer stopID = ((NetServer *) mParent)->getStopFrameID();
00424 
00425                 stopID.lock();
00426                 if(*stopID.getPtr() < frameID)
00427                 {
00428                  *stopID.getPtr() = frameID;
00429                 }
00430                 stopID.unlock();
00431 
00432          }
00433          //FRAMEID (source: server(broadcast); answer: FRAMEIDOK; handle: NetClient)
00434          else if(mType == "FRAMEID")
00435          {
00436                 u32 stopID = StringConverter::toU32(mMessage);
00437 
00438                 Host::getInstance()->setFrameID(stopID);
00439 
00440                 Host::getInstance()->collectData();
00441                 std::string statistic = Host::getInstance()->getOutputDocument()->serialize2XML();
00442                 openConnection();
00443                 sendMessage("FRAMEIDOK", statistic);
00444                 closeConnection();
00445          }
00446          //FRAMEIDPOK (source: client(TCP/IP); answer: ; handle: HandleClient)
00447          else if(mType == "FRAMEIDOK")
00448          {
00449                 // Append statistics to the gathered data
00450                 OutputNode::Pointer outputNode(Application::getInstance()->getCurrentExecutionOutputDocument());
00451                 outputNode.lock();
00452                 outputNode->createChildNode(mMessage.c_str());
00453                 outputNode.unlock();
00454 
00455          }
00456          //COLLECTDATA (source: server(broadcast); answer: COLLECTDATAOK; handle: NetClient)
00457          else if(mType == "COLLECTDATA")
00458          {
00459 
00460                 Host::getInstance()->collectData();
00461                 openConnection();
00462                 sendMessage("COLLECTDATAOK", Host::getInstance()->getOutputDocument()->serialize2XML());
00463                 closeConnection();
00464 
00465          } else if(mType == "COLLECTDATAOK")
00466          {
00467                 // Append statistics to the gathered data
00468                 OutputNode::Pointer outputNode(Application::getInstance()->getCurrentExecutionOutputDocument());
00469                 outputNode.lock();
00470                 outputNode->createChildNode(mMessage.c_str());
00471                 outputNode.unlock();
00472          } else if(mType == "TIME")
00473          {
00474 
00475           Timer:sleep(Host::getInstance()->getID());
00476                 openConnection();
00477                 Real t1 = Timer::getUncorrectedSystemTime();
00478 
00479                 sendMessage("TIMEOK", "NONE");
00480                 recieveMessage();
00481 
00482                 Host::getInstance()->setMessageSendingTime((Timer::getUncorrectedSystemTime() - t1) / 2);
00483 
00484                 closeConnection();
00485          } else if(mType == "TIMEOK")
00486          {
00487                 sendMessage("TT", "NONE");
00488          }
00489         }
00490   }
00491 
00492  /*----------------------------------------------------------------------*/
00493 
00494 }