Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions src/sdk/object_io.js
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ class ObjectIO {
params.bucket_master_key_id = obj_upload.bucket_master_key_id;

try {
dbg.log0('upload_object_range: start upload stream', upload_params);
dbg.log1('upload_object_range: start upload stream', upload_params);
return this._upload_stream(params, complete_params);
} catch (err) {
dbg.error('upload_object_range: object part upload failed', upload_params, err);
Expand Down Expand Up @@ -213,7 +213,7 @@ class ObjectIO {
'last_modified_time',
);
try {
dbg.log0('upload_object: start upload', create_params);
dbg.log1('upload_object: start upload', create_params);
const create_reply = await params.client.object.create_object_upload(create_params);
params.obj_id = create_reply.obj_id;
params.tier_id = create_reply.tier_id;
Expand All @@ -228,7 +228,7 @@ class ObjectIO {
await this._upload_stream(params, complete_params);
}

dbg.log0('upload_object: complete upload', complete_params);
dbg.log1('upload_object: complete upload', complete_params);

if (params.async_get_last_modified_time) {
complete_params.last_modified_time = await params.async_get_last_modified_time();
Expand Down Expand Up @@ -275,7 +275,7 @@ class ObjectIO {
'num',
);
try {
dbg.log0('upload_multipart: start upload', complete_params);
dbg.log1('upload_multipart: start upload', complete_params);
const multipart_reply = await params.client.object.create_multipart(create_params);
params.tier_id = multipart_reply.tier_id;
params.bucket_id = multipart_reply.bucket_id;
Expand All @@ -289,7 +289,7 @@ class ObjectIO {
} else {
await this._upload_stream(params, complete_params);
}
dbg.log0('upload_multipart: complete upload', complete_params);
dbg.log1('upload_multipart: complete upload', complete_params);
const multipart_params = await params.client.object.complete_multipart(complete_params);
multipart_params.multipart_id = complete_params.multipart_id;
return multipart_params;
Expand Down Expand Up @@ -377,7 +377,7 @@ class ObjectIO {
async _upload_stream_internal(params, complete_params) {

params.desc = _.pick(params, 'obj_id', 'num', 'bucket', 'key');
dbg.log0('UPLOAD:', params.desc, 'streaming to', params.bucket, params.key);
dbg.log1('UPLOAD:', params.desc, 'streaming to', params.bucket, params.key);

// start and seq are set to zero even for multiparts and will be fixed
// when multiparts are combined to object in complete_object_upload
Expand Down Expand Up @@ -490,7 +490,7 @@ class ObjectIO {
params.range.end = params.start;
complete_params.size += chunk.size;
complete_params.num_parts += 1;
dbg.log0('UPLOAD: part', { ...params.desc, start: part.start, end: part.end, seq: part.seq });
dbg.log1('UPLOAD: part', { ...params.desc, start: part.start, end: part.end, seq: part.seq });

if (chunk.size > config.MAX_OBJECT_PART_SIZE) {
throw new Error(`Chunk size=${chunk.size} exceeds ` +
Expand All @@ -500,7 +500,7 @@ class ObjectIO {
return chunk;
});

/**
/**
* passing partial object info we have in this context which will be sent to block_stores
* as block_md.mapping_info so it can be used for recovery in case the db is not available.
* @type {Partial<nb.ObjectInfo>}
Expand Down Expand Up @@ -592,13 +592,19 @@ class ObjectIO {
reader.push(reader.pending.shift());
return;
}
const io_sem_size = _get_io_semaphore_size(requested_size);

// TODO we dont want to use requested_size as end, because we read entire chunks
// and we are better off return the data to the stream buffer
// instead of getting multiple calls from the stream with small slices to return.

const requested_end = Math.min(params.end, reader.pos + requested_size);
if (requested_end <= reader.pos) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thinking here, maybe the logic will be clearer:
if (requested_size === 0 || reader.pos >= params.end) {
As this will give us more understanding that reading is finished

dbg.log1(`READ reader finished. requested end is less than reader pos. requested_end=${requested_end} reader.pos=${reader.pos}`);
reader.push(null);
return;
}

const io_sem_size = _get_io_semaphore_size(requested_end - reader.pos);
this._io_buffers_sem.surround_count(io_sem_size, async () => {
try {
const buffers = await this.read_object({
Expand Down Expand Up @@ -627,7 +633,7 @@ class ObjectIO {
reader.pending.push(missing_buf);
}
}
dbg.log0('READ reader pos', reader.pos);
dbg.log1('READ reader pos', reader.pos);
reader.push(reader.pending.shift());
} else {
reader.push(null);
Expand Down
Loading