import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; import { Cron } from '@nestjs/schedule'; import { InjectRepository } from '@nestjs/typeorm'; import { IsNull, Not, Repository } from 'typeorm'; import { AgrarmonitorService } from './agrarmonitor.service'; import { PaperlessService } from '../paperless/paperless.service'; import { Setting } from '../database/entities/setting.entity'; import { Client } from '../database/entities/client.entity'; import { CorrespondentSetting } from '../database/entities/correspondent-setting.entity'; const INTERN_BELEGNUMMER_FIELD_ID = 7; const EINGANGSDATUM_FIELD_ID = 9; const EXTERN_BELEGNUMMER_FIELD_ID = 3; const DOCS_PAGE_SIZE = 500; const AGRARMONITOR_BASE_URL = 'https://admin7.agrarmonitor.de'; export interface PollingResult { processed: number; updated: number; skipped: number; errors: string[]; } export interface SyncConflict { agrarmonitorId: number; correspondents: Array<{ id: number; name: string; documentCount: number }>; } export interface SyncCorrespondentsResult { total: number; matched: number; unmatched: number; autoMerged: number; conflicts: SyncConflict[]; } @Injectable() export class AgrarmonitorPollingService implements OnModuleInit { private readonly logger = new Logger(AgrarmonitorPollingService.name); private pollingRunning = false; private uploadCheckRunning = false; constructor( private readonly agrarmonitorService: AgrarmonitorService, private readonly paperlessService: PaperlessService, @InjectRepository(Setting) private readonly settingRepo: Repository, @InjectRepository(Client) private readonly clientRepo: Repository, @InjectRepository(CorrespondentSetting) private readonly corrSettingRepo: Repository, ) {} async onModuleInit() { await this.upsertSetting('agrarmonitor_tag_fertig', '4'); await this.upsertSetting('agrarmonitor_tag_verbucht', '9'); await this.upsertSetting('agrarmonitor_tag_hochgeladen', ''); await this.upsertSetting('agrarmonitor_link_field', ''); } @Cron(process.env['AGRARMONITOR_POLLING_CRON'] || '0 */30 * * * *') async scheduledPolling() { if (!process.env['AGRARMONITOR_POLLING_CRON']) return; this.runPolling().catch((err) => this.logger.error('Cron-Polling-Fehler:', err)); } @Cron(process.env['AGRARMONITOR_UPLOAD_CHECK_CRON'] || '0 * * * * *') async scheduledUploadCheck() { if (!process.env['AGRARMONITOR_UPLOAD_CHECK_CRON']) return; this.processVerarbeiteteDocuments().catch((err) => this.logger.error('Cron-Upload-Check-Fehler:', err)); } async getPollingConfig(): Promise<{ tagFertig: string; tagVerbucht: string; tagHochgeladen: string; linkField: string }> { const [fertig, verbucht, hochgeladen, linkField] = await Promise.all([ this.settingRepo.findOneBy({ Tag: 'agrarmonitor_tag_fertig' }), this.settingRepo.findOneBy({ Tag: 'agrarmonitor_tag_verbucht' }), this.settingRepo.findOneBy({ Tag: 'agrarmonitor_tag_hochgeladen' }), this.settingRepo.findOneBy({ Tag: 'agrarmonitor_link_field' }), ]); return { tagFertig: fertig?.Wert ?? '4', tagVerbucht: verbucht?.Wert ?? '9', tagHochgeladen: hochgeladen?.Wert ?? '', linkField: linkField?.Wert ?? '', }; } async updatePollingConfig( tagFertig: string, tagVerbucht: string, tagHochgeladen: string, linkField: string, ): Promise<{ tagFertig: string; tagVerbucht: string; tagHochgeladen: string; linkField: string }> { await Promise.all([ this.settingRepo.update({ Tag: 'agrarmonitor_tag_fertig' }, { Wert: tagFertig }), this.settingRepo.update({ Tag: 'agrarmonitor_tag_verbucht' }, { Wert: tagVerbucht }), this.settingRepo.update({ Tag: 'agrarmonitor_tag_hochgeladen' }, { Wert: tagHochgeladen }), this.settingRepo.update({ Tag: 'agrarmonitor_link_field' }, { Wert: linkField }), ]); return { tagFertig, tagVerbucht, tagHochgeladen, linkField }; } async runPolling(): Promise { if (this.pollingRunning) { this.logger.warn('Polling läuft bereits, überspringe'); return { processed: 0, updated: 0, skipped: 0, errors: ['Polling bereits aktiv'] }; } this.pollingRunning = true; const result: PollingResult = { processed: 0, updated: 0, skipped: 0, errors: [] }; this.logger.log('Starte Agrarmonitor-Polling'); try { const [tagFertigSetting, tagVerbuchtSetting] = await Promise.all([ this.settingRepo.findOneBy({ Tag: 'agrarmonitor_tag_fertig' }), this.settingRepo.findOneBy({ Tag: 'agrarmonitor_tag_verbucht' }), ]); const tagFertigId = parseInt(tagFertigSetting?.Wert ?? '4', 10); const tagVerbuchtId = parseInt(tagVerbuchtSetting?.Wert ?? '9', 10); if (isNaN(tagFertigId) || isNaN(tagVerbuchtId)) { const msg = 'Tag-IDs ungültig (keine Zahlen)'; this.logger.error(msg); return { ...result, errors: [msg] }; } let amClient: Awaited>; try { amClient = await this.agrarmonitorService.getClient(); } catch (err: unknown) { const msg = `Connector-Fehler: ${err instanceof Error ? err.message : 'unbekannt'}`; this.logger.error(msg); return { ...result, errors: [msg] }; } let customers: Awaited>; try { customers = await amClient.fetchCustomers(); } catch (err: unknown) { const msg = `Kunden-Abruf fehlgeschlagen: ${err instanceof Error ? err.message : 'unbekannt'}`; this.logger.error(msg); return { ...result, errors: [msg] }; } for (const customer of customers.filter( (c) => Number(c['ist_lieferant']) === 1 && Number(c['ist_aktiv']) === 1, )) { try { await this.getOrCreateCorrespondent(customer); } catch (err: unknown) { this.logger.warn(`Korrespondenten-Sync fehlgeschlagen: ${err instanceof Error ? err.message : err}`); } } const docsResponse = await this.paperlessService.getDocuments({ page: 1, page_size: DOCS_PAGE_SIZE, truncate_content: true, tags__id__all: tagFertigId, }); const docs: any[] = docsResponse?.results ?? []; if ((docsResponse?.count ?? 0) > DOCS_PAGE_SIZE) { this.logger.warn(`Mehr als ${DOCS_PAGE_SIZE} Dokumente bereit — nur erste ${DOCS_PAGE_SIZE} werden verarbeitet`); } this.logger.log(`${docs.length} Dokumente fertig in Agrarmonitor`); for (const doc of docs) { result.processed++; const interneBelegnummer = ((doc.custom_fields as any[]) ?? []).find( (cf: any) => cf.field === INTERN_BELEGNUMMER_FIELD_ID, )?.value as string ?? ''; if (!interneBelegnummer) { this.logger.log(`Dokument ${doc.id as number} hat keine interne Belegnummer`); result.skipped++; await this.delay(500); continue; } let amResults: Awaited>; try { amResults = await amClient.eingangsrechnungenLivesearch(interneBelegnummer); } catch (err: unknown) { const status = (err as any)?.response?.status; if (status === 401 || status === 403) { this.agrarmonitorService.clearClient(); const msg = `Session abgelaufen (${status}) — Polling abgebrochen, nächster Lauf meldet sich neu an`; this.logger.warn(msg); result.errors.push(msg); break; } const msg = `${interneBelegnummer}: Livesearch-Fehler`; this.logger.error(`${msg}: ${err instanceof Error ? err.message : err}`); result.errors.push(msg); await this.delay(500); continue; } if (amResults.length === 0) { this.logger.log(`${interneBelegnummer} nicht in Agrarmonitor gefunden`); result.skipped++; await this.delay(500); continue; } if (amResults.length > 1) { const msg = `${interneBelegnummer}: Mehrfach gefunden`; this.logger.error(msg); result.errors.push(msg); await this.delay(500); continue; } const amDoc = amResults[0]; if (!amDoc.interneBelegNummer && interneBelegnummer) { try { await amClient.setLieferscheinNummer(amDoc.eingangId, interneBelegnummer); } catch (err: unknown) { this.logger.warn(`${interneBelegnummer}: Lieferscheinnummer setzen fehlgeschlagen: ${err instanceof Error ? err.message : err}`); } } if (!amDoc.eingangsDatum) { const eingangsdatumField = ((doc.custom_fields as any[]) ?? []).find( (cf: any) => cf.field === EINGANGSDATUM_FIELD_ID, ); if (eingangsdatumField?.value) { const eingangsdatum = new Date(eingangsdatumField.value as string); if (!isNaN(eingangsdatum.getTime())) { await amClient.setEingangsdatum(amDoc.eingangId, eingangsdatum); this.logger.log(`Eingangsdatum für ${interneBelegnummer} gesetzt`); } } result.skipped++; } else if (amDoc.buchungsDatum) { try { let correspondentId: number | undefined; const customer = customers.find((c) => Number(c.id) === amDoc.kundenId); if (customer) { const corr = await this.getOrCreateCorrespondent(customer); if (corr) correspondentId = corr.id as number; } let ownerId: number | undefined; const matchedClient = await this.clientRepo.findOneBy({ AgrarmonitorBetriebId: amDoc.betriebId, }); if (matchedClient) ownerId = matchedClient.PaperlessUserId; const currentTags: number[] = (doc.tags as number[]) ?? []; const newTags = [...new Set(currentTags.filter((t) => t !== tagFertigId).concat([tagVerbuchtId]))]; const updateData: Record = { tags: newTags }; if (correspondentId !== undefined) updateData.correspondent = correspondentId; if (ownerId !== undefined) updateData.owner = ownerId; await this.paperlessService.updateDocument(doc.id as number, updateData); this.logger.log(`Beleg ${interneBelegnummer} gebucht`); result.updated++; } catch (err: unknown) { const msg = `${interneBelegnummer}: Update-Fehler`; this.logger.error(`${msg}: ${err instanceof Error ? err.message : err}`); result.errors.push(msg); } } else { result.skipped++; } await this.delay(500); } this.logger.log( `Polling abgeschlossen: ${result.processed} verarbeitet, ${result.updated} aktualisiert, ` + `${result.skipped} übersprungen, ${result.errors.length} Fehler`, ); } finally { this.pollingRunning = false; } return result; } async processVerarbeiteteDocuments(): Promise { if (this.uploadCheckRunning) { this.logger.warn('Upload-Check läuft bereits, überspringe'); return { processed: 0, updated: 0, skipped: 0, errors: ['Upload-Check bereits aktiv'] }; } this.uploadCheckRunning = true; const result: PollingResult = { processed: 0, updated: 0, skipped: 0, errors: [] }; this.logger.log('Starte Upload-Check'); try { const [hochgeladenSetting, fertigSetting, linkFieldSetting] = await Promise.all([ this.settingRepo.findOneBy({ Tag: 'agrarmonitor_tag_hochgeladen' }), this.settingRepo.findOneBy({ Tag: 'agrarmonitor_tag_fertig' }), this.settingRepo.findOneBy({ Tag: 'agrarmonitor_link_field' }), ]); const tagHochgeladenId = parseInt(hochgeladenSetting?.Wert ?? '', 10); const tagFertigId = parseInt(fertigSetting?.Wert ?? '4', 10); const linkFieldId = parseInt(linkFieldSetting?.Wert ?? '', 10); if (isNaN(tagHochgeladenId)) { this.logger.warn('Tag "hochgeladen" nicht konfiguriert — Upload-Check übersprungen'); return { ...result, errors: ['Tag "hochgeladen" nicht konfiguriert'] }; } let amClient: Awaited>; try { amClient = await this.agrarmonitorService.getClient(); } catch (err: unknown) { const msg = `Connector-Fehler: ${err instanceof Error ? err.message : 'unbekannt'}`; this.logger.error(msg); return { ...result, errors: [msg] }; } const docsResponse = await this.paperlessService.getDocuments({ page: 1, page_size: DOCS_PAGE_SIZE, truncate_content: true, tags__id__all: tagHochgeladenId, }); const docs: any[] = docsResponse?.results ?? []; if ((docsResponse?.count ?? 0) > DOCS_PAGE_SIZE) { this.logger.warn(`Mehr als ${DOCS_PAGE_SIZE} Dokumente hochgeladen — nur erste ${DOCS_PAGE_SIZE} werden geprüft`); } this.logger.log(`${docs.length} Dokumente laut Paperless im Dateieingang`); for (const doc of docs) { result.processed++; const interneBelegnummer = ((doc.custom_fields as any[]) ?? []).find( (cf: any) => cf.field === INTERN_BELEGNUMMER_FIELD_ID, )?.value as string ?? ''; if (!interneBelegnummer) { this.logger.log(`Dokument ${doc.id as number} hat keine interne Belegnummer`); result.skipped++; await this.delay(500); continue; } let vorhanden: boolean; try { vorhanden = await amClient.eingangsrechnungVorhanden(interneBelegnummer); } catch (err: unknown) { const status = (err as any)?.response?.status; if (status === 401 || status === 403) { this.agrarmonitorService.clearClient(); const msg = `Session abgelaufen (${status}) — Upload-Check abgebrochen`; this.logger.warn(msg); result.errors.push(msg); break; } const msg = `${interneBelegnummer}: Vorhanden-Check fehlgeschlagen`; this.logger.error(`${msg}: ${err instanceof Error ? err.message : err}`); result.errors.push(msg); await this.delay(500); continue; } if (!vorhanden) { result.skipped++; await this.delay(500); continue; } this.logger.log(`Dokument ${interneBelegnummer} ist bereits verarbeitet, aktualisiere Paperless`); let amResults: Awaited>; try { amResults = await amClient.eingangsrechnungenLivesearch(interneBelegnummer); } catch (err: unknown) { const status = (err as any)?.response?.status; if (status === 401 || status === 403) { this.agrarmonitorService.clearClient(); const msg = `Session abgelaufen (${status}) — Upload-Check abgebrochen`; this.logger.warn(msg); result.errors.push(msg); break; } const msg = `${interneBelegnummer}: Livesearch-Fehler`; this.logger.error(`${msg}: ${err instanceof Error ? err.message : err}`); result.errors.push(msg); await this.delay(500); continue; } if (amResults.length > 1) { this.logger.log(`Dokument ${interneBelegnummer} ist doppelt vorhanden`); result.skipped++; await this.delay(500); continue; } const amDoc = amResults[0]; try { // Kundendaten abrufen const customer = await amClient.getCustomerById(amDoc.kundenId); const lieferantennummer = (customer['lieferantennummer'] as string) ?? ''; if (!lieferantennummer) { this.logger.log(`Kunde ${amDoc.kundenId} hat keine Lieferantennummer — Dokument wird übersprungen`); result.skipped++; await this.delay(500); continue; } // Korrespondent ermitteln oder anlegen const corr = await this.getOrCreateCorrespondent(customer); // Owner aus Client-Tabelle let ownerId: number | undefined; const matchedClient = await this.clientRepo.findOneBy({ AgrarmonitorBetriebId: amDoc.betriebId }); if (matchedClient) ownerId = matchedClient.PaperlessUserId; // Tags: hochgeladen entfernen, fertig hinzufügen const currentTags: number[] = (doc.tags as number[]) ?? []; const newTags = [...new Set(currentTags.filter((t) => t !== tagHochgeladenId).concat([tagFertigId]))]; // Custom fields aufbauen: bestehende behalten, extern + link setzen const existingFields: any[] = ((doc.custom_fields as any[]) ?? []).map((f: any) => ({ ...f })); this.setCustomField(existingFields, EXTERN_BELEGNUMMER_FIELD_ID, amDoc.belegNummer); if (!isNaN(linkFieldId)) { this.setCustomField( existingFields, linkFieldId, `${AGRARMONITOR_BASE_URL}/rechnungen/detail/${amDoc.eingangId}`, ); } const updateData: Record = { title: (amDoc.dokumentTyp === 0 ? 'ERG ' : 'EGU ') + amDoc.belegNummer, document_type: amDoc.dokumentTyp === 0 ? 1 : 2, tags: newTags, custom_fields: existingFields, }; if (amDoc.belegDatum) updateData.created = amDoc.belegDatum.toISOString().slice(0, 10); if (corr) updateData.correspondent = corr.id as number; if (ownerId !== undefined) updateData.owner = ownerId; await this.paperlessService.updateDocument(doc.id as number, updateData); await this.paperlessService.addNote( doc.id as number, `Beleg in Agrarmonitor verarbeitet: ${new Date().toLocaleString('de-DE')}`, ); this.logger.log(`Beleg ${interneBelegnummer} auf AMfertig gesetzt`); result.updated++; } catch (err: unknown) { const msg = `${interneBelegnummer}: Update-Fehler`; this.logger.error(`${msg}: ${err instanceof Error ? err.message : err}`); result.errors.push(msg); } await this.delay(500); } this.logger.log( `Upload-Check abgeschlossen: ${result.processed} geprüft, ${result.updated} aktualisiert, ` + `${result.skipped} übersprungen, ${result.errors.length} Fehler`, ); } finally { this.uploadCheckRunning = false; } return result; } private setCustomField(fields: any[], fieldId: number, value: any): void { const existing = fields.find((f) => f.field === fieldId); if (existing) { existing.value = value; } else { fields.push({ field: fieldId, value }); } } async syncCorrespondentIds(): Promise { let amClient: Awaited>; try { amClient = await this.agrarmonitorService.getClient(); } catch (err: unknown) { throw new Error(`Connector-Fehler: ${err instanceof Error ? err.message : 'unbekannt'}`); } const customers = await amClient.fetchCustomers(); const lieferantMap = new Map(); // lieferantennummer → AM-ID const kundenMap = new Map(); // kundennummer → AM-ID for (const c of customers) { const liefNr = String(c['lieferantennummer'] ?? '').trim(); if (liefNr) lieferantMap.set(liefNr, Number(c.id)); const kdNr = String(c['kundennummer'] ?? '').trim(); if (kdNr) kundenMap.set(kdNr, Number(c.id)); } const allCorrespondents: any[] = []; let page = 1; while (true) { const resp = await this.paperlessService.getCorrespondents({ page, page_size: 250 }); allCorrespondents.push(...(resp.results ?? [])); if (!resp.next) break; page++; } const lieferantRegex = /\((\d+)\)$/; // reine Zahl → Lieferantennummer const kundenRegex = /\(KD(\d+)\)$/; // KD-Prefix → Kundennummer let matched = 0; let unmatched = 0; for (const corr of allCorrespondents) { const name = corr.name as string; let amId: number | undefined; const kdMatch = kundenRegex.exec(name); if (kdMatch) { amId = kundenMap.get(kdMatch[1]); } else { const liefMatch = lieferantRegex.exec(name); if (liefMatch) amId = lieferantMap.get(liefMatch[1]); } if (amId === undefined) { unmatched++; continue; } let setting = await this.corrSettingRepo.findOneBy({ CorrespondentId: corr.id as number }); if (!setting) { setting = this.corrSettingRepo.create({ CorrespondentId: corr.id as number, AgrarmonitorId: amId }); } else { setting.AgrarmonitorId = amId; } await this.corrSettingRepo.save(setting); matched++; } // Duplikate ermitteln: mehrere Paperless-Korrespondenten mit derselben AgrarmonitorId const allLinked = await this.corrSettingRepo.find({ where: { AgrarmonitorId: Not(IsNull()) }, }); const byAmId = new Map(); for (const s of allLinked) { const amId = s.AgrarmonitorId!; const ids = byAmId.get(amId) ?? []; ids.push(s.CorrespondentId); byAmId.set(amId, ids); } let autoMerged = 0; const conflicts: SyncConflict[] = []; for (const [amId, corrIds] of byAmId) { if (corrIds.length <= 1) continue; const corrs = await Promise.all(corrIds.map(id => this.paperlessService.getCorrespondent(id))); const uniqueNames = new Set(corrs.map((c: any) => c.name as string)); if (uniqueNames.size === 1) { // Gleicher Name — automatisch zusammenführen const withoutDocs = corrs.filter((c: any) => Number(c.document_count) === 0); const withDocs = corrs.filter((c: any) => Number(c.document_count) > 0); if (withoutDocs.length > 0) { for (const toDelete of withoutDocs) { await this.paperlessService.deleteCorrespondent(toDelete.id as number); await this.corrSettingRepo.delete({ CorrespondentId: toDelete.id as number }); autoMerged++; this.logger.log(`Duplikat gelöscht (keine Dokumente): ${toDelete.name as string} (ID ${toDelete.id as number})`); } } else { // Alle haben Dokumente — in den mit den meisten Dokumenten zusammenführen const sorted = [...withDocs].sort((a: any, b: any) => Number(b.document_count) - Number(a.document_count)); const keep = sorted[0] as any; for (const toMerge of sorted.slice(1)) { await this.mergeCorrespondents(keep.id as number, toMerge.id as number); autoMerged++; this.logger.log(`Duplikat zusammengeführt in ${keep.name as string} (ID ${keep.id as number})`); } } } else { // Unterschiedliche Namen — Nutzerentscheidung erforderlich conflicts.push({ agrarmonitorId: amId, correspondents: corrs.map((c: any) => ({ id: c.id as number, name: c.name as string, documentCount: Number(c.document_count), })), }); } } this.logger.log( `Korrespondenten-Abgleich: ${matched} zugeordnet, ${unmatched} ohne Treffer, ` + `${autoMerged} automatisch zusammengeführt, ${conflicts.length} Konflikte`, ); return { total: allCorrespondents.length, matched, unmatched, autoMerged, conflicts }; } async mergeCorrespondents(keepId: number, deleteId: number): Promise<{ mergedDocuments: number }> { let mergedDocuments = 0; let page = 1; while (true) { const resp = await this.paperlessService.getDocuments({ correspondent__id: deleteId, page, page_size: 250, truncate_content: true, }); const docs: any[] = resp?.results ?? []; for (const doc of docs) { await this.paperlessService.updateDocument(doc.id as number, { correspondent: keepId }); mergedDocuments++; } if (!resp?.next) break; page++; } await this.paperlessService.deleteCorrespondent(deleteId); await this.corrSettingRepo.delete({ CorrespondentId: deleteId }); this.logger.log(`Korrespondent ${deleteId} → ${keepId} zusammengeführt (${mergedDocuments} Dokumente)`); return { mergedDocuments }; } private async getOrCreateCorrespondent(customer: Record): Promise { const lieferantennummer = (customer['lieferantennummer'] as string) ?? ''; const displayName = this.buildCustomerName(customer, lieferantennummer); let corr = await this.paperlessService.getCorrespondentByName(displayName); if (!corr) { corr = await this.paperlessService.addCorrespondent({ name: displayName, match: '', matching_algorithm: 0, is_insensitive: true, owner: null, }); } return corr; } private buildCustomerName(customer: Record, nummer: string): string { const firma = (customer['firma'] as string) ?? ''; const nachname = (customer['nachname'] as string) ?? ''; const vorname = (customer['vorname'] as string) ?? ''; const name = firma || (nachname + (vorname ? ', ' + vorname : '')); return `${name} (${nummer})`; } private delay(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)); } private async upsertSetting(tag: string, defaultValue: string): Promise { const existing = await this.settingRepo.findOneBy({ Tag: tag }); if (!existing) { await this.settingRepo.save( this.settingRepo.create({ Typ: 1, Wert: defaultValue, Tag: tag }), ); } } }