feat: add AgrarmonitorPollingService with cron and runPolling

This commit is contained in:
2026-05-23 14:45:47 +02:00
parent 79874bf54f
commit dd0fcfc2e5
@@ -0,0 +1,252 @@
// paperless-backend/src/agrarmonitor/agrarmonitor-polling.service.ts
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { Cron } from '@nestjs/schedule';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { AgrarmonitorService } from './agrarmonitor.service';
import { AgrarmonitorWebService } from './agrarmonitor-web.service';
import { PaperlessService } from '../paperless/paperless.service';
import { Setting } from '../database/entities/setting.entity';
import { Client } from '../database/entities/client.entity';
const INTERN_BELEGNUMMER_FIELD_ID = 7;
const EINGANGSDATUM_FIELD_ID = 9;
export interface PollingResult {
processed: number;
updated: number;
skipped: number;
errors: string[];
}
@Injectable()
export class AgrarmonitorPollingService implements OnModuleInit {
private readonly logger = new Logger(AgrarmonitorPollingService.name);
constructor(
private readonly agrarmonitorService: AgrarmonitorService,
private readonly webService: AgrarmonitorWebService,
private readonly paperlessService: PaperlessService,
@InjectRepository(Setting) private readonly settingRepo: Repository<Setting>,
@InjectRepository(Client) private readonly clientRepo: Repository<Client>,
) {}
async onModuleInit() {
await this.upsertSetting('agrarmonitor_tag_fertig', '4');
await this.upsertSetting('agrarmonitor_tag_verbucht', '9');
}
@Cron(process.env['AGRARMONITOR_POLLING_CRON'] || '0 */30 * * * *')
async scheduledPolling() {
if (!process.env['AGRARMONITOR_POLLING_CRON']) return;
this.runPolling().catch((err) => this.logger.error('Cron-Polling-Fehler:', err));
}
async getPollingConfig(): Promise<{ tagFertig: string; tagVerbucht: string }> {
const [fertig, verbucht] = await Promise.all([
this.settingRepo.findOneBy({ Tag: 'agrarmonitor_tag_fertig' }),
this.settingRepo.findOneBy({ Tag: 'agrarmonitor_tag_verbucht' }),
]);
return {
tagFertig: fertig?.Wert ?? '4',
tagVerbucht: verbucht?.Wert ?? '9',
};
}
async updatePollingConfig(tagFertig: string, tagVerbucht: string): Promise<{ tagFertig: string; tagVerbucht: string }> {
await this.settingRepo.update({ Tag: 'agrarmonitor_tag_fertig' }, { Wert: tagFertig });
await this.settingRepo.update({ Tag: 'agrarmonitor_tag_verbucht' }, { Wert: tagVerbucht });
return { tagFertig, tagVerbucht };
}
async runPolling(): Promise<PollingResult> {
const result: PollingResult = { processed: 0, updated: 0, skipped: 0, errors: [] };
this.logger.log('Starte Agrarmonitor-Polling');
const [tagFertigSetting, tagVerbuchtSetting] = await Promise.all([
this.settingRepo.findOneBy({ Tag: 'agrarmonitor_tag_fertig' }),
this.settingRepo.findOneBy({ Tag: 'agrarmonitor_tag_verbucht' }),
]);
const tagFertigId = parseInt(tagFertigSetting?.Wert ?? '4', 10);
const tagVerbuchtId = parseInt(tagVerbuchtSetting?.Wert ?? '9', 10);
let amClient: Awaited<ReturnType<typeof this.agrarmonitorService.getClient>>;
try {
amClient = await this.agrarmonitorService.getClient();
} catch (err: unknown) {
const msg = `Connector-Fehler: ${err instanceof Error ? err.message : 'unbekannt'}`;
this.logger.error(msg);
return { ...result, errors: [msg] };
}
let customers: Awaited<ReturnType<typeof amClient.fetchCustomers>>;
try {
customers = await amClient.fetchCustomers();
} catch (err: unknown) {
const msg = `Kunden-Abruf fehlgeschlagen: ${err instanceof Error ? err.message : 'unbekannt'}`;
this.logger.error(msg);
return { ...result, errors: [msg] };
}
for (const customer of customers.filter(
(c) => Number(c['ist_lieferant']) === 1 && Number(c['ist_aktiv']) === 1,
)) {
const lieferantennummer = (customer['lieferantennummer'] as string) ?? '';
const searchName = `(${lieferantennummer})`;
const displayName = this.buildCustomerName(customer, lieferantennummer);
const existing = await this.paperlessService.getCorrespondentByName(searchName);
if (!existing) {
await this.paperlessService.addCorrespondent({
name: displayName,
match: '',
matching_algorithm: 0,
is_insensitive: true,
owner: null,
});
}
}
const docsResponse = await this.paperlessService.getDocuments({
page: 1,
page_size: 9999,
truncate_content: true,
tags__id__all: tagFertigId,
});
const docs: any[] = docsResponse?.results ?? [];
this.logger.log(`${docs.length} Dokumente fertig in Agrarmonitor`);
for (const doc of docs) {
result.processed++;
const interneBelegnummer =
((doc.custom_fields as any[]) ?? []).find(
(cf: any) => cf.field === INTERN_BELEGNUMMER_FIELD_ID,
)?.value as string ?? '';
if (!interneBelegnummer) {
this.logger.log(`Dokument ${doc.id as number} hat keine interne Belegnummer`);
result.skipped++;
await this.delay(500);
continue;
}
let amResults: Awaited<ReturnType<typeof this.webService.eingangsrechnungenLivesearch>>;
try {
amResults = await this.webService.eingangsrechnungenLivesearch(interneBelegnummer);
} catch (err: unknown) {
const msg = `${interneBelegnummer}: Livesearch-Fehler`;
this.logger.error(`${msg}: ${err instanceof Error ? err.message : err}`);
result.errors.push(msg);
await this.delay(500);
continue;
}
if (amResults.length === 0) {
this.logger.log(`${interneBelegnummer} nicht in Agrarmonitor gefunden`);
result.skipped++;
await this.delay(500);
continue;
}
if (amResults.length > 1) {
const msg = `${interneBelegnummer}: Mehrfach gefunden`;
this.logger.error(msg);
result.errors.push(msg);
await this.delay(500);
continue;
}
const amDoc = amResults[0];
if (!amDoc.interneBelegNummer && interneBelegnummer) {
await this.webService.setLieferscheinNummer(amDoc.eingangId, interneBelegnummer);
}
if (!amDoc.eingangsDatum) {
const eingangsdatumField = ((doc.custom_fields as any[]) ?? []).find(
(cf: any) => cf.field === EINGANGSDATUM_FIELD_ID,
);
if (eingangsdatumField?.value) {
const eingangsdatum = new Date(eingangsdatumField.value as string);
if (!isNaN(eingangsdatum.getTime())) {
await this.webService.setEingangsdatum(amDoc.eingangId, eingangsdatum);
this.logger.log(`Eingangsdatum für ${interneBelegnummer} gesetzt`);
}
}
} else if (amDoc.buchungsDatum) {
try {
let correspondentId: number | undefined;
const customer = customers.find((c) => Number(c.id) === amDoc.kundenId);
if (customer) {
const lieferantennummer = (customer['lieferantennummer'] as string) ?? '';
const searchName = `(${lieferantennummer})`;
const displayName = this.buildCustomerName(customer, lieferantennummer);
let corr = await this.paperlessService.getCorrespondentByName(searchName);
if (!corr) {
corr = await this.paperlessService.addCorrespondent({
name: displayName,
match: '',
matching_algorithm: 0,
is_insensitive: true,
owner: null,
});
}
if (corr) correspondentId = corr.id as number;
}
let ownerId: number | undefined;
const matchedClient = await this.clientRepo.findOneBy({
AgrarmonitorBetriebId: amDoc.betriebId,
});
if (matchedClient) ownerId = matchedClient.PaperlessUserId;
const currentTags: number[] = (doc.tags as number[]) ?? [];
const newTags = currentTags.filter((t) => t !== tagFertigId).concat([tagVerbuchtId]);
const updateData: Record<string, any> = { tags: newTags };
if (correspondentId !== undefined) updateData.correspondent = correspondentId;
if (ownerId !== undefined) updateData.owner = ownerId;
await this.paperlessService.updateDocument(doc.id as number, updateData);
this.logger.log(`Beleg ${interneBelegnummer} gebucht`);
result.updated++;
} catch (err: unknown) {
const msg = `${interneBelegnummer}: Update-Fehler`;
this.logger.error(`${msg}: ${err instanceof Error ? err.message : err}`);
result.errors.push(msg);
}
} else {
result.skipped++;
}
await this.delay(500);
}
this.logger.log(
`Polling abgeschlossen: ${result.processed} verarbeitet, ${result.updated} aktualisiert, ` +
`${result.skipped} übersprungen, ${result.errors.length} Fehler`,
);
return result;
}
private buildCustomerName(customer: Record<string, unknown>, nummer: string): string {
const firma = (customer['firma'] as string) ?? '';
const nachname = (customer['nachname'] as string) ?? '';
const vorname = (customer['vorname'] as string) ?? '';
const name = firma || (nachname + (vorname ? ', ' + vorname : ''));
return `${name} (${nummer})`;
}
private delay(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
private async upsertSetting(tag: string, defaultValue: string): Promise<void> {
const existing = await this.settingRepo.findOneBy({ Tag: tag });
if (!existing) {
await this.settingRepo.save(
this.settingRepo.create({ Typ: 1, Wert: defaultValue, Tag: tag }),
);
}
}
}