66aeab282c
Build and Push Multi-Platform Images / build-and-push (push) Successful in 19s
This reverts commit 07dfd7e840.
312 lines
9.6 KiB
TypeScript
312 lines
9.6 KiB
TypeScript
import { Injectable, Logger } from '@nestjs/common';
|
||
import { ConfigService } from '@nestjs/config';
|
||
import { Cron, CronExpression } from '@nestjs/schedule';
|
||
import { InjectRepository } from '@nestjs/typeorm';
|
||
import { Repository } from 'typeorm';
|
||
import { ImapFlow, type FetchMessageObject } from 'imapflow';
|
||
import {
|
||
simpleParser,
|
||
type AddressObject,
|
||
type Attachment as MailAttachment,
|
||
} from 'mailparser';
|
||
import * as crypto from 'crypto';
|
||
import * as os from 'os';
|
||
import * as path from 'path';
|
||
import * as fs from 'fs/promises';
|
||
import { PdfService } from '../preprocessing/pdf.service';
|
||
import { EmailPageCacheService } from '../email/email-page-cache.service';
|
||
import { Email } from '../database/entities/email.entity';
|
||
import { Attachment } from '../database/entities/attachment.entity';
|
||
import { Content } from '../database/entities/content.entity';
|
||
import { isERechnung } from './zugferd.util';
|
||
|
||
@Injectable()
|
||
export class EmailDownloadService {
|
||
private readonly logger = new Logger(EmailDownloadService.name);
|
||
private running = false;
|
||
|
||
constructor(
|
||
private readonly configService: ConfigService,
|
||
private readonly pdfService: PdfService,
|
||
private readonly pageCache: EmailPageCacheService,
|
||
@InjectRepository(Email) private readonly emailRepo: Repository<Email>,
|
||
@InjectRepository(Attachment)
|
||
private readonly attachmentRepo: Repository<Attachment>,
|
||
@InjectRepository(Content)
|
||
private readonly contentRepo: Repository<Content>,
|
||
) {}
|
||
|
||
@Cron(CronExpression.EVERY_5_MINUTES)
|
||
async handleCron() {
|
||
if (this.running) {
|
||
this.logger.warn('E-Mail-Download läuft noch – überspringe Tick.');
|
||
return;
|
||
}
|
||
this.running = true;
|
||
try {
|
||
await this.fetchAndStore();
|
||
} catch (err: any) {
|
||
this.logger.error(
|
||
`Fehler im E-Mail-Download-Job: ${err.message}`,
|
||
err.stack,
|
||
);
|
||
} finally {
|
||
this.running = false;
|
||
}
|
||
}
|
||
|
||
private async fetchAndStore(): Promise<void> {
|
||
const host = this.configService.get<string>('IMAP_HOST');
|
||
const port = this.configService.get<number>('IMAP_PORT', 993);
|
||
const secure =
|
||
this.configService.get<string>('IMAP_USE_SSL', 'true') === 'true';
|
||
const user = this.configService.get<string>('IMAP_USERNAME');
|
||
const pass = this.configService.get<string>('IMAP_PASSWORD');
|
||
|
||
if (!host || !user || !pass) {
|
||
this.logger.warn(
|
||
'IMAP-Konfiguration unvollständig – Job wird übersprungen.',
|
||
);
|
||
return;
|
||
}
|
||
|
||
this.logger.log('E-Mail Fetch Job gestartet.');
|
||
|
||
const client = new ImapFlow({
|
||
host,
|
||
port,
|
||
secure,
|
||
auth: { user, pass },
|
||
logger: false,
|
||
});
|
||
|
||
await client.connect();
|
||
this.logger.log(`Verbunden mit IMAP-Server ${host}:${port}`);
|
||
|
||
const lock = await client.getMailboxLock('INBOX');
|
||
try {
|
||
const status = await client.status('INBOX', { messages: true });
|
||
this.logger.log(
|
||
`Posteingang geöffnet. Anzahl der Nachrichten: ${status.messages ?? 0}`,
|
||
);
|
||
|
||
if (!status.messages || status.messages === 0) {
|
||
return;
|
||
}
|
||
|
||
const iter = client.fetch('1:*', {
|
||
envelope: true,
|
||
uid: true,
|
||
source: true,
|
||
});
|
||
|
||
for await (const msg of iter) {
|
||
const messageId = msg.envelope?.messageId;
|
||
if (!messageId) continue;
|
||
|
||
try {
|
||
const existing = await this.emailRepo.findOne({
|
||
where: { MessageId: messageId },
|
||
});
|
||
if (existing) {
|
||
this.logger.debug(
|
||
`E-Mail mit MessageId ${messageId} bereits vorhanden.`,
|
||
);
|
||
continue;
|
||
}
|
||
|
||
await this.processMessage(msg);
|
||
} catch (err: any) {
|
||
this.logger.error(
|
||
`Fehler beim Abrufen der Nachricht ${messageId}: ${err.message}`,
|
||
err.stack,
|
||
);
|
||
}
|
||
}
|
||
|
||
this.logger.log(
|
||
'Alle neuen E-Mails und deren Anhänge wurden in der Datenbank gespeichert.',
|
||
);
|
||
} finally {
|
||
lock.release();
|
||
await client.logout().catch(() => undefined);
|
||
this.logger.log('Verbindung zum IMAP-Server geschlossen.');
|
||
}
|
||
}
|
||
|
||
private async processMessage(msg: FetchMessageObject): Promise<void> {
|
||
if (!msg.source) return;
|
||
const parsed = await simpleParser(msg.source);
|
||
const messageId = msg.envelope?.messageId ?? parsed.messageId ?? '';
|
||
|
||
const email = new Email();
|
||
email.MessageId = messageId;
|
||
email.SenderAddress = formatAddress(parsed.from);
|
||
email.RecipientAddress = formatAddress(parsed.to);
|
||
email.Subject = (msg.envelope?.subject ?? parsed.subject ?? '').slice(
|
||
0,
|
||
500,
|
||
);
|
||
email.Date = msg.envelope?.date ?? parsed.date ?? new Date();
|
||
email.Body = parsed.html || parsed.text || '';
|
||
email.Status = 0;
|
||
|
||
const attachmentsToPersist: Array<{
|
||
attachment: Attachment;
|
||
buffer: Buffer;
|
||
}> = [];
|
||
|
||
for (const att of parsed.attachments) {
|
||
const entry = await this.buildAttachment(att);
|
||
if (entry) attachmentsToPersist.push(entry);
|
||
}
|
||
|
||
// Double-Check: nochmal gegen DB prüfen (Race-Condition-Schutz wie in C#)
|
||
const existing2 = await this.emailRepo.findOne({
|
||
where: { MessageId: messageId },
|
||
});
|
||
if (existing2) {
|
||
this.logger.debug(
|
||
`E-Mail mit MessageId ${messageId} nach dem Download bereits vorhanden.`,
|
||
);
|
||
return;
|
||
}
|
||
|
||
const savedEmail = await this.emailRepo.save(email);
|
||
|
||
for (const { attachment, buffer } of attachmentsToPersist) {
|
||
attachment.EmailMessageId = savedEmail.Id;
|
||
const savedAttachment = await this.attachmentRepo.save(attachment);
|
||
|
||
const content = new Content();
|
||
content.AttachmentEntityId = savedAttachment.Id;
|
||
content.Content1 = buffer;
|
||
content.ContentLength = buffer.length;
|
||
await this.contentRepo.save(content);
|
||
|
||
// Generate PDF thumbnails if it's a PDF
|
||
if (savedAttachment.ContentType === 'application/pdf') {
|
||
await this.generateThumbnailsForAttachment(savedAttachment, buffer);
|
||
}
|
||
}
|
||
|
||
this.logger.debug(
|
||
`Neue E-Mail mit MessageId ${messageId} hinzugefügt (${attachmentsToPersist.length} Anhänge).`,
|
||
);
|
||
}
|
||
|
||
public async generateThumbnailsForAttachment(
|
||
attachment: Attachment,
|
||
buffer: Buffer,
|
||
): Promise<void> {
|
||
try {
|
||
const tempPdfPath = path.join(
|
||
os.tmpdir(),
|
||
`email-att-${attachment.Id}.pdf`,
|
||
);
|
||
await fs.writeFile(tempPdfPath, buffer);
|
||
|
||
const images = await this.pdfService.pdfToImages(tempPdfPath, 400);
|
||
await this.pageCache.generate(attachment.Id, images);
|
||
|
||
attachment.PageCount = images.length;
|
||
await this.attachmentRepo.save(attachment);
|
||
|
||
await this.pdfService.cleanup(images);
|
||
await fs.unlink(tempPdfPath).catch(() => {});
|
||
} catch (err: any) {
|
||
this.logger.warn(
|
||
`Konnte Vorschaubilder für Anhang ${attachment.Id} nicht generieren: ${err.message}`,
|
||
);
|
||
}
|
||
}
|
||
|
||
public async backfillThumbnailsForNewEmails(): Promise<{
|
||
processed: number;
|
||
failed: number;
|
||
}> {
|
||
const emails = await this.emailRepo.find({
|
||
where: { Status: 0 },
|
||
relations: ['Attachments', 'Attachments.Content'],
|
||
});
|
||
|
||
let processed = 0;
|
||
let failed = 0;
|
||
|
||
for (const email of emails) {
|
||
for (const attachment of email.Attachments) {
|
||
if (
|
||
attachment.ContentType === 'application/pdf' &&
|
||
attachment.PageCount === 0 &&
|
||
attachment.Content?.Content1
|
||
) {
|
||
this.logger.log(
|
||
`Backfill: Generiere Thumbnails für Attachment ${attachment.Id} (Email ${email.Id})`,
|
||
);
|
||
try {
|
||
await this.generateThumbnailsForAttachment(
|
||
attachment,
|
||
attachment.Content.Content1,
|
||
);
|
||
processed++;
|
||
} catch (err) {
|
||
failed++;
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
this.logger.log(
|
||
`Backfill abgeschlossen: ${processed} erfolgreich, ${failed} fehlgeschlagen.`,
|
||
);
|
||
return { processed, failed };
|
||
}
|
||
|
||
private async buildAttachment(
|
||
att: MailAttachment,
|
||
): Promise<{ attachment: Attachment; buffer: Buffer } | null> {
|
||
const buffer = att.content;
|
||
if (!Buffer.isBuffer(buffer) || buffer.length === 0) return null;
|
||
|
||
const filename = att.filename ?? att.cid ?? 'unbenannt';
|
||
let contentType = att.contentType ?? 'application/octet-stream';
|
||
const isEmbedded = att.contentDisposition === 'inline';
|
||
|
||
const isPdfByName = filename.toLowerCase().endsWith('.pdf');
|
||
const isPdfByType = contentType.toLowerCase() === 'application/pdf';
|
||
const isOctet = contentType.toLowerCase() === 'application/octet-stream';
|
||
|
||
// Nur PDFs (inkl. als octet-stream deklarierter PDFs) und embedded rfc822 werden übernommen.
|
||
const isRfc822 = contentType.toLowerCase() === 'message/rfc822';
|
||
if (!isPdfByType && !isRfc822 && !(isOctet && isPdfByName)) {
|
||
return null;
|
||
}
|
||
|
||
if (isPdfByName && !isPdfByType) {
|
||
contentType = 'application/pdf';
|
||
}
|
||
|
||
const attachment = new Attachment();
|
||
attachment.FileName = filename.slice(0, 255);
|
||
attachment.ContentType = contentType.slice(0, 100);
|
||
attachment.IsEmbedded = isEmbedded;
|
||
attachment.ContentId = att.cid ? att.cid.slice(0, 255) : null;
|
||
attachment.Checksum = crypto.createHash('md5').update(buffer).digest('hex');
|
||
attachment.Erechnung =
|
||
contentType.toLowerCase() === 'application/pdf'
|
||
? isERechnung(buffer)
|
||
: false;
|
||
attachment.ParentId = null;
|
||
|
||
return { attachment, buffer };
|
||
}
|
||
}
|
||
|
||
function formatAddress(
|
||
addr: AddressObject | AddressObject[] | undefined,
|
||
): string {
|
||
if (!addr) return '';
|
||
const first = Array.isArray(addr) ? addr[0] : addr;
|
||
return (first?.text ?? '').slice(0, 255);
|
||
}
|