/* * SPDX-FileCopyrightText: syuilo and misskey-project * SPDX-License-Identifier: AGPL-3.0-only */ import * as Redis from 'ioredis'; import { bindThis } from '@/decorators.js'; export class RedisKVCache { private readonly lifetime: number; private readonly memoryCache: MemoryKVCache; private readonly fetcher: (key: string) => Promise; private readonly toRedisConverter: (value: T) => string; private readonly fromRedisConverter: (value: string) => T | undefined; constructor( private redisClient: Redis.Redis, private name: string, opts: { lifetime: RedisKVCache['lifetime']; memoryCacheLifetime: number; fetcher: RedisKVCache['fetcher']; toRedisConverter: RedisKVCache['toRedisConverter']; fromRedisConverter: RedisKVCache['fromRedisConverter']; }, ) { this.lifetime = opts.lifetime; this.memoryCache = new MemoryKVCache(opts.memoryCacheLifetime); this.fetcher = opts.fetcher; this.toRedisConverter = opts.toRedisConverter; this.fromRedisConverter = opts.fromRedisConverter; } @bindThis public async set(key: string, value: T): Promise { this.memoryCache.set(key, value); if (this.lifetime === Infinity) { await this.redisClient.set( `kvcache:${this.name}:${key}`, this.toRedisConverter(value), ); } else { await this.redisClient.set( `kvcache:${this.name}:${key}`, this.toRedisConverter(value), 'EX', Math.round(this.lifetime / 1000), ); } } @bindThis public async get(key: string): Promise { const memoryCached = this.memoryCache.get(key); if (memoryCached !== undefined) return memoryCached; const cached = await this.redisClient.get(`kvcache:${this.name}:${key}`); if (cached == null) return undefined; const value = this.fromRedisConverter(cached); if (value !== undefined) { this.memoryCache.set(key, value); } return value; } @bindThis public async delete(key: string): Promise { this.memoryCache.delete(key); await this.redisClient.del(`kvcache:${this.name}:${key}`); } /** * キャッシュがあればそれを返し、無ければfetcherを呼び出して結果をキャッシュ&返します * This awaits the call to Redis to ensure that the write succeeded, which is important for a few reasons: * * Other code uses this to synchronize changes between worker processes. A failed write can internally de-sync the cluster. * * Without an `await`, consecutive calls could race. An unlucky race could result in the older write overwriting the newer value. * * Not awaiting here makes the entire cache non-consistent. The prevents many possible uses. */ @bindThis public async fetch(key: string): Promise { const cachedValue = await this.get(key); if (cachedValue !== undefined) { // Cache HIT return cachedValue; } // Cache MISS const value = await this.fetcher(key); await this.set(key, value); return value; } @bindThis public async refresh(key: string) { const value = await this.fetcher(key); await this.set(key, value); // TODO: イベント発行して他プロセスのメモリキャッシュも更新できるようにする } @bindThis public gc() { this.memoryCache.gc(); } @bindThis public dispose() { this.memoryCache.dispose(); } } export class RedisSingleCache { private readonly lifetime: number; private readonly memoryCache: MemorySingleCache; private readonly fetcher: () => Promise; private readonly toRedisConverter: (value: T) => string; private readonly fromRedisConverter: (value: string) => T | undefined; constructor( private redisClient: Redis.Redis, private name: string, opts: { lifetime: number; memoryCacheLifetime: number; fetcher: RedisSingleCache['fetcher']; toRedisConverter: RedisSingleCache['toRedisConverter']; fromRedisConverter: RedisSingleCache['fromRedisConverter']; }, ) { this.lifetime = opts.lifetime; this.memoryCache = new MemorySingleCache(opts.memoryCacheLifetime); this.fetcher = opts.fetcher; this.toRedisConverter = opts.toRedisConverter; this.fromRedisConverter = opts.fromRedisConverter; } @bindThis public async set(value: T): Promise { this.memoryCache.set(value); if (this.lifetime === Infinity) { await this.redisClient.set( `singlecache:${this.name}`, this.toRedisConverter(value), ); } else { await this.redisClient.set( `singlecache:${this.name}`, this.toRedisConverter(value), 'EX', Math.round(this.lifetime / 1000), ); } } @bindThis public async get(): Promise { const memoryCached = this.memoryCache.get(); if (memoryCached !== undefined) return memoryCached; const cached = await this.redisClient.get(`singlecache:${this.name}`); if (cached == null) return undefined; const value = this.fromRedisConverter(cached); if (value !== undefined) { this.memoryCache.set(value); } return value; } @bindThis public async delete(): Promise { this.memoryCache.delete(); await this.redisClient.del(`singlecache:${this.name}`); } /** * キャッシュがあればそれを返し、無ければfetcherを呼び出して結果をキャッシュ&返します * This awaits the call to Redis to ensure that the write succeeded, which is important for a few reasons: * * Other code uses this to synchronize changes between worker processes. A failed write can internally de-sync the cluster. * * Without an `await`, consecutive calls could race. An unlucky race could result in the older write overwriting the newer value. * * Not awaiting here makes the entire cache non-consistent. The prevents many possible uses. */ @bindThis public async fetch(): Promise { const cachedValue = await this.get(); if (cachedValue !== undefined) { // Cache HIT return cachedValue; } // Cache MISS const value = await this.fetcher(); await this.set(value); return value; } @bindThis public async refresh() { const value = await this.fetcher(); await this.set(value); // TODO: イベント発行して他プロセスのメモリキャッシュも更新できるようにする } } // TODO: メモリ節約のためあまり参照されないキーを定期的に削除できるようにする? export class MemoryKVCache { private readonly cache = new Map(); private readonly gcIntervalHandle = setInterval(() => this.gc(), 1000 * 60 * 3); // 3m constructor( private readonly lifetime: number, ) {} @bindThis /** * Mapにキャッシュをセットします * @deprecated これを直接呼び出すべきではない。InternalEventなどで変更を全てのプロセス/マシンに通知するべき */ public set(key: string, value: T): void { this.cache.set(key, { date: Date.now(), value, }); } @bindThis public get(key: string): T | undefined { const cached = this.cache.get(key); if (cached == null) return undefined; if ((Date.now() - cached.date) > this.lifetime) { this.cache.delete(key); return undefined; } return cached.value; } @bindThis public delete(key: string): void { this.cache.delete(key); } /** * キャッシュがあればそれを返し、無ければfetcherを呼び出して結果をキャッシュ&返します * optional: キャッシュが存在してもvalidatorでfalseを返すとキャッシュ無効扱いにします */ @bindThis public async fetch(key: string, fetcher: () => Promise, validator?: (cachedValue: T) => boolean): Promise { const cachedValue = this.get(key); if (cachedValue !== undefined) { if (validator) { if (validator(cachedValue)) { // Cache HIT return cachedValue; } } else { // Cache HIT return cachedValue; } } // Cache MISS const value = await fetcher(); this.set(key, value); return value; } /** * キャッシュがあればそれを返し、無ければfetcherを呼び出して結果をキャッシュ&返します * optional: キャッシュが存在してもvalidatorでfalseを返すとキャッシュ無効扱いにします */ @bindThis public async fetchMaybe(key: string, fetcher: () => Promise, validator?: (cachedValue: T) => boolean): Promise { const cachedValue = this.get(key); if (cachedValue !== undefined) { if (validator) { if (validator(cachedValue)) { // Cache HIT return cachedValue; } } else { // Cache HIT return cachedValue; } } // Cache MISS const value = await fetcher(); if (value !== undefined) { this.set(key, value); } return value; } @bindThis public gc(): void { const now = Date.now(); for (const [key, { date }] of this.cache.entries()) { // The map is ordered from oldest to youngest. // We can stop once we find an entry that's still active, because all following entries must *also* be active. const age = now - date; if (age < this.lifetime) break; this.cache.delete(key); } } @bindThis public dispose(): void { clearInterval(this.gcIntervalHandle); } public get entries() { return this.cache.entries(); } } export class MemorySingleCache { private cachedAt: number | null = null; private value: T | undefined; constructor( private lifetime: number, ) {} @bindThis public set(value: T): void { this.cachedAt = Date.now(); this.value = value; } @bindThis public get(): T | undefined { if (this.cachedAt == null) return undefined; if ((Date.now() - this.cachedAt) > this.lifetime) { this.value = undefined; this.cachedAt = null; return undefined; } return this.value; } @bindThis public delete() { this.value = undefined; this.cachedAt = null; } /** * キャッシュがあればそれを返し、無ければfetcherを呼び出して結果をキャッシュ&返します * optional: キャッシュが存在してもvalidatorでfalseを返すとキャッシュ無効扱いにします */ @bindThis public async fetch(fetcher: () => Promise, validator?: (cachedValue: T) => boolean): Promise { const cachedValue = this.get(); if (cachedValue !== undefined) { if (validator) { if (validator(cachedValue)) { // Cache HIT return cachedValue; } } else { // Cache HIT return cachedValue; } } // Cache MISS const value = await fetcher(); this.set(value); return value; } /** * キャッシュがあればそれを返し、無ければfetcherを呼び出して結果をキャッシュ&返します * optional: キャッシュが存在してもvalidatorでfalseを返すとキャッシュ無効扱いにします */ @bindThis public async fetchMaybe(fetcher: () => Promise, validator?: (cachedValue: T) => boolean): Promise { const cachedValue = this.get(); if (cachedValue !== undefined) { if (validator) { if (validator(cachedValue)) { // Cache HIT return cachedValue; } } else { // Cache HIT return cachedValue; } } // Cache MISS const value = await fetcher(); if (value !== undefined) { this.set(value); } return value; } }