KI-AGENT: Zentrale Benachrichtigungsengine mit Desktop Push umsetzen
This commit is contained in:
@@ -1,25 +1,51 @@
|
||||
// services/notification.service.ts
|
||||
import type { FastifyInstance } from 'fastify';
|
||||
import {secrets} from "../utils/secrets";
|
||||
import { eq } from "drizzle-orm";
|
||||
import { notificationsEventTypes, notificationsItems } from "../../db/schema";
|
||||
import type { FastifyInstance } from "fastify"
|
||||
import webPush from "web-push"
|
||||
import { and, desc, eq, inArray, isNull, sql } from "drizzle-orm"
|
||||
import {
|
||||
authUsers,
|
||||
notificationPushSubscriptions,
|
||||
notificationsEventTypes,
|
||||
notificationsItems,
|
||||
notificationsPreferences,
|
||||
notificationsPreferencesDefaults,
|
||||
} from "../../db/schema"
|
||||
import { secrets } from "../utils/secrets"
|
||||
|
||||
export type NotificationStatus = 'queued' | 'sent' | 'failed';
|
||||
export type NotificationChannel = "inapp" | "email" | "push" | "webhook" | "sms"
|
||||
export type NotificationStatus = "queued" | "sent" | "failed" | "read"
|
||||
|
||||
export interface TriggerInput {
|
||||
tenantId: number;
|
||||
userId: string; // muss auf public.auth_users.id zeigen
|
||||
eventType: string; // muss in notifications_event_types existieren
|
||||
title: string; // Betreff/Title
|
||||
message: string; // Klartext-Inhalt
|
||||
payload?: Record<string, unknown>;
|
||||
tenantId: number
|
||||
userId?: string
|
||||
userIds?: string[]
|
||||
eventType: string
|
||||
title: string
|
||||
message: string
|
||||
payload?: Record<string, unknown>
|
||||
channels?: NotificationChannel[]
|
||||
}
|
||||
|
||||
export interface PushSubscriptionInput {
|
||||
endpoint: string
|
||||
keys: {
|
||||
p256dh: string
|
||||
auth: string
|
||||
}
|
||||
deviceLabel?: string
|
||||
meta?: Record<string, unknown>
|
||||
}
|
||||
|
||||
export interface UserDirectoryInfo {
|
||||
email?: string;
|
||||
email?: string
|
||||
}
|
||||
|
||||
export type UserDirectory = (server: FastifyInstance, userId: string, tenantId: number) => Promise<UserDirectoryInfo | null>;
|
||||
export type UserDirectory = (
|
||||
server: FastifyInstance,
|
||||
userId: string,
|
||||
tenantId: number
|
||||
) => Promise<UserDirectoryInfo | null>
|
||||
|
||||
const DEFAULT_CHANNELS: NotificationChannel[] = ["inapp"]
|
||||
|
||||
export class NotificationService {
|
||||
constructor(
|
||||
@@ -27,99 +53,355 @@ export class NotificationService {
|
||||
private getUser: UserDirectory
|
||||
) {}
|
||||
|
||||
/**
|
||||
* Löst eine E-Mail-Benachrichtigung aus:
|
||||
* - Validiert den Event-Typ
|
||||
* - Legt einen Datensatz in notifications_items an (status: queued)
|
||||
* - Versendet E-Mail (FEDEO Branding)
|
||||
* - Aktualisiert status/sent_at bzw. error
|
||||
*/
|
||||
async trigger(input: TriggerInput) {
|
||||
const { tenantId, userId, eventType, title, message, payload } = input;
|
||||
const tenantId = input.tenantId
|
||||
const userIds = Array.from(new Set([...(input.userIds || []), input.userId].filter(Boolean))) as string[]
|
||||
|
||||
// 1) Event-Typ prüfen (aktiv?)
|
||||
const eventTypeRows = await this.server.db
|
||||
if (!tenantId) throw new Error("tenantId fehlt")
|
||||
if (!userIds.length) throw new Error("Keine Empfänger angegeben")
|
||||
|
||||
const eventType = await this.getActiveEventType(input.eventType)
|
||||
const allowedChannels = this.normalizeChannels(eventType.allowedChannels)
|
||||
const requestedChannels = input.channels?.length ? input.channels : allowedChannels
|
||||
const channels = requestedChannels.filter((channel) => allowedChannels.includes(channel))
|
||||
|
||||
if (!channels.length) {
|
||||
return { success: true, created: 0, delivered: 0, skipped: userIds.length }
|
||||
}
|
||||
|
||||
const results = []
|
||||
|
||||
for (const userId of userIds) {
|
||||
const enabledChannels = await this.resolveEnabledChannels({
|
||||
tenantId,
|
||||
userId,
|
||||
eventType: input.eventType,
|
||||
channels,
|
||||
})
|
||||
|
||||
for (const channel of enabledChannels) {
|
||||
const itemRows = await this.server.db
|
||||
.insert(notificationsItems)
|
||||
.values({
|
||||
tenantId,
|
||||
userId,
|
||||
eventType: input.eventType,
|
||||
title: input.title,
|
||||
message: input.message,
|
||||
payload: input.payload ?? null,
|
||||
channel,
|
||||
status: "queued",
|
||||
})
|
||||
.returning()
|
||||
|
||||
const item = itemRows[0]
|
||||
if (!item) continue
|
||||
|
||||
results.push(await this.deliver(item))
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
success: results.every((result) => result.success),
|
||||
created: results.length,
|
||||
delivered: results.filter((result) => result.success).length,
|
||||
failed: results.filter((result) => !result.success).length,
|
||||
}
|
||||
}
|
||||
|
||||
async listForUser(tenantId: number, userId: string, limit = 50) {
|
||||
return await this.server.db
|
||||
.select()
|
||||
.from(notificationsItems)
|
||||
.where(and(
|
||||
eq(notificationsItems.tenantId, tenantId),
|
||||
eq(notificationsItems.userId, userId),
|
||||
eq(notificationsItems.channel, "inapp")
|
||||
))
|
||||
.orderBy(desc(notificationsItems.createdAt))
|
||||
.limit(Math.min(Math.max(limit, 1), 100))
|
||||
}
|
||||
|
||||
async markRead(tenantId: number, userId: string, notificationId: string) {
|
||||
const rows = await this.server.db
|
||||
.update(notificationsItems)
|
||||
.set({ readAt: new Date(), status: "read" })
|
||||
.where(and(
|
||||
eq(notificationsItems.id, notificationId),
|
||||
eq(notificationsItems.tenantId, tenantId),
|
||||
eq(notificationsItems.userId, userId)
|
||||
))
|
||||
.returning()
|
||||
|
||||
return rows[0] || null
|
||||
}
|
||||
|
||||
async registerPushSubscription(
|
||||
tenantId: number,
|
||||
userId: string,
|
||||
subscription: PushSubscriptionInput,
|
||||
userAgent?: string
|
||||
) {
|
||||
if (!subscription.endpoint || !subscription.keys?.p256dh || !subscription.keys?.auth) {
|
||||
throw new Error("Push-Subscription ist unvollständig")
|
||||
}
|
||||
|
||||
const rows = await this.server.db
|
||||
.insert(notificationPushSubscriptions)
|
||||
.values({
|
||||
tenantId,
|
||||
userId,
|
||||
endpoint: subscription.endpoint,
|
||||
p256dh: subscription.keys.p256dh,
|
||||
auth: subscription.keys.auth,
|
||||
userAgent,
|
||||
deviceLabel: subscription.deviceLabel,
|
||||
meta: subscription.meta ?? null,
|
||||
lastSeenAt: new Date(),
|
||||
disabledAt: null,
|
||||
})
|
||||
.onConflictDoUpdate({
|
||||
target: notificationPushSubscriptions.endpoint,
|
||||
set: {
|
||||
tenantId,
|
||||
userId,
|
||||
p256dh: subscription.keys.p256dh,
|
||||
auth: subscription.keys.auth,
|
||||
userAgent,
|
||||
deviceLabel: subscription.deviceLabel,
|
||||
meta: subscription.meta ?? null,
|
||||
lastSeenAt: new Date(),
|
||||
disabledAt: null,
|
||||
},
|
||||
})
|
||||
.returning()
|
||||
|
||||
return rows[0]
|
||||
}
|
||||
|
||||
async disablePushSubscription(tenantId: number, userId: string, endpoint: string) {
|
||||
await this.server.db
|
||||
.update(notificationPushSubscriptions)
|
||||
.set({ disabledAt: new Date() })
|
||||
.where(and(
|
||||
eq(notificationPushSubscriptions.tenantId, tenantId),
|
||||
eq(notificationPushSubscriptions.userId, userId),
|
||||
eq(notificationPushSubscriptions.endpoint, endpoint)
|
||||
))
|
||||
|
||||
return { success: true }
|
||||
}
|
||||
|
||||
getPublicPushConfig() {
|
||||
return {
|
||||
configured: Boolean(secrets.WEB_PUSH_PUBLIC_KEY && secrets.WEB_PUSH_PRIVATE_KEY),
|
||||
publicKey: secrets.WEB_PUSH_PUBLIC_KEY || "",
|
||||
}
|
||||
}
|
||||
|
||||
private async getActiveEventType(eventType: string) {
|
||||
const rows = await this.server.db
|
||||
.select()
|
||||
.from(notificationsEventTypes)
|
||||
.where(eq(notificationsEventTypes.eventKey, eventType))
|
||||
.limit(1)
|
||||
const eventTypeRow = eventTypeRows[0]
|
||||
|
||||
if (!eventTypeRow || eventTypeRow.isActive !== true) {
|
||||
throw new Error(`Unbekannter oder inaktiver Event-Typ: ${eventType}`);
|
||||
const row = rows[0]
|
||||
if (!row || row.isActive !== true) {
|
||||
throw new Error(`Unbekannter oder inaktiver Event-Typ: ${eventType}`)
|
||||
}
|
||||
|
||||
// 2) Zieladresse beschaffen
|
||||
const user = await this.getUser(this.server, userId, tenantId);
|
||||
if (!user?.email) {
|
||||
throw new Error(`Nutzer ${userId} hat keine E-Mail-Adresse`);
|
||||
return row
|
||||
}
|
||||
|
||||
private normalizeChannels(value: unknown): NotificationChannel[] {
|
||||
if (!Array.isArray(value)) return DEFAULT_CHANNELS
|
||||
|
||||
const valid = new Set(["inapp", "email", "push", "webhook", "sms"])
|
||||
const channels = value.filter((channel): channel is NotificationChannel =>
|
||||
typeof channel === "string" && valid.has(channel)
|
||||
)
|
||||
|
||||
return channels.length ? channels : DEFAULT_CHANNELS
|
||||
}
|
||||
|
||||
private async resolveEnabledChannels(input: {
|
||||
tenantId: number
|
||||
userId: string
|
||||
eventType: string
|
||||
channels: NotificationChannel[]
|
||||
}) {
|
||||
const prefs = await this.server.db
|
||||
.select()
|
||||
.from(notificationsPreferences)
|
||||
.where(and(
|
||||
eq(notificationsPreferences.tenantId, input.tenantId),
|
||||
eq(notificationsPreferences.userId, input.userId),
|
||||
eq(notificationsPreferences.eventType, input.eventType),
|
||||
inArray(notificationsPreferences.channel, input.channels)
|
||||
))
|
||||
|
||||
const defaults = await this.server.db
|
||||
.select()
|
||||
.from(notificationsPreferencesDefaults)
|
||||
.where(and(
|
||||
eq(notificationsPreferencesDefaults.tenantId, input.tenantId),
|
||||
eq(notificationsPreferencesDefaults.eventKey, input.eventType),
|
||||
inArray(notificationsPreferencesDefaults.channel, input.channels)
|
||||
))
|
||||
|
||||
return input.channels.filter((channel) => {
|
||||
const userPref = prefs.find((pref) => pref.channel === channel)
|
||||
if (userPref) return userPref.enabled
|
||||
|
||||
const defaultPref = defaults.find((pref) => pref.channel === channel)
|
||||
if (defaultPref) return defaultPref.enabled
|
||||
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
private async deliver(item: typeof notificationsItems.$inferSelect) {
|
||||
if (item.channel === "inapp") {
|
||||
await this.markSent(item.id)
|
||||
return { success: true, id: item.id, channel: item.channel }
|
||||
}
|
||||
|
||||
// 3) Notification anlegen (status: queued)
|
||||
const insertedRows = await this.server.db
|
||||
.insert(notificationsItems)
|
||||
.values({
|
||||
tenantId,
|
||||
userId,
|
||||
eventType,
|
||||
title,
|
||||
message,
|
||||
payload: payload ?? null,
|
||||
channel: 'email',
|
||||
status: 'queued'
|
||||
})
|
||||
.returning({ id: notificationsItems.id })
|
||||
const inserted = insertedRows[0]
|
||||
|
||||
if (!inserted) {
|
||||
throw new Error("Fehler beim Einfügen der Notification");
|
||||
if (item.channel === "push") {
|
||||
return await this.deliverPush(item)
|
||||
}
|
||||
|
||||
// 4) E-Mail versenden
|
||||
if (item.channel === "email") {
|
||||
return await this.deliverEmail(item)
|
||||
}
|
||||
|
||||
await this.markFailed(item.id, `Kein Zusteller für Kanal ${item.channel}`)
|
||||
return { success: false, id: item.id, channel: item.channel }
|
||||
}
|
||||
|
||||
private async deliverPush(item: typeof notificationsItems.$inferSelect) {
|
||||
if (!secrets.WEB_PUSH_PUBLIC_KEY || !secrets.WEB_PUSH_PRIVATE_KEY) {
|
||||
await this.markFailed(item.id, "Web Push ist nicht konfiguriert")
|
||||
return { success: false, id: item.id, channel: item.channel }
|
||||
}
|
||||
|
||||
webPush.setVapidDetails(
|
||||
secrets.WEB_PUSH_SUBJECT || "mailto:admin@example.com",
|
||||
secrets.WEB_PUSH_PUBLIC_KEY,
|
||||
secrets.WEB_PUSH_PRIVATE_KEY
|
||||
)
|
||||
|
||||
const subscriptions = await this.server.db
|
||||
.select()
|
||||
.from(notificationPushSubscriptions)
|
||||
.where(and(
|
||||
eq(notificationPushSubscriptions.tenantId, item.tenantId),
|
||||
eq(notificationPushSubscriptions.userId, item.userId),
|
||||
isNull(notificationPushSubscriptions.disabledAt)
|
||||
))
|
||||
|
||||
if (!subscriptions.length) {
|
||||
await this.markFailed(item.id, "Keine aktive Push-Subscription")
|
||||
return { success: false, id: item.id, channel: item.channel }
|
||||
}
|
||||
|
||||
const payload = JSON.stringify({
|
||||
id: item.id,
|
||||
title: item.title,
|
||||
message: item.message,
|
||||
payload: item.payload || {},
|
||||
})
|
||||
|
||||
let delivered = 0
|
||||
const errors: string[] = []
|
||||
|
||||
for (const subscription of subscriptions) {
|
||||
try {
|
||||
await webPush.sendNotification({
|
||||
endpoint: subscription.endpoint,
|
||||
keys: {
|
||||
p256dh: subscription.p256dh,
|
||||
auth: subscription.auth,
|
||||
},
|
||||
}, payload)
|
||||
|
||||
delivered++
|
||||
await this.server.db
|
||||
.update(notificationPushSubscriptions)
|
||||
.set({ lastSeenAt: new Date() })
|
||||
.where(eq(notificationPushSubscriptions.id, subscription.id))
|
||||
} catch (error: any) {
|
||||
errors.push(error?.message || String(error))
|
||||
|
||||
if (error?.statusCode === 404 || error?.statusCode === 410) {
|
||||
await this.server.db
|
||||
.update(notificationPushSubscriptions)
|
||||
.set({ disabledAt: new Date() })
|
||||
.where(eq(notificationPushSubscriptions.id, subscription.id))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (delivered > 0) {
|
||||
await this.markSent(item.id)
|
||||
return { success: true, id: item.id, channel: item.channel, delivered }
|
||||
}
|
||||
|
||||
await this.markFailed(item.id, errors.join("; ") || "Push konnte nicht zugestellt werden")
|
||||
return { success: false, id: item.id, channel: item.channel }
|
||||
}
|
||||
|
||||
private async deliverEmail(item: typeof notificationsItems.$inferSelect) {
|
||||
try {
|
||||
await this.sendEmail(user.email, title, message);
|
||||
const user = await this.getUser(this.server, item.userId, item.tenantId)
|
||||
if (!user?.email) throw new Error(`Nutzer ${item.userId} hat keine E-Mail-Adresse`)
|
||||
|
||||
await this.server.db
|
||||
.update(notificationsItems)
|
||||
.set({ status: 'sent', sentAt: new Date() })
|
||||
.where(eq(notificationsItems.id, inserted.id));
|
||||
|
||||
return { success: true, id: inserted.id };
|
||||
} catch (err: any) {
|
||||
await this.server.db
|
||||
.update(notificationsItems)
|
||||
.set({ status: 'failed', error: String(err?.message || err) })
|
||||
.where(eq(notificationsItems.id, inserted.id));
|
||||
|
||||
this.server.log.error({ err, notificationId: inserted.id }, 'E-Mail Versand fehlgeschlagen');
|
||||
return { success: false, error: err?.message || 'E-Mail Versand fehlgeschlagen' };
|
||||
await this.sendEmail(user.email, item.title, item.message)
|
||||
await this.markSent(item.id)
|
||||
return { success: true, id: item.id, channel: item.channel }
|
||||
} catch (error: any) {
|
||||
await this.markFailed(item.id, error?.message || "E-Mail Versand fehlgeschlagen")
|
||||
this.server.log.error({ err: error, notificationId: item.id }, "E-Mail Versand fehlgeschlagen")
|
||||
return { success: false, id: item.id, channel: item.channel }
|
||||
}
|
||||
}
|
||||
|
||||
// ---- private helpers ------------------------------------------------------
|
||||
private async markSent(id: string) {
|
||||
await this.server.db
|
||||
.update(notificationsItems)
|
||||
.set({ status: "sent", sentAt: new Date() })
|
||||
.where(eq(notificationsItems.id, id))
|
||||
}
|
||||
|
||||
private async markFailed(id: string, error: string) {
|
||||
await this.server.db
|
||||
.update(notificationsItems)
|
||||
.set({ status: "failed", error })
|
||||
.where(eq(notificationsItems.id, id))
|
||||
}
|
||||
|
||||
private async sendEmail(to: string, subject: string, message: string) {
|
||||
const nodemailer = await import('nodemailer');
|
||||
const nodemailer = await import("nodemailer")
|
||||
|
||||
const transporter = nodemailer.createTransport({
|
||||
host: secrets.MAILER_SMTP_HOST,
|
||||
port: Number(secrets.MAILER_SMTP_PORT),
|
||||
secure: secrets.MAILER_SMTP_SSL === 'true',
|
||||
secure: secrets.MAILER_SMTP_SSL === "true",
|
||||
auth: {
|
||||
user: secrets.MAILER_SMTP_USER,
|
||||
pass: secrets.MAILER_SMTP_PASS
|
||||
}
|
||||
});
|
||||
pass: secrets.MAILER_SMTP_PASS,
|
||||
},
|
||||
})
|
||||
|
||||
const html = this.renderFedeoHtml(subject, message);
|
||||
const html = this.renderFedeoHtml(subject, message)
|
||||
|
||||
await transporter.sendMail({
|
||||
from: secrets.MAILER_FROM,
|
||||
to,
|
||||
subject,
|
||||
text: message,
|
||||
html
|
||||
});
|
||||
html,
|
||||
})
|
||||
}
|
||||
|
||||
private renderFedeoHtml(title: string, message: string) {
|
||||
@@ -133,18 +415,17 @@ export class NotificationService {
|
||||
<p style="font-size:12px;color:#666">Automatisch generiert von FEDEO</p>
|
||||
</div>
|
||||
</body></html>
|
||||
`;
|
||||
`
|
||||
}
|
||||
|
||||
// simple escaping (ausreichend für unser Template)
|
||||
private escapeHtml(s: string) {
|
||||
return s
|
||||
.replace(/&/g, '&')
|
||||
.replace(/</g, '<')
|
||||
.replace(/>/g, '>');
|
||||
.replace(/&/g, "&")
|
||||
.replace(/</g, "<")
|
||||
.replace(/>/g, ">")
|
||||
}
|
||||
|
||||
private nl2br(s: string) {
|
||||
return s.replace(/\n/g, '<br/>');
|
||||
return s.replace(/\n/g, "<br/>")
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user