diff --git a/backend/src/index.ts b/backend/src/index.ts index b50306b..aa90483 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -58,6 +58,7 @@ import {loadSecrets, secrets} from "./utils/secrets"; import {initMailer} from "./utils/mailer" import {initS3} from "./utils/s3"; import { runBootstrap } from "./modules/bootstrap.service"; +import { startMatrixPushWorker } from "./modules/matrix-push-worker.service"; //Services @@ -83,6 +84,7 @@ async function main() { await app.register(dbPlugin); await app.register(servicesPlugin); await runBootstrap(app); + startMatrixPushWorker(app); app.addHook('preHandler', (req, reply, done) => { console.log(req.method) diff --git a/backend/src/modules/matrix-push-worker.service.ts b/backend/src/modules/matrix-push-worker.service.ts new file mode 100644 index 0000000..e728155 --- /dev/null +++ b/backend/src/modules/matrix-push-worker.service.ts @@ -0,0 +1,237 @@ +import { createHash } from "node:crypto" +import type { FastifyInstance } from "fastify" +import { and, desc, eq, inArray, isNotNull, ne } from "drizzle-orm" +import { authProfiles, authTenantUsers, authUsers, communicationRooms, notificationsItems } from "../../db/schema" +import { matrixService } from "./matrix.service" +import { NotificationService, UserDirectory } from "./notification.service" + +type ChatRecipient = { + userId: string + email?: string | null + firstName?: string | null + lastName?: string | null + fullName?: string | null + matrixUserId?: string +} + +const getUserDirectory: UserDirectory = async (server: FastifyInstance, userId) => { + const rows = await server.db + .select({ email: authUsers.email }) + .from(authUsers) + .where(eq(authUsers.id, userId)) + .limit(1) + + return rows[0] || null +} + +const displayUserName = (user: { fullName?: string | null; firstName?: string | null; lastName?: string | null; email?: string | null }) => { + const name = user.fullName || [user.firstName, user.lastName].filter(Boolean).join(" ") + return name || user.email || "Benutzer" +} + +const directRoomKey = (firstUserId: string, secondUserId: string) => { + const hash = createHash("sha256") + .update([firstUserId, secondUserId].sort().join(":")) + .digest("hex") + .slice(0, 16) + + return `direct_${hash}` +} + +const mentionAliasesForUser = (user: ChatRecipient) => { + const name = displayUserName(user) + return Array.from(new Set([ + name, + user.fullName, + [user.firstName, user.lastName].filter(Boolean).join(" "), + user.firstName, + user.email, + ].filter(Boolean).map((value) => String(value).toLowerCase()))) +} + +const mentionedRecipientIds = (text: string, recipients: ChatRecipient[]) => { + const normalizedText = text.toLowerCase() + + return recipients + .filter((recipient) => mentionAliasesForUser(recipient).some((alias) => + normalizedText.includes(`@${alias}`) + )) + .map((recipient) => recipient.userId) +} + +export function startMatrixPushWorker(server: FastifyInstance) { + if (process.env.MATRIX_PUSH_WORKER_DISABLED === "1") { + server.log.info("Matrix-Push-Worker ist deaktiviert") + return + } + + const matrix = matrixService(server) + const notifications = new NotificationService(server, getUserDirectory) + const intervalMs = Math.max(Number(process.env.MATRIX_PUSH_WORKER_INTERVAL_MS || 3000), 1000) + let since: string | undefined + let running = false + let stopped = false + let timer: ReturnType | undefined + + const getTenantRecipients = async (tenantId: number) => { + const rows = await server.db + .select({ + userId: authTenantUsers.user_id, + email: authUsers.email, + firstName: authProfiles.first_name, + lastName: authProfiles.last_name, + fullName: authProfiles.full_name, + }) + .from(authTenantUsers) + .innerJoin(authUsers, eq(authUsers.id, authTenantUsers.user_id)) + .leftJoin(authProfiles, and( + eq(authProfiles.user_id, authTenantUsers.user_id), + eq(authProfiles.tenant_id, tenantId) + )) + .where(eq(authTenantUsers.tenant_id, tenantId)) + + return await Promise.all(rows.map(async (row) => ({ + ...row, + matrixUserId: await matrix.matrixUserIdForUser(row.userId, tenantId), + }))) + } + + const hasChatNotificationForMessage = async (tenantId: number, userId: string, messageId: string) => { + const rows = await server.db + .select({ + payload: notificationsItems.payload, + }) + .from(notificationsItems) + .where(and( + eq(notificationsItems.tenantId, tenantId), + eq(notificationsItems.userId, userId), + eq(notificationsItems.eventType, "communication.message.new") + )) + .orderBy(desc(notificationsItems.createdAt)) + .limit(200) + + return rows.some((row) => (row.payload as any)?.messageId === messageId) + } + + const recipientsForMessage = ( + room: typeof communicationRooms.$inferSelect, + recipients: ChatRecipient[], + senderUserId: string | null, + text: string + ) => { + const candidates = senderUserId + ? recipients.filter((recipient) => recipient.userId !== senderUserId) + : recipients + const mentioned = new Set(mentionedRecipientIds(text, candidates)) + const directRecipients = new Set() + + if (room.type === "direct" && room.entityUuid && room.entityUuid !== senderUserId) { + directRecipients.add(room.entityUuid) + } else if (room.type === "direct" && senderUserId) { + candidates + .filter((recipient) => directRoomKey(senderUserId, recipient.userId) === room.key) + .forEach((recipient) => directRecipients.add(recipient.userId)) + } + + return candidates + .filter((recipient) => directRecipients.has(recipient.userId) || mentioned.has(recipient.userId)) + .map((recipient) => ({ + ...recipient, + mentioned: mentioned.has(recipient.userId), + direct: directRecipients.has(recipient.userId), + })) + } + + const deliverMessageNotification = async ( + room: typeof communicationRooms.$inferSelect, + message: any, + recipients: ChatRecipient[] + ) => { + if (!message.id || message.own) return + + const sender = recipients.find((recipient) => recipient.matrixUserId === message.sender) || null + const text = message.body || message.attachment?.fileName || "Neue Nachricht" + const targets = recipientsForMessage(room, recipients, sender?.userId || null, text) + if (!targets.length) return + + const senderName = sender ? displayUserName(sender) : message.senderDisplayName || message.sender || "Matrix" + const preview = text.length > 160 ? `${text.slice(0, 157)}...` : text + + for (const target of targets) { + if (await hasChatNotificationForMessage(room.tenantId, target.userId, message.id)) continue + + await notifications.trigger({ + tenantId: room.tenantId, + userId: target.userId, + eventType: "communication.message.new", + title: target.mentioned ? `${senderName} hat dich erwähnt` : `Neue Direktnachricht von ${senderName}`, + message: preview, + payload: { + link: `/communication/chat?room=${encodeURIComponent(room.key)}`, + roomKey: room.key, + roomName: room.name, + roomType: room.type, + messageId: message.id, + matrixSender: message.sender, + mentioned: target.mentioned, + direct: target.direct, + }, + channels: ["inapp", "push"], + }) + } + } + + const runOnce = async () => { + if (running || stopped) return + running = true + + try { + const initial = !since + const sync = await matrix.syncServiceRoomEvents(since, initial) + since = sync.nextBatch || since + + if (!initial && sync.rooms?.length) { + const roomIds = sync.rooms.map((room: any) => room.roomId).filter(Boolean) + const rooms = roomIds.length + ? await server.db + .select() + .from(communicationRooms) + .where(and( + inArray(communicationRooms.matrixRoomId, roomIds), + ne(communicationRooms.archived, true), + isNotNull(communicationRooms.matrixRoomId) + )) + : [] + const roomsByMatrixId = new Map(rooms.map((room) => [room.matrixRoomId, room])) + const recipientsByTenant = new Map() + + for (const syncedRoom of sync.rooms) { + const room = roomsByMatrixId.get(syncedRoom.roomId) + if (!room || !syncedRoom.messages?.length) continue + + if (!recipientsByTenant.has(room.tenantId)) { + recipientsByTenant.set(room.tenantId, await getTenantRecipients(room.tenantId)) + } + + const recipients = recipientsByTenant.get(room.tenantId) || [] + for (const message of syncedRoom.messages) { + await deliverMessageNotification(room, message, recipients) + } + } + } + } catch (err) { + server.log.error({ err }, "Matrix-Push-Worker konnte Matrix-Events nicht verarbeiten") + } finally { + running = false + if (!stopped) { + timer = setTimeout(() => void runOnce(), since ? 0 : intervalMs) + } + } + } + + timer = setTimeout(() => void runOnce(), intervalMs) + server.addHook("onClose", async () => { + stopped = true + if (timer) clearTimeout(timer) + }) +} diff --git a/backend/src/modules/matrix.service.ts b/backend/src/modules/matrix.service.ts index bf4e529..19ab7ed 100644 --- a/backend/src/modules/matrix.service.ts +++ b/backend/src/modules/matrix.service.ts @@ -1512,6 +1512,69 @@ export function matrixService(server: FastifyInstance) { } } + const syncServiceRoomEvents = async (since?: string, initial = false) => { + const service = await ensureServiceAccessToken() + const filter = { + room: { + timeline: { + limit: 50, + }, + }, + presence: { + types: [], + }, + account_data: { + types: [], + }, + } + const params = new URLSearchParams({ + timeout: since && !initial ? "25000" : "0", + filter: JSON.stringify(filter), + }) + + if (since) params.set("since", since) + + const response = await requestMatrixJson( + `/_matrix/client/v3/sync?${params.toString()}`, + service.accessToken + ) + const joinedRooms = response.rooms?.join || {} + + return { + nextBatch: response.next_batch || since || "", + serviceUserId: service.matrixUserId, + rooms: Object.entries(joinedRooms).map(([roomId, joinedRoom]) => { + const timelineEvents = joinedRoom.timeline?.events || [] + const messages = initial + ? [] + : timelineEvents + .filter((event) => + event.type === "m.room.message" && + ["m.text", "m.file", "m.image"].includes(event.content?.msgtype || "") && + event.content?.["m.relates_to"]?.rel_type !== "m.replace" + ) + .map((event) => ({ + id: event.event_id, + roomId, + sender: event.sender, + senderDisplayName: event.sender, + body: event.content?.body || "", + attachment: attachmentFromEvent(event), + timestamp: event.origin_server_ts, + own: event.sender === service.matrixUserId, + replyToEventId: event.content?.["m.relates_to"]?.["m.in_reply_to"]?.event_id || null, + })) + + return { + roomId, + messages, + membersChanged: [...timelineEvents, ...(joinedRoom.state?.events || [])] + .some((event) => event.type === "m.room.member"), + } + }), + } + } + const sendTenantRoomMessage = async ( userId: string, tenantId: number | null, @@ -2189,6 +2252,7 @@ export function matrixService(server: FastifyInstance) { getTenantRoomMembers, searchTenantRoomMessages, syncTenantRoomEvents, + syncServiceRoomEvents, sendTenantRoomMessage, sendTenantRoomReaction, editTenantRoomMessage,