13 #ifndef __NET_PACKETSOCKET_H__
14 #define __NET_PACKETSOCKET_H__
37 : myTimer(socket.get_executor()), myStream(std::move(socket))
45 myTimer.expires_from_now(hboost::posix_time::minutes(10));
47 [me = weak_from_this()](
48 const hboost::system::error_code& ec)
50 if (ec == hboost::asio::error::operation_aborted)
52 if (
auto s = me.lock();
s)
66 hboost::system::error_code ignored_ec;
67 myStream.shutdown(ASIO_TCPSocket::shutdown_both, ignored_ec);
68 myStream.close(ignored_ec);
77 hboost::posix_time::time_duration myTimeout;
91 hboost::asio::dispatch(
92 myImpl->stream().get_executor(),
93 [me = myImpl->weak_from_this()]()
95 if (
auto s = me.lock();
s)
105 template <
typename DynamicBuffer,
typename Handler>
108 void(hboost::system::error_code, std::size_t))
109 asyncRead(DynamicBuffer&
buffer, Handler&& handler);
110 template <typename Handler>
112 Handler,
void(hboost::system::error_code, std::
size_t))
116 template <
class AsyncStream,
class DynamicBuffer>
118 template <
class AsyncStream>
119 class AsyncWriteImpl;
128 template <typename AsyncStream, typename DynamicBuffer>
135 DynamicBuffer& buffer)
136 : myStream(stream), myImpl(backend), myBuffer(buffer)
140 template <
typename Self>
143 hboost::system::error_code ec = {},
144 std::size_t bytes_transfered = 0)
146 myBytesWritten += bytes_transfered;
151 return self.complete(
152 hboost::asio::error::operation_aborted, myBytesWritten);
157 return self.complete(ec, myBytesWritten);
165 myStream.async_read_some(myBuffer.prepare(512), std::move(
self));
172 myBuffer.commit(bytes_transfered);
176 if (!myIsCompressed && myReadPacketSize < 0)
179 static constexpr
int header_size = 5;
180 if (myBuffer.size() >= header_size)
183 static_assert(
sizeof(len) == 4);
187 hboost::asio::buffer_copy(tmp, read_buffer);
189 myReadPacketSize = len;
190 myFinalPacketSize = len;
192 if (myReadPacketSize > 4096)
194 return self.complete(
195 hboost::asio::error::message_size, myBytesWritten);
200 = *((
const char*)read_buffer.data() +
sizeof(unsigned))
203 myReadPacketSize = -1;
204 myBuffer.consume(header_size);
208 if (myIsCompressed && myReadPacketSize < 0)
210 static constexpr
int header_size = 4;
211 if (myBuffer.size() >= header_size)
214 static_assert(
sizeof(len) == header_size);
218 hboost::asio::buffer_copy(tmp, read_buffer);
220 myReadPacketSize = len;
221 myBuffer.consume(header_size);
225 if (myFinalPacketSize < myReadPacketSize)
227 return self.complete(
228 hboost::asio::error::connection_aborted,
235 if (myReadPacketSize >= 0 && myBuffer.size() >= myReadPacketSize)
239 (
const char*)read_buffer.data(), myReadPacketSize);
244 data, myReadPacketSize, myFinalPacketSize))
246 return self.complete(
248 hboost::system::errc::bad_message),
254 self.complete({}, myBytesWritten);
259 std::size_t read_size = 512;
260 if (myReadPacketSize > 0)
261 read_size =
static_cast<std::size_t
>(myReadPacketSize);
262 const auto size = myBuffer.size();
263 const auto limit = myBuffer.max_size() -
size;
264 std::size_t actual_read_size =
std::min(
269 myStream.async_read_some(
270 myBuffer.prepare(actual_read_size), std::move(
self));
274 DynamicBuffer& myBuffer;
275 bool myIsFirst =
true;
276 bool myIsCompressed =
false;
277 exint myReadPacketSize = -1;
278 exint myFinalPacketSize = -1;
279 std::size_t myBytesWritten = 0;
280 AsyncStream& myStream;
288 template <
typename AsyncStream>
289 class NET_PacketSocket::AsyncWriteImpl
296 : myStream(stream), myImpl(impl), myWriteQueue(write_queue)
300 template <
typename Self>
303 hboost::system::error_code ec = {},
304 std::size_t bytes_transfered = 0)
306 myBytesWritten += bytes_transfered;
311 return self.complete(
312 hboost::asio::error::operation_aborted, myBytesWritten);
317 return self.complete(ec, myBytesWritten);
320 exint total = bytes_transfered;
321 while (total > 0 && myWriteQueue.size() > 0)
323 exint i = myIndex + total;
324 if (i >= myWriteQueue[0].
length())
326 exint remain = myWriteQueue[0].length() - myIndex;
329 myWriteQueue.removeIndex(0);
338 if (myWriteQueue.size() == 0)
340 return self.complete({}, myBytesWritten);
345 if (myWriteQueue.size() == 1)
348 myWriteQueue[0].
buffer() + myIndex,
349 myWriteQueue[0].
length() - myIndex);
350 myStream.async_write_some(
buffer, std::move(
self));
354 std::vector<ASIO_ConstBuffer>
buffers;
355 buffers.reserve(myWriteQueue.size());
356 buffers.emplace_back(
357 myWriteQueue[0].
buffer() + myIndex,
358 myWriteQueue[0].
length() - myIndex);
359 for (
exint i = 1; i < myWriteQueue.size(); i++)
360 buffers.emplace_back(
361 myWriteQueue[i].buffer(), myWriteQueue[i].length());
362 myStream.async_write_some(buffers, std::move(
self));
369 std::size_t myBytesWritten = 0;
370 AsyncStream& myStream;
374 template <
typename DynamicBuffer,
typename Handler>
377 void(hboost::system::error_code, std::size_t))
381 return hboost::asio::async_compose<
383 void(hboost::system::error_code, std::size_t)>(
384 AsyncReadImpl{myImpl->stream(), myImpl, buffer}, handler,
388 template <
typename Handler>
391 void(hboost::system::error_code, std::size_t))
403 uint32 orig_len =
static_cast<uint32>(msg.length() + 1);
412 msg = std::move(wbuf);
415 return hboost::asio::async_compose<
416 Handler,
void(hboost::system::error_code, std::size_t)>(
417 AsyncWriteImpl{myImpl->stream(), myImpl, std::move(queue)},
418 handler, myImpl->stream()
422 #endif // __NET_PACKETSOCKET_H__
hboost::asio::ip::tcp::socket ASIO_TCPSocket
*get result *(waiting if necessary)*A common idiom is to fire a bunch of sub tasks at the and then *wait for them to all complete We provide a helper class
hboost::asio::const_buffer ASIO_ConstBuffer
GLsizei const GLfloat * value
hboost::asio::mutable_buffer ASIO_MutableBuffer
GLuint GLsizei GLsizei * length
NET_API bool net_uncompressBuffer(UT_WorkBuffer &wbuf, exint comp_size, exint orig_size)
ImageBuf OIIO_API min(Image_or_Const A, Image_or_Const B, ROI roi={}, int nthreads=0)
hboost::asio::deadline_timer ASIO_DeadlineTimer
UT_ErrorCode make_error_code(NET::CurlError e)
void UTtomips(int16 *, int64)
std::enable_shared_from_this< T > UTenable_shared_from_this
std::shared_ptr< T > UT_SharedPtr
Wrapper around std::shared_ptr.
#define UT_NON_COPYABLE(CLASS)
Define deleted copy constructor and assignment operator inside a class.
*get result *(waiting if necessary)*A common idiom is to fire a bunch of sub tasks at the queue
HBOOST_ASIO_INITFN_RESULT_TYPE(Handler, void(hboost::system::error_code, std::size_t)) NET_PacketSocket
ASIO_TCPSocket & nextLayer()
SYS_FORCE_INLINE void append(char character)
ImageBuf OIIO_API max(Image_or_Const A, Image_or_Const B, ROI roi={}, int nthreads=0)
std::weak_ptr< T > UT_WeakPtr