00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029 #include "../include/PCMNetServer.h"
00030
00031 namespace ParCompMark
00032 {
00033
00034
00035
00036
00037
00038 NetServer::NetServer(const std::string & name):
00039
00040 Network(name
00041 )
00042
00043
00044
00045
00046
00047
00048 {
00049
00050 mMaxConnection = 30;
00051
00052 mHostNumber = 100000;
00053
00054 mStopFrameID = IntPointer(new int);
00055 *mStopFrameID.getPtr() = 0;
00056
00057
00058 getIP();
00059
00060 initialize();
00061 }
00062
00063
00064
00065 NetServer::~NetServer()
00066 {
00067 if(mInitialized)
00068 {
00069 finalize();
00070 }
00071 }
00072
00073
00074
00075
00076
00077
00078
00079 void NetServer::initialize()
00080 {
00081
00082 Assert(!mInitialized, INVALID_OPERATION_ERROR, "NetServer::initialize");
00083
00084
00085
00086 struct sockaddr_in addr;
00087
00088 if((mStreamSocket = socket(PF_INET, SOCK_STREAM, 0)) < 0)
00089 {
00090 Except(INTERNAL_ERROR, "NetServer::initialize", "Unable to create socket");
00091 }
00092
00093 addr.sin_family = AF_INET;
00094 addr.sin_port = htons(mCommunicationPort);
00095 addr.sin_addr.s_addr = htonl(INADDR_ANY);
00096 memset(&(addr.sin_zero), '\0', 8);
00097
00098 int yes = 1;
00099
00100 if(setsockopt(mStreamSocket, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1)
00101 {
00102 Except(INTERNAL_ERROR, "NetServer::initialize", "Unable to change socket options");
00103 }
00104
00105 if((bind(mStreamSocket, (struct sockaddr *) &addr, sizeof(struct sockaddr))) < 0)
00106 {
00107 Except(INTERNAL_ERROR, "NetServer::initialize", "Unable to bind socket");
00108 }
00109
00110 if(listen(mStreamSocket, mMaxConnection) < 0)
00111 {
00112 Except(INTERNAL_ERROR, "NetServer::initialize", "Unable to listen socket");
00113 }
00114
00115
00116 int broadcastPermission = 1;
00117
00118 if((mBroadcastSocket = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
00119 {
00120 Except(INTERNAL_ERROR, "NetServer::initialize", "Unable to create socket");
00121 }
00122
00123 if(setsockopt
00124 (mBroadcastSocket, SOL_SOCKET, SO_BROADCAST, (void *) &broadcastPermission, sizeof(broadcastPermission)) < 0)
00125 {
00126 Except(INTERNAL_ERROR, "NetServer::initialize", "Unable to change socket options");
00127 }
00128
00129 mBroadcastAddress.sin_family = AF_INET;
00130 mBroadcastAddress.sin_addr.s_addr = inet_addr(mIfConfs->get("eth0")->BroadcastIP.c_str());
00131 mBroadcastAddress.sin_port = htons(mBroadcastPort);
00132 memset(&(mBroadcastAddress.sin_zero), '\0', 8);
00133
00134 mRecievedNumber = new int;
00135
00136 mRecievedNumber.lock();
00137 *mRecievedNumber.getPtr() = 0;
00138 mRecievedNumber.unlock();
00139
00140 initThread(0, 0, true);
00141
00142 mInitialized = true;
00143
00144 Logger::getInstance()->log(Logger::NOTICE, "NetServer initalized.");
00145 }
00146
00147
00148
00149 void NetServer::finalize()
00150 {
00151 Assert(mInitialized, INVALID_OPERATION_ERROR, "NetServer::finalize()");
00152
00153 if(mBroadcastSocket != -1)
00154 {
00155 close(mBroadcastSocket);
00156 mBroadcastSocket = -1;
00157 }
00158
00159 if(mStreamSocket != -1)
00160 {
00161 close(mStreamSocket);
00162 mStreamSocket = -1;
00163 }
00164
00165 mInitialized = false;
00166 }
00167
00168
00169
00170 void NetServer::sendBroadcastMessage(const std::string & type, const std::string & message)
00171 {
00172
00173 mRecievedNumber.lock();
00174
00175 *mRecievedNumber.getPtr() = 0;
00176
00177 mRecievedNumber.unlock();
00178
00179 std::string full;
00180
00181 full = type + "|" + message;
00182
00183 if(sendto
00184 (mBroadcastSocket, full.c_str(), full.length(), 0, (struct sockaddr *) &mBroadcastAddress,
00185 sizeof(struct sockaddr)) != full.length())
00186 {
00187 Except(INTERNAL_ERROR, "NetServer::sendBroadcastMessage", "Unable to send message");
00188 }
00189
00190 Logger::getInstance()->log(Logger::NOTICE, "A broadcast message is sent: " + full);
00191
00192 if(mHostNumber != 0)
00193 {
00194
00195 wait();
00196 }
00197
00198 }
00199
00200
00201
00202 void NetServer::buildCluster()
00203 {
00204
00205 bool ok = false;
00206
00207 int probeNumber,
00208 nodeNumber,
00209 maxProbe = 4;
00210
00211 probeNumber = 0;
00212 nodeNumber = 0;
00213
00214 mHostNumber = 0;
00215
00216
00217 while(probeNumber < maxProbe)
00218 {
00219 sendBroadcastMessage("LOGIN", "");
00220
00221 Timer::sleep(0.01);
00222
00223 Cluster::getInstance()->getHosts().lock();
00224 int n = Cluster::getInstance()->getHosts()->getSize();
00225
00226 Cluster::getInstance()->getHosts().unlock();
00227
00228 if(nodeNumber < n)
00229 {
00230 probeNumber = 0;
00231 nodeNumber = n;
00232 } else
00233 {
00234 probeNumber++;
00235 }
00236
00237 }
00238
00239 mHostNumber = Cluster::getInstance()->getHosts()->getSize();
00240
00241 }
00242
00243
00244
00245 void NetServer::task()
00246 {
00247
00248 struct sockaddr_in addr;
00249
00250 int clientSock = -1;
00251 unsigned int addrLen = sizeof(struct sockaddr);
00252
00253 if((clientSock = accept(mStreamSocket, (struct sockaddr *) &addr, &addrLen)) < 0)
00254 {
00255 Except(INTERNAL_ERROR, "NetServer::task()", "Unable to accept");
00256 }
00257
00258 Logger::getInstance()->log(Logger::DEBUG, "Client handling started.");
00259
00260 HandleClient *handle;
00261
00262 handle = new HandleClient(this);
00263
00264 handle->setStreamSocket(clientSock);
00265 handle->setServerIP(mOwnIP);
00266 handle->initThread(1, 0);
00267
00268 handle->startThread();
00269
00270 }
00271
00272
00273
00274 }