KI-AGENT: Matrix Push Worker Diagnose ergänzen
This commit is contained in:
@@ -14,6 +14,54 @@ type ChatRecipient = {
|
|||||||
matrixUserId?: string
|
matrixUserId?: string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type MatrixPushWorkerEvent = {
|
||||||
|
at: string
|
||||||
|
type: string
|
||||||
|
roomKey?: string
|
||||||
|
roomId?: string | null
|
||||||
|
messageId?: string
|
||||||
|
sender?: string
|
||||||
|
targets?: number
|
||||||
|
created?: number
|
||||||
|
delivered?: number
|
||||||
|
failed?: number
|
||||||
|
error?: string
|
||||||
|
}
|
||||||
|
|
||||||
|
const matrixPushWorkerState = {
|
||||||
|
enabled: false,
|
||||||
|
startedAt: null as string | null,
|
||||||
|
lastRunAt: null as string | null,
|
||||||
|
lastJoinAt: null as string | null,
|
||||||
|
lastJoinTotal: 0,
|
||||||
|
lastJoinJoined: 0,
|
||||||
|
lastJoinFailed: 0,
|
||||||
|
hasSyncToken: false,
|
||||||
|
lastSyncRooms: 0,
|
||||||
|
lastSyncMessages: 0,
|
||||||
|
lastMatchedRooms: 0,
|
||||||
|
lastNotificationsCreated: 0,
|
||||||
|
lastNotificationsDelivered: 0,
|
||||||
|
lastNotificationsFailed: 0,
|
||||||
|
lastError: null as string | null,
|
||||||
|
events: [] as MatrixPushWorkerEvent[],
|
||||||
|
}
|
||||||
|
|
||||||
|
const rememberWorkerEvent = (event: MatrixPushWorkerEvent) => {
|
||||||
|
matrixPushWorkerState.events = [
|
||||||
|
{
|
||||||
|
at: new Date().toISOString(),
|
||||||
|
...event,
|
||||||
|
},
|
||||||
|
...matrixPushWorkerState.events,
|
||||||
|
].slice(0, 25)
|
||||||
|
}
|
||||||
|
|
||||||
|
export const getMatrixPushWorkerState = () => ({
|
||||||
|
...matrixPushWorkerState,
|
||||||
|
events: [...matrixPushWorkerState.events],
|
||||||
|
})
|
||||||
|
|
||||||
const getUserDirectory: UserDirectory = async (server: FastifyInstance, userId) => {
|
const getUserDirectory: UserDirectory = async (server: FastifyInstance, userId) => {
|
||||||
const rows = await server.db
|
const rows = await server.db
|
||||||
.select({ email: authUsers.email })
|
.select({ email: authUsers.email })
|
||||||
@@ -65,6 +113,10 @@ export function startMatrixPushWorker(server: FastifyInstance) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
matrixPushWorkerState.enabled = true
|
||||||
|
matrixPushWorkerState.startedAt = new Date().toISOString()
|
||||||
|
rememberWorkerEvent({ at: new Date().toISOString(), type: "started" })
|
||||||
|
|
||||||
const matrix = matrixService(server)
|
const matrix = matrixService(server)
|
||||||
const notifications = new NotificationService(server, getUserDirectory)
|
const notifications = new NotificationService(server, getUserDirectory)
|
||||||
const intervalMs = Math.max(Number(process.env.MATRIX_PUSH_WORKER_INTERVAL_MS || 3000), 1000)
|
const intervalMs = Math.max(Number(process.env.MATRIX_PUSH_WORKER_INTERVAL_MS || 3000), 1000)
|
||||||
@@ -153,15 +205,35 @@ export function startMatrixPushWorker(server: FastifyInstance) {
|
|||||||
const sender = recipients.find((recipient) => recipient.matrixUserId === message.sender) || null
|
const sender = recipients.find((recipient) => recipient.matrixUserId === message.sender) || null
|
||||||
const text = message.body || message.attachment?.fileName || "Neue Nachricht"
|
const text = message.body || message.attachment?.fileName || "Neue Nachricht"
|
||||||
const targets = recipientsForMessage(room, recipients, sender?.userId || null, text)
|
const targets = recipientsForMessage(room, recipients, sender?.userId || null, text)
|
||||||
|
rememberWorkerEvent({
|
||||||
|
at: new Date().toISOString(),
|
||||||
|
type: "message_seen",
|
||||||
|
roomKey: room.key,
|
||||||
|
roomId: room.matrixRoomId,
|
||||||
|
messageId: message.id,
|
||||||
|
sender: message.sender,
|
||||||
|
targets: targets.length,
|
||||||
|
})
|
||||||
if (!targets.length) return
|
if (!targets.length) return
|
||||||
|
|
||||||
const senderName = sender ? displayUserName(sender) : message.senderDisplayName || message.sender || "Matrix"
|
const senderName = sender ? displayUserName(sender) : message.senderDisplayName || message.sender || "Matrix"
|
||||||
const preview = text.length > 160 ? `${text.slice(0, 157)}...` : text
|
const preview = text.length > 160 ? `${text.slice(0, 157)}...` : text
|
||||||
|
|
||||||
for (const target of targets) {
|
for (const target of targets) {
|
||||||
if (await hasChatNotificationForMessage(room.tenantId, target.userId, message.id)) continue
|
if (await hasChatNotificationForMessage(room.tenantId, target.userId, message.id)) {
|
||||||
|
rememberWorkerEvent({
|
||||||
|
at: new Date().toISOString(),
|
||||||
|
type: "notification_skipped_duplicate",
|
||||||
|
roomKey: room.key,
|
||||||
|
roomId: room.matrixRoomId,
|
||||||
|
messageId: message.id,
|
||||||
|
sender: message.sender,
|
||||||
|
targets: 1,
|
||||||
|
})
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
await notifications.trigger({
|
const result = await notifications.trigger({
|
||||||
tenantId: room.tenantId,
|
tenantId: room.tenantId,
|
||||||
userId: target.userId,
|
userId: target.userId,
|
||||||
eventType: "communication.message.new",
|
eventType: "communication.message.new",
|
||||||
@@ -179,6 +251,21 @@ export function startMatrixPushWorker(server: FastifyInstance) {
|
|||||||
},
|
},
|
||||||
channels: ["inapp", "push"],
|
channels: ["inapp", "push"],
|
||||||
})
|
})
|
||||||
|
matrixPushWorkerState.lastNotificationsCreated += result.created || 0
|
||||||
|
matrixPushWorkerState.lastNotificationsDelivered += result.delivered || 0
|
||||||
|
matrixPushWorkerState.lastNotificationsFailed += result.failed || 0
|
||||||
|
rememberWorkerEvent({
|
||||||
|
at: new Date().toISOString(),
|
||||||
|
type: "notification_triggered",
|
||||||
|
roomKey: room.key,
|
||||||
|
roomId: room.matrixRoomId,
|
||||||
|
messageId: message.id,
|
||||||
|
sender: message.sender,
|
||||||
|
targets: 1,
|
||||||
|
created: result.created || 0,
|
||||||
|
delivered: result.delivered || 0,
|
||||||
|
failed: result.failed || 0,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -187,9 +274,29 @@ export function startMatrixPushWorker(server: FastifyInstance) {
|
|||||||
running = true
|
running = true
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
matrixPushWorkerState.lastRunAt = new Date().toISOString()
|
||||||
|
matrixPushWorkerState.lastError = null
|
||||||
|
matrixPushWorkerState.lastSyncRooms = 0
|
||||||
|
matrixPushWorkerState.lastSyncMessages = 0
|
||||||
|
matrixPushWorkerState.lastMatchedRooms = 0
|
||||||
|
matrixPushWorkerState.lastNotificationsCreated = 0
|
||||||
|
matrixPushWorkerState.lastNotificationsDelivered = 0
|
||||||
|
matrixPushWorkerState.lastNotificationsFailed = 0
|
||||||
|
|
||||||
if (!lastServiceJoinSyncAt || Date.now() - lastServiceJoinSyncAt > 60_000) {
|
if (!lastServiceJoinSyncAt || Date.now() - lastServiceJoinSyncAt > 60_000) {
|
||||||
const joinResult = await matrix.syncServiceJoinedTenantRooms()
|
const joinResult = await matrix.syncServiceJoinedTenantRooms()
|
||||||
lastServiceJoinSyncAt = Date.now()
|
lastServiceJoinSyncAt = Date.now()
|
||||||
|
matrixPushWorkerState.lastJoinAt = new Date().toISOString()
|
||||||
|
matrixPushWorkerState.lastJoinTotal = joinResult.total
|
||||||
|
matrixPushWorkerState.lastJoinJoined = joinResult.joined
|
||||||
|
matrixPushWorkerState.lastJoinFailed = joinResult.failed
|
||||||
|
rememberWorkerEvent({
|
||||||
|
at: new Date().toISOString(),
|
||||||
|
type: "service_join_sync",
|
||||||
|
targets: joinResult.total,
|
||||||
|
delivered: joinResult.joined,
|
||||||
|
failed: joinResult.failed,
|
||||||
|
})
|
||||||
if (joinResult.failed) {
|
if (joinResult.failed) {
|
||||||
console.warn("Matrix-Push-Worker: Service-User konnte nicht alle Räume joinen", {
|
console.warn("Matrix-Push-Worker: Service-User konnte nicht alle Räume joinen", {
|
||||||
total: joinResult.total,
|
total: joinResult.total,
|
||||||
@@ -202,6 +309,10 @@ export function startMatrixPushWorker(server: FastifyInstance) {
|
|||||||
const initial = !since
|
const initial = !since
|
||||||
const sync = await matrix.syncServiceRoomEvents(since, initial)
|
const sync = await matrix.syncServiceRoomEvents(since, initial)
|
||||||
since = sync.nextBatch || since
|
since = sync.nextBatch || since
|
||||||
|
matrixPushWorkerState.hasSyncToken = Boolean(since)
|
||||||
|
matrixPushWorkerState.lastSyncRooms = sync.rooms?.length || 0
|
||||||
|
matrixPushWorkerState.lastSyncMessages = (sync.rooms || [])
|
||||||
|
.reduce((sum: number, room: any) => sum + (room.messages?.length || 0), 0)
|
||||||
|
|
||||||
if (!initial && sync.rooms?.length) {
|
if (!initial && sync.rooms?.length) {
|
||||||
const roomIds = sync.rooms.map((room: any) => room.roomId).filter(Boolean)
|
const roomIds = sync.rooms.map((room: any) => room.roomId).filter(Boolean)
|
||||||
@@ -216,6 +327,7 @@ export function startMatrixPushWorker(server: FastifyInstance) {
|
|||||||
))
|
))
|
||||||
: []
|
: []
|
||||||
const roomsByMatrixId = new Map(rooms.map((room) => [room.matrixRoomId, room]))
|
const roomsByMatrixId = new Map(rooms.map((room) => [room.matrixRoomId, room]))
|
||||||
|
matrixPushWorkerState.lastMatchedRooms = rooms.length
|
||||||
const recipientsByTenant = new Map<number, ChatRecipient[]>()
|
const recipientsByTenant = new Map<number, ChatRecipient[]>()
|
||||||
|
|
||||||
for (const syncedRoom of sync.rooms) {
|
for (const syncedRoom of sync.rooms) {
|
||||||
@@ -233,6 +345,13 @@ export function startMatrixPushWorker(server: FastifyInstance) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
|
matrixPushWorkerState.lastError = err instanceof Error ? err.message : String(err)
|
||||||
|
rememberWorkerEvent({
|
||||||
|
at: new Date().toISOString(),
|
||||||
|
type: "error",
|
||||||
|
error: matrixPushWorkerState.lastError,
|
||||||
|
})
|
||||||
|
console.error("Matrix-Push-Worker konnte Matrix-Events nicht verarbeiten", err)
|
||||||
server.log.error({ err }, "Matrix-Push-Worker konnte Matrix-Events nicht verarbeiten")
|
server.log.error({ err }, "Matrix-Push-Worker konnte Matrix-Events nicht verarbeiten")
|
||||||
} finally {
|
} finally {
|
||||||
running = false
|
running = false
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import multipart from "@fastify/multipart"
|
|||||||
import { and, desc, eq, inArray, ne } from "drizzle-orm"
|
import { and, desc, eq, inArray, ne } from "drizzle-orm"
|
||||||
import { authProfiles, authTenantUsers, authUsers, notificationsItems, projects } from "../../db/schema"
|
import { authProfiles, authTenantUsers, authUsers, notificationsItems, projects } from "../../db/schema"
|
||||||
import { matrixService } from "../modules/matrix.service"
|
import { matrixService } from "../modules/matrix.service"
|
||||||
|
import { getMatrixPushWorkerState } from "../modules/matrix-push-worker.service"
|
||||||
import { NotificationService, UserDirectory } from "../modules/notification.service"
|
import { NotificationService, UserDirectory } from "../modules/notification.service"
|
||||||
|
|
||||||
const getUserDirectory: UserDirectory = async (server: FastifyInstance, userId) => {
|
const getUserDirectory: UserDirectory = async (server: FastifyInstance, userId) => {
|
||||||
@@ -448,6 +449,10 @@ export default async function communicationRoutes(server: FastifyInstance) {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
server.get("/communication/matrix/push-worker", async () => {
|
||||||
|
return getMatrixPushWorkerState()
|
||||||
|
})
|
||||||
|
|
||||||
server.get("/communication/matrix/media", async (req, reply) => {
|
server.get("/communication/matrix/media", async (req, reply) => {
|
||||||
try {
|
try {
|
||||||
const query = req.query as { uri?: string; name?: string }
|
const query = req.query as { uri?: string; name?: string }
|
||||||
|
|||||||
Reference in New Issue
Block a user