1616 */
1717public class ParallelDownloader {
1818
19- /* the blob reference */
20- private CloudBlob blob ;
19+ /* instance of the object */
20+ private static ParallelDownloader instance = null ;
2121 /* the number of threads */
2222 private int defaultNumOfThreads = 8 ;
2323 /* the minimum chunk size */
2424 private int minChunkSize = 512 * 1024 ; // 512K
2525 /* the downloader threads pool */
26- private ThreadPuddle downloaderThreadsPool ;
26+ private static ThreadPuddle downloaderThreadsPool ;
2727 /* the factory of thread puddle class */
28- private ThreadPuddleFactory threadPuddleFactory ;
29- /* final count of chunks */
30- private int numOfChunks ;
31- /* completed task's ID so far */
32- private int completedTaskID = -1 ;
33- /* the start offset of the blob to be downloaded from */
34- private int blobOffset ;
35- /* the length of bytes needed to be downloaded,
36- * the length should be within a pre-defined range */
37- private long length ;
38- /* the tasks list */
39- private List <DownloadTask > downloadTasksList = new ArrayList <DownloadTask >();
28+ private static ThreadPuddleFactory threadPuddleFactory ;
29+
4030
4131 /**
4232 * the task class of read/write operations
@@ -54,39 +44,55 @@ public DownloadTask(int taskID, int offset, long length) {
5444 }
5545 }
5646
47+ /**
48+ * constructor
49+ */
50+ private ParallelDownloader (){
51+ initTheDownLoaderThreadsPool (this .defaultNumOfThreads );
52+ }
53+
54+ /**
55+ * get the singleton instance
56+ * @return
57+ */
58+ public synchronized static ParallelDownloader getInstance () {
59+ if (instance == null ){
60+ synchronized (ParallelDownloader .class ) {
61+ if (instance == null ){
62+ instance = new ParallelDownloader ();
63+ }
64+ }
65+ }
66+ return instance ;
67+ }
5768
5869 /**
5970 * initialize the threads pool
6071 */
6172 private final void initTheDownLoaderThreadsPool (int numOfthreads ){
6273 threadPuddleFactory = new ThreadPuddleFactory ();
6374 threadPuddleFactory .setThreads (numOfthreads );
64- threadPuddleFactory .setTaskLimit (numOfthreads );
6575 threadPuddleFactory .setFifo (true );
6676 downloaderThreadsPool = threadPuddleFactory .build ();
6777 }
6878
69- public ParallelDownloader (CloudBlob blob , int blobOffset , long length ){
70- this .blob = blob ;
71- this .blobOffset = blobOffset ;
72- this .length = length ;
73- getFinalNumOfChunks ();
74- initTheDownLoaderThreadsPool (numOfChunks );
75- }
7679
77- private void getFinalNumOfChunks (){
80+ private int getFinalNumOfChunks (long length ){
7881 int tmpBlockCount = (int )((float )length / (float )minChunkSize ) + 1 ;
7982 /* the final number of the chunks */
80- numOfChunks = Math .min (defaultNumOfThreads , tmpBlockCount );
81- return ;
83+ int numOfChunks = Math .min (defaultNumOfThreads , tmpBlockCount );
84+ return numOfChunks ;
8285 }
8386
8487 /**
8588 * generate the parallel tasks
8689 */
87- private final void splitJobIntoTheParallelTasks (){
90+ private final List <DownloadTask > splitJobIntoTheParallelTasks (int blobOffset , long length ){
91+ List <DownloadTask > downloadTasksList = new ArrayList <DownloadTask >();
8892 int taskSequenceID = 0 ;
89- int tmpOffset = this .blobOffset ;
93+ int tmpOffset = blobOffset ;
94+ /* get the number of chunks */
95+ int numOfChunks = getFinalNumOfChunks (length );
9096 /* the final size of the chunk */
9197 long numBtsInEachChunk = (long )(float )(length /numOfChunks );
9298 long bytesLeft = length ;
@@ -109,15 +115,19 @@ private final void splitJobIntoTheParallelTasks(){
109115 /* increment/decrement counters */
110116 bytesLeft -= bytesToRead ;
111117 }
112- return ;
118+ return downloadTasksList ;
113119 }
114120
115- public final byte [] downloadBlobWithParallelThreads () throws Exception {
121+ public final byte [] downloadBlobWithParallelThreads (final CloudBlob blob , final int blobOffset , final long length )
122+ throws Exception {
116123 final byte [] dlJobCentralBuffer = new byte [(int )length ];
124+ /* completed task's ID so far */
125+ final int completedTaskID [] = new int [1 ];
126+ completedTaskID [0 ] = -1 ;
117127 final int failedTasks [] = new int [1 ];
118128 final int totalBtsDownloaded [] = new int [1 ];
119- /* get the download tasks */
120- splitJobIntoTheParallelTasks ();
129+ /* generate the download tasks */
130+ List < DownloadTask > downloadTasksList = splitJobIntoTheParallelTasks (blobOffset , length );
121131 /* no tasks, return immediately */
122132 if (downloadTasksList .size () == 0 ){return dlJobCentralBuffer ;}
123133 /* start the parallel reading */
@@ -137,7 +147,7 @@ public void run()
137147 /* check the downloaded result, break the loop if success*/
138148 if (bytesDownloaded == downloadTask .length ) {
139149 /* wait for the pre-task to be completed */
140- while (completedTaskID != downloadTask .taskID -1 ){
150+ while (completedTaskID [ 0 ] != downloadTask .taskID -1 ){
141151 Thread .sleep ((Constants .DEFAULT_BFC_THREAD_SLEEP_MILLS /5 ));
142152 }
143153 /* get the offset in the central buffer */
@@ -148,7 +158,7 @@ public void run()
148158 /* update the total bytes downloaded */
149159 totalBtsDownloaded [0 ] += bytesDownloaded ;
150160 /* update the offset of the completed task */
151- completedTaskID = downloadTask .taskID ;
161+ completedTaskID [ 0 ] = downloadTask .taskID ;
152162 }
153163
154164
@@ -175,11 +185,11 @@ public void run()
175185 while (true ){
176186 /* if there are any error, we should abandon this read operation */
177187 if (failedTasks [0 ] > 0 ){
178- downloaderThreadsPool .die ();
188+ // downloaderThreadsPool.die();
179189 throw new BfsException ("read Failed:" + blob .getName ());
180190 }
181191 /* completedTaskID start form 0 */
182- if (downloadTasksList .size () == completedTaskID + 1 && totalBtsDownloaded [0 ] == length ){
192+ if (downloadTasksList .size () == completedTaskID [ 0 ] + 1 && totalBtsDownloaded [0 ] == length ){
183193 break ;
184194 }
185195 Thread .sleep ((Constants .DEFAULT_BFC_THREAD_SLEEP_MILLS /5 ));
0 commit comments