KI-AGENT: Matrix Push Worker für ungelesene Nachrichten ergänzen
This commit is contained in:
@@ -58,6 +58,7 @@ import {loadSecrets, secrets} from "./utils/secrets";
|
|||||||
import {initMailer} from "./utils/mailer"
|
import {initMailer} from "./utils/mailer"
|
||||||
import {initS3} from "./utils/s3";
|
import {initS3} from "./utils/s3";
|
||||||
import { runBootstrap } from "./modules/bootstrap.service";
|
import { runBootstrap } from "./modules/bootstrap.service";
|
||||||
|
import { startMatrixPushWorker } from "./modules/matrix-push-worker.service";
|
||||||
|
|
||||||
|
|
||||||
//Services
|
//Services
|
||||||
@@ -83,6 +84,7 @@ async function main() {
|
|||||||
await app.register(dbPlugin);
|
await app.register(dbPlugin);
|
||||||
await app.register(servicesPlugin);
|
await app.register(servicesPlugin);
|
||||||
await runBootstrap(app);
|
await runBootstrap(app);
|
||||||
|
startMatrixPushWorker(app);
|
||||||
|
|
||||||
app.addHook('preHandler', (req, reply, done) => {
|
app.addHook('preHandler', (req, reply, done) => {
|
||||||
console.log(req.method)
|
console.log(req.method)
|
||||||
|
|||||||
237
backend/src/modules/matrix-push-worker.service.ts
Normal file
237
backend/src/modules/matrix-push-worker.service.ts
Normal file
@@ -0,0 +1,237 @@
|
|||||||
|
import { createHash } from "node:crypto"
|
||||||
|
import type { FastifyInstance } from "fastify"
|
||||||
|
import { and, desc, eq, inArray, isNotNull, ne } from "drizzle-orm"
|
||||||
|
import { authProfiles, authTenantUsers, authUsers, communicationRooms, notificationsItems } from "../../db/schema"
|
||||||
|
import { matrixService } from "./matrix.service"
|
||||||
|
import { NotificationService, UserDirectory } from "./notification.service"
|
||||||
|
|
||||||
|
type ChatRecipient = {
|
||||||
|
userId: string
|
||||||
|
email?: string | null
|
||||||
|
firstName?: string | null
|
||||||
|
lastName?: string | null
|
||||||
|
fullName?: string | null
|
||||||
|
matrixUserId?: string
|
||||||
|
}
|
||||||
|
|
||||||
|
const getUserDirectory: UserDirectory = async (server: FastifyInstance, userId) => {
|
||||||
|
const rows = await server.db
|
||||||
|
.select({ email: authUsers.email })
|
||||||
|
.from(authUsers)
|
||||||
|
.where(eq(authUsers.id, userId))
|
||||||
|
.limit(1)
|
||||||
|
|
||||||
|
return rows[0] || null
|
||||||
|
}
|
||||||
|
|
||||||
|
const displayUserName = (user: { fullName?: string | null; firstName?: string | null; lastName?: string | null; email?: string | null }) => {
|
||||||
|
const name = user.fullName || [user.firstName, user.lastName].filter(Boolean).join(" ")
|
||||||
|
return name || user.email || "Benutzer"
|
||||||
|
}
|
||||||
|
|
||||||
|
const directRoomKey = (firstUserId: string, secondUserId: string) => {
|
||||||
|
const hash = createHash("sha256")
|
||||||
|
.update([firstUserId, secondUserId].sort().join(":"))
|
||||||
|
.digest("hex")
|
||||||
|
.slice(0, 16)
|
||||||
|
|
||||||
|
return `direct_${hash}`
|
||||||
|
}
|
||||||
|
|
||||||
|
const mentionAliasesForUser = (user: ChatRecipient) => {
|
||||||
|
const name = displayUserName(user)
|
||||||
|
return Array.from(new Set([
|
||||||
|
name,
|
||||||
|
user.fullName,
|
||||||
|
[user.firstName, user.lastName].filter(Boolean).join(" "),
|
||||||
|
user.firstName,
|
||||||
|
user.email,
|
||||||
|
].filter(Boolean).map((value) => String(value).toLowerCase())))
|
||||||
|
}
|
||||||
|
|
||||||
|
const mentionedRecipientIds = (text: string, recipients: ChatRecipient[]) => {
|
||||||
|
const normalizedText = text.toLowerCase()
|
||||||
|
|
||||||
|
return recipients
|
||||||
|
.filter((recipient) => mentionAliasesForUser(recipient).some((alias) =>
|
||||||
|
normalizedText.includes(`@${alias}`)
|
||||||
|
))
|
||||||
|
.map((recipient) => recipient.userId)
|
||||||
|
}
|
||||||
|
|
||||||
|
export function startMatrixPushWorker(server: FastifyInstance) {
|
||||||
|
if (process.env.MATRIX_PUSH_WORKER_DISABLED === "1") {
|
||||||
|
server.log.info("Matrix-Push-Worker ist deaktiviert")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
const matrix = matrixService(server)
|
||||||
|
const notifications = new NotificationService(server, getUserDirectory)
|
||||||
|
const intervalMs = Math.max(Number(process.env.MATRIX_PUSH_WORKER_INTERVAL_MS || 3000), 1000)
|
||||||
|
let since: string | undefined
|
||||||
|
let running = false
|
||||||
|
let stopped = false
|
||||||
|
let timer: ReturnType<typeof setTimeout> | undefined
|
||||||
|
|
||||||
|
const getTenantRecipients = async (tenantId: number) => {
|
||||||
|
const rows = await server.db
|
||||||
|
.select({
|
||||||
|
userId: authTenantUsers.user_id,
|
||||||
|
email: authUsers.email,
|
||||||
|
firstName: authProfiles.first_name,
|
||||||
|
lastName: authProfiles.last_name,
|
||||||
|
fullName: authProfiles.full_name,
|
||||||
|
})
|
||||||
|
.from(authTenantUsers)
|
||||||
|
.innerJoin(authUsers, eq(authUsers.id, authTenantUsers.user_id))
|
||||||
|
.leftJoin(authProfiles, and(
|
||||||
|
eq(authProfiles.user_id, authTenantUsers.user_id),
|
||||||
|
eq(authProfiles.tenant_id, tenantId)
|
||||||
|
))
|
||||||
|
.where(eq(authTenantUsers.tenant_id, tenantId))
|
||||||
|
|
||||||
|
return await Promise.all(rows.map(async (row) => ({
|
||||||
|
...row,
|
||||||
|
matrixUserId: await matrix.matrixUserIdForUser(row.userId, tenantId),
|
||||||
|
})))
|
||||||
|
}
|
||||||
|
|
||||||
|
const hasChatNotificationForMessage = async (tenantId: number, userId: string, messageId: string) => {
|
||||||
|
const rows = await server.db
|
||||||
|
.select({
|
||||||
|
payload: notificationsItems.payload,
|
||||||
|
})
|
||||||
|
.from(notificationsItems)
|
||||||
|
.where(and(
|
||||||
|
eq(notificationsItems.tenantId, tenantId),
|
||||||
|
eq(notificationsItems.userId, userId),
|
||||||
|
eq(notificationsItems.eventType, "communication.message.new")
|
||||||
|
))
|
||||||
|
.orderBy(desc(notificationsItems.createdAt))
|
||||||
|
.limit(200)
|
||||||
|
|
||||||
|
return rows.some((row) => (row.payload as any)?.messageId === messageId)
|
||||||
|
}
|
||||||
|
|
||||||
|
const recipientsForMessage = (
|
||||||
|
room: typeof communicationRooms.$inferSelect,
|
||||||
|
recipients: ChatRecipient[],
|
||||||
|
senderUserId: string | null,
|
||||||
|
text: string
|
||||||
|
) => {
|
||||||
|
const candidates = senderUserId
|
||||||
|
? recipients.filter((recipient) => recipient.userId !== senderUserId)
|
||||||
|
: recipients
|
||||||
|
const mentioned = new Set(mentionedRecipientIds(text, candidates))
|
||||||
|
const directRecipients = new Set<string>()
|
||||||
|
|
||||||
|
if (room.type === "direct" && room.entityUuid && room.entityUuid !== senderUserId) {
|
||||||
|
directRecipients.add(room.entityUuid)
|
||||||
|
} else if (room.type === "direct" && senderUserId) {
|
||||||
|
candidates
|
||||||
|
.filter((recipient) => directRoomKey(senderUserId, recipient.userId) === room.key)
|
||||||
|
.forEach((recipient) => directRecipients.add(recipient.userId))
|
||||||
|
}
|
||||||
|
|
||||||
|
return candidates
|
||||||
|
.filter((recipient) => directRecipients.has(recipient.userId) || mentioned.has(recipient.userId))
|
||||||
|
.map((recipient) => ({
|
||||||
|
...recipient,
|
||||||
|
mentioned: mentioned.has(recipient.userId),
|
||||||
|
direct: directRecipients.has(recipient.userId),
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
const deliverMessageNotification = async (
|
||||||
|
room: typeof communicationRooms.$inferSelect,
|
||||||
|
message: any,
|
||||||
|
recipients: ChatRecipient[]
|
||||||
|
) => {
|
||||||
|
if (!message.id || message.own) return
|
||||||
|
|
||||||
|
const sender = recipients.find((recipient) => recipient.matrixUserId === message.sender) || null
|
||||||
|
const text = message.body || message.attachment?.fileName || "Neue Nachricht"
|
||||||
|
const targets = recipientsForMessage(room, recipients, sender?.userId || null, text)
|
||||||
|
if (!targets.length) return
|
||||||
|
|
||||||
|
const senderName = sender ? displayUserName(sender) : message.senderDisplayName || message.sender || "Matrix"
|
||||||
|
const preview = text.length > 160 ? `${text.slice(0, 157)}...` : text
|
||||||
|
|
||||||
|
for (const target of targets) {
|
||||||
|
if (await hasChatNotificationForMessage(room.tenantId, target.userId, message.id)) continue
|
||||||
|
|
||||||
|
await notifications.trigger({
|
||||||
|
tenantId: room.tenantId,
|
||||||
|
userId: target.userId,
|
||||||
|
eventType: "communication.message.new",
|
||||||
|
title: target.mentioned ? `${senderName} hat dich erwähnt` : `Neue Direktnachricht von ${senderName}`,
|
||||||
|
message: preview,
|
||||||
|
payload: {
|
||||||
|
link: `/communication/chat?room=${encodeURIComponent(room.key)}`,
|
||||||
|
roomKey: room.key,
|
||||||
|
roomName: room.name,
|
||||||
|
roomType: room.type,
|
||||||
|
messageId: message.id,
|
||||||
|
matrixSender: message.sender,
|
||||||
|
mentioned: target.mentioned,
|
||||||
|
direct: target.direct,
|
||||||
|
},
|
||||||
|
channels: ["inapp", "push"],
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const runOnce = async () => {
|
||||||
|
if (running || stopped) return
|
||||||
|
running = true
|
||||||
|
|
||||||
|
try {
|
||||||
|
const initial = !since
|
||||||
|
const sync = await matrix.syncServiceRoomEvents(since, initial)
|
||||||
|
since = sync.nextBatch || since
|
||||||
|
|
||||||
|
if (!initial && sync.rooms?.length) {
|
||||||
|
const roomIds = sync.rooms.map((room: any) => room.roomId).filter(Boolean)
|
||||||
|
const rooms = roomIds.length
|
||||||
|
? await server.db
|
||||||
|
.select()
|
||||||
|
.from(communicationRooms)
|
||||||
|
.where(and(
|
||||||
|
inArray(communicationRooms.matrixRoomId, roomIds),
|
||||||
|
ne(communicationRooms.archived, true),
|
||||||
|
isNotNull(communicationRooms.matrixRoomId)
|
||||||
|
))
|
||||||
|
: []
|
||||||
|
const roomsByMatrixId = new Map(rooms.map((room) => [room.matrixRoomId, room]))
|
||||||
|
const recipientsByTenant = new Map<number, ChatRecipient[]>()
|
||||||
|
|
||||||
|
for (const syncedRoom of sync.rooms) {
|
||||||
|
const room = roomsByMatrixId.get(syncedRoom.roomId)
|
||||||
|
if (!room || !syncedRoom.messages?.length) continue
|
||||||
|
|
||||||
|
if (!recipientsByTenant.has(room.tenantId)) {
|
||||||
|
recipientsByTenant.set(room.tenantId, await getTenantRecipients(room.tenantId))
|
||||||
|
}
|
||||||
|
|
||||||
|
const recipients = recipientsByTenant.get(room.tenantId) || []
|
||||||
|
for (const message of syncedRoom.messages) {
|
||||||
|
await deliverMessageNotification(room, message, recipients)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
server.log.error({ err }, "Matrix-Push-Worker konnte Matrix-Events nicht verarbeiten")
|
||||||
|
} finally {
|
||||||
|
running = false
|
||||||
|
if (!stopped) {
|
||||||
|
timer = setTimeout(() => void runOnce(), since ? 0 : intervalMs)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
timer = setTimeout(() => void runOnce(), intervalMs)
|
||||||
|
server.addHook("onClose", async () => {
|
||||||
|
stopped = true
|
||||||
|
if (timer) clearTimeout(timer)
|
||||||
|
})
|
||||||
|
}
|
||||||
@@ -1512,6 +1512,69 @@ export function matrixService(server: FastifyInstance) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const syncServiceRoomEvents = async (since?: string, initial = false) => {
|
||||||
|
const service = await ensureServiceAccessToken()
|
||||||
|
const filter = {
|
||||||
|
room: {
|
||||||
|
timeline: {
|
||||||
|
limit: 50,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
presence: {
|
||||||
|
types: [],
|
||||||
|
},
|
||||||
|
account_data: {
|
||||||
|
types: [],
|
||||||
|
},
|
||||||
|
}
|
||||||
|
const params = new URLSearchParams({
|
||||||
|
timeout: since && !initial ? "25000" : "0",
|
||||||
|
filter: JSON.stringify(filter),
|
||||||
|
})
|
||||||
|
|
||||||
|
if (since) params.set("since", since)
|
||||||
|
|
||||||
|
const response = await requestMatrixJson<MatrixSyncResponse>(
|
||||||
|
`/_matrix/client/v3/sync?${params.toString()}`,
|
||||||
|
service.accessToken
|
||||||
|
)
|
||||||
|
const joinedRooms = response.rooms?.join || {}
|
||||||
|
|
||||||
|
return {
|
||||||
|
nextBatch: response.next_batch || since || "",
|
||||||
|
serviceUserId: service.matrixUserId,
|
||||||
|
rooms: Object.entries(joinedRooms).map(([roomId, joinedRoom]) => {
|
||||||
|
const timelineEvents = joinedRoom.timeline?.events || []
|
||||||
|
const messages = initial
|
||||||
|
? []
|
||||||
|
: timelineEvents
|
||||||
|
.filter((event) =>
|
||||||
|
event.type === "m.room.message" &&
|
||||||
|
["m.text", "m.file", "m.image"].includes(event.content?.msgtype || "") &&
|
||||||
|
event.content?.["m.relates_to"]?.rel_type !== "m.replace"
|
||||||
|
)
|
||||||
|
.map((event) => ({
|
||||||
|
id: event.event_id,
|
||||||
|
roomId,
|
||||||
|
sender: event.sender,
|
||||||
|
senderDisplayName: event.sender,
|
||||||
|
body: event.content?.body || "",
|
||||||
|
attachment: attachmentFromEvent(event),
|
||||||
|
timestamp: event.origin_server_ts,
|
||||||
|
own: event.sender === service.matrixUserId,
|
||||||
|
replyToEventId: event.content?.["m.relates_to"]?.["m.in_reply_to"]?.event_id || null,
|
||||||
|
}))
|
||||||
|
|
||||||
|
return {
|
||||||
|
roomId,
|
||||||
|
messages,
|
||||||
|
membersChanged: [...timelineEvents, ...(joinedRoom.state?.events || [])]
|
||||||
|
.some((event) => event.type === "m.room.member"),
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
const sendTenantRoomMessage = async (
|
const sendTenantRoomMessage = async (
|
||||||
userId: string,
|
userId: string,
|
||||||
tenantId: number | null,
|
tenantId: number | null,
|
||||||
@@ -2189,6 +2252,7 @@ export function matrixService(server: FastifyInstance) {
|
|||||||
getTenantRoomMembers,
|
getTenantRoomMembers,
|
||||||
searchTenantRoomMessages,
|
searchTenantRoomMessages,
|
||||||
syncTenantRoomEvents,
|
syncTenantRoomEvents,
|
||||||
|
syncServiceRoomEvents,
|
||||||
sendTenantRoomMessage,
|
sendTenantRoomMessage,
|
||||||
sendTenantRoomReaction,
|
sendTenantRoomReaction,
|
||||||
editTenantRoomMessage,
|
editTenantRoomMessage,
|
||||||
|
|||||||
Reference in New Issue
Block a user