HDK
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
NET_PacketSocket.h
Go to the documentation of this file.
1 /*
2  * PROPRIETARY INFORMATION. This software is proprietary to
3  * Side Effects Software Inc., and is not to be reproduced,
4  * transmitted, or disclosed in any way without written permission.
5  *
6  * NAME: NET_PacketSocket.h
7  *
8  * COMMENTS:
9  *
10  *
11  */
12 
13 #ifndef __NET_PACKETSOCKET_H__
14 #define __NET_PACKETSOCKET_H__
15 
16 #include "NET_API.h"
17 
18 #include <UT/UT_BoostAsio.h>
19 #include <UT/UT_Debug.h>
20 #include <UT/UT_NonCopyable.h>
21 #include <UT/UT_SharedPtr.h>
22 #include <UT/UT_StringHolder.h>
23 #include <UT/UT_Endian.h>
24 #include <UT/UT_WorkBuffer.h>
25 #include <UT/UT_StringArray.h>
26 
27 NET_API bool
28 net_uncompressBuffer(UT_WorkBuffer& wbuf, exint comp_size, exint orig_size);
29 
31 {
32 private:
33  class Impl : public UTenable_shared_from_this<Impl>
34  {
35  public:
36  explicit Impl(ASIO_TCPSocket&& socket)
37  : myTimer(socket.get_executor()), myStream(std::move(socket))
38  {
39  }
40 
41  ASIO_TCPSocket& stream() { return myStream; }
42  void resetTimer()
43  {
44  UT_WeakPtr<Impl> me = weak_from_this();
45  myTimer.expires_from_now(hboost::posix_time::minutes(10));
46  myTimer.async_wait(
47  [me = weak_from_this()](
48  const hboost::system::error_code& ec)
49  {
50  if (ec == hboost::asio::error::operation_aborted)
51  return;
52  if (auto s = me.lock(); s)
53  {
54  s->close();
55  }
56  });
57  }
58  void cancelTimer()
59  {
60  myTimer.cancel();
61  }
62 
63  void close()
64  {
65  // Closing the socket will propogate
66  hboost::system::error_code ignored_ec;
67  myStream.shutdown(ASIO_TCPSocket::shutdown_both, ignored_ec);
68  myStream.close(ignored_ec);
69  myTimer.cancel();
70  }
71 
72  private:
73  /// {@
74  /// Used to kill the socket if no activity has occurred within the specified
75  /// amount of time.
76  ASIO_DeadlineTimer myTimer;
77  hboost::posix_time::time_duration myTimeout;
78  /// @}
79  ASIO_TCPSocket myStream;
80  };
81 
82 public:
83  explicit NET_PacketSocket(ASIO_TCPSocket&& socket);
84  virtual ~NET_PacketSocket() = default;
86 
87  /// Shutdown the socket. After this is called the socket becomes dead
88  /// and no other calls should be made.
90  {
91  hboost::asio::dispatch(
92  myImpl->stream().get_executor(),
93  [me = myImpl->weak_from_this()]()
94  {
95  if (auto s = me.lock(); s)
96  s->close();
97  });
98  }
99  /// Access to the underlying socket. This only meant to be used in cases
100  /// where information about the connection is required (ie. remote endpoint)
101  /// NB: this method should only ever be called prior to the first read/write
102  /// of the socket for thread safety.
103  ASIO_TCPSocket& nextLayer() { return myImpl->stream(); }
104 
105  template <typename DynamicBuffer, typename Handler>
107  Handler,
108  void(hboost::system::error_code, std::size_t))
109  asyncRead(DynamicBuffer& buffer, Handler&& handler);
110  template <typename Handler>
112  Handler, void(hboost::system::error_code, std::size_t))
113  asyncWrite(UT_StringArray&& buffers, Handler&& handler);
114 
115 private:
116  template <class AsyncStream, class DynamicBuffer>
117  class AsyncReadImpl;
118  template <class AsyncStream>
119  class AsyncWriteImpl;
120 
121  UT_SharedPtr<Impl> myImpl;
122 };
123 
124 // -----------------------------------------------------------------------------
125 // Packet Read Op
126 // -----------------------------------------------------------------------------
127 
128 template <typename AsyncStream, typename DynamicBuffer>
129 class NET_PacketSocket::AsyncReadImpl
130 {
131 public:
132  AsyncReadImpl(
133  AsyncStream& stream,
135  DynamicBuffer& buffer)
136  : myStream(stream), myImpl(backend), myBuffer(buffer)
137  {
138  }
139 
140  template <typename Self>
141  void operator()(
142  Self &self,
143  hboost::system::error_code ec = {},
144  std::size_t bytes_transfered = 0)
145  {
146  myBytesWritten += bytes_transfered;
147 
148  UT_SharedPtr<NET_PacketSocket::Impl> impl = myImpl.lock();
149  if (!impl)
150  {
151  return self.complete(
152  hboost::asio::error::operation_aborted, myBytesWritten);
153  }
154 
155  if (ec)
156  {
157  return self.complete(ec, myBytesWritten);
158  }
159 
160  if (myIsFirst)
161  {
162  // If this is our first request then kick off a read
163  myIsFirst = false;
164  impl->resetTimer();
165  myStream.async_read_some(myBuffer.prepare(512), std::move(self));
166  return;
167  }
168 
169  // We received more data. Reset the connection dead timer.
170  impl->cancelTimer();
171 
172  myBuffer.commit(bytes_transfered);
173 
174  // If the packet is not compressed and no read size has been read then
175  // check the header
176  if (!myIsCompressed && myReadPacketSize < 0)
177  {
178  // Parse the original payload length and compressed flag.
179  static constexpr int header_size = 5;
180  if (myBuffer.size() >= header_size)
181  {
182  uint32 len = 0;
183  static_assert(sizeof(len) == 4);
184  ASIO_MutableBuffer tmp(&len, sizeof(len));
185  ASIO_ConstBuffer read_buffer = myBuffer.data();
186  // Retrieve the payload length
187  hboost::asio::buffer_copy(tmp, read_buffer);
188  UTtomips(len);
189  myReadPacketSize = len;
190  myFinalPacketSize = len;
191  // We have a max size of 4096
192  if (myReadPacketSize > 4096)
193  {
194  return self.complete(
195  hboost::asio::error::message_size, myBytesWritten);
196  }
197 
198  // Check if the playload is compressed or not.
199  myIsCompressed
200  = *((const char*)read_buffer.data() + sizeof(unsigned))
201  == 1;
202  if (myIsCompressed)
203  myReadPacketSize = -1;
204  myBuffer.consume(header_size);
205  }
206  }
207  // If the packet is compressed read the compressed size
208  if (myIsCompressed && myReadPacketSize < 0)
209  {
210  static constexpr int header_size = 4;
211  if (myBuffer.size() >= header_size)
212  {
213  uint32 len = 0;
214  static_assert(sizeof(len) == header_size);
215  ASIO_MutableBuffer tmp(&len, sizeof(len));
216  ASIO_ConstBuffer read_buffer = myBuffer.data();
217  // Retrieve the payload length
218  hboost::asio::buffer_copy(tmp, read_buffer);
219  UTtomips(len);
220  myReadPacketSize = len;
221  myBuffer.consume(header_size);
222 
223  /// Something went wrong. The uncompressed size should never
224  /// be smaller then the compressed size.
225  if (myFinalPacketSize < myReadPacketSize)
226  {
227  return self.complete(
228  hboost::asio::error::connection_aborted,
229  myBytesWritten);
230  }
231  }
232  }
233  // If there is at least enough read data for this packet then handle the
234  // request
235  if (myReadPacketSize >= 0 && myBuffer.size() >= myReadPacketSize)
236  {
237  ASIO_ConstBuffer read_buffer = myBuffer.data();
239  (const char*)read_buffer.data(), myReadPacketSize);
240 
241  if (myIsCompressed)
242  {
244  data, myReadPacketSize, myFinalPacketSize))
245  {
246  return self.complete(
248  hboost::system::errc::bad_message),
249  myBytesWritten);
250  }
251  }
252 
253  // Reset the buffer information so that we can read again
254  self.complete({}, myBytesWritten);
255  return;
256  }
257 
258  // Guess our next read size
259  std::size_t read_size = 512;
260  if (myReadPacketSize > 0)
261  read_size = static_cast<std::size_t>(myReadPacketSize);
262  const auto size = myBuffer.size();
263  const auto limit = myBuffer.max_size() - size;
264  std::size_t actual_read_size = std::min(
265  std::max(std::size_t(512), myBuffer.capacity() - size),
266  std::min(read_size, limit));
267  impl->resetTimer();
268  // We need more data so read more and then continue parsing.
269  myStream.async_read_some(
270  myBuffer.prepare(actual_read_size), std::move(self));
271  }
272 
273 private:
274  DynamicBuffer& myBuffer;
275  bool myIsFirst = true;
276  bool myIsCompressed = false;
277  exint myReadPacketSize = -1;
278  exint myFinalPacketSize = -1;
279  std::size_t myBytesWritten = 0;
280  AsyncStream& myStream;
282 };
283 
284 // -----------------------------------------------------------------------------
285 // NET_IPacketSocket::AsyncWriteImpl
286 // -----------------------------------------------------------------------------
287 
288 template <typename AsyncStream>
289 class NET_PacketSocket::AsyncWriteImpl
290 {
291 public:
292  AsyncWriteImpl(
293  AsyncStream& stream,
295  UT_StringArray&& write_queue)
296  : myStream(stream), myImpl(impl), myWriteQueue(write_queue)
297  {
298  }
299 
300  template <typename Self>
301  void operator()(
302  Self& self,
303  hboost::system::error_code ec = {},
304  std::size_t bytes_transfered = 0)
305  {
306  myBytesWritten += bytes_transfered;
307 
308  UT_SharedPtr<NET_PacketSocket::Impl> impl = myImpl.lock();
309  if (!impl)
310  {
311  return self.complete(
312  hboost::asio::error::operation_aborted, myBytesWritten);
313  }
314 
315  if (ec)
316  {
317  return self.complete(ec, myBytesWritten);
318  }
319 
320  exint total = bytes_transfered;
321  while (total > 0 && myWriteQueue.size() > 0)
322  {
323  exint i = myIndex + total;
324  if (i >= myWriteQueue[0].length())
325  {
326  exint remain = myWriteQueue[0].length() - myIndex;
327  total -= remain;
328  myIndex = 0;
329  myWriteQueue.removeIndex(0);
330  }
331  else
332  {
333  myIndex += total;
334  break;
335  }
336  }
337 
338  if (myWriteQueue.size() == 0)
339  {
340  return self.complete({}, myBytesWritten);
341  }
342 
343  // To improve performance dont use scatter/gather if the messages sent
344  // are only 1.
345  if (myWriteQueue.size() == 1)
346  {
348  myWriteQueue[0].buffer() + myIndex,
349  myWriteQueue[0].length() - myIndex);
350  myStream.async_write_some(buffer, std::move(self));
351  }
352  else
353  {
354  std::vector<ASIO_ConstBuffer> buffers;
355  buffers.reserve(myWriteQueue.size());
356  buffers.emplace_back(
357  myWriteQueue[0].buffer() + myIndex,
358  myWriteQueue[0].length() - myIndex);
359  for (exint i = 1; i < myWriteQueue.size(); i++)
360  buffers.emplace_back(
361  myWriteQueue[i].buffer(), myWriteQueue[i].length());
362  myStream.async_write_some(buffers, std::move(self));
363  }
364  }
365 
366 private:
367  UT_StringArray myWriteQueue;
368  exint myIndex = 0;
369  std::size_t myBytesWritten = 0;
370  AsyncStream& myStream;
372 };
373 
374 template <typename DynamicBuffer, typename Handler>
376  Handler,
377  void(hboost::system::error_code, std::size_t))
378 NET_PacketSocket::asyncRead(DynamicBuffer& buffer, Handler&& handler)
379 {
381  return hboost::asio::async_compose<
382  Handler,
383  void(hboost::system::error_code, std::size_t)>(
384  AsyncReadImpl{myImpl->stream(), myImpl, buffer}, handler,
385  myImpl->stream());
386 }
387 
388 template <typename Handler>
390  Handler,
391  void(hboost::system::error_code, std::size_t))
392 NET_PacketSocket::asyncWrite(UT_StringArray&& queue, Handler&& handler)
393 {
394  // Convert each of the messages to the packet format so that we can
395  // send all of them in one go instead of having to send each packet
396  // one at a time.
397  for (UT_StringHolder& msg : queue)
398  {
399  UT_WorkBuffer wbuf;
400  // Always include the null terminator. I have no clue why this is needed
401  // but there are some checks that specifically include the null
402  // terminator.
403  uint32 orig_len = static_cast<uint32>(msg.length() + 1);
404  uint32 len = orig_len;
405  UTtomips(len);
406  // TODO: add support for compressed responses
407  wbuf.append((char*)&len, sizeof(uint32));
408  wbuf.append('\0');
409  wbuf.append(msg);
410  // Make sure to always include the null character.
411  wbuf.append('\0');
412  msg = std::move(wbuf);
413  }
414 
415  return hboost::asio::async_compose<
416  Handler, void(hboost::system::error_code, std::size_t)>(
417  AsyncWriteImpl{myImpl->stream(), myImpl, std::move(queue)},
418  handler, myImpl->stream()
419  );
420 }
421 
422 #endif // __NET_PACKETSOCKET_H__
GLuint GLuint stream
Definition: glcorearb.h:1832
hboost::asio::ip::tcp::socket ASIO_TCPSocket
Definition: UT_BoostAsio.h:44
*get result *(waiting if necessary)*A common idiom is to fire a bunch of sub tasks at the and then *wait for them to all complete We provide a helper class
Definition: thread.h:623
void
Definition: png.h:1083
hboost::asio::const_buffer ASIO_ConstBuffer
Definition: UT_BoostAsio.h:62
GLboolean * data
Definition: glcorearb.h:131
GLsizei const GLfloat * value
Definition: glcorearb.h:824
hboost::asio::mutable_buffer ASIO_MutableBuffer
Definition: UT_BoostAsio.h:63
int64 exint
Definition: SYS_Types.h:125
GLdouble s
Definition: glad.h:3009
GLuint GLsizei GLsizei * length
Definition: glcorearb.h:795
NET_API bool net_uncompressBuffer(UT_WorkBuffer &wbuf, exint comp_size, exint orig_size)
ImageBuf OIIO_API min(Image_or_Const A, Image_or_Const B, ROI roi={}, int nthreads=0)
hboost::asio::deadline_timer ASIO_DeadlineTimer
Definition: UT_BoostAsio.h:69
void close() override
GLuint buffer
Definition: glcorearb.h:660
UT_ErrorCode make_error_code(NET::CurlError e)
#define NET_API
Definition: NET_API.h:9
void UTtomips(int16 *, int64)
Definition: UT_Endian.h:145
std::enable_shared_from_this< T > UTenable_shared_from_this
Definition: UT_SharedPtr.h:39
Definition: core.h:760
std::shared_ptr< T > UT_SharedPtr
Wrapper around std::shared_ptr.
Definition: UT_SharedPtr.h:36
#define UT_NON_COPYABLE(CLASS)
Define deleted copy constructor and assignment operator inside a class.
const GLuint * buffers
Definition: glcorearb.h:661
*get result *(waiting if necessary)*A common idiom is to fire a bunch of sub tasks at the queue
Definition: thread.h:623
HBOOST_ASIO_INITFN_RESULT_TYPE(Handler, void(hboost::system::error_code, std::size_t)) NET_PacketSocket
GLsizeiptr size
Definition: glcorearb.h:664
ASIO_TCPSocket & nextLayer()
SYS_FORCE_INLINE void append(char character)
ImageBuf OIIO_API max(Image_or_Const A, Image_or_Const B, ROI roi={}, int nthreads=0)
unsigned int uint32
Definition: SYS_Types.h:40
std::weak_ptr< T > UT_WeakPtr
Definition: UT_SharedPtr.h:49
Definition: format.h:895