From c699d2ade8a93bde074de834a11fcb586317c2ac Mon Sep 17 00:00:00 2001 From: florianfederspiel Date: Fri, 22 May 2026 21:35:33 +0200 Subject: [PATCH] =?UTF-8?q?KI-AGENT:=20Matrix=20Push=20Worker=20Rate=20Lim?= =?UTF-8?q?it=20entsch=C3=A4rfen?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/modules/matrix-push-worker.service.ts | 10 +++++++++- backend/src/modules/matrix.service.ts | 20 +++++++++++++++---- 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/backend/src/modules/matrix-push-worker.service.ts b/backend/src/modules/matrix-push-worker.service.ts index 8bec483..44cd35f 100644 --- a/backend/src/modules/matrix-push-worker.service.ts +++ b/backend/src/modules/matrix-push-worker.service.ts @@ -125,6 +125,7 @@ export function startMatrixPushWorker(server: FastifyInstance) { let stopped = false let timer: ReturnType | undefined let lastServiceJoinSyncAt = 0 + let errorBackoffMs = 0 const getTenantRecipients = async (tenantId: number) => { const rows = await server.db @@ -344,8 +345,14 @@ export function startMatrixPushWorker(server: FastifyInstance) { } } } + errorBackoffMs = 0 } catch (err) { matrixPushWorkerState.lastError = err instanceof Error ? err.message : String(err) + const retryAfterMs = Number((err as any)?.retryAfterMs || (err as any)?.body?.retry_after_ms || 0) + errorBackoffMs = Math.min( + Math.max(retryAfterMs || (errorBackoffMs ? errorBackoffMs * 2 : 30_000), 30_000), + 5 * 60_000 + ) rememberWorkerEvent({ at: new Date().toISOString(), type: "error", @@ -356,7 +363,8 @@ export function startMatrixPushWorker(server: FastifyInstance) { } finally { running = false if (!stopped) { - timer = setTimeout(() => void runOnce(), since ? 0 : intervalMs) + const nextDelay = errorBackoffMs || (since ? 0 : intervalMs) + timer = setTimeout(() => void runOnce(), nextDelay) } } } diff --git a/backend/src/modules/matrix.service.ts b/backend/src/modules/matrix.service.ts index be37dc7..882dc0b 100644 --- a/backend/src/modules/matrix.service.ts +++ b/backend/src/modules/matrix.service.ts @@ -10,6 +10,7 @@ import jwt from "jsonwebtoken" type MatrixErrorResponse = { errcode?: string error?: string + retry_after_ms?: number } type MatrixRoomEvent = { @@ -382,13 +383,23 @@ export function matrixService(server: FastifyInstance) { const password = serviceUserPassword() try { - await registerWithSharedSecret(username, password, true) - } catch (err: any) { - if (err.errcode !== "M_USER_IN_USE") { - throw err + const login = await loginMatrixUser(username, password) + matrixServiceSessionCache = { + accessToken: login.access_token, + matrixUserId: login.user_id, + validUntilMs: Date.now() + 30 * 60 * 1000, } + + return matrixServiceSessionCache + } catch (loginErr: any) { + if (loginErr.statusCode === 429) throw loginErr } + try { + await registerWithSharedSecret(username, password, true) + } catch (registerErr: any) { + if (registerErr.errcode !== "M_USER_IN_USE") throw registerErr + } const login = await loginMatrixUser(username, password) matrixServiceSessionCache = { accessToken: login.access_token, @@ -442,6 +453,7 @@ export function matrixService(server: FastifyInstance) { { statusCode: response.status, errcode: error.errcode, + retryAfterMs: error.retry_after_ms, body, } )