HDK
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
UT_BloscCompressionFilter.h
Go to the documentation of this file.
1 /*
2  * PROPRIETARY INFORMATION. This software is proprietary to
3  * Side Effects Software Inc., and is not to be reproduced,
4  * transmitted, or disclosed in any way without written permission.
5  *
6  * NAME: UT_BloscCompressionFilter.C (C++)
7  *
8  * COMMENTS: .
9  */
10 
11 #ifndef UT_BLOSCCOMPRESSIONFILTER_H_
12 #define UT_BLOSCCOMPRESSIONFILTER_H_
13 
14 #include "UT_Assert.h"
15 #include "UT_SCFCommon.h"
16 #include "UT_UniquePtr.h"
17 #include <SYS/SYS_BoostStreams.h>
18 
19 #include <iostream>
20 #include <cstdio>
21 
22 #include <blosc.h>
23 
24 
25 ///
26 /// The blosc compression filter is an implementation of boost multicharacter output filter
27 /// It can be used in boost output streams to compress data using blosc on write.
28 ///
29 class UT_BloscCompressionFilter : public bios::multichar_output_filter
30 {
31  /// The number of bytes for the atomic type in the binary input stream
32  size_t myTypeSize;
33 
34  /// Block size for compression
35  size_t myBlockSize;
36 
37  /// The BLOSC compression level (0-9)
38  int myCompressionLevel;
39 
40  /// Whether or not to do the shuffling
41  bool myDoShuffle;
42 
43  /// Buffer for holding input and output
44  UT_UniquePtr<char[]> myInputBuffer; // Size = blockSize
45  UT_UniquePtr<char[]> myOutputBuffer; // size = blockSize+BLOSC_MAX_OVERHEAD
46  size_t myInputBufferUsage;
47 
48  /// Current location in compressed file
49  int64 myCurrentLocation;
50 
51  /// Location of compressed blocks
52  UT_CompressedBlockIndex myBlockIndex;
53 
54 public:
55  typedef char char_type;
56 
57  /// Blosc Compression Filter
58  /// Construct and push onto boost stream.
59  /// @param[in] typeSize The byte size of the atomic type in the binary stream
60  /// @param[in] blockSize The block size to force blosc to use.
61  /// @param[in] doShuffle True to run the shuffler. Default = true
62  /// @param[in] compressionLevel The compression level to give blosc
63  /// possible values are [1-9] with 9 being the most compression.
64  explicit UT_BloscCompressionFilter(size_t typeSize, size_t blockSize,
65  bool doShuffle = true, int compressionLevel = 9);
66  ~UT_BloscCompressionFilter() = default;
67 
68  // DANGER: Boost will copy this class around a few times before
69  // myInputBuffer/myOutputBuffer is allocated. We assume that if we're
70  // copied again, then its use will be restarted.
73 
74  /// Retrieves the list of locations in the file where the blocks are
75  /// WARNING: This method intentionally returns a copy of the index since its
76  /// liable to change while filtering.
78 
79  /// Retrieves the current location in the file
80  exint getCurrentLocation() const;
81 
82  /// Compresses all data available in the input buffer. Writing
83  /// it to the output buffer. Returns the compressed size if the
84  /// data put in the output buffer.
85  std::streamsize doCompression();
86 
87  template<typename Sink>
88  std::streamsize write(Sink& dest, const char* sourceBuffer,
89  std::streamsize sourceBufferSize);
90 
91  template<typename Sink>
92  void close(Sink& dest);
93 };
94 
96  size_t typeSize,
97  size_t blockSize,
98  bool doShuffle,
99  int compressionLevel)
100  : myTypeSize(typeSize)
101  , myBlockSize(blockSize)
102  , myCompressionLevel(compressionLevel)
103  , myDoShuffle(doShuffle)
104  , myInputBuffer(nullptr)
105  , myOutputBuffer(nullptr)
106  , myInputBufferUsage(0)
107  , myCurrentLocation(0)
108  , myBlockIndex()
109 {
110 }
111 
112 // DANGER: Boost will copy this class around a few times before
113 // myInputBuffer/myOutputBuffer is allocated. We assume that if we're
114 // copied again, then its use will be restarted.
117  : myTypeSize(copy.myTypeSize)
118  , myBlockSize(copy.myBlockSize)
119  , myCompressionLevel(copy.myCompressionLevel)
120  , myDoShuffle(copy.myDoShuffle)
121  , myInputBuffer(nullptr)
122  , myOutputBuffer(nullptr)
123  , myInputBufferUsage(0)
124  , myCurrentLocation(0)
125  , myBlockIndex()
126 {
127 }
128 
131 {
132  if (this != &copy)
133  {
134  myTypeSize = copy.myTypeSize;
135  myBlockSize = copy.myBlockSize;
136  myCompressionLevel = copy.myCompressionLevel;
137  myDoShuffle = copy.myDoShuffle;
138 
139  myInputBuffer = nullptr;
140  myOutputBuffer = nullptr;
141  myInputBufferUsage = 0;
142  myCurrentLocation = 0;
143  myBlockIndex = UT_CompressedBlockIndex();
144  }
145  return *this;
146 }
147 
150 {
151  return myBlockIndex;
152 }
153 
154 inline exint
156 {
157  return myCurrentLocation;
158 }
159 
160 inline std::streamsize
162 {
163  // Compress buffer and write to output buffer
164  std::streamsize compressedSize = blosc_compress_ctx(
165  myCompressionLevel, myDoShuffle ? 1 : 0, myTypeSize,
166  myInputBufferUsage, myInputBuffer.get(), myOutputBuffer.get(),
167  myBlockSize + BLOSC_MAX_OVERHEAD, "lz4", myBlockSize, 1);
168  // Clear the input buffer
169  myInputBufferUsage = 0;
170  // Blosc returns -1 on failure to compress
171  // Returns 0 if not enough buffer to compress
172  if (compressedSize == -1)
173  {
174  std::cerr << "Problem compressing, blosc returned: " << compressedSize
175  << std::endl;
176  return -1; // Give up (EOF)
177  }
178  return compressedSize;
179 }
180 
181 template <typename Sink>
182 inline std::streamsize
184  Sink& dest,
185  const char* sourceBuffer,
186  std::streamsize sourceBufferSize)
187 {
188  // Allocate input and output buffer if not allocated yet
189  // This is done because BOOST copies us around a bit before
190  // using the filter.
191  if (!myInputBuffer) {
192  UT_ASSERT(myOutputBuffer == NULL);
193  myInputBuffer = UTmakeUnique<char[]>(myBlockSize);
194  myOutputBuffer = UTmakeUnique<char[]>(myBlockSize + BLOSC_MAX_OVERHEAD);
195  }
196 
197  // Keep original buffer size as we need to return it
198  std::streamsize totalRead = sourceBufferSize;
199  // While we have stuff to read
200  while (sourceBufferSize > 0)
201  {
202  // Read all available characters until input buffer is full
203  // or we run out of characters
204  std::streamsize ammountToCopy = std::min(sourceBufferSize,
205  (std::streamsize) ((myBlockSize - myInputBufferUsage)));
206  memcpy(myInputBuffer.get() + myInputBufferUsage, sourceBuffer,
207  ammountToCopy);
208  myInputBufferUsage += ammountToCopy;
209  // Modify size available to read and increment source buffer
210  sourceBufferSize -= ammountToCopy;
211  sourceBuffer += ammountToCopy;
212  // If our buffer is full
213  if (myInputBufferUsage == myBlockSize)
214  {
215  // Run compression
216  std::streamsize compressedSize = doCompression();
217  // -1 indicates an error occoured so return -1 (signal eof)
218  if (compressedSize == -1)
219  {
220  return -1;
221  }
222  // Write the compressed block to the stream
223  bios::write<Sink>(dest, myOutputBuffer.get(), compressedSize);
224 
225  // Increment our counter of file location
226  myCurrentLocation += compressedSize;
227 
228  // Write out location of next block
229  myBlockIndex.append(myCurrentLocation);
230  }
231  }
232  UT_ASSERT(sourceBufferSize == 0);
233  return totalRead;
234 }
235 
236 template<typename Sink>
237 inline void
239 {
240  // Compress the rest of the data we have
241  if (myInputBufferUsage > 0)
242  {
243  // Remember the uncompressed size so we can give to index
244  std::streamsize uncompressedSize = myInputBufferUsage;
245 
246  // Run compression
247  std::streamsize compressedSize = doCompression();
248 
249  // Write the final block to the sink
250  bios::write<Sink>(dest, myOutputBuffer.get(), compressedSize);
251 
252  // Increment our current location so we can return an
253  // accurate final size
254  myCurrentLocation += compressedSize;
255 
256  // The index needs to know how big the final block is
257  myBlockIndex.setBlockSize(myBlockSize, uncompressedSize);
258 
259  // NOTE: We don't write out the final index here because
260  // we write out the index in advance!
261  } else
262  {
263  // In this case (where we have a perfect multiple of blocksize)
264  // we write an index location that was not actually present.
265  // So we remove it.
266  myBlockIndex.removeLast();
267 
268  // and we call the last block to be normal sized
269  myBlockIndex.setBlockSize(myBlockSize, myBlockSize);
270  }
271 }
272 
273 #endif /* UT_BLOSCCOMPRESSIONFILTER_H_ */
void setBlockSize(exint blockSize, exint lastBlockSize)
void removeLast()
Removes the last entry.
UT_BloscCompressionFilter & operator=(const UT_BloscCompressionFilter &copy)
OIIO_UTIL_API bool copy(string_view from, string_view to, std::string &err)
BLOSC_EXPORT int blosc_compress_ctx(int clevel, int doshuffle, size_t typesize, size_t nbytes, const void *src, void *dest, size_t destsize, const char *compressor, size_t blocksize, int numinternalthreads)
int64 exint
Definition: SYS_Types.h:125
ImageBuf OIIO_API min(Image_or_Const A, Image_or_Const B, ROI roi={}, int nthreads=0)
std::unique_ptr< T, Deleter > UT_UniquePtr
A smart pointer for unique ownership of dynamically allocated objects.
Definition: UT_UniquePtr.h:39
long long int64
Definition: SYS_Types.h:116
std::streamsize write(Sink &dest, const char *sourceBuffer, std::streamsize sourceBufferSize)
~UT_BloscCompressionFilter()=default
void append(BlockLocationType entry)
Adds entry to the end of the index.
#define BLOSC_MAX_OVERHEAD
Definition: blosc.h:39
UT_CompressedBlockIndex getIndex() const
VULKAN_HPP_INLINE VULKAN_HPP_CONSTEXPR_14 uint8_t blockSize(VULKAN_HPP_NAMESPACE::Format format)
#define UT_ASSERT(ZZ)
Definition: UT_Assert.h:156
exint getCurrentLocation() const
Retrieves the current location in the file.
UT_BloscCompressionFilter(size_t typeSize, size_t blockSize, bool doShuffle=true, int compressionLevel=9)