PCMNetServer.cpp

Go to the documentation of this file.
00001 
00002 //
00003 // This source file is a part of ParCompMark
00004 // Parallel Compositing Benchmark Framework
00005 //
00006 // for latest info see http://parcompmark.sourceforge.net
00007 
00008 //
00009 // Copyright (C) 2006 IT2 ParCompMark Dev. Team
00010 // 
00011 // This program is free software; you can redistribute it and/or
00012 // modify it under the terms of the GNU General Public License
00013 // as published by the Free Software Foundation; either version 2
00014 // of the License, or (at your option) any later version.
00015 // 
00016 // This program is distributed in the hope that it will be useful,
00017 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00018 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
00019 // GNU General Public License for more details.
00020 // 
00021 // You should have received a copy of the GNU General Public License
00022 // along with this program; if not, write to the Free Software
00023 // Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
00024 
00025 //
00026 // Inner includes
00027 //
00028 
00029 #include "../include/PCMNetServer.h"
00030 
00031 namespace ParCompMark
00032 {
00033 
00034   //
00035   // Constructors & destructor
00036   //
00037 
00038   NetServer::NetServer(const std::string & name):
00039         // Parent initializer 
00040   Network(name          /* Name of the class */
00041         )
00042         // You have to initialize the following attributes:
00043         // - mBroadcastAddress
00044         // - mMaxConnection
00045         // - mHostNumber
00046         // - mRecievedNumber
00047         // - mStopFrameID
00048   {
00049         //mBroadcastAddress
00050         mMaxConnection = 30;
00051         //TODO: hack
00052         mHostNumber = 100000;
00053 
00054         mStopFrameID = IntPointer(new int);
00055          *mStopFrameID.getPtr() = 0;
00056 
00057         //TODO: initialize() should call this function ??
00058         getIP();
00059 
00060         initialize();
00061   }
00062 
00063  /*----------------------------------------------------------------------*/
00064 
00065   NetServer::~NetServer()
00066   {
00067         if(mInitialized)
00068         {
00069          finalize();
00070         }
00071   }
00072 
00073  /*----------------------------------------------------------------------*/
00074 
00075   //
00076   // Methods
00077   //
00078 
00079   void NetServer::initialize()
00080   {
00081 
00082         Assert(!mInitialized, INVALID_OPERATION_ERROR, "NetServer::initialize");
00083 
00084         //server accept client connection initialize
00085 
00086         struct sockaddr_in addr;
00087 
00088         if((mStreamSocket = socket(PF_INET, SOCK_STREAM, 0)) < 0)
00089         {
00090          Except(INTERNAL_ERROR, "NetServer::initialize", "Unable to create socket");
00091         }
00092 
00093         addr.sin_family = AF_INET;
00094         addr.sin_port = htons(mCommunicationPort);
00095         addr.sin_addr.s_addr = htonl(INADDR_ANY);
00096         memset(&(addr.sin_zero), '\0', 8);
00097 
00098         int yes = 1;
00099 
00100         if(setsockopt(mStreamSocket, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1)
00101         {
00102          Except(INTERNAL_ERROR, "NetServer::initialize", "Unable to change socket options");
00103         }
00104 
00105         if((bind(mStreamSocket, (struct sockaddr *) &addr, sizeof(struct sockaddr))) < 0)
00106         {
00107          Except(INTERNAL_ERROR, "NetServer::initialize", "Unable to bind socket");
00108         }
00109 
00110         if(listen(mStreamSocket, mMaxConnection) < 0)
00111         {
00112          Except(INTERNAL_ERROR, "NetServer::initialize", "Unable to listen socket");
00113         }
00114         //broadcast send initialize
00115 
00116         int broadcastPermission = 1;
00117 
00118         if((mBroadcastSocket = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
00119         {
00120          Except(INTERNAL_ERROR, "NetServer::initialize", "Unable to create socket");
00121         }
00122 
00123         if(setsockopt
00124          (mBroadcastSocket, SOL_SOCKET, SO_BROADCAST, (void *) &broadcastPermission, sizeof(broadcastPermission)) < 0)
00125         {
00126          Except(INTERNAL_ERROR, "NetServer::initialize", "Unable to change socket options");
00127         }
00128 
00129         mBroadcastAddress.sin_family = AF_INET;
00130         mBroadcastAddress.sin_addr.s_addr = inet_addr(mIfConfs->get("eth0")->BroadcastIP.c_str());
00131         mBroadcastAddress.sin_port = htons(mBroadcastPort);
00132         memset(&(mBroadcastAddress.sin_zero), '\0', 8);
00133 
00134         mRecievedNumber = new int;
00135 
00136         mRecievedNumber.lock();
00137         *mRecievedNumber.getPtr() = 0;
00138         mRecievedNumber.unlock();
00139 
00140         initThread(0, 0, true);
00141 
00142         mInitialized = true;
00143 
00144         Logger::getInstance()->log(Logger::NOTICE, "NetServer initalized.");
00145   }
00146 
00147  /*----------------------------------------------------------------------*/
00148 
00149   void NetServer::finalize()
00150   {
00151         Assert(mInitialized, INVALID_OPERATION_ERROR, "NetServer::finalize()");
00152 
00153         if(mBroadcastSocket != -1)
00154         {
00155          close(mBroadcastSocket);
00156          mBroadcastSocket = -1;
00157         }
00158 
00159         if(mStreamSocket != -1)
00160         {
00161          close(mStreamSocket);
00162          mStreamSocket = -1;
00163         }
00164 
00165         mInitialized = false;
00166   }
00167 
00168  /*----------------------------------------------------------------------*/
00169 
00170   void NetServer::sendBroadcastMessage(const std::string & type, const std::string & message)
00171   {
00172 
00173         mRecievedNumber.lock();
00174 
00175         *mRecievedNumber.getPtr() = 0;
00176 
00177         mRecievedNumber.unlock();
00178 
00179         std::string full;
00180 
00181         full = type + "|" + message;
00182 
00183         if(sendto
00184          (mBroadcastSocket, full.c_str(), full.length(), 0, (struct sockaddr *) &mBroadcastAddress,
00185          sizeof(struct sockaddr)) != full.length())
00186         {
00187          Except(INTERNAL_ERROR, "NetServer::sendBroadcastMessage", "Unable to send message");
00188         }
00189 
00190         Logger::getInstance()->log(Logger::NOTICE, "A broadcast message is sent: " + full);
00191 
00192         if(mHostNumber != 0)
00193         {
00194          //fprintf(stderr, "host num,ber: %d", mHostNumber);
00195          wait();
00196         }
00197 
00198   }
00199 
00200  /*----------------------------------------------------------------------*/
00201 
00202   void NetServer::buildCluster()
00203   {
00204 
00205         bool ok = false;
00206 
00207         int probeNumber,
00208         nodeNumber,
00209         maxProbe = 4;
00210 
00211         probeNumber = 0;
00212         nodeNumber = 0;
00213 
00214         mHostNumber = 0;
00215 
00216         //do while all host response
00217         while(probeNumber < maxProbe)
00218         {
00219          sendBroadcastMessage("LOGIN", "");
00220 
00221          Timer::sleep(0.01);
00222 
00223          Cluster::getInstance()->getHosts().lock();
00224          int n = Cluster::getInstance()->getHosts()->getSize();
00225 
00226          Cluster::getInstance()->getHosts().unlock();
00227 
00228          if(nodeNumber < n)
00229          {
00230                 probeNumber = 0;
00231                 nodeNumber = n;
00232          } else
00233          {
00234                 probeNumber++;
00235          }
00236 
00237         }
00238 
00239         mHostNumber = Cluster::getInstance()->getHosts()->getSize();
00240 
00241   }
00242 
00243  /*----------------------------------------------------------------------*/
00244 
00245   void NetServer::task()
00246   {
00247 
00248         struct sockaddr_in addr;
00249 
00250         int clientSock = -1;
00251         unsigned int addrLen = sizeof(struct sockaddr);
00252 
00253         if((clientSock = accept(mStreamSocket, (struct sockaddr *) &addr, &addrLen)) < 0)
00254         {
00255          Except(INTERNAL_ERROR, "NetServer::task()", "Unable to accept");
00256         }
00257 
00258         Logger::getInstance()->log(Logger::DEBUG, "Client handling started.");
00259 
00260         HandleClient *handle;
00261 
00262         handle = new HandleClient(this);
00263 
00264         handle->setStreamSocket(clientSock);
00265         handle->setServerIP(mOwnIP);
00266         handle->initThread(1, 0);       //run one times
00267         //wait for ready ??
00268         handle->startThread();
00269 
00270   }
00271 
00272  /*----------------------------------------------------------------------*/
00273 
00274 }