feat: implement ProcessVerarbeiteteDocuments (Upload-Check)
Build and Push Multi-Platform Images / build-and-push (push) Successful in 37s

Ported ProcessVerarbeiteteDocuments() from C# ProcessUploads.cs:
- Checks docs tagged "hochgeladen" → eingangsrechnungVorhanden()
- On match: livesearch, update title/type/created/correspondent/tags,
  set custom fields (externeBelegnummer, AgrarmonitorLink), addNote
- Tag "hochgeladen" → "fertig" swap; owner via Client.AgrarmonitorBetriebId
- 401/403 guard: clearClient() + break (same pattern as runPolling)
- Cron: AGRARMONITOR_UPLOAD_CHECK_CRON (default: 0 * * * * *)
- New settings: agrarmonitor_tag_hochgeladen, agrarmonitor_link_field
- Endpoint: POST /api/agrarmonitor/process-uploads
- Frontend: polling-config extended with tagHochgeladen + linkField select,
  new card "Dokumenten-Verarbeitung" with run button + result display

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-25 12:11:44 +02:00
parent a726f863f0
commit 8c5a81ed27
6 changed files with 312 additions and 11 deletions
@@ -9,7 +9,9 @@ import { Client } from '../database/entities/client.entity';
const INTERN_BELEGNUMMER_FIELD_ID = 7;
const EINGANGSDATUM_FIELD_ID = 9;
const EXTERN_BELEGNUMMER_FIELD_ID = 3;
const DOCS_PAGE_SIZE = 500;
const AGRARMONITOR_BASE_URL = 'https://admin7.agrarmonitor.de';
export interface PollingResult {
processed: number;
@@ -22,6 +24,7 @@ export interface PollingResult {
export class AgrarmonitorPollingService implements OnModuleInit {
private readonly logger = new Logger(AgrarmonitorPollingService.name);
private pollingRunning = false;
private uploadCheckRunning = false;
constructor(
private readonly agrarmonitorService: AgrarmonitorService,
@@ -33,6 +36,8 @@ export class AgrarmonitorPollingService implements OnModuleInit {
async onModuleInit() {
await this.upsertSetting('agrarmonitor_tag_fertig', '4');
await this.upsertSetting('agrarmonitor_tag_verbucht', '9');
await this.upsertSetting('agrarmonitor_tag_hochgeladen', '');
await this.upsertSetting('agrarmonitor_link_field', '');
}
@Cron(process.env['AGRARMONITOR_POLLING_CRON'] || '0 */30 * * * *')
@@ -41,21 +46,40 @@ export class AgrarmonitorPollingService implements OnModuleInit {
this.runPolling().catch((err) => this.logger.error('Cron-Polling-Fehler:', err));
}
async getPollingConfig(): Promise<{ tagFertig: string; tagVerbucht: string }> {
const [fertig, verbucht] = await Promise.all([
@Cron(process.env['AGRARMONITOR_UPLOAD_CHECK_CRON'] || '0 * * * * *')
async scheduledUploadCheck() {
if (!process.env['AGRARMONITOR_UPLOAD_CHECK_CRON']) return;
this.processVerarbeiteteDocuments().catch((err) => this.logger.error('Cron-Upload-Check-Fehler:', err));
}
async getPollingConfig(): Promise<{ tagFertig: string; tagVerbucht: string; tagHochgeladen: string; linkField: string }> {
const [fertig, verbucht, hochgeladen, linkField] = await Promise.all([
this.settingRepo.findOneBy({ Tag: 'agrarmonitor_tag_fertig' }),
this.settingRepo.findOneBy({ Tag: 'agrarmonitor_tag_verbucht' }),
this.settingRepo.findOneBy({ Tag: 'agrarmonitor_tag_hochgeladen' }),
this.settingRepo.findOneBy({ Tag: 'agrarmonitor_link_field' }),
]);
return {
tagFertig: fertig?.Wert ?? '4',
tagVerbucht: verbucht?.Wert ?? '9',
tagHochgeladen: hochgeladen?.Wert ?? '',
linkField: linkField?.Wert ?? '',
};
}
async updatePollingConfig(tagFertig: string, tagVerbucht: string): Promise<{ tagFertig: string; tagVerbucht: string }> {
await this.settingRepo.update({ Tag: 'agrarmonitor_tag_fertig' }, { Wert: tagFertig });
await this.settingRepo.update({ Tag: 'agrarmonitor_tag_verbucht' }, { Wert: tagVerbucht });
return { tagFertig, tagVerbucht };
async updatePollingConfig(
tagFertig: string,
tagVerbucht: string,
tagHochgeladen: string,
linkField: string,
): Promise<{ tagFertig: string; tagVerbucht: string; tagHochgeladen: string; linkField: string }> {
await Promise.all([
this.settingRepo.update({ Tag: 'agrarmonitor_tag_fertig' }, { Wert: tagFertig }),
this.settingRepo.update({ Tag: 'agrarmonitor_tag_verbucht' }, { Wert: tagVerbucht }),
this.settingRepo.update({ Tag: 'agrarmonitor_tag_hochgeladen' }, { Wert: tagHochgeladen }),
this.settingRepo.update({ Tag: 'agrarmonitor_link_field' }, { Wert: linkField }),
]);
return { tagFertig, tagVerbucht, tagHochgeladen, linkField };
}
async runPolling(): Promise<PollingResult> {
@@ -210,9 +234,8 @@ export class AgrarmonitorPollingService implements OnModuleInit {
const customer = customers.find((c) => Number(c.id) === amDoc.kundenId);
if (customer) {
const lieferantennummer = (customer['lieferantennummer'] as string) ?? '';
const searchName = `(${lieferantennummer})`;
const displayName = this.buildCustomerName(customer, lieferantennummer);
let corr = await this.paperlessService.getCorrespondentByName(searchName);
let corr = await this.paperlessService.getCorrespondentByName(displayName);
if (!corr) {
corr = await this.paperlessService.addCorrespondent({
name: displayName,
@@ -264,6 +287,213 @@ export class AgrarmonitorPollingService implements OnModuleInit {
return result;
}
async processVerarbeiteteDocuments(): Promise<PollingResult> {
if (this.uploadCheckRunning) {
this.logger.warn('Upload-Check läuft bereits, überspringe');
return { processed: 0, updated: 0, skipped: 0, errors: ['Upload-Check bereits aktiv'] };
}
this.uploadCheckRunning = true;
const result: PollingResult = { processed: 0, updated: 0, skipped: 0, errors: [] };
this.logger.log('Starte Upload-Check');
try {
const [hochgeladenSetting, fertigSetting, linkFieldSetting] = await Promise.all([
this.settingRepo.findOneBy({ Tag: 'agrarmonitor_tag_hochgeladen' }),
this.settingRepo.findOneBy({ Tag: 'agrarmonitor_tag_fertig' }),
this.settingRepo.findOneBy({ Tag: 'agrarmonitor_link_field' }),
]);
const tagHochgeladenId = parseInt(hochgeladenSetting?.Wert ?? '', 10);
const tagFertigId = parseInt(fertigSetting?.Wert ?? '4', 10);
const linkFieldId = parseInt(linkFieldSetting?.Wert ?? '', 10);
if (isNaN(tagHochgeladenId)) {
this.logger.warn('Tag "hochgeladen" nicht konfiguriert — Upload-Check übersprungen');
return { ...result, errors: ['Tag "hochgeladen" nicht konfiguriert'] };
}
let amClient: Awaited<ReturnType<typeof this.agrarmonitorService.getClient>>;
try {
amClient = await this.agrarmonitorService.getClient();
} catch (err: unknown) {
const msg = `Connector-Fehler: ${err instanceof Error ? err.message : 'unbekannt'}`;
this.logger.error(msg);
return { ...result, errors: [msg] };
}
const docsResponse = await this.paperlessService.getDocuments({
page: 1,
page_size: DOCS_PAGE_SIZE,
truncate_content: true,
tags__id__all: tagHochgeladenId,
});
const docs: any[] = docsResponse?.results ?? [];
if ((docsResponse?.count ?? 0) > DOCS_PAGE_SIZE) {
this.logger.warn(`Mehr als ${DOCS_PAGE_SIZE} Dokumente hochgeladen — nur erste ${DOCS_PAGE_SIZE} werden geprüft`);
}
this.logger.log(`${docs.length} Dokumente laut Paperless im Dateieingang`);
for (const doc of docs) {
result.processed++;
const interneBelegnummer =
((doc.custom_fields as any[]) ?? []).find(
(cf: any) => cf.field === INTERN_BELEGNUMMER_FIELD_ID,
)?.value as string ?? '';
if (!interneBelegnummer) {
this.logger.log(`Dokument ${doc.id as number} hat keine interne Belegnummer`);
result.skipped++;
await this.delay(500);
continue;
}
let vorhanden: boolean;
try {
vorhanden = await amClient.eingangsrechnungVorhanden(interneBelegnummer);
} catch (err: unknown) {
const status = (err as any)?.response?.status;
if (status === 401 || status === 403) {
this.agrarmonitorService.clearClient();
const msg = `Session abgelaufen (${status}) — Upload-Check abgebrochen`;
this.logger.warn(msg);
result.errors.push(msg);
break;
}
const msg = `${interneBelegnummer}: Vorhanden-Check fehlgeschlagen`;
this.logger.error(`${msg}: ${err instanceof Error ? err.message : err}`);
result.errors.push(msg);
await this.delay(500);
continue;
}
if (!vorhanden) {
result.skipped++;
await this.delay(500);
continue;
}
this.logger.log(`Dokument ${interneBelegnummer} ist bereits verarbeitet, aktualisiere Paperless`);
let amResults: Awaited<ReturnType<typeof amClient.eingangsrechnungenLivesearch>>;
try {
amResults = await amClient.eingangsrechnungenLivesearch(interneBelegnummer);
} catch (err: unknown) {
const status = (err as any)?.response?.status;
if (status === 401 || status === 403) {
this.agrarmonitorService.clearClient();
const msg = `Session abgelaufen (${status}) — Upload-Check abgebrochen`;
this.logger.warn(msg);
result.errors.push(msg);
break;
}
const msg = `${interneBelegnummer}: Livesearch-Fehler`;
this.logger.error(`${msg}: ${err instanceof Error ? err.message : err}`);
result.errors.push(msg);
await this.delay(500);
continue;
}
if (amResults.length > 1) {
this.logger.log(`Dokument ${interneBelegnummer} ist doppelt vorhanden`);
result.skipped++;
await this.delay(500);
continue;
}
const amDoc = amResults[0];
try {
// Kundendaten abrufen
const customer = await amClient.getCustomerById(amDoc.kundenId);
const lieferantennummer = (customer['lieferantennummer'] as string) ?? '';
if (!lieferantennummer) {
this.logger.log(`Kunde ${amDoc.kundenId} hat keine Lieferantennummer — Dokument wird übersprungen`);
result.skipped++;
await this.delay(500);
continue;
}
// Korrespondent ermitteln oder anlegen
const displayName = this.buildCustomerName(customer, lieferantennummer);
let corr = await this.paperlessService.getCorrespondentByName(displayName);
if (!corr) {
corr = await this.paperlessService.addCorrespondent({
name: displayName,
match: '',
matching_algorithm: 0,
is_insensitive: true,
owner: null,
});
}
// Owner aus Client-Tabelle
let ownerId: number | undefined;
const matchedClient = await this.clientRepo.findOneBy({ AgrarmonitorBetriebId: amDoc.betriebId });
if (matchedClient) ownerId = matchedClient.PaperlessUserId;
// Tags: hochgeladen entfernen, fertig hinzufügen
const currentTags: number[] = (doc.tags as number[]) ?? [];
const newTags = [...new Set(currentTags.filter((t) => t !== tagHochgeladenId).concat([tagFertigId]))];
// Custom fields aufbauen: bestehende behalten, extern + link setzen
const existingFields: any[] = ((doc.custom_fields as any[]) ?? []).map((f: any) => ({ ...f }));
this.setCustomField(existingFields, EXTERN_BELEGNUMMER_FIELD_ID, amDoc.belegNummer);
if (!isNaN(linkFieldId)) {
this.setCustomField(
existingFields,
linkFieldId,
`${AGRARMONITOR_BASE_URL}/rechnungen/detail/${amDoc.eingangId}`,
);
}
const updateData: Record<string, any> = {
title: (amDoc.dokumentTyp === 0 ? 'ERG ' : 'EGU ') + amDoc.belegNummer,
document_type: amDoc.dokumentTyp === 0 ? 1 : 2,
tags: newTags,
custom_fields: existingFields,
};
if (amDoc.belegDatum) updateData.created = amDoc.belegDatum.toISOString().slice(0, 10);
if (corr) updateData.correspondent = corr.id as number;
if (ownerId !== undefined) updateData.owner = ownerId;
await this.paperlessService.updateDocument(doc.id as number, updateData);
await this.paperlessService.addNote(
doc.id as number,
`Beleg in Agrarmonitor verarbeitet: ${new Date().toLocaleString('de-DE')}`,
);
this.logger.log(`Beleg ${interneBelegnummer} auf AMfertig gesetzt`);
result.updated++;
} catch (err: unknown) {
const msg = `${interneBelegnummer}: Update-Fehler`;
this.logger.error(`${msg}: ${err instanceof Error ? err.message : err}`);
result.errors.push(msg);
}
await this.delay(500);
}
this.logger.log(
`Upload-Check abgeschlossen: ${result.processed} geprüft, ${result.updated} aktualisiert, ` +
`${result.skipped} übersprungen, ${result.errors.length} Fehler`,
);
} finally {
this.uploadCheckRunning = false;
}
return result;
}
private setCustomField(fields: any[], fieldId: number, value: any): void {
const existing = fields.find((f) => f.field === fieldId);
if (existing) {
existing.value = value;
} else {
fields.push({ field: fieldId, value });
}
}
private buildCustomerName(customer: Record<string, unknown>, nummer: string): string {
const firma = (customer['firma'] as string) ?? '';
const nachname = (customer['nachname'] as string) ?? '';
@@ -32,8 +32,8 @@ export class AgrarmonitorController {
@Put('polling-config')
@RequirePermissions(Permission.MANAGE_SETTINGS)
async updatePollingConfig(@Body() body: { tagFertig: string; tagVerbucht: string }) {
return this.pollingService.updatePollingConfig(body.tagFertig, body.tagVerbucht);
async updatePollingConfig(@Body() body: { tagFertig: string; tagVerbucht: string; tagHochgeladen: string; linkField: string }) {
return this.pollingService.updatePollingConfig(body.tagFertig, body.tagVerbucht, body.tagHochgeladen, body.linkField);
}
@Post('run-polling')
@@ -42,4 +42,11 @@ export class AgrarmonitorController {
async runPolling() {
return this.pollingService.runPolling();
}
@Post('process-uploads')
@HttpCode(200)
@RequirePermissions(Permission.MANAGE_SETTINGS)
async processUploads() {
return this.pollingService.processVerarbeiteteDocuments();
}
}