33import java .io .ByteArrayInputStream ;
44import java .io .IOException ;
55import java .io .OutputStream ;
6+ import java .nio .charset .StandardCharsets ;
67import java .util .ArrayList ;
78import java .util .List ;
89
10+ import javax .xml .bind .DatatypeConverter ;
11+
912import org .pmw .tinylog .Logger ;
1013
1114import com .microsoft .azure .storage .AccessCondition ;
@@ -33,7 +36,7 @@ public class BlobBufferedOus extends OutputStream {
3336 long totalDataUploaded = 0 ;
3437 long localFileSize = 0 ;
3538 /* the upload chunk size, the should be smaller than the buffer size */
36- int chunkSizeOfBB = Constants .BLOB_BUFFERED_OUTS_BLOCKBLOB_CHUNK_SIZE * 2 ;
39+ int chunkSizeOfBB = Constants .BLOB_BUFFERED_OUTS_BLOCKBLOB_CHUNK_SIZE ;
3740 int chunkNumber = 0 ;
3841 int chunkSizeOfAB = Constants .BLOB_BUFFERED_OUTS_APPENDBLOB_CHUNK_SIZE ;
3942 int chunkSizeOfPB = Constants .BLOB_BUFFERED_OUTS_PAGEBLOB_CHUNK_SIZE ;
@@ -51,9 +54,6 @@ public class BlobBufferedOus extends OutputStream {
5154
5255 /* list of all block ids we will be uploading - need it for the commit at the end */
5356 List <BlockEntry > blockList ;
54-
55- /* get the parallel uploader instance */
56- ParallelUploader parallelUploader = ParallelUploader .getInstance ();
5757
5858 @ SuppressWarnings ("static-access" )
5959 public BlobBufferedOus (BlobReqParams reqParams ) throws BfsException , StorageException {
@@ -99,7 +99,6 @@ public synchronized void write(final byte[] data, final int offset, final int le
9999 /* write data to the buffer */
100100 writeToBuffer (data , offset , length );
101101 /* check the buffered data and the chunk size threshold */
102- /* make sure the data is 512 Bytes aligned for page blob */
103102 if (isBufferedDataReadyToUpload ()){
104103 int numOfdataUploaded = 0 ;
105104 if ((numOfdataUploaded = uploadBlobChunk (centralBuffer , 0 , centralBufOffset )) > 0 ){
@@ -123,12 +122,6 @@ public synchronized final void flush() throws IOException {
123122 try {
124123 if (centralBufOffset > 0 ) {
125124 int numOfdataUploaded = 0 ;
126- /* for page blob, we should make sure the length is 512 Bytes aligned */
127- if (BfsBlobType .PAGEBLOB .equals (this .blobType )) {
128- /* make sure the data is 512 Bytes aligned for page blob */
129- this .centralBufOffset = (int ) BfsUtility .extendTo512BytesAlignedLength (centralBufOffset );
130- this .centralBuffer = BfsUtility .extendTo512BytesAlignedData (this .centralBuffer );
131- }
132125 if ((numOfdataUploaded = uploadBlobChunk (centralBuffer , 0 , centralBufOffset )) > 0 ) {
133126 totalDataUploaded += numOfdataUploaded ;
134127 /* reset the chunk count */
@@ -205,27 +198,34 @@ public synchronized final int uploadBlobChunk (byte[] rawData, int offset, int
205198 try {
206199 /* renew the lease firstly, otherwise the lease may be expired, this will cause error */
207200 blob .renewLease (accCondtion );
208- if (BfsBlobType .BLOCKBLOB .equals (this .blobType )){
209- /* is the block blob */
210- int uploadRes [] = parallelUploader .uploadBlobWithParallelThreads (rawData , blob , offset , length ,
211- blockList , accCondtion , leaseID , chunkNumber );
212- /* update the chunk counter */
213- chunkNumber = uploadRes [1 ];
214- }
215- else
216- {
217- /* is the append blob or page blob */
218- ByteArrayInputStream bInput = new ByteArrayInputStream (rawData , offset , length );
219- if (BfsBlobType .APPENDBLOB .equals (this .blobType )){
220- ((CloudAppendBlob ) blob ).appendBlock (bInput , (long )length , accCondtion , null , null );
221- }else if (BfsBlobType .PAGEBLOB .equals (this .blobType )){
222- /* for page blob, we already make sure the length is 512B aligned */
201+
202+ ByteArrayInputStream bInput = new ByteArrayInputStream (rawData , offset , length );
203+ /* update the chunk counter */
204+ chunkNumber ++;
205+ if (BfsBlobType .BLOCKBLOB .equals (this .blobType )){
206+ /* save chunk id in array (must be base64) */
207+ String chunkId = DatatypeConverter .printBase64Binary (String .format ("BlockId%07d" , chunkNumber ).getBytes (StandardCharsets .UTF_8 ));
208+ //String chunkId = Base64.getEncoder().encodeToString(String.format("BlockId%07d", chunkNumber).getBytes(StandardCharsets.UTF_8));
209+ BlockEntry chunk = new BlockEntry (chunkId );
210+ blockList .add (chunk );
211+ ((CloudBlockBlob ) blob ).uploadBlock (chunkId , bInput , (long )length , accCondtion , null , null );
212+ } else if (BfsBlobType .APPENDBLOB .equals (this .blobType )){
213+ ((CloudAppendBlob ) blob ).appendBlock (bInput , (long )length , accCondtion , null , null );
214+ } else if (BfsBlobType .PAGEBLOB .equals (this .blobType )){
223215 ((CloudPageBlob ) blob ).upload (bInput , length , accCondtion , null , null );
224- }
225- bInput .close ();
226216 }
217+ bInput .close ();
227218 dataUploadedThisChunk = length ;
228- } catch (Exception ex ) {
219+ /* save the block ID list , secure the data */
220+ if (BfsBlobType .BLOCKBLOB .equals (this .blobType )){
221+ /* set leaseID in the meta data */
222+ BlobReqParams cbParams = new BlobReqParams ();
223+ cbParams .setBlobInstance (blob );
224+ cbParams .setLeaseID (leaseID );
225+ BlobService .setBlobMetadata (cbParams ,Constants .BLOB_META_DATA_COMMITED_BLOBKS_KEY , BfsUtility .blockIds (blockList ));
226+ }
227+
228+ } catch (StorageException | IOException ex ) {
229229 String errMessage = "Unexpected exception occurred when uploading to the blob : "
230230 + this .fullBlobPath + ", No. of chunk: " + chunkNumber + "." + ex .getMessage ();
231231 BfsUtility .throwBlobfsException (ex , errMessage );
@@ -234,21 +234,14 @@ public synchronized final int uploadBlobChunk (byte[] rawData, int offset, int
234234 return dataUploadedThisChunk ;
235235 }
236236
237- /** make sure the length is valid
238- * @return
239- */
240237 public synchronized boolean isBufferedDataReadyToUpload () {
241238 boolean result = false ;
242239 if (BfsBlobType .BLOCKBLOB .equals (this .blobType )){
243- if (centralBufOffset > chunkSizeOfBB ){ result = true ;}
240+ if (centralBufOffset > chunkSizeOfBB ){ return result = true ;}
244241 } else if (BfsBlobType .APPENDBLOB .equals (this .blobType )){
245- if (centralBufOffset > chunkSizeOfAB ){ result = true ;}
246- } else if (BfsBlobType .PAGEBLOB .equals (this .blobType )){
247- if (centralBufOffset > chunkSizeOfPB && BfsUtility .is512BytesAligned (centralBufOffset )){
248- result = true ;
249- }
242+ if (centralBufOffset > chunkSizeOfAB ){ return result = true ;}
250243 }
251- /* page blob */
244+ /* page blob */
252245 return result ;
253246 }
254247
@@ -277,16 +270,14 @@ public synchronized void verifyUploadConditions() throws BfsException {
277270 String errMessage = "The size of the source file exceeds the size limit: " + blobSizeLimit + "." ;
278271 throw new BfsException (errMessage );
279272 }
280- /* for block blob and append blob */
281- if (BfsBlobType .BLOCKBLOB .equals (this .blobType ) || (BfsBlobType .APPENDBLOB .equals (this .blobType ))){
282- if (numOfCommitedBlocks + chunkNumber > Constants .BLOB_BLOCK_NUMBER_LIMIT - 1 ){ //50000
283- String errMessage = "The block count of target blob: " + fullBlobPath + " exceeds the " + Constants .BLOB_BLOCK_NUMBER_LIMIT + " count limit." ;
284- throw new BfsException (errMessage );
285- }
273+
274+ if (numOfCommitedBlocks + chunkNumber > Constants .BLOB_BLOCK_NUMBER_LIMIT - 1 ){ //50000
275+ String errMessage = "The block count of target blob: " + fullBlobPath + " exceeds the " + Constants .BLOB_BLOCK_NUMBER_LIMIT + " count limit." ;
276+ throw new BfsException (errMessage );
286277 }
287278 } catch (Exception ex ) {
288279 String errMessage = "Unexpected exception occurred when verifing the upload conditons for to the blob : "
289- + this .fullBlobPath + ". " + ex .getMessage ();
280+ + this .fullBlobPath + "." + ex .getMessage ();
290281 BfsUtility .throwBlobfsException (ex , errMessage );
291282
292283 }
0 commit comments