KI-AGENT: Matrix Push Worker Rate Limit entschärfen
This commit is contained in:
@@ -125,6 +125,7 @@ export function startMatrixPushWorker(server: FastifyInstance) {
|
|||||||
let stopped = false
|
let stopped = false
|
||||||
let timer: ReturnType<typeof setTimeout> | undefined
|
let timer: ReturnType<typeof setTimeout> | undefined
|
||||||
let lastServiceJoinSyncAt = 0
|
let lastServiceJoinSyncAt = 0
|
||||||
|
let errorBackoffMs = 0
|
||||||
|
|
||||||
const getTenantRecipients = async (tenantId: number) => {
|
const getTenantRecipients = async (tenantId: number) => {
|
||||||
const rows = await server.db
|
const rows = await server.db
|
||||||
@@ -344,8 +345,14 @@ export function startMatrixPushWorker(server: FastifyInstance) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
errorBackoffMs = 0
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
matrixPushWorkerState.lastError = err instanceof Error ? err.message : String(err)
|
matrixPushWorkerState.lastError = err instanceof Error ? err.message : String(err)
|
||||||
|
const retryAfterMs = Number((err as any)?.retryAfterMs || (err as any)?.body?.retry_after_ms || 0)
|
||||||
|
errorBackoffMs = Math.min(
|
||||||
|
Math.max(retryAfterMs || (errorBackoffMs ? errorBackoffMs * 2 : 30_000), 30_000),
|
||||||
|
5 * 60_000
|
||||||
|
)
|
||||||
rememberWorkerEvent({
|
rememberWorkerEvent({
|
||||||
at: new Date().toISOString(),
|
at: new Date().toISOString(),
|
||||||
type: "error",
|
type: "error",
|
||||||
@@ -356,7 +363,8 @@ export function startMatrixPushWorker(server: FastifyInstance) {
|
|||||||
} finally {
|
} finally {
|
||||||
running = false
|
running = false
|
||||||
if (!stopped) {
|
if (!stopped) {
|
||||||
timer = setTimeout(() => void runOnce(), since ? 0 : intervalMs)
|
const nextDelay = errorBackoffMs || (since ? 0 : intervalMs)
|
||||||
|
timer = setTimeout(() => void runOnce(), nextDelay)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ import jwt from "jsonwebtoken"
|
|||||||
type MatrixErrorResponse = {
|
type MatrixErrorResponse = {
|
||||||
errcode?: string
|
errcode?: string
|
||||||
error?: string
|
error?: string
|
||||||
|
retry_after_ms?: number
|
||||||
}
|
}
|
||||||
|
|
||||||
type MatrixRoomEvent = {
|
type MatrixRoomEvent = {
|
||||||
@@ -382,13 +383,23 @@ export function matrixService(server: FastifyInstance) {
|
|||||||
const password = serviceUserPassword()
|
const password = serviceUserPassword()
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await registerWithSharedSecret(username, password, true)
|
const login = await loginMatrixUser(username, password)
|
||||||
} catch (err: any) {
|
matrixServiceSessionCache = {
|
||||||
if (err.errcode !== "M_USER_IN_USE") {
|
accessToken: login.access_token,
|
||||||
throw err
|
matrixUserId: login.user_id,
|
||||||
|
validUntilMs: Date.now() + 30 * 60 * 1000,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return matrixServiceSessionCache
|
||||||
|
} catch (loginErr: any) {
|
||||||
|
if (loginErr.statusCode === 429) throw loginErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
await registerWithSharedSecret(username, password, true)
|
||||||
|
} catch (registerErr: any) {
|
||||||
|
if (registerErr.errcode !== "M_USER_IN_USE") throw registerErr
|
||||||
|
}
|
||||||
const login = await loginMatrixUser(username, password)
|
const login = await loginMatrixUser(username, password)
|
||||||
matrixServiceSessionCache = {
|
matrixServiceSessionCache = {
|
||||||
accessToken: login.access_token,
|
accessToken: login.access_token,
|
||||||
@@ -442,6 +453,7 @@ export function matrixService(server: FastifyInstance) {
|
|||||||
{
|
{
|
||||||
statusCode: response.status,
|
statusCode: response.status,
|
||||||
errcode: error.errcode,
|
errcode: error.errcode,
|
||||||
|
retryAfterMs: error.retry_after_ms,
|
||||||
body,
|
body,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|||||||
Reference in New Issue
Block a user