00001 /* 00002 * PROPRIETARY INFORMATION. This software is proprietary to 00003 * Side Effects Software Inc., and is not to be reproduced, 00004 * transmitted, or disclosed in any way without written permission. 00005 * 00006 * Produced by: 00007 * Jeff Lait 00008 * Side Effects Software Inc 00009 * 123 Front Street West, Suite 1401 00010 * Toronto, Ontario 00011 * Canada M5J 2M2 00012 * 416-504-9876 00013 * 00014 * NAME: UT_NetMessage.h ( UT Library, C++) 00015 * 00016 * COMMENTS: 00017 * Very similar to UT_NetStream, but rather than using threads 00018 * to interleave packets successfully, it is designed 00019 * around using ::select for the same purpose. 00020 * 00021 * This allows the user to only have to worry about dealing with complete 00022 * messages. 00023 * 00024 * The key tools for end users is the UT_NetMessage and the 00025 * UT_NetExchange. 00026 */ 00027 00028 #ifndef __UT_NetMessage__ 00029 #define __UT_NetMessage__ 00030 00031 #include "UT_API.h" 00032 00033 #include "UT_NetSocket.h" 00034 #include "UT_PtrArray.h" 00035 #include "UT_Interrupt.h" 00036 00037 class UT_StopWatch; 00038 00039 /// 00040 /// UT_NetMessage bears similarity to a UT_NetPacket in that 00041 /// it tries to abstract the necessitiy of retrying TCP/IP streams 00042 /// until the message is completely received. However, it is built 00043 /// around the idea of having a datapump, pumpData, to handle this, 00044 /// allowing one to interleave many messages in one thread using 00045 /// select() 00046 /// 00047 /// Write Protocol: 00048 /// STATE_WRITE: 00049 /// send 32bit length in network byte order 00050 /// send myLength from myData 00051 /// STATE_WAITACK: 00052 /// receive 'j' 00053 /// shutdown the connection 00054 /// 00055 /// Read Protocol: 00056 /// STATE_READ: 00057 /// receive 32bit length in network byte order 00058 /// set myLength and allocate myData 00059 /// receive myLength into myData 00060 /// STATE_SENDACK: 00061 /// send 'j' 00062 /// STATE_READFIN: 00063 /// receive '' (ie, wait till safe to close) 00064 /// 00065 class UT_API UT_NetMessage 00066 { 00067 public: 00068 UT_NetMessage(); 00069 00070 /// Frees the allocated data buffer. 00071 ~UT_NetMessage(); 00072 00073 /// What state we are in. 00074 /// Thanks to the vagaries of TCP/IP we need an application 00075 /// level ACK to verify the data made it across the wire 00076 /// before we do our shutdown. 00077 enum TransmitState 00078 { 00079 STATE_INVALID, 00080 00081 STATE_READ, /// Read message 00082 STATE_SENDACK, /// Send ack 00083 STATE_READFIN, /// Connection closed after ack sent. 00084 00085 STATE_WRITE, /// Send message 00086 STATE_WAITACK /// Wait for connection ack. 00087 }; 00088 00089 /// Sends or receives data from the socket until the message 00090 /// is complete. When it is complete or errored, returns true. Returns 00091 /// false if you should put it back in the select loop. 00092 bool pumpData(); 00093 00094 /// Trys to connect the socket, if already connected a no-op 00095 /// Returns false if still not connected, true if connected or error. 00096 /// failed is set to true if the connectionf ailed due to a 00097 /// UT_CONNECT_FAILED as opposed to being delayed by a UT_WOULD_BLOCK 00098 bool tryConnection(bool *failed=0); 00099 00100 /// The socket we are attached to. 00101 UT_NetSocket *getSocket() const { return mySocket; } 00102 /// File descriptor of socket. 00103 int getSocketFD() const { return mySocket->getSocket(); } 00104 00105 /// Adds our file descriptor to the select set and update 00106 /// maxfd. 00107 void addToFDSet(fd_set *set, int &maxfd) const; 00108 00109 TransmitState state() const { return myState; } 00110 bool isErrored() const { return myErrorState; } 00111 void flagErrored() { myErrorState = true; } 00112 00113 const char *data() const { return myData; } 00114 char *data() { return myData; } 00115 int length() const { return myLength; } 00116 00117 /// Extracts data from the message, applying proper byteswaps. 00118 /// Offset is in bytes. Does not need to be aligned. 00119 int64 extractInt64(int offset); 00120 int32 extractInt32(int offset); 00121 fpreal32 extractFloat32(int offset); 00122 fpreal64 extractFloat64(int offset); 00123 int16 extractInt16(int offset); 00124 int8 extractInt8(int offset); 00125 00126 /// Inserts data into the message, applying proper byte swaps. 00127 /// Offset is in bytes. Does not need to be aligned. 00128 void overwriteInt64(int offset, int64 val); 00129 void overwriteInt32(int offset, int32 val); 00130 void overwriteFloat32(int offset, fpreal32 val); 00131 void overwriteFloat64(int offset, fpreal64 val); 00132 void overwriteInt16(int offset, int16 val); 00133 void overwriteInt8(int offset, int8 val); 00134 00135 /// Determine source/sink of this message. 00136 /// Because we shutdown the socket on completion of the 00137 /// protocol, we always gain ownership of the socket and 00138 /// will handle deleting it ourselves. 00139 void setReadSocket(UT_NetSocket *socket); 00140 void setWriteSocket(UT_NetSocket *socket); 00141 void setWriteSocket(const char *addr, int port); 00142 00143 /// The headersize, in bytes, is prepended as nulls to the message. 00144 /// The headersize *is* included in the myLength, the receiver 00145 /// doesn't know about the distinction between header and data! 00146 void setWriteData(const char *data, int len) 00147 { setWriteDataWithHeader(0, data, len); } 00148 void setWriteData(const char *data) 00149 { setWriteData(data, strlen(data)); } 00150 void setWriteDataWithHeader(int headersize, const char *data, int len); 00151 void setWriteDataWithHeader(int headersize, const char *data) 00152 { setWriteDataWithHeader(headersize, data, strlen(data)); } 00153 00154 /// Allocates a blank write buffer of the given size. 00155 /// It is assumed the caller will fill it out by writing to data() 00156 void setWriteDataLength(int bufsize) 00157 { setWriteDataWithHeader(bufsize, 0, 0); } 00158 00159 static int64 totalBytesSent(); 00160 static int64 totalBytesReceived(); 00161 static void clearByteCounters(); 00162 00163 private: 00164 00165 /// Tracking variables. 00166 static int64 ourTotalBytesSent; 00167 static int64 ourTotalBytesReceived; 00168 00169 UT_NetSocket *mySocket; 00170 int myLength; 00171 char *myData; 00172 TransmitState myState; 00173 bool myErrorState; 00174 00175 // Internal state of where we are in terms of pumping... 00176 // -4 to -1 are for sending the length. 00177 int myDataPos; 00178 int myNetLength; // htonl version of length 00179 }; 00180 00181 /// 00182 /// UT_NetMessagePump 00183 /// 00184 /// Interleaves processing from possibly many different message 00185 /// sources. 00186 /// 00187 /// postMessage() can be used to queue messages to send after 00188 /// their setWriteSocket and setWriteData has been invoked. 00189 /// 00190 /// listenSocket() will invoke accept() and add to the completed() 00191 /// list messages as they are done. 00192 /// 00193 /// 00194 class UT_API UT_NetMessagePump 00195 { 00196 public: 00197 UT_NetMessagePump(); 00198 /// On destruction all messages on our internal lists will be destroyed. 00199 virtual ~UT_NetMessagePump(); 00200 00201 /// Adds the message to our queue to process. 00202 /// We take ownership of the UT_NetMessage until it is removed 00203 /// from one of our lists. 00204 void postMessage(UT_NetMessage *msg); 00205 00206 /// List of all completed incoming messages. 00207 /// 00208 /// To process a message from the completed list, 00209 /// UT_NetMessage *msg = pump.completed()(i); 00210 /// pump.completed()(i) = 0; 00211 /// ... use msg ... 00212 /// delete msg; 00213 UT_PtrArray<UT_NetMessage *> &completed() { return myCompleteList; } 00214 00215 /// List of messages that have entered the error status. 00216 /// This includes failed outgoing messages as well as incomplete 00217 /// incoming messages. 00218 UT_PtrArray<UT_NetMessage *> &errored() { return myErrorList; } 00219 00220 /// Starts listening to the given socket for messages 00221 /// We do not own this socket, caller should delete it (but not 00222 /// before stopping listening or deleting the pump!) 00223 /// Connections accepted from this socket will be read 00224 /// via UT_NetMessage's protocol. 00225 void listenSocket(UT_NetSocket *socket); 00226 /// Stops listening from a given socket. 00227 void stopListening(UT_NetSocket *socket); 00228 00229 /// Processes active messages and listens for incoming messages. 00230 /// Returns true if there is the *potential* of more work - if 00231 /// listening to a socket this is always true! 00232 bool pumpData(int timeoutms); 00233 00234 /// The number of messages still pending. 00235 int pendingMessages() const { return myWorkList.entries(); } 00236 protected: 00237 UT_PtrArray<UT_NetMessage *> myWorkList; 00238 UT_PtrArray<UT_NetMessage *> myErrorList; 00239 UT_PtrArray<UT_NetMessage *> myCompleteList; 00240 UT_PtrArray<UT_NetSocket *> myServerList; 00241 }; 00242 00243 class ut_NetPeerData 00244 { 00245 public: 00246 ut_NetPeerData() {} 00247 virtual ~ut_NetPeerData() {} 00248 ut_NetPeerData(const ut_NetPeerData &src) 00249 { 00250 *this = src; 00251 } 00252 00253 const ut_NetPeerData &operator=(const ut_NetPeerData &src) 00254 { 00255 myAddress.harden(src.myAddress); 00256 myPort = src.myPort; 00257 myPeer = src.myPeer; 00258 return *this; 00259 } 00260 00261 UT_String myAddress; 00262 int myPort; 00263 int myPeer; 00264 }; 00265 00266 /// 00267 /// UT_NetExchange 00268 /// 00269 /// Uses a tracker to exchange data between peers. 00270 /// The tracker handles peer discovery and synchronization. A unique 00271 /// jobname is used to allow multiple net exchanges to use the same 00272 /// tracker. 00273 /// 00274 /// EXTERNAL PROTOCOL: 00275 /// 00276 /// UT_NetMessages accepted or returned from this have an 8 byte 00277 /// header. The fields are filled out for you on a sendData() 00278 /// and can be queried to determine the message source when 00279 /// pulling completed messages from the result list. 00280 /// 00281 /// The first byte is the message type. If it is 'p', it is 00282 /// peer data. Users of UT_NetExchange should only see these 00283 /// messages as the other are internal tracker messages. 00284 /// 00285 /// extractInt16(4): Destination peer # 00286 /// extractInt16(6): Source peer # 00287 /// 00288 /// Anything 8 and after is user data. 00289 /// 00290 /// INTERNAL PROTOCOL: 00291 /// 00292 /// From the tracker: 00293 /// 00294 /// Message of type 'b' is a barrier value. The 32 bit network 00295 /// byte order barrier value starts at offset 1. 00296 /// 00297 /// Message type of 'c' is a peer list. A space delimited list 00298 /// of address port peer triples is present, one for each peer. 00299 /// 00300 /// Message type of 'd' is a done flag. It informs that all 00301 /// the peers have reported done to the tracker. 00302 /// 00303 /// Message type of 'e' is an error. One of the peers reported 00304 /// an error to the tracker that got broadcast back. 00305 /// 00306 /// To the tracker: 00307 /// 00308 /// Messages to the tracker all have the format 00309 /// command port peer npeer jobname 00310 /// 00311 /// port is the listen port used by this peer. 00312 /// peer is the peer number 00313 /// npeer is the number of peers 00314 /// jobname is the name of the current job 00315 /// 00316 /// command is one of: 00317 /// acquire: A peer is logging on to this jobname. 00318 /// done: A peer has all the data it cares about from the netexchange. 00319 /// error: A peer has encountered a fatal network error and wants a clean 00320 /// shutdown. 00321 /// barrierset: Establish a barrier, the current value is peer. 00322 /// barrierwait: Will recevie a done message when the peer value is reached or 00323 /// exceeded by the named barrier. 00324 /// 00325 class UT_API UT_NetExchange 00326 { 00327 public: 00328 UT_NetExchange(const char *trackeraddr, int trackerport, 00329 int peer, int npeer, const char *jobname); 00330 virtual ~UT_NetExchange(); 00331 00332 /// Posts the data to be sent to the specific destination 00333 /// machine. 00334 /// The data will be copied into an internal buffer. 00335 /// Note that destpeer could be this, in which case we'll just 00336 /// have effected a very slow copy. 00337 void sendData(int destpeer, const char *data, int len); 00338 00339 /// Gains ownership of the message. The message must have 00340 /// an 8 byte header reserved for the net exchange protocol. 00341 /// The rest of the data in the net message is unaffected. 00342 void sendData(int destpeer, UT_NetMessage *msg); 00343 00344 /// Flags this peer as done. You should still process the pump until 00345 /// interrupted or completed, however, as the other peers 00346 /// may not yet be done. Likewise, you may still continue 00347 /// to get new requests from other peers. 00348 void sendDone(); 00349 00350 /// Returns true until the tracker notifies us that all of the 00351 /// peers have invoked sendDone() or an error occurs. 00352 bool pumpData(int timeoutms); 00353 00354 /// Pumps the message pump until we have received the expected 00355 /// number of messages and the tracker reports us as done. The 00356 /// messages in the completed array must be deleted by the caller. 00357 /// UT_Interrupt is used to interrupt this. 00358 /// true if everything finished properly, false if there was 00359 /// an interrupt or error. 00360 bool receiveDataLoop(UT_PtrArray<UT_NetMessage *> &completed, 00361 int expectedmessages, int timeoutms = 100); 00362 00363 /// Pumps the message pump until we have received the expected 00364 /// number of data messages and the tracker reports us as done. The 00365 /// messages in the completed array must be deleted by the caller. 00366 /// UT_Interrupt is used to interrupt this. 00367 /// true if everything finished properly, false if there was 00368 /// an interrupt or error. 00369 /// 00370 /// The callback processes any message with a 'r' as the 8th byte. 00371 /// Final messages (which do not get further processed) should have 00372 /// 'd' as the 8th byte - they will 00373 /// be added to the completed list and count to the expectedmessages. 00374 /// It should be a function object taking two parameters, 00375 /// UT_NetExchange * and UT_NetMessage *. 00376 /// 00377 /// UT_Interrupt is used to interrupt this. 00378 /// true if everything finished properly, false if there was 00379 /// an interrupt or error. 00380 template <class CallbackType> 00381 bool processDataLoop(UT_PtrArray<UT_NetMessage *> &complist, 00382 int expectedmessages, 00383 CallbackType callback, 00384 int timeoutms = 100) 00385 { 00386 // Because we append to complist, allow for the case where 00387 // we are accumulating... 00388 int goal = complist.entries() + expectedmessages; 00389 bool done = false; 00390 int i; 00391 UT_NetMessage *msg; 00392 UT_Interrupt *boss = UTgetInterrupt(); 00393 00394 while (1) 00395 { 00396 if (boss->opInterrupt()) 00397 return false; 00398 00399 if (!pumpData(timeoutms)) 00400 break; 00401 00402 for (i = 0; i < completed().entries(); i++) 00403 { 00404 msg = completed()(i); 00405 completed()(i) = 0; 00406 00407 if (msg->extractInt8(8) == 'r') 00408 { 00409 callback(this, msg); 00410 delete msg; 00411 } 00412 else 00413 complist.append(msg); 00414 } 00415 00416 // See if we got our done package. 00417 if (!done && (complist.entries() >= goal)) 00418 { 00419 sendDone(); 00420 done = true; 00421 } 00422 } 00423 00424 return !isErrored(); 00425 } 00426 00427 /// The net exchange goes into an error state when the connection 00428 /// the tracker sends an error message. 00429 /// peer-to-peer errors are dealt with by sending the tracker 00430 /// an error message which is supposed to broadcast the error 00431 /// back to all the peers. Error recovery can thus be effected 00432 /// provided it is the tracker that stays up. 00433 bool isErrored() const { return myErrorFromTracker; } 00434 00435 /// Same semantics as UT_NetMessagePump. Stores all the messages 00436 /// that have successfully arrived. 00437 /// This first 8 bytes of theses messages is the header with 00438 /// which you can extract the source peer of the message. 00439 UT_PtrArray<UT_NetMessage *> &completed() { return myCompleted; } 00440 00441 /// Returns if we have received a peer list from the tracker. 00442 bool gotPeerList() const { return myGotPeerList; } 00443 00444 protected: 00445 ut_NetPeerData *findPeer(int peer); 00446 void processTrackerMessage(UT_NetMessage *msg); 00447 /// Assembles a message to the tracker. 00448 void sendTrackerMessage(const char *msg); 00449 00450 00451 UT_NetMessagePump myPump; 00452 int myNPeer, myPeer; 00453 UT_NetSocket *myToSelf; 00454 00455 UT_String myTrackerAddr; 00456 int myTrackerPort; 00457 00458 bool myGotPeerList; 00459 bool myError; 00460 bool myErrorFromTracker; 00461 UT_RefArray<ut_NetPeerData> myPeerList; 00462 00463 UT_PtrArray<UT_NetMessage *> myWaitingForPeers; 00464 UT_PtrArray<UT_NetMessage *> myCompleted; 00465 00466 UT_String myJobName; 00467 00468 /// We track our elapsed time for printing status 00469 UT_StopWatch *myTimer; 00470 int myTimerDelay, myHeartbeat; 00471 fpreal64 myLastTimerVal; 00472 }; 00473 00474 /// 00475 /// UT_NetBarrier 00476 /// 00477 /// Very similar to UT_NetExchange, using the same tracker. However, 00478 /// it is meant for creating producer/consumer queues where the data 00479 /// is transmitted OOB (ie, on shared disk resource) The producer 00480 /// can call setValue() to indicate how far it has completed. The consumer 00481 /// can invoke waitValue() to idle until the setValue is equal or 00482 /// greater than the waitValue. 00483 /// 00484 /// These calls will all block until success, error, or interrupt. 00485 /// 00486 class UT_API UT_NetBarrier 00487 { 00488 public: 00489 UT_NetBarrier(const char *trackeraddr, int trackerport, 00490 const char *jobname); 00491 virtual ~UT_NetBarrier(); 00492 00493 /// Tells the tracker to update the barrier value to the given 00494 /// value. Does not listen for a tracker response. 00495 void setValue(int val); 00496 00497 /// Requests the tracker to alert us when the given value is 00498 /// reached on the barrier. 00499 void waitValue(int val); 00500 00501 /// Returns current value of the barrier and then increments 00502 /// the barrier's value. 00503 /// Blocks until the tracker responds. 00504 int incrementValue(int defaultval); 00505 00506 protected: 00507 /// Creates and posts a message for the tracker. 00508 void sendTrackerMessage(const char *msg, int value); 00509 00510 /// A simplified method for running the message pump until 00511 /// the system errors or is interrupted. 00512 void blockUntilComplete(); 00513 00514 /// Returns true until the barrier has been resolved or error 00515 /// occurs 00516 bool pumpData(int timeoutms); 00517 00518 UT_NetMessagePump myPump; 00519 UT_NetSocket *myToSelf; 00520 bool myError; 00521 00522 int myBarrierValue; 00523 00524 UT_String myTrackerAddr; 00525 int myTrackerPort; 00526 UT_String myJobName; 00527 }; 00528 00529 00530 #endif 00531
1.5.9