00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029 #include "../include/PCMClient.h"
00030
00031 #include "../include/PCMApplication.h"
00032
00033 namespace ParCompMark
00034 {
00035
00036
00037
00038
00039
00040 Client::Client(const std::string & name):
00041
00042 Network(name
00043 )
00044
00045
00046
00047
00048
00049 {
00050 mParent = 0;
00051 }
00052
00053
00054
00055 Client::~Client()
00056 {
00057 }
00058
00059
00060
00061
00062
00063
00064
00065 void Client::openConnection()
00066 {
00067 Assert(mStreamSocket == -1, INVALID_OPERATION_ERROR, "Client::openConnection()");
00068 Assert(mServerIP != "", INVALID_OPERATION_ERROR, "Client::openConnection()");
00069
00070 struct sockaddr_in addr;
00071
00072 if(mStreamSocket != -1)
00073 {
00074 return;
00075 }
00076
00077 if((mStreamSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0)
00078 {
00079 Except(INTERNAL_ERROR, "Client::openConnection()", "Unable to create socket");
00080 }
00081
00082 addr.sin_family = AF_INET;
00083 addr.sin_port = htons(mCommunicationPort);
00084 addr.sin_addr.s_addr = inet_addr(mServerIP.c_str());
00085 memset(&(addr.sin_zero), '\0', 8);
00086
00087 if((connect(mStreamSocket, (struct sockaddr *) &addr, sizeof(struct sockaddr))) < 0)
00088 {
00089 Except(INTERNAL_ERROR, "Client::openConnection()", "Unable to connect socket");
00090 }
00091
00092 Logger::getInstance()->log(Logger::NOTICE, "TCP/IP connaction is opened.");
00093 }
00094
00095
00096
00097 void Client::closeConnection()
00098 {
00099 Assert(mStreamSocket != -1, INVALID_OPERATION_ERROR, "Client::closeConnection()");
00100
00101 close(mStreamSocket);
00102
00103 mStreamSocket = -1;
00104 }
00105
00106
00107
00108 void Client::sendMessage(const std::string & type, const std::string & message)
00109 {
00110 Assert(mStreamSocket != -1, INVALID_OPERATION_ERROR, "Client::sendMessage()");
00111
00112 u32 typeLength,
00113 messageLength;
00114
00115 std::string m = message;
00116
00117 typeLength = (u32) type.length();
00118 messageLength = (u32) m.length();
00119
00120 if(messageLength == 0)
00121 {
00122 m = "NONE";
00123 messageLength = m.length();
00124 }
00125
00126 u8 buff[4];
00127
00128 buff[0] = (u8) ((typeLength & 0xFF000000) >> 24);
00129 buff[1] = (u8) ((typeLength & 0x00FF0000) >> 16);
00130 buff[2] = (u8) ((typeLength & 0x0000FF00) >> 8);
00131 buff[3] = (u8) ((typeLength & 0x000000FF) >> 0);
00132
00133 if(send(mStreamSocket, buff, 4, 0) != 4)
00134 {
00135 Except(INTERNAL_ERROR, "Client::sendMessage()", "Unable to send message");
00136 }
00137
00138 if(send(mStreamSocket, type.c_str(), typeLength, 0) != typeLength)
00139 {
00140 Except(INTERNAL_ERROR, "Client::sendMessage()", "Unable to send message");
00141 }
00142
00143 buff[0] = (u8) ((messageLength & 0xFF000000) >> 24);
00144 buff[1] = (u8) ((messageLength & 0x00FF0000) >> 16);
00145 buff[2] = (u8) ((messageLength & 0x0000FF00) >> 8);
00146 buff[3] = (u8) ((messageLength & 0x000000FF) >> 0);
00147
00148 if(send(mStreamSocket, buff, 4, 0) != 4)
00149 {
00150 Except(INTERNAL_ERROR, "Client::sendMessage()", "Unable to send message");
00151 }
00152
00153 if(send(mStreamSocket, m.c_str(), messageLength, 0) != messageLength)
00154 {
00155 Except(INTERNAL_ERROR, "Client::sendMessage()", "Unable to send message");
00156 }
00157
00158 Logger::getInstance()->log(Logger::NOTICE, "TCP message sent: " + type + "|" + message);
00159
00160 }
00161
00162
00163
00164 void Client::recieveMessage()
00165 {
00166 Assert(mStreamSocket != -1, INVALID_OPERATION_ERROR, "Client::recieveMessage()");
00167
00168 std::string type, message, temp, str;
00169
00170 bool end = false;
00171
00172 int recvLength,
00173 index;
00174
00175 u32 length = 0;
00176
00177 recvLength = -1;
00178
00179 u32 sumRecieved;
00180
00181 char *buff;
00182 u8 *lengthBuff = new u8[4];
00183
00184
00185
00186 sumRecieved = 0;
00187 while(sumRecieved != 4)
00188 {
00189
00190 if((recvLength = recv(mStreamSocket, lengthBuff + (sumRecieved), 4 - sumRecieved, 0)) < 0)
00191 {
00192 Except(INTERNAL_ERROR, "Client::recieveMessage()", "Unable to recieve message");
00193 }
00194 sumRecieved += recvLength;
00195 }
00196
00197 length = length | (lengthBuff[0] << 24);
00198 length = length | (lengthBuff[1] << 16);
00199 length = length | (lengthBuff[2] << 8);
00200 length = length | (lengthBuff[3] << 0);
00201
00202 buff = new char[length + 1];
00203
00204 sumRecieved = 0;
00205 while(sumRecieved != length)
00206 {
00207
00208 if((recvLength = recv(mStreamSocket, buff + (sumRecieved), length - sumRecieved, 0)) < 0)
00209 {
00210 Except(INTERNAL_ERROR, "Client::recieveMessage()", "Unable to recieve message");
00211 }
00212 sumRecieved += recvLength;
00213 }
00214
00215 if(length != sumRecieved)
00216 {
00217 Except(INTERNAL_ERROR, "Client::recieveMessage()",
00218 "Bad message length. Sended: " + StringConverter::toString(length) + ", Recieved: " +
00219 StringConverter::toString(sumRecieved));
00220 }
00221
00222 buff[length] = '\0';
00223
00224 type = buff;
00225
00226
00227
00228 sumRecieved = 0;
00229 while(sumRecieved != 4)
00230 {
00231
00232 if((recvLength = recv(mStreamSocket, lengthBuff + (sumRecieved), 4 - sumRecieved, 0)) < 0)
00233 {
00234 Except(INTERNAL_ERROR, "Client::recieveMessage()", "Unable to recieve message");
00235 }
00236 sumRecieved += recvLength;
00237 }
00238
00239 length = 0;
00240
00241 length = length | (lengthBuff[0] << 24);
00242 length = length | (lengthBuff[1] << 16);
00243 length = length | (lengthBuff[2] << 8);
00244 length = length | (lengthBuff[3] << 0);
00245
00246 delete buff;
00247 buff = new char[length + 1];
00248
00249 sumRecieved = 0;
00250 while(sumRecieved != length)
00251 {
00252
00253 if((recvLength = recv(mStreamSocket, buff + (sumRecieved), length - sumRecieved, 0)) < 0)
00254 {
00255 Except(INTERNAL_ERROR, "Client::recieveMessage()", "Unable to recieve message");
00256 }
00257 sumRecieved += recvLength;
00258 }
00259
00260 if(length != sumRecieved)
00261 {
00262 Except(INTERNAL_ERROR, "Client::recieveMessage()",
00263 "Bad message length. Sended: " + StringConverter::toString(length) + ", Recieved: " +
00264 StringConverter::toString(sumRecieved));
00265 }
00266
00267 buff[length] = '\0';
00268
00269 message = buff;
00270
00271 delete buff;
00272
00273 mMessage = message;
00274 mType = type;
00275
00276 Logger::getInstance()->log(Logger::DEBUG, "message recieved: " + mType + "|" + mMessage);
00277
00278 }
00279
00280
00281
00282 void Client::handleMessage()
00283 {
00284
00285
00286
00287 Logger::getInstance()->log(Logger::DEBUG, "message handle start: " + mType + "|" + mMessage);
00288
00289 if((Host::getInstance()->getProcessCount() == 0)
00290 && (mType != "LOGIN" && mType != "LOGINOK" && mType != "INIT" && mType != "INITOK" && mType != "QUIT"
00291 && mType != "QUITOK" && mType != "LOWLEVELSCRIPT" && mType != "LOWLEVELSCRIPTOK" && mType != "TIME"
00292 && mType != "TIMEOK"))
00293 {
00294
00295
00296
00297 openConnection();
00298 sendMessage("NOACTIVE", "NONE");
00299 closeConnection();
00300
00301 } else
00302 {
00303
00304 if(mType == "LOGIN")
00305 {
00306 openConnection();
00307 Network::getIP();
00308 sendMessage("LOGINOK", Network::getHostName());
00309 recieveMessage();
00310 handleMessage();
00311 closeConnection();
00312
00313 } else if(mType == "LOGINOK")
00314 {
00315 struct sockaddr_in addr;
00316 unsigned int addrLen = sizeof(struct sockaddr);
00317
00318 if(getpeername(mStreamSocket, (struct sockaddr *) &addr, &addrLen) < 0)
00319 {
00320 Except(INTERNAL_ERROR, "Client::handleMessage()", "Unable to get peer name.");
00321 }
00322
00323 std::string ip = inet_ntoa(addr.sin_addr);
00324
00325 unsigned int id = 100000;
00326
00327 Cluster::getInstance()->getHosts().lock();
00328 if(!Cluster::getInstance()->getHosts()->has(mMessage))
00329 {
00330 HostInfo::Pointer h(new HostInfo(ip, OutputNode::INFORMATION));
00331
00332 Cluster::getInstance()->getHosts()->add(mMessage, h);
00333
00334 id = Cluster::getInstance()->getHosts()->getSize();
00335 }
00336 Cluster::getInstance()->getHosts().unlock();
00337
00338 sendMessage("ID", StringConverter::toString(id));
00339
00340 } else if(mType == "ID")
00341 {
00342
00343 unsigned int id = StringConverter::toU32(mMessage);
00344
00345 if(id < 100000)
00346 Host::getInstance()->setID(id);
00347
00348 }
00349
00350 else if(mType == "QUIT")
00351 {
00352 openConnection();
00353 sendMessage("QUITOK", "NONE");
00354 closeConnection();
00355 stopThread();
00356 }
00357
00358 else if(mType == "QUITOK")
00359 {
00360
00361 }
00362
00363 else if(mType == "INIT")
00364 {
00365
00371 openConnection();
00372 sendMessage("INITOK", Application::getInstance()->getHostInfo()->serialize2XML());
00373 recieveMessage();
00374 handleMessage();
00375 closeConnection();
00376 }
00377
00378 else if(mType == "INITOK")
00379 {
00380
00381 OutputNode::Pointer hostinfo(Application::getInstance()->getCollectClusterDescription());
00382 hostinfo.lock();
00383 hostinfo->createChildNode(mMessage.c_str());
00384 hostinfo.unlock();
00385
00386 sendMessage("LOWLEVELSCRIPT", Application::getInstance()->getLowLevelScript());
00387 recieveMessage();
00388 handleMessage();
00389 }
00390
00391 else if(mType == "LOWLEVELSCRIPT")
00392 {
00393
00394 Network::getIP();
00395
00396 Host::getInstance()->setName(Network::getHostName());
00397 Host::getInstance()->setLowLevelScript(mMessage);
00398 Host::getInstance()->initialize();
00399 Host::getInstance()->start();
00400
00401 sendMessage("LOWLEVELSCRIPTOK", "NONE");
00402 }
00403
00404 else if(mType == "LOWLEVELSCRIPTOK")
00405 {
00406 ;
00407 }
00408
00409 else if(mType == "STOP")
00410 {
00411 u32 frameID = Host::getInstance()->stop();
00412
00413 openConnection();
00414 sendMessage("STOPOK", StringConverter::toString(frameID));
00415 closeConnection();
00416
00417 }
00418
00419 else if(mType == "STOPOK")
00420 {
00421 u32 frameID = StringConverter::toU32(mMessage);
00422
00423 NetServer::IntPointer stopID = ((NetServer *) mParent)->getStopFrameID();
00424
00425 stopID.lock();
00426 if(*stopID.getPtr() < frameID)
00427 {
00428 *stopID.getPtr() = frameID;
00429 }
00430 stopID.unlock();
00431
00432 }
00433
00434 else if(mType == "FRAMEID")
00435 {
00436 u32 stopID = StringConverter::toU32(mMessage);
00437
00438 Host::getInstance()->setFrameID(stopID);
00439
00440 Host::getInstance()->collectData();
00441 std::string statistic = Host::getInstance()->getOutputDocument()->serialize2XML();
00442 openConnection();
00443 sendMessage("FRAMEIDOK", statistic);
00444 closeConnection();
00445 }
00446
00447 else if(mType == "FRAMEIDOK")
00448 {
00449
00450 OutputNode::Pointer outputNode(Application::getInstance()->getCurrentExecutionOutputDocument());
00451 outputNode.lock();
00452 outputNode->createChildNode(mMessage.c_str());
00453 outputNode.unlock();
00454
00455 }
00456
00457 else if(mType == "COLLECTDATA")
00458 {
00459
00460 Host::getInstance()->collectData();
00461 openConnection();
00462 sendMessage("COLLECTDATAOK", Host::getInstance()->getOutputDocument()->serialize2XML());
00463 closeConnection();
00464
00465 } else if(mType == "COLLECTDATAOK")
00466 {
00467
00468 OutputNode::Pointer outputNode(Application::getInstance()->getCurrentExecutionOutputDocument());
00469 outputNode.lock();
00470 outputNode->createChildNode(mMessage.c_str());
00471 outputNode.unlock();
00472 } else if(mType == "TIME")
00473 {
00474
00475 Timer:sleep(Host::getInstance()->getID());
00476 openConnection();
00477 Real t1 = Timer::getUncorrectedSystemTime();
00478
00479 sendMessage("TIMEOK", "NONE");
00480 recieveMessage();
00481
00482 Host::getInstance()->setMessageSendingTime((Timer::getUncorrectedSystemTime() - t1) / 2);
00483
00484 closeConnection();
00485 } else if(mType == "TIMEOK")
00486 {
00487 sendMessage("TT", "NONE");
00488 }
00489 }
00490 }
00491
00492
00493
00494 }