Skip to content

Commit 341c145

Browse files
authored
fix: Start reprovisioning search index if a migration requires it (#779)
1 parent f8a00c0 commit 341c145

File tree

7 files changed

+195
-88
lines changed

7 files changed

+195
-88
lines changed

bases/renku_data_services/data_api/app.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from renku_data_services.project.blueprints import ProjectsBP, ProjectSessionSecretBP
2626
from renku_data_services.repositories.blueprints import RepositoriesBP
2727
from renku_data_services.search.blueprints import SearchBP
28+
from renku_data_services.search.reprovision import SearchReprovision
2829
from renku_data_services.session.blueprints import BuildsBP, EnvironmentsBP, SessionLaunchersBP
2930
from renku_data_services.storage.blueprints import StorageBP, StorageSchemaBP
3031
from renku_data_services.users.blueprints import KCUsersBP, UserPreferencesBP, UserSecretsBP
@@ -207,12 +208,15 @@ def register_all_handlers(app: Sanic, config: Config) -> Sanic:
207208
name="search2",
208209
url_prefix=url_prefix,
209210
authenticator=config.authenticator,
210-
reprovisioning_repo=config.reprovisioning_repo,
211-
user_repo=config.kc_user_repo,
212-
group_repo=config.group_repo,
211+
search_reprovision=SearchReprovision(
212+
search_updates_repo=config.search_updates_repo,
213+
reprovisioning_repo=config.reprovisioning_repo,
214+
solr_config=config.solr_config,
215+
user_repo=config.kc_user_repo,
216+
group_repo=config.group_repo,
217+
project_repo=config.project_repo,
218+
),
213219
solr_config=config.solr_config,
214-
project_repo=config.project_repo,
215-
search_updates_repo=config.search_updates_repo,
216220
authz=config.authz,
217221
)
218222
data_connectors = DataConnectorsBP(

bases/renku_data_services/data_api/main.py

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@
1616
from sentry_sdk.integrations.sanic import SanicIntegration, _context_enter, _context_exit, _set_transaction
1717

1818
import renku_data_services.search.core as search_core
19+
import renku_data_services.solr.entity_schema as entity_schema
1920
from renku_data_services.app_config import Config
2021
from renku_data_services.authz.admin_sync import sync_admins_from_keycloak
22+
from renku_data_services.base_models.core import APIUser
2123
from renku_data_services.data_api.app import register_all_handlers
2224
from renku_data_services.data_api.prometheus import collect_system_metrics, setup_app_metrics, setup_prometheus
2325
from renku_data_services.errors.errors import (
@@ -27,7 +29,6 @@
2729
ValidationError,
2830
)
2931
from renku_data_services.migrations.core import run_migrations_for_app
30-
from renku_data_services.solr import entity_schema
3132
from renku_data_services.solr.solr_client import DefaultSolrClient
3233
from renku_data_services.solr.solr_migrate import SchemaMigrator
3334
from renku_data_services.storage.rclone import RCloneValidator
@@ -72,6 +73,17 @@ async def _update_search(app: Sanic) -> None:
7273
raise
7374

7475

76+
async def _solr_reindex(app: Sanic) -> None:
77+
"""Run a solr reindex of all data.
78+
79+
This might be required after migrating the solr schema.
80+
"""
81+
config = Config.from_env()
82+
reprovision = config.search_reprovisioning
83+
admin = APIUser(is_admin=True)
84+
await reprovision.run_reprovision(admin)
85+
86+
7587
def send_pending_events(app_name: str) -> None:
7688
"""Send pending messages to redis."""
7789
app = Sanic(app_name)
@@ -94,6 +106,16 @@ def update_search(app_name: str) -> None:
94106
asyncio.run(_update_search(app))
95107

96108

109+
def solr_reindex(app_name: str) -> None:
110+
"""Runs a solr reindex."""
111+
app = Sanic(app_name)
112+
setup_app_metrics(app)
113+
114+
logger.info("Running SOLR reindex triggered by a migration")
115+
asyncio.set_event_loop(uvloop.new_event_loop())
116+
asyncio.run(_solr_reindex(app))
117+
118+
97119
def create_app() -> Sanic:
98120
"""Create a Sanic application."""
99121
config = Config.from_env()
@@ -171,11 +193,13 @@ async def do_migrations(_: Sanic) -> None:
171193
await config.rp_repo.initialize(config.db.conn_url(async_client=False), config.default_resource_pool)
172194

173195
@app.main_process_start
174-
async def do_solr_migrations(_: Sanic) -> None:
196+
async def do_solr_migrations(app: Sanic) -> None:
175197
logger.info(f"Running SOLR migrations at: {config.solr_config}")
176198
migrator = SchemaMigrator(config.solr_config)
177199
await migrator.ensure_core()
178200
result = await migrator.migrate(entity_schema.all_migrations)
201+
# starting background tasks can only be done in `main_process_ready`
202+
app.ctx.solr_reindex = result.requires_reindex
179203
logger.info(f"SOLR migration done: {result}")
180204

181205
@app.before_server_start
@@ -186,10 +210,11 @@ async def setup_rclone_validator(app: Sanic) -> None:
186210
@app.main_process_ready
187211
async def ready(app: Sanic) -> None:
188212
"""Application ready event handler."""
189-
config = Config.from_env()
190213
logger.info("starting events background job.")
191-
app.manager.manage("SendEvents", send_pending_events, {"app_name": config.app_name}, transient=True)
192-
app.manager.manage("UpdateSearch", update_search, {"app_name": config.app_name}, transient=True)
214+
app.manager.manage("SendEvents", send_pending_events, {"app_name": app.name}, transient=True)
215+
app.manager.manage("UpdateSearch", update_search, {"app_name": app.name}, transient=True)
216+
if getattr(app.ctx, "solr_reindex", False):
217+
app.manager.manage("SolrReindex", solr_reindex, {"app_name": app.name}, transient=True)
193218

194219
return app
195220

components/renku_data_services/app_config/config.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
)
7373
from renku_data_services.repositories.db import GitRepositoriesRepository
7474
from renku_data_services.search.db import SearchUpdatesRepo
75+
from renku_data_services.search.reprovision import SearchReprovision
7576
from renku_data_services.secrets.db import LowLevelUserSecretsRepo, UserSecretsRepo
7677
from renku_data_services.session import crs as session_crs
7778
from renku_data_services.session.db import SessionRepository
@@ -285,6 +286,7 @@ class Config:
285286
_event_repo: EventRepository | None = field(default=None, repr=False, init=False)
286287
_reprovisioning_repo: ReprovisioningRepository | None = field(default=None, repr=False, init=False)
287288
_search_updates_repo: SearchUpdatesRepo | None = field(default=None, repr=False, init=False)
289+
_search_reprovisioning: SearchReprovision | None = field(default=None, repr=False, init=False)
288290
_session_repo: SessionRepository | None = field(default=None, repr=False, init=False)
289291
_user_preferences_repo: UserPreferencesRepository | None = field(default=None, repr=False, init=False)
290292
_kc_user_repo: KcUserRepo | None = field(default=None, repr=False, init=False)
@@ -400,6 +402,20 @@ def search_updates_repo(self) -> SearchUpdatesRepo:
400402
self._search_updates_repo = SearchUpdatesRepo(session_maker=self.db.async_session_maker)
401403
return self._search_updates_repo
402404

405+
@property
406+
def search_reprovisioning(self) -> SearchReprovision:
407+
"""The SearchReprovisioning class."""
408+
if not self._search_reprovisioning:
409+
self._search_reprovisioning = SearchReprovision(
410+
search_updates_repo=self.search_updates_repo,
411+
reprovisioning_repo=self.reprovisioning_repo,
412+
solr_config=self.solr_config,
413+
user_repo=self.kc_user_repo,
414+
group_repo=self.group_repo,
415+
project_repo=self.project_repo,
416+
)
417+
return self._search_reprovisioning
418+
403419
@property
404420
def project_repo(self) -> ProjectRepository:
405421
"""The DB adapter for Renku native projects."""

components/renku_data_services/search/blueprints.py

Lines changed: 6 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -11,27 +11,19 @@
1111
from renku_data_services.base_api.auth import authenticate, only_admins
1212
from renku_data_services.base_api.blueprint import BlueprintFactoryResponse, CustomBlueprint
1313
from renku_data_services.base_api.misc import validate_query
14-
from renku_data_services.message_queue.db import ReprovisioningRepository
15-
from renku_data_services.namespace.db import GroupRepository
16-
from renku_data_services.project.db import ProjectRepository
1714
from renku_data_services.search.apispec import SearchQuery
18-
from renku_data_services.search.db import SearchUpdatesRepo
15+
from renku_data_services.search.reprovision import SearchReprovision
1916
from renku_data_services.search.user_query_parser import QueryParser
2017
from renku_data_services.solr.solr_client import SolrClientConfig
21-
from renku_data_services.users.db import UserRepo
2218

2319

2420
@dataclass(kw_only=True)
2521
class SearchBP(CustomBlueprint):
2622
"""Handlers for search."""
2723

2824
authenticator: base_models.Authenticator
29-
reprovisioning_repo: ReprovisioningRepository
30-
user_repo: UserRepo
31-
group_repo: GroupRepository
32-
project_repo: ProjectRepository
33-
search_updates_repo: SearchUpdatesRepo
3425
solr_config: SolrClientConfig
26+
search_reprovision: SearchReprovision
3527
authz: Authz
3628

3729
def post(self) -> BlueprintFactoryResponse:
@@ -40,19 +32,10 @@ def post(self) -> BlueprintFactoryResponse:
4032
@authenticate(self.authenticator)
4133
@only_admins
4234
async def _post(request: Request, user: base_models.APIUser) -> HTTPResponse | JSONResponse:
43-
reprovisioning = await self.reprovisioning_repo.start()
35+
reprovisioning = await self.search_reprovision.acquire_reprovision()
4436

4537
request.app.add_task(
46-
core.reprovision(
47-
requested_by=user,
48-
reprovisioning=reprovisioning,
49-
search_updates_repo=self.search_updates_repo,
50-
reprovisioning_repo=self.reprovisioning_repo,
51-
solr_config=self.solr_config,
52-
user_repo=self.user_repo,
53-
group_repo=self.group_repo,
54-
project_repo=self.project_repo,
55-
),
38+
self.search_reprovision.init_reprovision(requested_by=user, reprovisioning=reprovisioning),
5639
name=f"reprovisioning-{reprovisioning.id}",
5740
)
5841

@@ -65,7 +48,7 @@ def get_status(self) -> BlueprintFactoryResponse:
6548

6649
@authenticate(self.authenticator)
6750
async def _get_status(_: Request, __: base_models.APIUser) -> JSONResponse | HTTPResponse:
68-
reprovisioning = await self.reprovisioning_repo.get_active_reprovisioning()
51+
reprovisioning = await self.search_reprovision.get_current_reprovision()
6952
if not reprovisioning:
7053
return HTTPResponse(status=404)
7154
return json({"id": str(reprovisioning.id), "start_date": reprovisioning.start_date.isoformat()})
@@ -78,7 +61,7 @@ def delete(self) -> BlueprintFactoryResponse:
7861
@authenticate(self.authenticator)
7962
@only_admins
8063
async def _delete(_: Request, __: base_models.APIUser) -> HTTPResponse:
81-
await self.reprovisioning_repo.stop()
64+
await self.search_reprovision.kill_reprovision_lock()
8265
return HTTPResponse(status=204)
8366

8467
return "/search/reprovision", ["DELETE"], _delete

components/renku_data_services/search/core.py

Lines changed: 0 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,6 @@
99
import renku_data_services.search.solr_token as st
1010
from renku_data_services.authz.models import Role
1111
from renku_data_services.base_models import APIUser
12-
from renku_data_services.message_queue.db import ReprovisioningRepository
13-
from renku_data_services.message_queue.models import Reprovisioning
14-
from renku_data_services.namespace.db import GroupRepository
15-
from renku_data_services.project.db import ProjectRepository
1612
from renku_data_services.search import authz, converters
1713
from renku_data_services.search.db import SearchUpdatesRepo
1814
from renku_data_services.search.models import DeleteDoc
@@ -37,57 +33,6 @@
3733
SolrQuery,
3834
SubQuery,
3935
)
40-
from renku_data_services.users.db import UserRepo
41-
42-
43-
async def reprovision(
44-
requested_by: APIUser,
45-
reprovisioning: Reprovisioning,
46-
search_updates_repo: SearchUpdatesRepo,
47-
reprovisioning_repo: ReprovisioningRepository,
48-
solr_config: SolrClientConfig,
49-
user_repo: UserRepo,
50-
group_repo: GroupRepository,
51-
project_repo: ProjectRepository,
52-
) -> None:
53-
"""Initiates reprovisioning by inserting documents into the staging table."""
54-
55-
def log_counter(c: int) -> None:
56-
if c % 50 == 0:
57-
logger.info(f"Inserted {c}. entities into staging table...")
58-
59-
try:
60-
logger.info(f"Starting reprovisioning with ID {reprovisioning.id}")
61-
started = datetime.now()
62-
await search_updates_repo.clear_all()
63-
async with DefaultSolrClient(solr_config) as client:
64-
await client.delete("_type:*")
65-
counter = 0
66-
all_users = user_repo.get_all_users(requested_by=requested_by)
67-
async for user_entity in all_users:
68-
await search_updates_repo.insert(user_entity, started)
69-
counter += 1
70-
log_counter(counter)
71-
72-
all_groups = group_repo.get_all_groups(requested_by=requested_by)
73-
async for group_entity in all_groups:
74-
await search_updates_repo.insert(group_entity, started)
75-
counter += 1
76-
log_counter(counter)
77-
78-
all_projects = project_repo.get_all_projects(requested_by=requested_by)
79-
async for project_entity in all_projects:
80-
await search_updates_repo.insert(project_entity, started)
81-
counter += 1
82-
log_counter(counter)
83-
84-
logger.info(f"Inserted {counter} entities into the staging table.")
85-
86-
except Exception as e:
87-
logger.error("Error while reprovisioning entities!", exc_info=e)
88-
## TODO error handling. skip or fail?
89-
finally:
90-
await reprovisioning_repo.stop()
9136

9237

9338
async def update_solr(search_updates_repo: SearchUpdatesRepo, solr_client: SolrClient, batch_size: int) -> None:

0 commit comments

Comments
 (0)