Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
- Fix: 「D」キーでダークモードを切り替える際にsyncDeviceDarkModeのチェックがバイパスされる問題を修正

### Server
- Fix: 管理画面で `enableFanoutTimeline` を切り替えるとタイムラインにギャップが生じノートが取りこぼされる問題を修正 (過渡期フラグ `fanoutTimelineActive` を導入し、トグル中は BullMQ ジョブで Redis 上の `list:*` をパージしてからデータプレーンを切り替える方式に変更。過渡期中はデータプレーンが FTTL を一切使用せず DB 直行となり、過渡期中の `enableFanoutTimeline` 再変更は 409 で拒否される)
- Enhance: リモートノートクリーニングジョブのスキップ処理のパフォーマンス改善
- Fix: PerUserDriveChart がシステム所有ファイル (userId が null) の更新で `"group"` の非NULL制約違反によりクラッシュする問題を修正 (#17498)
- Enhance: リモートノートクリーニングジョブの削除対象検索処理のパフォーマンス改善
Expand Down
1 change: 1 addition & 0 deletions locales/ja-JP.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1760,6 +1760,7 @@ _serverSettings:
shortName: "略称"
shortNameDescription: "サーバーの正式名称が長い場合に、代わりに表示することのできる略称や通称。"
fanoutTimelineDescription: "有効にすると、各種タイムラインを取得する際のパフォーマンスが大幅に向上し、データベースへの負荷を軽減することが可能です。ただし、Redisのメモリ使用量は増加します。サーバーのメモリ容量が少ない場合、または動作が不安定な場合は無効にすることができます。"
fanoutTimelineTransitionInProgress: "切り替えを適用中です。完了するまでこのトグルは操作できません。適用中、タイムラインは一時的にデータベースから取得されます。"
fanoutTimelineDbFallback: "データベースへのフォールバック"
fanoutTimelineDbFallbackDescription: "有効にすると、タイムラインがキャッシュされていない場合にDBへ追加で問い合わせを行うフォールバック処理を行います。無効にすると、フォールバック処理を行わないことでさらにサーバーの負荷を軽減することができますが、タイムラインが取得できる範囲に制限が生じます。"
reactionsBufferingDescription: "有効にすると、リアクション作成時のパフォーマンスが大幅に向上し、データベースへの負荷を軽減することが可能です。ただし、Redisのメモリ使用量は増加します。"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* SPDX-FileCopyrightText: syuilo and misskey-project
* SPDX-License-Identifier: AGPL-3.0-only
*/

export class AddFanoutTimelineActive1780626317299 {
name = 'AddFanoutTimelineActive1780626317299';

/**
* @param {QueryRunner} queryRunner
*/
async up(queryRunner) {
await queryRunner.query('ALTER TABLE "meta" ADD "fanoutTimelineActive" boolean NOT NULL DEFAULT true');
}

/**
* @param {QueryRunner} queryRunner
*/
async down(queryRunner) {
await queryRunner.query('ALTER TABLE "meta" DROP COLUMN "fanoutTimelineActive"');
}
};
25 changes: 25 additions & 0 deletions packages/backend/src/core/FanoutTimelineService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,31 @@ export class FanoutTimelineService {
return this.redisForTimelines.del('list:' + name);
}

/**
* Redis 上のすべてのタイムラインリスト (`list:*`) を一括削除する。
* `enableFanoutTimeline` の有効/無効を切り替えた直後など、Redis 上のキャッシュが
* DB と整合しなくなる可能性があるタイミングで呼ぶことを想定。
* 削除後は各タイムライン取得時にFanoutTimelineEndpointServiceが `noteIds.length === 0`
* を検知してDB直行するため、結果としてDBから自然に再構築される。
*/
@bindThis
public async purgeAll(): Promise<number> {
let cursor = '0';
let totalDeleted = 0;
do {
const [next, keys] = await this.redisForTimelines.scan(cursor, 'MATCH', this.redisForTimelines.options.keyPrefix + 'list:*', 'COUNT', 100);
cursor = next;
if (keys.length === 0) continue;
// ioredis の keyPrefix は SCAN で返るキーには含まれているが DEL 渡し時に
// 二重に付加されるのを防ぐため prefix を剥がして渡す
const prefix = this.redisForTimelines.options.keyPrefix ?? '';
const stripped = prefix !== '' ? keys.map(k => k.startsWith(prefix) ? k.slice(prefix.length) : k) : keys;
await this.redisForTimelines.del(...stripped);
totalDeleted += stripped.length;
} while (cursor !== '0');
return totalDeleted;
}

@bindThis
public remove(name: FanoutTimelineName, id: string) {
return this.redisForTimelines.lrem('list:' + name, 1, id);
Expand Down
4 changes: 3 additions & 1 deletion packages/backend/src/core/NoteCreateService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1043,7 +1043,9 @@ export class NoteCreateService implements OnApplicationShutdown {

@bindThis
private async pushToTl(note: MiNote, user: { id: MiUser['id']; host: MiUser['host']; }) {
if (!this.meta.enableFanoutTimeline) return;
// fanoutTimelineActive=false は admin がトグルを切り替えた直後の過渡期。
// その間は FTTL への push を止め、データプレーンは DB のみで動かす。
if (!this.meta.enableFanoutTimeline || !this.meta.fanoutTimelineActive) return;

const r = this.redisForTimelines.pipeline();

Expand Down
24 changes: 24 additions & 0 deletions packages/backend/src/core/QueueService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,30 @@ export class QueueService {
});
}

/**
* `enableFanoutTimeline` トグル時に Redis 上の list:* キャッシュを一掃するジョブ。
* 完了後に `MetaService.update({ fanoutTimelineActive: targetState })` で
* データプレーン側のスイッチを入れ直す。
* 固定 jobId にしているので、過渡期中の二重 enqueue は BullMQ 側で抑止される。
*/
@bindThis
public createPurgeFanoutTimelinesJob(targetState: boolean) {
return this.systemQueue.add('purgeFanoutTimelines', { targetState }, {
jobId: 'purgeFanoutTimelines',
attempts: 3,
backoff: {
type: 'exponential',
delay: 5000,
},
removeOnComplete: {
age: 3600 * 24 * 7, // keep up to 7 days
},
removeOnFail: {
age: 3600 * 24 * 7, // keep up to 7 days
},
});
}

@bindThis
public createDeleteAccountJob(user: ThinUser, opts: { soft?: boolean; } = {}) {
return this.dbQueue.add('deleteAccount', {
Expand Down
11 changes: 11 additions & 0 deletions packages/backend/src/models/Meta.ts
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,17 @@ export class MiMeta {
})
public enableFanoutTimeline: boolean;

/**
* `enableFanoutTimeline` をトグルした直後の過渡期 (Redis 上の list:* キャッシュを
* BullMQ ジョブで purge している間) は false になり、データプレーンは FTTL を一切
* 読み書きしない (= OFF と同じ挙動)。purge ジョブ完了で true に戻る。
* 過渡期中の `enableFanoutTimeline` 変更は admin endpoint 側で 409 で拒否する。
*/
@Column('boolean', {
default: true,
})
public fanoutTimelineActive: boolean;

@Column('boolean', {
default: true,
})
Expand Down
2 changes: 2 additions & 0 deletions packages/backend/src/queue/QueueProcessorModule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { CleanChartsProcessorService } from './processors/CleanChartsProcessorSe
import { CleanProcessorService } from './processors/CleanProcessorService.js';
import { CheckModeratorsActivityProcessorService } from './processors/CheckModeratorsActivityProcessorService.js';
import { CleanRemoteNotesProcessorService } from './processors/CleanRemoteNotesProcessorService.js';
import { PurgeFanoutTimelinesProcessorService } from './processors/PurgeFanoutTimelinesProcessorService.js';
import { CleanRemoteFilesProcessorService } from './processors/CleanRemoteFilesProcessorService.js';
import { DeleteAccountProcessorService } from './processors/DeleteAccountProcessorService.js';
import { DeleteDriveFilesProcessorService } from './processors/DeleteDriveFilesProcessorService.js';
Expand Down Expand Up @@ -87,6 +88,7 @@ import { RelationshipProcessorService } from './processors/RelationshipProcessor
CheckExpiredMutingsProcessorService,
CheckModeratorsActivityProcessorService,
CleanRemoteNotesProcessorService,
PurgeFanoutTimelinesProcessorService,
QueueProcessorService,
],
exports: [
Expand Down
3 changes: 3 additions & 0 deletions packages/backend/src/queue/QueueProcessorService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import { BakeBufferedReactionsProcessorService } from './processors/BakeBuffered
import { CleanProcessorService } from './processors/CleanProcessorService.js';
import { AggregateRetentionProcessorService } from './processors/AggregateRetentionProcessorService.js';
import { CleanRemoteNotesProcessorService } from './processors/CleanRemoteNotesProcessorService.js';
import { PurgeFanoutTimelinesProcessorService } from './processors/PurgeFanoutTimelinesProcessorService.js';
import { QueueLoggerService } from './QueueLoggerService.js';
import { QUEUE, baseWorkerOptions } from './const.js';

Expand Down Expand Up @@ -127,6 +128,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
private checkModeratorsActivityProcessorService: CheckModeratorsActivityProcessorService,
private cleanProcessorService: CleanProcessorService,
private cleanRemoteNotesProcessorService: CleanRemoteNotesProcessorService,
private purgeFanoutTimelinesProcessorService: PurgeFanoutTimelinesProcessorService,
) {
this.logger = this.queueLoggerService.logger;

Expand Down Expand Up @@ -176,6 +178,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
case 'checkModeratorsActivity': return this.checkModeratorsActivityProcessorService.process();
case 'clean': return this.cleanProcessorService.process();
case 'cleanRemoteNotes': return this.cleanRemoteNotesProcessorService.process(job);
case 'purgeFanoutTimelines': return this.purgeFanoutTimelinesProcessorService.process(job);
default: throw new Error(`unrecognized job type ${job.name} for system`);
}
};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* SPDX-FileCopyrightText: syuilo and misskey-project
* SPDX-License-Identifier: AGPL-3.0-only
*/

import { Injectable } from '@nestjs/common';
import type Logger from '@/logger.js';
import { bindThis } from '@/decorators.js';
import { FanoutTimelineService } from '@/core/FanoutTimelineService.js';
import { MetaService } from '@/core/MetaService.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
import type * as Bull from 'bullmq';

@Injectable()
export class PurgeFanoutTimelinesProcessorService {
private logger: Logger;

constructor(
private fanoutTimelineService: FanoutTimelineService,
private metaService: MetaService,
private queueLoggerService: QueueLoggerService,
) {
this.logger = this.queueLoggerService.logger.createSubLogger('purge-fanout-timelines');
}

@bindThis
public async process(job: Bull.Job<{ targetState: boolean }>): Promise<void> {
const { targetState } = job.data;
this.logger.info(`Purging fanout timelines (targetState=${targetState})...`);

const deleted = await this.fanoutTimelineService.purgeAll();
await this.metaService.update({ fanoutTimelineActive: targetState });

this.logger.succ(`Purged ${deleted} fanout timeline keys. fanoutTimelineActive=${targetState}`);
}
}
2 changes: 1 addition & 1 deletion packages/backend/src/server/ActivityPubServerService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ export class ActivityPubServerService {
const partOf = `${this.config.url}/users/${userId}/outbox`;

if (page) {
const notes = this.meta.enableFanoutTimeline ? await this.fanoutTimelineEndpointService.getMiNotes({
const notes = (this.meta.enableFanoutTimeline && this.meta.fanoutTimelineActive) ? await this.fanoutTimelineEndpointService.getMiNotes({
sinceId: sinceId ?? null,
untilId: untilId ?? null,
limit: limit,
Expand Down
5 changes: 5 additions & 0 deletions packages/backend/src/server/api/endpoints/admin/meta.ts
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,10 @@ export const meta = {
type: 'boolean',
optional: false, nullable: false,
},
fanoutTimelineActive: {
type: 'boolean',
optional: false, nullable: false,
},
enableFanoutTimelineDbFallback: {
type: 'boolean',
optional: false, nullable: false,
Expand Down Expand Up @@ -725,6 +729,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
policies: { ...DEFAULT_POLICIES, ...instance.policies },
manifestJsonOverride: instance.manifestJsonOverride,
enableFanoutTimeline: instance.enableFanoutTimeline,
fanoutTimelineActive: instance.fanoutTimelineActive,
enableFanoutTimelineDbFallback: instance.enableFanoutTimelineDbFallback,
perLocalUserUserTimelineCacheMax: instance.perLocalUserUserTimelineCacheMax,
perRemoteUserUserTimelineCacheMax: instance.perRemoteUserUserTimelineCacheMax,
Expand Down
39 changes: 34 additions & 5 deletions packages/backend/src/server/api/endpoints/admin/update-meta.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,24 @@ import type { MiMeta } from '@/models/Meta.js';
import { ModerationLogService } from '@/core/ModerationLogService.js';
import { Endpoint } from '@/server/api/endpoint-base.js';
import { MetaService } from '@/core/MetaService.js';
import { QueueService } from '@/core/QueueService.js';
import { ApiError } from '@/server/api/error.js';

export const meta = {
tags: ['admin'],

requireCredential: true,
requireAdmin: true,
kind: 'write:admin:meta',

errors: {
fanoutTimelineTransitionInProgress: {
message: 'A fanout timeline toggle is currently being applied. Please wait until the purge job completes before changing enableFanoutTimeline again.',
code: 'FANOUT_TIMELINE_TRANSITION_IN_PROGRESS',
id: '65b9b26a-700c-41ea-96f7-96b9a7902301',
httpStatusCode: 409,
},
},
} as const;

export const paramDef = {
Expand Down Expand Up @@ -230,6 +241,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-

private metaService: MetaService,
private moderationLogService: ModerationLogService,
private queueService: QueueService,
) {
super(meta, paramDef, async (ps, me) => {
const set = {} as Partial<MiMeta>;
Expand Down Expand Up @@ -648,10 +660,6 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
set.manifestJsonOverride = ps.manifestJsonOverride;
}

if (ps.enableFanoutTimeline !== undefined) {
set.enableFanoutTimeline = ps.enableFanoutTimeline;
}

if (ps.enableFanoutTimelineDbFallback !== undefined) {
set.enableFanoutTimelineDbFallback = ps.enableFanoutTimelineDbFallback;
}
Expand Down Expand Up @@ -764,11 +772,32 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-

const before = await this.metaService.fetch(true);

// enableFanoutTimeline をトグルする場合、Redis 上の list:* キャッシュを BullMQ
// ジョブで一括 purge してから fanoutTimelineActive=true に戻す。過渡期 (active=false)
// 中はデータプレーンが FTTL を読み書きしないため、push と purge の競合・MetaService
// キャッシュ伝播ラグ・大規模インスタンスでの HTTP タイムアウト等を一掃できる。
// 過渡期中に enableFanoutTimeline をさらに変更するリクエストは 409 で拒否する。
if (ps.enableFanoutTimeline !== undefined) {
if (ps.enableFanoutTimeline !== before.enableFanoutTimeline) {
// 過渡期 = enableFanoutTimeline と fanoutTimelineActive が乖離している状態
// (purge ジョブ進行中)。stable な OFF/OFF や ON/ON からのトグルは受理する。
if (before.enableFanoutTimeline !== before.fanoutTimelineActive) {
throw new ApiError(meta.errors.fanoutTimelineTransitionInProgress);
}
set.fanoutTimelineActive = false;
}
set.enableFanoutTimeline = ps.enableFanoutTimeline;
}

await this.metaService.update(set);

const after = await this.metaService.fetch(true);

this.moderationLogService.log(me, 'updateServerSettings', {
if (before.enableFanoutTimeline !== after.enableFanoutTimeline) {
await this.queueService.createPurgeFanoutTimelinesJob(after.enableFanoutTimeline);
}

await this.moderationLogService.log(me, 'updateServerSettings', {
before,
after,
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-

if (me) this.activeUsersChart.read(me);

if (!this.serverSettings.enableFanoutTimeline) {
if (!this.serverSettings.enableFanoutTimeline || !this.serverSettings.fanoutTimelineActive) {
return await this.noteEntityService.packMany(await this.getFromDb({ untilId, sinceId, limit: ps.limit, channelId: channel.id }, me), me);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-

if (ps.withReplies && ps.withFiles) throw new ApiError(meta.errors.bothWithRepliesAndWithFiles);

if (!this.serverSettings.enableFanoutTimeline) {
if (!this.serverSettings.enableFanoutTimeline || !this.serverSettings.fanoutTimelineActive) {
const timeline = await this.getFromDb({
untilId,
sinceId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-

if (ps.withReplies && ps.withFiles) throw new ApiError(meta.errors.bothWithRepliesAndWithFiles);

if (!this.serverSettings.enableFanoutTimeline) {
if (!this.serverSettings.enableFanoutTimeline || !this.serverSettings.fanoutTimelineActive) {
const timeline = await this.getFromDb({
untilId,
sinceId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
const untilId = ps.untilId ?? (ps.untilDate ? this.idService.gen(ps.untilDate!) : null);
const sinceId = ps.sinceId ?? (ps.sinceDate ? this.idService.gen(ps.sinceDate!) : null);

if (!this.serverSettings.enableFanoutTimeline) {
if (!this.serverSettings.enableFanoutTimeline || !this.serverSettings.fanoutTimelineActive) {
const timeline = await this.getFromDb({
untilId,
sinceId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
throw new ApiError(meta.errors.noSuchList);
}

if (!this.serverSettings.enableFanoutTimeline) {
if (!this.serverSettings.enableFanoutTimeline || !this.serverSettings.fanoutTimelineActive) {
const timeline = await this.getFromDb(list, {
untilId,
sinceId,
Expand Down
2 changes: 1 addition & 1 deletion packages/backend/src/server/api/endpoints/users/notes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ export default class extends Endpoint<typeof meta, typeof paramDef> { // eslint-
}
}

if (!this.serverSettings.enableFanoutTimeline) {
if (!this.serverSettings.enableFanoutTimeline || !this.serverSettings.fanoutTimelineActive) {
const timeline = await this.getFromDb({
untilId,
sinceId,
Expand Down
Loading
Loading