From 51e0ae95b177dc5b971e4e00ed23a113628d00ee Mon Sep 17 00:00:00 2001 From: florianfederspiel Date: Fri, 22 May 2026 21:32:47 +0200 Subject: [PATCH] =?UTF-8?q?KI-AGENT:=20Matrix=20Push=20Worker=20Diagnose?= =?UTF-8?q?=20erg=C3=A4nzen?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/modules/matrix-push-worker.service.ts | 123 +++++++++++++++++- backend/src/routes/communication.ts | 5 + 2 files changed, 126 insertions(+), 2 deletions(-) diff --git a/backend/src/modules/matrix-push-worker.service.ts b/backend/src/modules/matrix-push-worker.service.ts index 308226e..8bec483 100644 --- a/backend/src/modules/matrix-push-worker.service.ts +++ b/backend/src/modules/matrix-push-worker.service.ts @@ -14,6 +14,54 @@ type ChatRecipient = { matrixUserId?: string } +type MatrixPushWorkerEvent = { + at: string + type: string + roomKey?: string + roomId?: string | null + messageId?: string + sender?: string + targets?: number + created?: number + delivered?: number + failed?: number + error?: string +} + +const matrixPushWorkerState = { + enabled: false, + startedAt: null as string | null, + lastRunAt: null as string | null, + lastJoinAt: null as string | null, + lastJoinTotal: 0, + lastJoinJoined: 0, + lastJoinFailed: 0, + hasSyncToken: false, + lastSyncRooms: 0, + lastSyncMessages: 0, + lastMatchedRooms: 0, + lastNotificationsCreated: 0, + lastNotificationsDelivered: 0, + lastNotificationsFailed: 0, + lastError: null as string | null, + events: [] as MatrixPushWorkerEvent[], +} + +const rememberWorkerEvent = (event: MatrixPushWorkerEvent) => { + matrixPushWorkerState.events = [ + { + at: new Date().toISOString(), + ...event, + }, + ...matrixPushWorkerState.events, + ].slice(0, 25) +} + +export const getMatrixPushWorkerState = () => ({ + ...matrixPushWorkerState, + events: [...matrixPushWorkerState.events], +}) + const getUserDirectory: UserDirectory = async (server: FastifyInstance, userId) => { const rows = await server.db .select({ email: authUsers.email }) @@ -65,6 +113,10 @@ export function startMatrixPushWorker(server: FastifyInstance) { return } + matrixPushWorkerState.enabled = true + matrixPushWorkerState.startedAt = new Date().toISOString() + rememberWorkerEvent({ at: new Date().toISOString(), type: "started" }) + const matrix = matrixService(server) const notifications = new NotificationService(server, getUserDirectory) const intervalMs = Math.max(Number(process.env.MATRIX_PUSH_WORKER_INTERVAL_MS || 3000), 1000) @@ -153,15 +205,35 @@ export function startMatrixPushWorker(server: FastifyInstance) { const sender = recipients.find((recipient) => recipient.matrixUserId === message.sender) || null const text = message.body || message.attachment?.fileName || "Neue Nachricht" const targets = recipientsForMessage(room, recipients, sender?.userId || null, text) + rememberWorkerEvent({ + at: new Date().toISOString(), + type: "message_seen", + roomKey: room.key, + roomId: room.matrixRoomId, + messageId: message.id, + sender: message.sender, + targets: targets.length, + }) if (!targets.length) return const senderName = sender ? displayUserName(sender) : message.senderDisplayName || message.sender || "Matrix" const preview = text.length > 160 ? `${text.slice(0, 157)}...` : text for (const target of targets) { - if (await hasChatNotificationForMessage(room.tenantId, target.userId, message.id)) continue + if (await hasChatNotificationForMessage(room.tenantId, target.userId, message.id)) { + rememberWorkerEvent({ + at: new Date().toISOString(), + type: "notification_skipped_duplicate", + roomKey: room.key, + roomId: room.matrixRoomId, + messageId: message.id, + sender: message.sender, + targets: 1, + }) + continue + } - await notifications.trigger({ + const result = await notifications.trigger({ tenantId: room.tenantId, userId: target.userId, eventType: "communication.message.new", @@ -179,6 +251,21 @@ export function startMatrixPushWorker(server: FastifyInstance) { }, channels: ["inapp", "push"], }) + matrixPushWorkerState.lastNotificationsCreated += result.created || 0 + matrixPushWorkerState.lastNotificationsDelivered += result.delivered || 0 + matrixPushWorkerState.lastNotificationsFailed += result.failed || 0 + rememberWorkerEvent({ + at: new Date().toISOString(), + type: "notification_triggered", + roomKey: room.key, + roomId: room.matrixRoomId, + messageId: message.id, + sender: message.sender, + targets: 1, + created: result.created || 0, + delivered: result.delivered || 0, + failed: result.failed || 0, + }) } } @@ -187,9 +274,29 @@ export function startMatrixPushWorker(server: FastifyInstance) { running = true try { + matrixPushWorkerState.lastRunAt = new Date().toISOString() + matrixPushWorkerState.lastError = null + matrixPushWorkerState.lastSyncRooms = 0 + matrixPushWorkerState.lastSyncMessages = 0 + matrixPushWorkerState.lastMatchedRooms = 0 + matrixPushWorkerState.lastNotificationsCreated = 0 + matrixPushWorkerState.lastNotificationsDelivered = 0 + matrixPushWorkerState.lastNotificationsFailed = 0 + if (!lastServiceJoinSyncAt || Date.now() - lastServiceJoinSyncAt > 60_000) { const joinResult = await matrix.syncServiceJoinedTenantRooms() lastServiceJoinSyncAt = Date.now() + matrixPushWorkerState.lastJoinAt = new Date().toISOString() + matrixPushWorkerState.lastJoinTotal = joinResult.total + matrixPushWorkerState.lastJoinJoined = joinResult.joined + matrixPushWorkerState.lastJoinFailed = joinResult.failed + rememberWorkerEvent({ + at: new Date().toISOString(), + type: "service_join_sync", + targets: joinResult.total, + delivered: joinResult.joined, + failed: joinResult.failed, + }) if (joinResult.failed) { console.warn("Matrix-Push-Worker: Service-User konnte nicht alle Räume joinen", { total: joinResult.total, @@ -202,6 +309,10 @@ export function startMatrixPushWorker(server: FastifyInstance) { const initial = !since const sync = await matrix.syncServiceRoomEvents(since, initial) since = sync.nextBatch || since + matrixPushWorkerState.hasSyncToken = Boolean(since) + matrixPushWorkerState.lastSyncRooms = sync.rooms?.length || 0 + matrixPushWorkerState.lastSyncMessages = (sync.rooms || []) + .reduce((sum: number, room: any) => sum + (room.messages?.length || 0), 0) if (!initial && sync.rooms?.length) { const roomIds = sync.rooms.map((room: any) => room.roomId).filter(Boolean) @@ -216,6 +327,7 @@ export function startMatrixPushWorker(server: FastifyInstance) { )) : [] const roomsByMatrixId = new Map(rooms.map((room) => [room.matrixRoomId, room])) + matrixPushWorkerState.lastMatchedRooms = rooms.length const recipientsByTenant = new Map() for (const syncedRoom of sync.rooms) { @@ -233,6 +345,13 @@ export function startMatrixPushWorker(server: FastifyInstance) { } } } catch (err) { + matrixPushWorkerState.lastError = err instanceof Error ? err.message : String(err) + rememberWorkerEvent({ + at: new Date().toISOString(), + type: "error", + error: matrixPushWorkerState.lastError, + }) + console.error("Matrix-Push-Worker konnte Matrix-Events nicht verarbeiten", err) server.log.error({ err }, "Matrix-Push-Worker konnte Matrix-Events nicht verarbeiten") } finally { running = false diff --git a/backend/src/routes/communication.ts b/backend/src/routes/communication.ts index 2b12897..d08df09 100644 --- a/backend/src/routes/communication.ts +++ b/backend/src/routes/communication.ts @@ -4,6 +4,7 @@ import multipart from "@fastify/multipart" import { and, desc, eq, inArray, ne } from "drizzle-orm" import { authProfiles, authTenantUsers, authUsers, notificationsItems, projects } from "../../db/schema" import { matrixService } from "../modules/matrix.service" +import { getMatrixPushWorkerState } from "../modules/matrix-push-worker.service" import { NotificationService, UserDirectory } from "../modules/notification.service" const getUserDirectory: UserDirectory = async (server: FastifyInstance, userId) => { @@ -448,6 +449,10 @@ export default async function communicationRoutes(server: FastifyInstance) { } }) + server.get("/communication/matrix/push-worker", async () => { + return getMatrixPushWorkerState() + }) + server.get("/communication/matrix/media", async (req, reply) => { try { const query = req.query as { uri?: string; name?: string }