Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 36 additions & 5 deletions sql/sql_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10145,11 +10145,15 @@ int TABLE::unlock_hlindexes()

int TABLE::hlindexes_on_insert()
{
DBUG_ASSERT(s->hlindexes() == (hlindex != NULL));
if (hlindex && hlindex->in_use)
if (int err= mhnsw_insert(this, key_info + s->keys))
return err;
return 0;
DBUG_ASSERT(s->hlindexes() == (hlindex != NULL));
if (hlindex && hlindex->in_use)
{
if (hlindex->bulk_insert_active)
return mhnsw_bulk_insert_row(this, key_info + s->keys);
else
return mhnsw_insert(this, key_info + s->keys);
}
return 0;
Comment on lines +10148 to +10156
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The member bulk_insert_active was added to the TABLE struct in sql/table.h, but here it is accessed via hlindex->bulk_insert_active. Since hlindex is not of type TABLE*, this will cause a compilation error. It should be accessed directly as bulk_insert_active (or this->bulk_insert_active).

Additionally, please update the indentation to 2 spaces to match the repository's implicit style guide.

  DBUG_ASSERT(s->hlindexes() == (hlindex != NULL));
  if (hlindex && hlindex->in_use)
  {
    if (bulk_insert_active)
      return mhnsw_bulk_insert_row(this, key_info + s->keys);
    else
      return mhnsw_insert(this, key_info + s->keys);
  }
  return 0;

}

int TABLE::hlindexes_on_update()
Expand Down Expand Up @@ -10208,3 +10212,30 @@ int TABLE::hlindex_read_end()
{
return mhnsw_read_end(this);
}

int TABLE::hlindexes_bulk_insert_begin(ha_rows rows)
{
if (s->hlindexes())
{
if (!hlindex || !hlindex->in_use)
if (int err= open_hlindexes_for_write())
return err;

if (hlindex && hlindex->in_use)
{
hlindex->bulk_insert_active= true;
return mhnsw_bulk_insert_begin(this, key_info + s->keys, rows);
}
}
return 0;
}

int TABLE::hlindexes_bulk_insert_end()
{
if (hlindex && hlindex->in_use)
{
hlindex->bulk_insert_active= false;
return mhnsw_bulk_insert_end(this, key_info + s->keys);
}
return 0;
}
Comment on lines +10216 to +10241
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There are three issues here:

  1. bulk_insert_active is a member of TABLE, not hlindex, so accessing it via hlindex->bulk_insert_active will cause a compilation error.
  2. If mhnsw_bulk_insert_begin fails (returns a non-zero error code), bulk_insert_active should be reset to false so that subsequent inserts do not incorrectly attempt bulk insertion.
  3. The indentation should be updated to 2 spaces to match the repository's implicit style guide.
int TABLE::hlindexes_bulk_insert_begin(ha_rows rows)
{
  if (hlindex && hlindex->in_use)
  {
    bulk_insert_active= true;
    int err= mhnsw_bulk_insert_begin(this, key_info + s->keys, rows);
    if (err)
      bulk_insert_active= false;
    return err;
  }
  return 0;
}

int TABLE::hlindexes_bulk_insert_end()
{
  if (hlindex && hlindex->in_use)
  {
    bulk_insert_active= false;
    return mhnsw_bulk_insert_end(this, key_info + s->keys);
  }
  return 0;
}

23 changes: 19 additions & 4 deletions sql/sql_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12616,6 +12616,7 @@ copy_data_between_tables(THD *thd, TABLE *from, TABLE *to,
bool make_unversioned= from->versioned() && !to->versioned();
bool keep_versioned= from->versioned() && to->versioned();
bool bulk_insert_started= 0;
bool hlindex_bulk_started= 0;
Field *to_row_start= NULL, *to_row_end= NULL, *from_row_end= NULL;
MYSQL_TIME query_start;
DBUG_ENTER("copy_data_between_tables");
Expand Down Expand Up @@ -12662,11 +12663,17 @@ copy_data_between_tables(THD *thd, TABLE *from, TABLE *to,

from->file->info(HA_STATUS_VARIABLE);
to->file->extra(HA_EXTRA_PREPARE_FOR_ALTER_TABLE);
if (!to->s->long_unique_table && !to->s->hlindexes())
if (!to->s->long_unique_table)
{
to->file->ha_start_bulk_insert(from->file->stats.records,
ignore ? 0 : HA_CREATE_UNIQUE_INDEX_BY_SORT);
bulk_insert_started= 1;
to->file->ha_start_bulk_insert(from->file->stats.records,
ignore ? 0 : HA_CREATE_UNIQUE_INDEX_BY_SORT);
bulk_insert_started= 1;

if (to->s->hlindexes())
{
to->hlindexes_bulk_insert_begin(from->file->stats.records);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think error handling is needed here.

hlindex_bulk_started= 1;
}
}
Comment on lines +12666 to 12677
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The return value of to->hlindexes_bulk_insert_begin is currently ignored. If the bulk insert initialization fails, we should handle the error, print it, and avoid setting hlindex_bulk_started to 1 so that we do not proceed with the bulk insert or attempt to end it later.

Additionally, please update the indentation to 2 spaces to match the repository's implicit style guide.

  if (!to->s->long_unique_table)
  {
    to->file->ha_start_bulk_insert(from->file->stats.records,
                                   ignore ? 0 : HA_CREATE_UNIQUE_INDEX_BY_SORT);
    bulk_insert_started= 1;

    if (to->s->hlindexes())
    {
      if ((error= to->hlindexes_bulk_insert_begin(from->file->stats.records)))
      {
        to->file->print_error(error, MYF(0));
      }
      else
      {
        hlindex_bulk_started= 1;
      }
    }
  }

mysql_stage_set_work_estimated(thd->m_stage_progress_psi, from->file->stats.records);
List_iterator<Create_field> it(alter_info->create_list);
Expand Down Expand Up @@ -12999,6 +13006,14 @@ copy_data_between_tables(THD *thd, TABLE *from, TABLE *to,
}

bulk_insert_started= 0;
if (hlindex_bulk_started && to->hlindexes_bulk_insert_end() && error <= 0)
{
if (!thd->is_error())
to->file->print_error(my_errno, MYF(0));
error= 1;
}
hlindex_bulk_started=0;
Comment on lines +13009 to +13015
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Please update the indentation to 2 spaces and add proper spacing around operators (e.g., hlindex_bulk_started = 0;) to match the repository's implicit style guide.

  if (hlindex_bulk_started && to->hlindexes_bulk_insert_end() && error <= 0)
  {
    if (!thd->is_error())
      to->file->print_error(my_errno, MYF(0));
    error= 1;
  }
  hlindex_bulk_started= 0;


if (error <= 0 && !to->s->hlindexes())
{
Abort_on_warning_instant_set save_abort_on_warning(thd, false);
Expand Down
3 changes: 3 additions & 0 deletions sql/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -1632,6 +1632,7 @@ struct TABLE
*/
bool alias_name_used; /* true if table_name is alias */
bool get_fields_in_item_tree; /* Signal to fix_field */
bool bulk_insert_active; /* mhnsw bulk_insert_started flag */
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The new member variable bulk_insert_active is not initialized. To prevent potential uninitialized memory bugs, it is highly recommended to initialize it to false directly in the struct definition using C++ member initialization.

  bool bulk_insert_active= false;    /* mhnsw bulk_insert_started flag */

private:
bool m_needs_reopen;
bool created; /* For tmp tables. TRUE <=> tmp table was actually created.*/
Expand Down Expand Up @@ -1875,6 +1876,8 @@ struct TABLE
int hlindexes_on_update();
int hlindexes_on_delete(const uchar *buf);
int hlindexes_on_delete_all(bool truncate);
int hlindexes_bulk_insert_begin(ha_rows rows);
int hlindexes_bulk_insert_end();
int unlock_hlindexes();

void prepare_triggers_for_insert_stmt_or_event();
Expand Down
196 changes: 193 additions & 3 deletions sql/vector_mhnsw.cc
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ class MHNSW_Share : public Sql_alloc
const uint M;
metric_type metric;
bool use_subdist;

bool bulk_active;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explicitely initialize this to false?

MHNSW_Share(TABLE *t)
: tref_len(t->file->ref_length), gref_len(t->hlindex->file->ref_length),
M(static_cast<uint>(t->s->key_info[t->s->keys].option_struct->M)),
Expand Down Expand Up @@ -1012,6 +1012,8 @@ int FVectorNode::load_from_record(TABLE *graph)
FVector *vec_ptr= FVector::align_ptr(tref() + tref_len());
memcpy(vec_ptr->data(), v->ptr(), v->length());
vec_ptr->postprocess(ctx->use_subdist, ctx->vec_len);
if (ctx->metric == COSINE)
vec_ptr->abs2= 0.5f;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you already prepared #5184 to fix this separately, good.


longlong layer= graph->field[FIELD_LAYER]->val_int();
if (layer > 100) // 10e30 nodes at M=2, more at larger M's
Expand Down Expand Up @@ -1266,8 +1268,9 @@ static int update_second_degree_neighbors(MHNSW_param *p, FVectorNode *node)
if (int err= select_neighbors(p, neigh, neighneighbors, node,
max_neighbors))
return err;
if (int err= neigh->save(p->graph))
return err;
if (!p->ctx->bulk_active)
if (int err= neigh->save(p->graph))
return err;
}
return 0;
}
Expand Down Expand Up @@ -1504,6 +1507,193 @@ int mhnsw_insert(TABLE *table, KEY *keyinfo)
}


struct MHNSW_Bulk_context : public Sql_alloc {
MHNSW_Share *ctx;
DYNAMIC_ARRAY nodes;
ha_rows estimated_rows;
uint8_t current_max_layer;
};

int mhnsw_bulk_insert_begin(TABLE *table, KEY *keyinfo, ha_rows rows)
{
TABLE *graph= table->hlindex;
DBUG_ASSERT(graph);
DBUG_ASSERT(keyinfo->algorithm == HA_KEY_ALG_VECTOR);
DBUG_ASSERT(keyinfo->usable_key_parts == 1);

MHNSW_Bulk_context *bulk= new (table->in_use->mem_root) MHNSW_Bulk_context();
if (!bulk)
return HA_ERR_OUT_OF_MEM;

bulk->estimated_rows= rows;
if (my_init_dynamic_array(PSI_INSTRUMENT_MEM, &bulk->nodes, sizeof(FVectorNode*),
rows + rows * 0.1, rows, MYF(0)))
{
delete bulk;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this delete bulk statements and the one few lines below can be removed.

return HA_ERR_OUT_OF_MEM;
}

int err= MHNSW_Share::acquire(&bulk->ctx, table, true);
if (err && err != HA_ERR_END_OF_FILE && err != HA_ERR_KEY_NOT_FOUND)
{
delete_dynamic(&bulk->nodes);
delete bulk;
return err;
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you may add DBUG_ASSERT(!bulk->ctx->start); to document that the bulk build assumes an empty target graph

bulk->ctx->bulk_active= 1;
bulk->current_max_layer= 0;
table->hlindex->context= bulk;
return 0;
}

int mhnsw_bulk_insert_row(TABLE *table, KEY *keyinfo)
{
TABLE *graph= table->hlindex;
MHNSW_Bulk_context *bulk= (MHNSW_Bulk_context*)graph->context;
MHNSW_Share *ctx= bulk->ctx;
MY_BITMAP *old_map= dbug_tmp_use_all_columns(table, &table->read_set);

DBUG_ASSERT(graph);
DBUG_ASSERT(bulk);
DBUG_ASSERT(keyinfo->algorithm == HA_KEY_ALG_VECTOR);
DBUG_ASSERT(keyinfo->usable_key_parts == 1);

Field *vec_field= keyinfo->key_part->field;
String buf, *res= vec_field->val_str(&buf);

DBUG_ASSERT(vec_field->binary());
DBUG_ASSERT(vec_field->cmp_type() == STRING_RESULT);
DBUG_ASSERT(res); // ER_INDEX_CANNOT_HAVE_NULL
DBUG_ASSERT(res->length() > 0 && res->length() % 4 == 0);
DBUG_ASSERT(table->file->ref_length <= graph->field[FIELD_TREF]->field_length);

table->file->position(table->record[0]);

if (ctx->byte_len == 0)
ctx->set_lengths(res->length());

if (ctx->byte_len != res->length())
return my_errno= HA_ERR_CRASHED;

const double NORMALIZATION_FACTOR= 1 / std::log(ctx->M);
double log= -std::log(my_rnd(&table->in_use->rand)) * NORMALIZATION_FACTOR;
uint8_t max_layer= bulk->current_max_layer;
uint8_t target_layer= std::min<uint8_t>(static_cast<uint8_t>(std::floor(log)), max_layer + 1);

if (bulk->nodes.elements == 0)
target_layer= 0;

if (target_layer > bulk->current_max_layer)
bulk->current_max_layer= target_layer;

FVectorNode *node= new (ctx->alloc_node())
FVectorNode(ctx, table->file->ref, target_layer, res->ptr());

if (insert_dynamic(&bulk->nodes, (uchar*)&node))
return HA_ERR_OUT_OF_MEM;

dbug_tmp_restore_column_map(&table->read_set, old_map);
return 0;
}

int mhnsw_bulk_insert_end(TABLE *table, KEY *keyinfo)
{
THD *thd= table->in_use;
TABLE *graph= table->hlindex;
MHNSW_Bulk_context *bulk= (MHNSW_Bulk_context*)graph->context;

DBUG_ASSERT(graph);
DBUG_ASSERT(bulk);

MHNSW_Share *ctx= bulk->ctx;
SCOPE_EXIT([ctx, bulk, table](){
delete_dynamic(&bulk->nodes);
ctx->bulk_active= 0;
ctx->release(table);
table->hlindex->context= nullptr;
});

for (uint i= 0; i < bulk->nodes.elements; i++)
{
FVectorNode *target= *(FVectorNode**)dynamic_element(&bulk->nodes, i, FVectorNode**);

if (!ctx->start)
{
ctx->start= target;
continue;
}

MEM_ROOT_SAVEPOINT memroot_sv;
root_make_savepoint(thd->mem_root, &memroot_sv);
SCOPE_EXIT([memroot_sv](){ root_free_to_savepoint(&memroot_sv); });

const uint8_t max_layer= ctx->start->max_layer;
uint8_t target_layer= target->max_layer;

MHNSW_param p(ctx, graph, max_layer);
p.acc.graph_size= 1;

const size_t max_found= ctx->max_neighbors(0);
Neighborhood candidates;
candidates.init(thd->alloc<FVectorNode*>(max_found + 7), max_found);
candidates.links[candidates.num++]= ctx->start;

for (; p.layer > target_layer; p.layer--)
{
if (int err= search_layer(&p, target->vec, NEAREST, 1, &candidates, false))
return err;
}

for (; p.layer >= 0; p.layer--)
{
uint max_neighbors= ctx->max_neighbors(p.layer);
if (int err= search_layer(&p, target->vec, NEAREST, max_neighbors,
&candidates, true))
return err;
if (int err= select_neighbors(&p, target, candidates, 0, max_neighbors))
return err;
}

ctx->add_to_stats(p.acc);

if (target_layer > max_layer)
ctx->start= target;

for (p.layer= target_layer; p.layer >= 0; p.layer--)
{
if (int err= update_second_degree_neighbors(&p, target))
return err;
}
}

graph->file->ha_start_bulk_insert(bulk->nodes.elements, 0);

for (uint i= 0; i < bulk->nodes.elements; i++)
{
FVectorNode *node= *(FVectorNode**)dynamic_element(&bulk->nodes, i, FVectorNode**);
if (int err= node->save(graph))
return err;
}

if (int err= graph->file->ha_end_bulk_insert())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs to be placed inside a SCOPE_EXIT or similar, to avoid premature exit on error if node->save() fails in the loop above.

return err;

if (int err= graph->file->ha_rnd_init(0))
return err;
SCOPE_EXIT([graph](){ graph->file->ha_rnd_end(); });

// fix neighbors grefs
for (uint i= 0; i < bulk->nodes.elements; i++)
{
FVectorNode *node= *(FVectorNode**)dynamic_element(&bulk->nodes, i, FVectorNode**);
if (int err= node->save(graph))
return err;
}

return 0;
}

struct Search_context: public Sql_alloc
{
Neighborhood found;
Expand Down
3 changes: 3 additions & 0 deletions sql/vector_mhnsw.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ int mhnsw_invalidate(TABLE *table, const uchar *rec, KEY *keyinfo);
int mhnsw_delete_all(TABLE *table, KEY *keyinfo, bool truncate);
void mhnsw_free(TABLE_SHARE *share);
Item_func_vec_distance::distance_kind mhnsw_uses_distance(const TABLE *table, KEY *keyinfo);
int mhnsw_bulk_insert_begin(TABLE *table, KEY *keyinfo, ha_rows rows);
int mhnsw_bulk_insert_end(TABLE *table, KEY *keyinfo);
int mhnsw_bulk_insert_row(TABLE *table, KEY *keyinfo);

extern ha_create_table_option mhnsw_index_options[];
extern st_plugin_int *mhnsw_plugin;
Loading