HDK
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
msgpipe.C
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2018
3  * Side Effects Software Inc. All rights reserved.
4  *
5  * Redistribution and use of Houdini Development Kit samples in source and
6  * binary forms, with or without modification, are permitted provided that the
7  * following conditions are met:
8  * 1. Redistributions of source code must retain the above copyright notice,
9  * this list of conditions and the following disclaimer.
10  * 2. The name of Side Effects Software may not be used to endorse or
11  * promote products derived from this software without specific prior
12  * written permission.
13  *
14  * THIS SOFTWARE IS PROVIDED BY SIDE EFFECTS SOFTWARE `AS IS' AND ANY EXPRESS
15  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
16  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN
17  * NO EVENT SHALL SIDE EFFECTS SOFTWARE BE LIABLE FOR ANY DIRECT, INDIRECT,
18  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
19  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
20  * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
21  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
22  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
23  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
24  *
25  *----------------------------------------------------------------------------
26  */
27 
28 
29 #include <CMD/CMD_Args.h>
30 #include <UT/UT_Assert.h>
31 #include <UT/UT_IStream.h>
32 #include <UT/UT_OFStream.h>
33 #include <UT/UT_NetMessage.h>
34 
35 #include <ostream>
36 #include <iostream>
37 #include <stdio.h>
38 
39 
40 static void
41 usage(const char *program)
42 {
43  std::cerr << "Usage: " << program << " tracker port thisslicenumber numslice\n";
44 }
45 
46 #define DATA_LEN 80000
47 #define PIPE_ATTEMPTS 10
48 
49 
50 // Transfer data between peers using a UT_NetMessagePipe
51 //
52 // Build using:
53 // hcustom -s msgpipe.C
54 //
55 // Example usage:
56 // python simtracker.py 8000 9000
57 // msgpipe localhost 8000 0 2
58 // msgpipe localhost 8000 1 2
59 //
60 int
61 main(int argc, char *argv[])
62 {
63  CMD_Args args;
64 
65  args.initialize(argc, argv);
66 
67  if (args.argc() != 5)
68  {
69  usage(argv[0]);
70  return 1;
71  }
72 
73  const char *tracker = args.argv(1);
74  int port = SYSatoi(args.argv(2));
75  int thisslice = SYSatoi(args.argv(3));
76  int numslice = SYSatoi(args.argv(4));
77 
78  std::cerr << "Building connection to tracker " << tracker << ":" << port << std::endl;
79  std::cerr << "I am slice " << thisslice << " out of " << numslice << std::endl;
80 
81  // Use a net exchange to send some data between the slices.
82  {
83  std::cerr << "Simple Exchange Test" << std::endl;
84  UT_NetExchange netxchg(tracker, port, thisslice, numslice, "normalexchange");
85 
86  for (int peer = 0; peer < numslice; peer++)
87  {
88  UT_NetMessage *msg = new UT_NetMessage();
89  msg->setWriteDataLength(8 + DATA_LEN * sizeof(int32));
90 
91  for (int i = 0; i < DATA_LEN; i++)
92  {
93  msg->overwriteInt32(8 + i * sizeof(int32), i + peer);
94  }
95  netxchg.sendData(peer, msg);
96  }
97 
99  if (!netxchg.receiveDataLoop(completed, numslice))
100  {
101  std::cerr << "Failure to do traditional data exchange, got " << completed.entries() << std::endl;
102  return -1;
103  }
104 
105  // Now verify our messages are legit. They should all be encoded
106  // with our peer infor.
107  for (int i = 0; i < completed.entries(); i++)
108  {
109  UT_NetMessage *msg = completed(i);
110 
111  for (int i = 0; i < DATA_LEN; i++)
112  {
113  int32 val;
114  int32 goalval;
115  goalval = i + thisslice;
116 
117  val = msg->extractInt32(8 + i * sizeof(int32));
118 
119  if (val != goalval)
120  {
121  std::cerr << "Error, got " << val << " at integer index " << i << " rather than expected " << goalval << ", source slice was " << msg->extractInt16(6) << std::endl;
122  return -1;
123  }
124  }
125 
126  delete msg;
127  }
128  }
129 
130  // Use a net message pipe to repeatedly pump data along the same
131  // socket.
132  {
133  std::cerr << "Message Pipe Tests" << std::endl;
134  UT_NetMessagePipe pipe(tracker, port, thisslice, numslice, "messagepipe");
135 
136  // pipe.setCompressionThreshold(-1);
137 
138  if (!pipe.openPipes())
139  {
140  std::cerr << "Failed to open message pipes" << std::endl;
141  return -1;
142  }
143 
144  for (int j = 0; j < PIPE_ATTEMPTS; j++)
145  {
146  exint datalen = DATA_LEN;
147  std::cerr << "Starting pass " << j << std::endl;
148  for (int peer = 0; peer < numslice; peer++)
149  {
150  UT_NetMessage *msg = pipe.readPipe(peer);
151 
152  msg->resetLength(UT_NetMessage::STATE_READPIPE, datalen * sizeof(int32));
153 
154  msg = pipe.writePipe(peer);
155  msg->resetLength(UT_NetMessage::STATE_WRITEPIPE, datalen * sizeof(int32));
156 
157  for (int i = 0; i < datalen; i++)
158  {
159  // NOTE: No header here!
160  msg->overwriteInt32(i * sizeof(int32), i + peer + j);
161  }
162  }
163 
164  // Now do the transfer.
165  if (!pipe.transferData())
166  {
167  std::cerr << "Failed to push data across pipes!" << std::endl;
168  return -1;
169  }
170 
171  // Now see if it came through alright!
172  // Again, all the data sent to ourselves should be our own
173  // peer number.
174  for (int peer = 0; peer < numslice; peer++)
175  {
176  UT_NetMessage *msg = pipe.readPipe(peer);
177 
178  for (int i = 0; i < datalen; i++)
179  {
180  int32 goalval = i + thisslice + j;
181  int32 val;
182  val = msg->extractInt32(i * sizeof(int32));
183  if (val != goalval)
184  {
185  std::cerr << "Error, got " << val << " at integer index " << i << " rather than expected " << goalval << ", source slice was " << peer << " and this was pipe pass " << j << std::endl;
186  return -1;
187  }
188  }
189  }
190  }
191 
192  for (int peer = 0; peer < numslice; peer++)
193  {
194  UT_NetMessage *msg = pipe.readPipe(peer);
196 
197  msg = pipe.writePipe(peer);
199  }
200 
201  if (!pipe.closePipes())
202  {
203  std::cerr << "Failed to close message pipes." << std::endl;
204  return -1;
205  }
206 
207  }
208 
209  std::cerr << "All transferred successfully!" << std::endl;
210 
211  return 0;
212 }
#define PIPE_ATTEMPTS
Definition: msgpipe.C:47
int main(int argc, char *argv[])
Definition: msgpipe.C:61
const char * argv(unsigned i) const
Definition: UT_Args.h:44
int32 extractInt32(exint offset)
void overwriteInt32(exint offset, int32 val)
bool closePipes(int timeoutms=100)
Shuts down the pipes, returns true if successful.
void setWriteDataLength(exint bufsize)
png_uint_32 i
Definition: png.h:2877
UT_NetMessage * readPipe(int peer)
bool receiveDataLoop(UT_Array< UT_NetMessage * > &completed, int expectedmessages, int timeoutms=100)
int64 exint
Definition: SYS_Types.h:115
int argc() const
Definition: UT_Args.h:43
int int32
Definition: SYS_Types.h:34
exint entries() const
Alias of size(). size() is preferred.
Definition: UT_Array.h:446
GLsizeiptr const void GLenum usage
Definition: glcorearb.h:663
bool transferData(int timeoutms=100)
UT_NetMessage * writePipe(int peer)
GLuint GLfloat * val
Definition: glcorearb.h:1607
void resetLength(TransmitState state, exint newlen)
Resizes, useful for pipe messages.
Wait for connection ack.
Definition: UT_NetMessage.h:83
#define DATA_LEN
Definition: msgpipe.C:46
void initialize(int argc, const char *const argv[])
void sendData(int destpeer, const char *data, exint len)
GLbitfield GLuint program
Definition: glcorearb.h:1930
int16 extractInt16(exint offset)
bool openPipes(int timeoutms=100)
Prepares the pipes, returns true if successful.