Initial commit with Email Import Wizard and Task Processor updates
This commit is contained in:
@@ -0,0 +1,248 @@
|
||||
import { Injectable, Logger } from '@nestjs/common';
|
||||
import { ConfigService } from '@nestjs/config';
|
||||
import { Cron, CronExpression } from '@nestjs/schedule';
|
||||
import { InjectRepository } from '@nestjs/typeorm';
|
||||
import { Repository } from 'typeorm';
|
||||
import { ImapFlow, type FetchMessageObject } from 'imapflow';
|
||||
import { simpleParser, type AddressObject, type Attachment as MailAttachment } from 'mailparser';
|
||||
import * as crypto from 'crypto';
|
||||
import * as os from 'os';
|
||||
import * as path from 'path';
|
||||
import * as fs from 'fs/promises';
|
||||
import { PdfService } from '../preprocessing/pdf.service';
|
||||
import { EmailPageCacheService } from '../email/email-page-cache.service';
|
||||
import { Email } from '../database/entities/email.entity';
|
||||
import { Attachment } from '../database/entities/attachment.entity';
|
||||
import { Content } from '../database/entities/content.entity';
|
||||
import { isERechnung } from './zugferd.util';
|
||||
|
||||
@Injectable()
|
||||
export class EmailDownloadService {
|
||||
private readonly logger = new Logger(EmailDownloadService.name);
|
||||
private running = false;
|
||||
|
||||
constructor(
|
||||
private readonly configService: ConfigService,
|
||||
private readonly pdfService: PdfService,
|
||||
private readonly pageCache: EmailPageCacheService,
|
||||
@InjectRepository(Email) private readonly emailRepo: Repository<Email>,
|
||||
@InjectRepository(Attachment) private readonly attachmentRepo: Repository<Attachment>,
|
||||
@InjectRepository(Content) private readonly contentRepo: Repository<Content>,
|
||||
) {}
|
||||
|
||||
@Cron(CronExpression.EVERY_5_MINUTES)
|
||||
async handleCron() {
|
||||
if (this.running) {
|
||||
this.logger.warn('E-Mail-Download läuft noch – überspringe Tick.');
|
||||
return;
|
||||
}
|
||||
this.running = true;
|
||||
try {
|
||||
await this.fetchAndStore();
|
||||
} catch (err: any) {
|
||||
this.logger.error(`Fehler im E-Mail-Download-Job: ${err.message}`, err.stack);
|
||||
} finally {
|
||||
this.running = false;
|
||||
}
|
||||
}
|
||||
|
||||
private async fetchAndStore(): Promise<void> {
|
||||
const host = this.configService.get<string>('IMAP_HOST');
|
||||
const port = this.configService.get<number>('IMAP_PORT', 993);
|
||||
const secure = this.configService.get<string>('IMAP_USE_SSL', 'true') === 'true';
|
||||
const user = this.configService.get<string>('IMAP_USERNAME');
|
||||
const pass = this.configService.get<string>('IMAP_PASSWORD');
|
||||
|
||||
if (!host || !user || !pass) {
|
||||
this.logger.warn('IMAP-Konfiguration unvollständig – Job wird übersprungen.');
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.log('E-Mail Fetch Job gestartet.');
|
||||
|
||||
const client = new ImapFlow({
|
||||
host,
|
||||
port,
|
||||
secure,
|
||||
auth: { user, pass },
|
||||
logger: false,
|
||||
});
|
||||
|
||||
await client.connect();
|
||||
this.logger.log(`Verbunden mit IMAP-Server ${host}:${port}`);
|
||||
|
||||
const lock = await client.getMailboxLock('INBOX');
|
||||
try {
|
||||
const status = await client.status('INBOX', { messages: true });
|
||||
this.logger.log(`Posteingang geöffnet. Anzahl der Nachrichten: ${status.messages ?? 0}`);
|
||||
|
||||
if (!status.messages || status.messages === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const iter = client.fetch(
|
||||
'1:*',
|
||||
{ envelope: true, uid: true, source: true },
|
||||
);
|
||||
|
||||
for await (const msg of iter) {
|
||||
const messageId = msg.envelope?.messageId;
|
||||
if (!messageId) continue;
|
||||
|
||||
try {
|
||||
const existing = await this.emailRepo.findOne({ where: { MessageId: messageId } });
|
||||
if (existing) {
|
||||
this.logger.debug(`E-Mail mit MessageId ${messageId} bereits vorhanden.`);
|
||||
continue;
|
||||
}
|
||||
|
||||
await this.processMessage(msg);
|
||||
} catch (err: any) {
|
||||
this.logger.error(`Fehler beim Abrufen der Nachricht ${messageId}: ${err.message}`, err.stack);
|
||||
}
|
||||
}
|
||||
|
||||
this.logger.log('Alle neuen E-Mails und deren Anhänge wurden in der Datenbank gespeichert.');
|
||||
} finally {
|
||||
lock.release();
|
||||
await client.logout().catch(() => undefined);
|
||||
this.logger.log('Verbindung zum IMAP-Server geschlossen.');
|
||||
}
|
||||
}
|
||||
|
||||
private async processMessage(msg: FetchMessageObject): Promise<void> {
|
||||
if (!msg.source) return;
|
||||
const parsed = await simpleParser(msg.source);
|
||||
const messageId = msg.envelope?.messageId ?? parsed.messageId ?? '';
|
||||
|
||||
const email = new Email();
|
||||
email.MessageId = messageId;
|
||||
email.SenderAddress = formatAddress(parsed.from);
|
||||
email.RecipientAddress = formatAddress(parsed.to);
|
||||
email.Subject = (msg.envelope?.subject ?? parsed.subject ?? '').slice(0, 500);
|
||||
email.Date = msg.envelope?.date ?? parsed.date ?? new Date();
|
||||
email.Body = parsed.html || parsed.text || '';
|
||||
email.Status = 0;
|
||||
|
||||
const attachmentsToPersist: Array<{ attachment: Attachment; buffer: Buffer }> = [];
|
||||
|
||||
for (const att of parsed.attachments) {
|
||||
const entry = await this.buildAttachment(att);
|
||||
if (entry) attachmentsToPersist.push(entry);
|
||||
}
|
||||
|
||||
// Double-Check: nochmal gegen DB prüfen (Race-Condition-Schutz wie in C#)
|
||||
const existing2 = await this.emailRepo.findOne({ where: { MessageId: messageId } });
|
||||
if (existing2) {
|
||||
this.logger.debug(`E-Mail mit MessageId ${messageId} nach dem Download bereits vorhanden.`);
|
||||
return;
|
||||
}
|
||||
|
||||
const savedEmail = await this.emailRepo.save(email);
|
||||
|
||||
for (const { attachment, buffer } of attachmentsToPersist) {
|
||||
attachment.EmailMessageId = savedEmail.Id;
|
||||
const savedAttachment = await this.attachmentRepo.save(attachment);
|
||||
|
||||
const content = new Content();
|
||||
content.AttachmentEntityId = savedAttachment.Id;
|
||||
content.Content1 = buffer;
|
||||
content.ContentLength = buffer.length;
|
||||
await this.contentRepo.save(content);
|
||||
|
||||
// Generate PDF thumbnails if it's a PDF
|
||||
if (savedAttachment.ContentType === 'application/pdf') {
|
||||
await this.generateThumbnailsForAttachment(savedAttachment, buffer);
|
||||
}
|
||||
}
|
||||
|
||||
this.logger.debug(`Neue E-Mail mit MessageId ${messageId} hinzugefügt (${attachmentsToPersist.length} Anhänge).`);
|
||||
}
|
||||
|
||||
public async generateThumbnailsForAttachment(attachment: Attachment, buffer: Buffer): Promise<void> {
|
||||
try {
|
||||
const tempPdfPath = path.join(os.tmpdir(), `email-att-${attachment.Id}.pdf`);
|
||||
await fs.writeFile(tempPdfPath, buffer);
|
||||
|
||||
const images = await this.pdfService.pdfToImages(tempPdfPath, 400);
|
||||
await this.pageCache.generate(attachment.Id, images);
|
||||
|
||||
attachment.PageCount = images.length;
|
||||
await this.attachmentRepo.save(attachment);
|
||||
|
||||
await this.pdfService.cleanup(images);
|
||||
await fs.unlink(tempPdfPath).catch(() => {});
|
||||
} catch (err: any) {
|
||||
this.logger.warn(`Konnte Vorschaubilder für Anhang ${attachment.Id} nicht generieren: ${err.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
public async backfillThumbnailsForNewEmails(): Promise<{ processed: number; failed: number }> {
|
||||
const emails = await this.emailRepo.find({
|
||||
where: { Status: 0 },
|
||||
relations: ['Attachments', 'Attachments.Content']
|
||||
});
|
||||
|
||||
let processed = 0;
|
||||
let failed = 0;
|
||||
|
||||
for (const email of emails) {
|
||||
for (const attachment of email.Attachments) {
|
||||
if (attachment.ContentType === 'application/pdf' && attachment.PageCount === 0 && attachment.Content?.Content1) {
|
||||
this.logger.log(`Backfill: Generiere Thumbnails für Attachment ${attachment.Id} (Email ${email.Id})`);
|
||||
try {
|
||||
await this.generateThumbnailsForAttachment(attachment, attachment.Content.Content1);
|
||||
processed++;
|
||||
} catch (err) {
|
||||
failed++;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
this.logger.log(`Backfill abgeschlossen: ${processed} erfolgreich, ${failed} fehlgeschlagen.`);
|
||||
return { processed, failed };
|
||||
}
|
||||
|
||||
private async buildAttachment(
|
||||
att: MailAttachment,
|
||||
): Promise<{ attachment: Attachment; buffer: Buffer } | null> {
|
||||
const buffer = att.content;
|
||||
if (!Buffer.isBuffer(buffer) || buffer.length === 0) return null;
|
||||
|
||||
const filename = att.filename ?? att.cid ?? 'unbenannt';
|
||||
let contentType = att.contentType ?? 'application/octet-stream';
|
||||
const isEmbedded = att.contentDisposition === 'inline';
|
||||
|
||||
const isPdfByName = filename.toLowerCase().endsWith('.pdf');
|
||||
const isPdfByType = contentType.toLowerCase() === 'application/pdf';
|
||||
const isOctet = contentType.toLowerCase() === 'application/octet-stream';
|
||||
|
||||
// Nur PDFs (inkl. als octet-stream deklarierter PDFs) und embedded rfc822 werden übernommen.
|
||||
const isRfc822 = contentType.toLowerCase() === 'message/rfc822';
|
||||
if (!isPdfByType && !isRfc822 && !(isOctet && isPdfByName)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (isPdfByName && !isPdfByType) {
|
||||
contentType = 'application/pdf';
|
||||
}
|
||||
|
||||
const attachment = new Attachment();
|
||||
attachment.FileName = filename.slice(0, 255);
|
||||
attachment.ContentType = contentType.slice(0, 100);
|
||||
attachment.IsEmbedded = isEmbedded;
|
||||
attachment.ContentId = att.cid ? att.cid.slice(0, 255) : null;
|
||||
attachment.Checksum = crypto.createHash('md5').update(buffer).digest('hex');
|
||||
attachment.Erechnung = contentType.toLowerCase() === 'application/pdf' ? isERechnung(buffer) : false;
|
||||
attachment.ParentId = null;
|
||||
|
||||
return { attachment, buffer };
|
||||
}
|
||||
}
|
||||
|
||||
function formatAddress(addr: AddressObject | AddressObject[] | undefined): string {
|
||||
if (!addr) return '';
|
||||
const first = Array.isArray(addr) ? addr[0] : addr;
|
||||
return (first?.text ?? '').slice(0, 255);
|
||||
}
|
||||
Reference in New Issue
Block a user