diff --git a/crates/api/src/main.rs b/crates/api/src/main.rs index 5bdf893..4267016 100644 --- a/crates/api/src/main.rs +++ b/crates/api/src/main.rs @@ -46,6 +46,7 @@ use holzleitner_application::usecases::{ UpdateServiceUseCase, UploadDeliveryNoteImageUseCase, }; use holzleitner_application::ports::DeliveryReportJobRepository; +use holzleitner_application::ports::{SyncRunRepository, SyncTrigger}; use holzleitner_infrastructure::auth::{ KeycloakAdapterConfig, KeycloakAdminClient, KeycloakAdminConfig, KeycloakAuthService, }; @@ -60,7 +61,7 @@ use holzleitner_infrastructure::persistence::{ PgAccountRepository, PgAttachmentRepository, PgCarRepository, PgDeliveryCompletionRepository, PgDeliveryCreditRepository, PgDeliveryNoteRepository, PgDeliveryReportJobRepository, PgDeliveryRepository, PgDeliveryServiceRepository, PgPaymentMethodRepository, PgScanRepository, - PgServiceRepository, PgTourRepository, PoolConfig, connect_and_migrate, + PgServiceRepository, PgSyncRunRepository, PgTourRepository, PoolConfig, connect_and_migrate, }; use holzleitner_infrastructure::storage::{LocalAttachmentStorage, LocalSignatureStorage}; use tokio_cron_scheduler::{Job, JobScheduler}; @@ -214,6 +215,7 @@ pub(crate) async fn run_app( let delivery_service_repository = Arc::new(PgDeliveryServiceRepository::new(pool.clone())); let report_repository = Arc::new(PgDeliveryReportRepository::new(pool.clone())); let report_job_repository = Arc::new(PgDeliveryReportJobRepository::new(pool.clone())); + let sync_run_repository = Arc::new(PgSyncRunRepository::new(pool.clone())); // --- Lokaler Unterschriften-Speicher (Dateisystem) ----------------- let signature_storage = Arc::new( @@ -333,6 +335,7 @@ pub(crate) async fn run_app( erp_source, sync_tour.clone(), driver_provisioner, + sync_run_repository.clone(), )); // DEV-ONLY: überschreibender Resync (löscht Postgres-Tourdaten + Import). // Wird immer gebaut, der Endpoint aber nur bei dev.sync_enabled gemountet. @@ -522,7 +525,7 @@ pub(crate) async fn run_app( let import = import.clone(); Box::pin(async move { let date = (chrono::Utc::now() + chrono::Duration::days(offset)).date_naive(); - match import.execute(date).await { + match import.execute_with(date, SyncTrigger::Scheduler).await { Ok(summary) => tracing::info!( date = %summary.date, total = summary.tours_total, @@ -542,6 +545,48 @@ pub(crate) async fn run_app( offset_days = cfg.import.date_offset_days, "erp_import scheduler gestartet" ); + + // Startup-Catch-up: beim Serverstart nachsynchronisieren, falls für das + // Zieldatum (heute + offset, i. d. R. morgen) noch KEIN erfolgreicher + // Lauf dokumentiert ist. Deckt Erststart UND längere Unterbrechungen ab, + // bei denen der Cron-Zeitpunkt (03:00) verpasst wurde. Läuft im + // Hintergrund, damit der Serverstart nicht am (evtl. langsamen oder + // abwesenden) ERP hängt. + { + let import = import_erp_tours.clone(); + let sync_runs = sync_run_repository.clone(); + let offset = cfg.import.date_offset_days; + tokio::spawn(async move { + let date = (chrono::Utc::now() + chrono::Duration::days(offset)).date_naive(); + match sync_runs.has_successful_run_for(date).await { + Ok(true) => tracing::info!( + %date, + "erp_import.catchup: bereits erfolgreich gesynct — übersprungen" + ), + Ok(false) => { + tracing::info!(%date, "erp_import.catchup: kein Sync vorhanden — synchronisiere nach"); + match import.execute_with(date, SyncTrigger::Startup).await { + Ok(s) => tracing::info!( + date = %s.date, + total = s.tours_total, + ok = s.tours_ok, + failed = s.tours_failed, + "erp_import.catchup.done" + ), + Err(e) => { + tracing::error!(%date, error = %e, "erp_import.catchup.failed") + } + } + } + Err(e) => tracing::error!( + %date, + error = %e, + "erp_import.catchup: Statusabfrage fehlgeschlagen" + ), + } + }); + } + Some(scheduler) } else { tracing::info!("erp_import deaktiviert (IMPORT_ENABLED!=true)"); diff --git a/crates/application/src/ports/mod.rs b/crates/application/src/ports/mod.rs index 43a2195..8296da6 100644 --- a/crates/application/src/ports/mod.rs +++ b/crates/application/src/ports/mod.rs @@ -27,6 +27,7 @@ pub mod erp_delivery_writeback; pub mod scan_repository; pub mod service_repository; pub mod signature_storage; +pub mod sync_run_repository; pub mod tour_repository; pub use account_repository::AccountRepository; @@ -59,4 +60,5 @@ pub use payment_method_repository::PaymentMethodRepository; pub use scan_repository::{ApplyScanOutcome, ScanRepository}; pub use service_repository::ServiceRepository; pub use signature_storage::{SignatureRole, SignatureStorage}; +pub use sync_run_repository::{SyncRunRecord, SyncRunRepository, SyncTrigger}; pub use tour_repository::TourRepository; diff --git a/crates/application/src/ports/sync_run_repository.rs b/crates/application/src/ports/sync_run_repository.rs new file mode 100644 index 0000000..484b0b9 --- /dev/null +++ b/crates/application/src/ports/sync_run_repository.rs @@ -0,0 +1,68 @@ +//! Port: Dokumentation der ERP-Import-Läufe (Sync mit ERPframe). +//! +//! Spiegelt die Tabelle `erp_sync_runs`. Jeder Import — vom täglichen +//! Scheduler, vom Startup-Catch-up oder manuell (Admin-Endpunkt) — wird hier +//! protokolliert. Zusätzlich Grundlage für den Startup-Catch-up: prüft, ob für +//! ein Zieldatum bereits ein erfolgreicher Lauf existiert. +//! +//! Konkrete Impl (Postgres via sqlx) lebt in `holzleitner-infrastructure`. + +use async_trait::async_trait; +use chrono::NaiveDate; + +use crate::error::ApplicationError; + +/// Auslöser eines Import-Laufs — fürs Audit in der DB hinterlegt. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum SyncTrigger { + /// Manuell über den Admin-Endpunkt (oder Dev-Resync). + Manual, + /// Täglicher Cron-Scheduler. + Scheduler, + /// Catch-up beim Serverstart (Erststart / nach längerer Unterbrechung). + Startup, +} + +impl SyncTrigger { + pub fn as_str(self) -> &'static str { + match self { + SyncTrigger::Manual => "manual", + SyncTrigger::Scheduler => "scheduler", + SyncTrigger::Startup => "startup", + } + } +} + +/// Ein abgeschlossener Import-Lauf, wie er dokumentiert wird (eine Zeile in +/// `erp_sync_runs`). +#[derive(Debug, Clone)] +pub struct SyncRunRecord { + /// Tourdatum, für das gesynct wurde. + pub target_date: NaiveDate, + pub trigger: SyncTrigger, + /// `true` = der Lauf lief durch (ERP gelesen + verarbeitet). Ein einzelner + /// fehlerhafter Beleg (`tours_failed > 0`) zählt weiterhin als erfolgreicher + /// Lauf. `false` = der Lauf scheiterte komplett (z. B. ERP nicht + /// erreichbar), dann ist `error` gesetzt. + pub success: bool, + pub tours_total: i32, + pub tours_ok: i32, + pub tours_failed: i32, + pub drivers_provisioned: i32, + /// Fehlertext bei `success = false`. + pub error: Option, +} + +#[async_trait] +pub trait SyncRunRepository: Send + Sync { + /// Schreibt die Dokumentation eines (abgeschlossenen oder gescheiterten) + /// Laufs. + async fn record(&self, run: SyncRunRecord) -> Result<(), ApplicationError>; + + /// `true`, wenn für `date` bereits ein **erfolgreicher** Lauf dokumentiert + /// ist. Basis für den Startup-Catch-up ("kein Sync für morgen ⇒ syncen"). + async fn has_successful_run_for( + &self, + date: NaiveDate, + ) -> Result; +} diff --git a/crates/application/src/usecases/import_erp_tours.rs b/crates/application/src/usecases/import_erp_tours.rs index 0b198ec..53705c8 100644 --- a/crates/application/src/usecases/import_erp_tours.rs +++ b/crates/application/src/usecases/import_erp_tours.rs @@ -4,7 +4,9 @@ use chrono::NaiveDate; use serde::{Deserialize, Serialize}; use crate::error::ApplicationError; -use crate::ports::{DriverIdentityProvisioner, ErpDeliverySource}; +use crate::ports::{ + DriverIdentityProvisioner, ErpDeliverySource, SyncRunRecord, SyncRunRepository, SyncTrigger, +}; use crate::usecases::SyncTourUseCase; /// Ergebnis eines Import-Laufs — pro Fahrer-Tour Erfolg/Fehler getrennt, @@ -45,6 +47,9 @@ pub struct ImportErpToursUseCase { /// Optionaler Identity-Provisioner (Keycloak). `None` ⇒ Konto-Anlage /// deaktiviert (`KEYCLOAK_PROVISIONING_ENABLED=false`). provisioner: Option>, + /// Dokumentiert jeden Lauf in `erp_sync_runs` (Audit + Basis für den + /// Startup-Catch-up „kein Sync für morgen ⇒ syncen"). + sync_runs: Arc, } impl ImportErpToursUseCase { @@ -52,15 +57,62 @@ impl ImportErpToursUseCase { source: Arc, sync_tour: Arc, provisioner: Option>, + sync_runs: Arc, ) -> Self { Self { source, sync_tour, provisioner, + sync_runs, } } + /// Import mit manuellem Auslöser (Admin-Endpunkt / Dev-Resync). pub async fn execute(&self, date: NaiveDate) -> Result { + self.execute_with(date, SyncTrigger::Manual).await + } + + /// Import mit explizitem Auslöser. Dokumentiert den Lauf in JEDEM Fall + /// (Erfolg ODER kompletter Fehlschlag) in `erp_sync_runs`. Best-effort: ein + /// Schreibfehler bei der Dokumentation verändert den Import-Ausgang nicht. + pub async fn execute_with( + &self, + date: NaiveDate, + trigger: SyncTrigger, + ) -> Result { + let result = self.run(date).await; + + let record = match &result { + Ok(s) => SyncRunRecord { + target_date: date, + trigger, + success: true, + tours_total: s.tours_total as i32, + tours_ok: s.tours_ok as i32, + tours_failed: s.tours_failed as i32, + drivers_provisioned: s.drivers_provisioned as i32, + error: None, + }, + Err(e) => SyncRunRecord { + target_date: date, + trigger, + success: false, + tours_total: 0, + tours_ok: 0, + tours_failed: 0, + drivers_provisioned: 0, + error: Some(e.to_string()), + }, + }; + // `application` loggt bewusst nicht selbst → Doku-Schreibfehler still + // verschlucken (das eigentliche Import-Ergebnis hat Vorrang). + let _ = self.sync_runs.record(record).await; + + result + } + + /// Eigentlicher Import-Lauf (ohne Dokumentation). + async fn run(&self, date: NaiveDate) -> Result { let tours = self.source.fetch_tours_for_date(date).await?; let tours_total = tours.len(); let mut tours_ok = 0usize; diff --git a/crates/infrastructure/src/persistence/mod.rs b/crates/infrastructure/src/persistence/mod.rs index 8a5f39b..69005c8 100644 --- a/crates/infrastructure/src/persistence/mod.rs +++ b/crates/infrastructure/src/persistence/mod.rs @@ -17,6 +17,7 @@ pub mod payment_method_repository; pub mod pool; pub mod scan_repository; pub mod service_repository; +pub mod sync_run_repository; pub mod tour_repository; pub use account_repository::PgAccountRepository; @@ -32,4 +33,5 @@ pub use payment_method_repository::PgPaymentMethodRepository; pub use pool::{connect_and_migrate, PoolConfig}; pub use scan_repository::PgScanRepository; pub use service_repository::PgServiceRepository; +pub use sync_run_repository::PgSyncRunRepository; pub use tour_repository::PgTourRepository; diff --git a/crates/infrastructure/src/persistence/sync_run_repository.rs b/crates/infrastructure/src/persistence/sync_run_repository.rs new file mode 100644 index 0000000..a9567b9 --- /dev/null +++ b/crates/infrastructure/src/persistence/sync_run_repository.rs @@ -0,0 +1,60 @@ +//! Postgres-Implementierung von `SyncRunRepository` (Tabelle `erp_sync_runs`). +//! +//! Reines Append-/Audit-Log der ERP-Import-Läufe + die Catch-up-Abfrage +//! „gibt es schon einen erfolgreichen Sync für Datum X?". + +use async_trait::async_trait; +use chrono::NaiveDate; +use sqlx::PgPool; + +use holzleitner_application::error::ApplicationError; +use holzleitner_application::ports::{SyncRunRecord, SyncRunRepository}; + +pub struct PgSyncRunRepository { + pool: PgPool, +} + +impl PgSyncRunRepository { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +fn db(e: E) -> ApplicationError { + ApplicationError::Repository(e.to_string()) +} + +#[async_trait] +impl SyncRunRepository for PgSyncRunRepository { + async fn record(&self, run: SyncRunRecord) -> Result<(), ApplicationError> { + sqlx::query( + "INSERT INTO erp_sync_runs \ + (target_date, trigger, success, tours_total, tours_ok, tours_failed, \ + drivers_provisioned, error) \ + VALUES ($1, $2, $3, $4, $5, $6, $7, $8)", + ) + .bind(run.target_date) + .bind(run.trigger.as_str()) + .bind(run.success) + .bind(run.tours_total) + .bind(run.tours_ok) + .bind(run.tours_failed) + .bind(run.drivers_provisioned) + .bind(run.error) + .execute(&self.pool) + .await + .map_err(db)?; + Ok(()) + } + + async fn has_successful_run_for(&self, date: NaiveDate) -> Result { + let exists: bool = sqlx::query_scalar( + "SELECT EXISTS(SELECT 1 FROM erp_sync_runs WHERE target_date = $1 AND success)", + ) + .bind(date) + .fetch_one(&self.pool) + .await + .map_err(db)?; + Ok(exists) + } +} diff --git a/migrations/0030_erp_sync_runs.sql b/migrations/0030_erp_sync_runs.sql new file mode 100644 index 0000000..a2cd4a3 --- /dev/null +++ b/migrations/0030_erp_sync_runs.sql @@ -0,0 +1,40 @@ +-- 0030_erp_sync_runs.sql +-- +-- Dokumentation der ERP-Import-Läufe (Sync mit ERPframe). Jeder Lauf — egal ob +-- vom täglichen Scheduler, vom Startup-Catch-up oder manuell über den +-- Admin-Endpunkt (`POST /admin/import-erp`) — hinterlässt hier eine Zeile. +-- Zweck: Audit/Nachvollziehbarkeit ("wann wurde für welches Tourdatum gesynct, +-- mit welchem Ergebnis"). +-- +-- Zusätzlich Grundlage für den Startup-Catch-up: beim Serverstart (Erststart +-- ODER nach längerer Unterbrechung, wenn der Cron-Zeitpunkt verpasst wurde) +-- prüft das Backend, ob für das Zieldatum (morgen) bereits ein ERFOLGREICHER +-- Lauf dokumentiert ist — wenn nicht, synct es nach. + +CREATE TABLE erp_sync_runs ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + -- Tourdatum, für das gesynct wurde (NICHT der Zeitpunkt des Laufs). + target_date date NOT NULL, + -- Auslöser: 'scheduler' | 'startup' | 'manual'. + trigger text NOT NULL, + -- true = Lauf lief durch (ERP gelesen + verarbeitet); ein einzelner + -- fehlerhafter Beleg (tours_failed > 0) zählt weiterhin als erfolgreicher + -- Lauf. false = Lauf komplett gescheitert (z. B. ERP nicht erreichbar), + -- dann ist `error` gesetzt. + success boolean NOT NULL, + tours_total integer NOT NULL DEFAULT 0, + tours_ok integer NOT NULL DEFAULT 0, + tours_failed integer NOT NULL DEFAULT 0, + drivers_provisioned integer NOT NULL DEFAULT 0, + error text, + created_at timestamptz NOT NULL DEFAULT now() +); + +-- Schnelle Abfrage „gibt es einen ERFOLGREICHEN Sync für Datum X?" (Catch-up). +CREATE INDEX erp_sync_runs_date_success + ON erp_sync_runs (target_date) + WHERE success; + +-- Chronologische Sicht fürs Audit (neueste zuerst). +CREATE INDEX erp_sync_runs_created_at + ON erp_sync_runs (created_at DESC);