HDK
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
UT_BloscCompressionFilter.h
Go to the documentation of this file.
1 /*
2  * PROPRIETARY INFORMATION. This software is proprietary to
3  * Side Effects Software Inc., and is not to be reproduced,
4  * transmitted, or disclosed in any way without written permission.
5  *
6  * NAME: UT_BloscCompressionFilter.C (C++)
7  *
8  * COMMENTS: .
9  */
10 
11 #ifndef UT_BLOSCCOMPRESSIONFILTER_H_
12 #define UT_BLOSCCOMPRESSIONFILTER_H_
13 
14 #include <SYS/SYS_BoostStreams.h>
15 #include <cstdio>
16 #include <iostream>
17 #include <blosc.h>
18 #include "UT_SCFCommon.h"
19 
20 #include "UT_Assert.h"
21 
22 
23 ///
24 /// The blosc compression filter is an implementation of boost multicharacter output filter
25 /// It can be used in boost output streams to compress data using blosc on write.
26 ///
27 class UT_BloscCompressionFilter : public bios::multichar_output_filter
28 {
29  /// The number of bytes for the atomic type in the binary input stream
30  size_t myTypeSize;
31 
32  /// Block size for compression
33  size_t myBlockSize;
34 
35  /// The BLOSC compression level (0-9)
36  int myCompressionLevel;
37 
38  /// Whether or not to do the shuffling
39  bool myDoShuffle;
40 
41  /// Buffer for holding input and output
42  char* myInputBuffer; // Size = blockSize
43  char* myOutputBuffer; // size = blockSize+BLOSC_MAX_OVERHEAD
44  size_t myInputBufferUsage;
45 
46  /// Current location in compressed file
47  int64 currentLocation;
48 
49  /// Location of compressed blocks
51 
52 public:
53  typedef char char_type;
54 
55  /// Blosc Compression Filter
56  /// Construct and push onto boost stream.
57  /// @param[in] typeSize The byte size of the atomic type in the binary stream
58  /// @param[in] blockSize The block size to force blosc to use.
59  /// @param[in] doShuffle True to run the shuffler. Default = true
60  /// @param[in] compressionLevel The compression level to give blosc
61  /// possible values are [1-9] with 9 being the most compression.
62  explicit UT_BloscCompressionFilter(size_t typeSize, size_t blockSize,
63  bool doShuffle = true, int compressionLevel = 9);
65 
66  /// Retrieves the list of locations in the file where the blocks are
68 
69  /// Retrieves the current location in the file
71 
72  /// Compresses all data available in the input buffer. Writing
73  /// it to the output buffer. Returns the compressed size if the
74  /// data put in the output buffer.
75  std::streamsize doCompression();
76 
77  template<typename Sink>
78  std::streamsize write(Sink& dest, const char* sourceBuffer,
79  std::streamsize sourceBufferSize);
80 
81  template<typename Sink>
82  void close(Sink& dest);
83 };
84 
86  size_t typeSize, size_t blockSize, bool doShuffle, int compressionLevel) :
87  myTypeSize(typeSize), myBlockSize(blockSize), myCompressionLevel(
88  compressionLevel), myDoShuffle(doShuffle), myInputBuffer(NULL), myOutputBuffer(
89  NULL), myInputBufferUsage(0), currentLocation(0), index()
90 {
91 }
92 
94 {
95  delete[] myInputBuffer;
96  delete[] myOutputBuffer;
97 }
98 
100 {
101  return index;
102 }
103 
105 {
106  return currentLocation;
107 }
108 
110 {
111  // Compress buffer and write to output buffer
112  std::streamsize compressedSize = blosc_compress_ctx(myCompressionLevel,
113  myDoShuffle ? 1 : 0, myTypeSize, myInputBufferUsage, myInputBuffer,
114  myOutputBuffer, myBlockSize + BLOSC_MAX_OVERHEAD, "lz4", myBlockSize, 1);
115  // Clear the input buffer
116  myInputBufferUsage = 0;
117  // Blosc returns -1 on failure to compress
118  // Returns 0 if not enough buffer to compress
119  if (compressedSize == -1)
120  {
121  std::cerr << "Problem compressing, blosc returned: " << compressedSize
122  << std::endl;
123  return -1; // Give up (EOF)
124  }
125  return compressedSize;
126 }
127 
128 template<typename Sink>
129 inline std::streamsize UT_BloscCompressionFilter::write(Sink& dest,
130  const char* sourceBuffer, std::streamsize sourceBufferSize)
131 {
132  // Allocate input and output buffer if not allocated yet
133  // This is done because BOOST copies us around a bit before
134  // using the filter.
135  if (!myInputBuffer) {
136  UT_ASSERT(myOutputBuffer == NULL);
137  myInputBuffer = new char[myBlockSize];
138  myOutputBuffer = new char[myBlockSize + BLOSC_MAX_OVERHEAD];
139  }
140 
141  // Keep original buffer size as we need to return it
142  std::streamsize totalRead = sourceBufferSize;
143  // While we have stuff to read
144  while (sourceBufferSize > 0)
145  {
146  // Read all available characters until input buffer is full
147  // or we run out of characters
148  std::streamsize ammountToCopy = std::min(sourceBufferSize,
149  (std::streamsize) ((myBlockSize - myInputBufferUsage)));
150  memcpy(myInputBuffer + myInputBufferUsage, sourceBuffer, ammountToCopy);
151  myInputBufferUsage += ammountToCopy;
152  // Modify size available to read and increment source buffer
153  sourceBufferSize -= ammountToCopy;
154  sourceBuffer += ammountToCopy;
155  // If our buffer is full
156  if (myInputBufferUsage == myBlockSize)
157  {
158  // Run compression
159  std::streamsize compressedSize = doCompression();
160  // -1 indicates an error occoured so return -1 (signal eof)
161  if (compressedSize == -1)
162  {
163  return -1;
164  }
165  // Write the compressed block to the stream
166  bios::write<Sink>(dest, myOutputBuffer, compressedSize);
167 
168  // Increment our counter of file location
169  currentLocation += compressedSize;
170 
171  // Write out location of next block
172  index.append(currentLocation);
173  }
174  }
175  UT_ASSERT(sourceBufferSize == 0);
176  return totalRead;
177 }
178 
179 template<typename Sink>
180 inline void UT_BloscCompressionFilter::close(Sink& dest)
181 {
182  // Compress the rest of the data we have
183  if (myInputBufferUsage > 0)
184  {
185  // Remember the uncompressed size so we can give to index
186  std::streamsize uncompressedSize = myInputBufferUsage;
187 
188  // Run compression
189  std::streamsize compressedSize = doCompression();
190 
191  // Write the final block to the sink
192  bios::write<Sink>(dest, myOutputBuffer, compressedSize);
193 
194  // Increment our current location so we can return an
195  // accurate final size
196  currentLocation += compressedSize;
197 
198  // The index needs to know how big the final block is
199  index.setBlockSize(myBlockSize, uncompressedSize);
200 
201  // NOTE: We don't write out the final index here because
202  // we write out the index in advance!
203  } else
204  {
205  // In this case (where we have a perfect multiple of blocksize)
206  // we write an index location that was not actually present.
207  // So we remove it.
208  index.removeLast();
209 
210  // and we call the last block to be normal sized
211  index.setBlockSize(myBlockSize, myBlockSize);
212  }
213 }
214 
215 #endif /* UT_BLOSCCOMPRESSIONFILTER_H_ */
exint getCurrentLocation()
Retrieves the current location in the file.
long long int64
Definition: SYS_Types.h:107
int64 exint
Definition: SYS_Types.h:116
DLL_EXPORT int blosc_compress_ctx(int clevel, int doshuffle, size_t typesize, size_t nbytes, const void *src, void *dest, size_t destsize, const char *compressor, size_t blocksize, int numinternalthreads)
std::streamsize write(Sink &dest, const char *sourceBuffer, std::streamsize sourceBufferSize)
GLuint index
Definition: glcorearb.h:785
#define BLOSC_MAX_OVERHEAD
Definition: blosc.h:45
#define UT_ASSERT(ZZ)
Definition: UT_Assert.h:126
UT_CompressedBlockIndex getIndex()
Retrieves the list of locations in the file where the blocks are.
const std::enable_if<!VecTraits< T >::IsVec, T >::type & min(const T &a, const T &b)
Definition: Composite.h:129
UT_BloscCompressionFilter(size_t typeSize, size_t blockSize, bool doShuffle=true, int compressionLevel=9)