feat(import): ERP-Sync in DB dokumentieren + Startup-Catch-up
Jeder ERPframe-Import (Scheduler, Startup, manuell) wird in der neuen Tabelle erp_sync_runs protokolliert (target_date, trigger, success, Zaehler, Fehler). Beim Serverstart synct das Backend das Zieldatum (heute + offset, i.d.R. morgen) nach, falls dafuer noch kein erfolgreicher Lauf dokumentiert ist — deckt Erststart UND laengere Unterbrechungen ab, bei denen der Cron-Zeitpunkt verpasst wurde. Gated ueber [import] enabled. - Migration 0030_erp_sync_runs - Port SyncRunRepository (+ SyncRunRecord, SyncTrigger) - Adapter PgSyncRunRepository - ImportErpToursUseCase dokumentiert jeden Lauf; neues execute_with(date, trigger) - main.rs: Repo verdrahtet, Scheduler-Trigger, Startup-Catch-up (tokio::spawn) Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@ -46,6 +46,7 @@ use holzleitner_application::usecases::{
|
||||
UpdateServiceUseCase, UploadDeliveryNoteImageUseCase,
|
||||
};
|
||||
use holzleitner_application::ports::DeliveryReportJobRepository;
|
||||
use holzleitner_application::ports::{SyncRunRepository, SyncTrigger};
|
||||
use holzleitner_infrastructure::auth::{
|
||||
KeycloakAdapterConfig, KeycloakAdminClient, KeycloakAdminConfig, KeycloakAuthService,
|
||||
};
|
||||
@ -60,7 +61,7 @@ use holzleitner_infrastructure::persistence::{
|
||||
PgAccountRepository, PgAttachmentRepository, PgCarRepository, PgDeliveryCompletionRepository,
|
||||
PgDeliveryCreditRepository, PgDeliveryNoteRepository, PgDeliveryReportJobRepository,
|
||||
PgDeliveryRepository, PgDeliveryServiceRepository, PgPaymentMethodRepository, PgScanRepository,
|
||||
PgServiceRepository, PgTourRepository, PoolConfig, connect_and_migrate,
|
||||
PgServiceRepository, PgSyncRunRepository, PgTourRepository, PoolConfig, connect_and_migrate,
|
||||
};
|
||||
use holzleitner_infrastructure::storage::{LocalAttachmentStorage, LocalSignatureStorage};
|
||||
use tokio_cron_scheduler::{Job, JobScheduler};
|
||||
@ -214,6 +215,7 @@ pub(crate) async fn run_app(
|
||||
let delivery_service_repository = Arc::new(PgDeliveryServiceRepository::new(pool.clone()));
|
||||
let report_repository = Arc::new(PgDeliveryReportRepository::new(pool.clone()));
|
||||
let report_job_repository = Arc::new(PgDeliveryReportJobRepository::new(pool.clone()));
|
||||
let sync_run_repository = Arc::new(PgSyncRunRepository::new(pool.clone()));
|
||||
|
||||
// --- Lokaler Unterschriften-Speicher (Dateisystem) -----------------
|
||||
let signature_storage = Arc::new(
|
||||
@ -333,6 +335,7 @@ pub(crate) async fn run_app(
|
||||
erp_source,
|
||||
sync_tour.clone(),
|
||||
driver_provisioner,
|
||||
sync_run_repository.clone(),
|
||||
));
|
||||
// DEV-ONLY: überschreibender Resync (löscht Postgres-Tourdaten + Import).
|
||||
// Wird immer gebaut, der Endpoint aber nur bei dev.sync_enabled gemountet.
|
||||
@ -522,7 +525,7 @@ pub(crate) async fn run_app(
|
||||
let import = import.clone();
|
||||
Box::pin(async move {
|
||||
let date = (chrono::Utc::now() + chrono::Duration::days(offset)).date_naive();
|
||||
match import.execute(date).await {
|
||||
match import.execute_with(date, SyncTrigger::Scheduler).await {
|
||||
Ok(summary) => tracing::info!(
|
||||
date = %summary.date,
|
||||
total = summary.tours_total,
|
||||
@ -542,6 +545,48 @@ pub(crate) async fn run_app(
|
||||
offset_days = cfg.import.date_offset_days,
|
||||
"erp_import scheduler gestartet"
|
||||
);
|
||||
|
||||
// Startup-Catch-up: beim Serverstart nachsynchronisieren, falls für das
|
||||
// Zieldatum (heute + offset, i. d. R. morgen) noch KEIN erfolgreicher
|
||||
// Lauf dokumentiert ist. Deckt Erststart UND längere Unterbrechungen ab,
|
||||
// bei denen der Cron-Zeitpunkt (03:00) verpasst wurde. Läuft im
|
||||
// Hintergrund, damit der Serverstart nicht am (evtl. langsamen oder
|
||||
// abwesenden) ERP hängt.
|
||||
{
|
||||
let import = import_erp_tours.clone();
|
||||
let sync_runs = sync_run_repository.clone();
|
||||
let offset = cfg.import.date_offset_days;
|
||||
tokio::spawn(async move {
|
||||
let date = (chrono::Utc::now() + chrono::Duration::days(offset)).date_naive();
|
||||
match sync_runs.has_successful_run_for(date).await {
|
||||
Ok(true) => tracing::info!(
|
||||
%date,
|
||||
"erp_import.catchup: bereits erfolgreich gesynct — übersprungen"
|
||||
),
|
||||
Ok(false) => {
|
||||
tracing::info!(%date, "erp_import.catchup: kein Sync vorhanden — synchronisiere nach");
|
||||
match import.execute_with(date, SyncTrigger::Startup).await {
|
||||
Ok(s) => tracing::info!(
|
||||
date = %s.date,
|
||||
total = s.tours_total,
|
||||
ok = s.tours_ok,
|
||||
failed = s.tours_failed,
|
||||
"erp_import.catchup.done"
|
||||
),
|
||||
Err(e) => {
|
||||
tracing::error!(%date, error = %e, "erp_import.catchup.failed")
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => tracing::error!(
|
||||
%date,
|
||||
error = %e,
|
||||
"erp_import.catchup: Statusabfrage fehlgeschlagen"
|
||||
),
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Some(scheduler)
|
||||
} else {
|
||||
tracing::info!("erp_import deaktiviert (IMPORT_ENABLED!=true)");
|
||||
|
||||
@ -27,6 +27,7 @@ pub mod erp_delivery_writeback;
|
||||
pub mod scan_repository;
|
||||
pub mod service_repository;
|
||||
pub mod signature_storage;
|
||||
pub mod sync_run_repository;
|
||||
pub mod tour_repository;
|
||||
|
||||
pub use account_repository::AccountRepository;
|
||||
@ -59,4 +60,5 @@ pub use payment_method_repository::PaymentMethodRepository;
|
||||
pub use scan_repository::{ApplyScanOutcome, ScanRepository};
|
||||
pub use service_repository::ServiceRepository;
|
||||
pub use signature_storage::{SignatureRole, SignatureStorage};
|
||||
pub use sync_run_repository::{SyncRunRecord, SyncRunRepository, SyncTrigger};
|
||||
pub use tour_repository::TourRepository;
|
||||
|
||||
68
crates/application/src/ports/sync_run_repository.rs
Normal file
68
crates/application/src/ports/sync_run_repository.rs
Normal file
@ -0,0 +1,68 @@
|
||||
//! Port: Dokumentation der ERP-Import-Läufe (Sync mit ERPframe).
|
||||
//!
|
||||
//! Spiegelt die Tabelle `erp_sync_runs`. Jeder Import — vom täglichen
|
||||
//! Scheduler, vom Startup-Catch-up oder manuell (Admin-Endpunkt) — wird hier
|
||||
//! protokolliert. Zusätzlich Grundlage für den Startup-Catch-up: prüft, ob für
|
||||
//! ein Zieldatum bereits ein erfolgreicher Lauf existiert.
|
||||
//!
|
||||
//! Konkrete Impl (Postgres via sqlx) lebt in `holzleitner-infrastructure`.
|
||||
|
||||
use async_trait::async_trait;
|
||||
use chrono::NaiveDate;
|
||||
|
||||
use crate::error::ApplicationError;
|
||||
|
||||
/// Auslöser eines Import-Laufs — fürs Audit in der DB hinterlegt.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum SyncTrigger {
|
||||
/// Manuell über den Admin-Endpunkt (oder Dev-Resync).
|
||||
Manual,
|
||||
/// Täglicher Cron-Scheduler.
|
||||
Scheduler,
|
||||
/// Catch-up beim Serverstart (Erststart / nach längerer Unterbrechung).
|
||||
Startup,
|
||||
}
|
||||
|
||||
impl SyncTrigger {
|
||||
pub fn as_str(self) -> &'static str {
|
||||
match self {
|
||||
SyncTrigger::Manual => "manual",
|
||||
SyncTrigger::Scheduler => "scheduler",
|
||||
SyncTrigger::Startup => "startup",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Ein abgeschlossener Import-Lauf, wie er dokumentiert wird (eine Zeile in
|
||||
/// `erp_sync_runs`).
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SyncRunRecord {
|
||||
/// Tourdatum, für das gesynct wurde.
|
||||
pub target_date: NaiveDate,
|
||||
pub trigger: SyncTrigger,
|
||||
/// `true` = der Lauf lief durch (ERP gelesen + verarbeitet). Ein einzelner
|
||||
/// fehlerhafter Beleg (`tours_failed > 0`) zählt weiterhin als erfolgreicher
|
||||
/// Lauf. `false` = der Lauf scheiterte komplett (z. B. ERP nicht
|
||||
/// erreichbar), dann ist `error` gesetzt.
|
||||
pub success: bool,
|
||||
pub tours_total: i32,
|
||||
pub tours_ok: i32,
|
||||
pub tours_failed: i32,
|
||||
pub drivers_provisioned: i32,
|
||||
/// Fehlertext bei `success = false`.
|
||||
pub error: Option<String>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait SyncRunRepository: Send + Sync {
|
||||
/// Schreibt die Dokumentation eines (abgeschlossenen oder gescheiterten)
|
||||
/// Laufs.
|
||||
async fn record(&self, run: SyncRunRecord) -> Result<(), ApplicationError>;
|
||||
|
||||
/// `true`, wenn für `date` bereits ein **erfolgreicher** Lauf dokumentiert
|
||||
/// ist. Basis für den Startup-Catch-up ("kein Sync für morgen ⇒ syncen").
|
||||
async fn has_successful_run_for(
|
||||
&self,
|
||||
date: NaiveDate,
|
||||
) -> Result<bool, ApplicationError>;
|
||||
}
|
||||
@ -4,7 +4,9 @@ use chrono::NaiveDate;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::error::ApplicationError;
|
||||
use crate::ports::{DriverIdentityProvisioner, ErpDeliverySource};
|
||||
use crate::ports::{
|
||||
DriverIdentityProvisioner, ErpDeliverySource, SyncRunRecord, SyncRunRepository, SyncTrigger,
|
||||
};
|
||||
use crate::usecases::SyncTourUseCase;
|
||||
|
||||
/// Ergebnis eines Import-Laufs — pro Fahrer-Tour Erfolg/Fehler getrennt,
|
||||
@ -45,6 +47,9 @@ pub struct ImportErpToursUseCase {
|
||||
/// Optionaler Identity-Provisioner (Keycloak). `None` ⇒ Konto-Anlage
|
||||
/// deaktiviert (`KEYCLOAK_PROVISIONING_ENABLED=false`).
|
||||
provisioner: Option<Arc<dyn DriverIdentityProvisioner>>,
|
||||
/// Dokumentiert jeden Lauf in `erp_sync_runs` (Audit + Basis für den
|
||||
/// Startup-Catch-up „kein Sync für morgen ⇒ syncen").
|
||||
sync_runs: Arc<dyn SyncRunRepository>,
|
||||
}
|
||||
|
||||
impl ImportErpToursUseCase {
|
||||
@ -52,15 +57,62 @@ impl ImportErpToursUseCase {
|
||||
source: Arc<dyn ErpDeliverySource>,
|
||||
sync_tour: Arc<SyncTourUseCase>,
|
||||
provisioner: Option<Arc<dyn DriverIdentityProvisioner>>,
|
||||
sync_runs: Arc<dyn SyncRunRepository>,
|
||||
) -> Self {
|
||||
Self {
|
||||
source,
|
||||
sync_tour,
|
||||
provisioner,
|
||||
sync_runs,
|
||||
}
|
||||
}
|
||||
|
||||
/// Import mit manuellem Auslöser (Admin-Endpunkt / Dev-Resync).
|
||||
pub async fn execute(&self, date: NaiveDate) -> Result<ImportSummary, ApplicationError> {
|
||||
self.execute_with(date, SyncTrigger::Manual).await
|
||||
}
|
||||
|
||||
/// Import mit explizitem Auslöser. Dokumentiert den Lauf in JEDEM Fall
|
||||
/// (Erfolg ODER kompletter Fehlschlag) in `erp_sync_runs`. Best-effort: ein
|
||||
/// Schreibfehler bei der Dokumentation verändert den Import-Ausgang nicht.
|
||||
pub async fn execute_with(
|
||||
&self,
|
||||
date: NaiveDate,
|
||||
trigger: SyncTrigger,
|
||||
) -> Result<ImportSummary, ApplicationError> {
|
||||
let result = self.run(date).await;
|
||||
|
||||
let record = match &result {
|
||||
Ok(s) => SyncRunRecord {
|
||||
target_date: date,
|
||||
trigger,
|
||||
success: true,
|
||||
tours_total: s.tours_total as i32,
|
||||
tours_ok: s.tours_ok as i32,
|
||||
tours_failed: s.tours_failed as i32,
|
||||
drivers_provisioned: s.drivers_provisioned as i32,
|
||||
error: None,
|
||||
},
|
||||
Err(e) => SyncRunRecord {
|
||||
target_date: date,
|
||||
trigger,
|
||||
success: false,
|
||||
tours_total: 0,
|
||||
tours_ok: 0,
|
||||
tours_failed: 0,
|
||||
drivers_provisioned: 0,
|
||||
error: Some(e.to_string()),
|
||||
},
|
||||
};
|
||||
// `application` loggt bewusst nicht selbst → Doku-Schreibfehler still
|
||||
// verschlucken (das eigentliche Import-Ergebnis hat Vorrang).
|
||||
let _ = self.sync_runs.record(record).await;
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
/// Eigentlicher Import-Lauf (ohne Dokumentation).
|
||||
async fn run(&self, date: NaiveDate) -> Result<ImportSummary, ApplicationError> {
|
||||
let tours = self.source.fetch_tours_for_date(date).await?;
|
||||
let tours_total = tours.len();
|
||||
let mut tours_ok = 0usize;
|
||||
|
||||
@ -17,6 +17,7 @@ pub mod payment_method_repository;
|
||||
pub mod pool;
|
||||
pub mod scan_repository;
|
||||
pub mod service_repository;
|
||||
pub mod sync_run_repository;
|
||||
pub mod tour_repository;
|
||||
|
||||
pub use account_repository::PgAccountRepository;
|
||||
@ -32,4 +33,5 @@ pub use payment_method_repository::PgPaymentMethodRepository;
|
||||
pub use pool::{connect_and_migrate, PoolConfig};
|
||||
pub use scan_repository::PgScanRepository;
|
||||
pub use service_repository::PgServiceRepository;
|
||||
pub use sync_run_repository::PgSyncRunRepository;
|
||||
pub use tour_repository::PgTourRepository;
|
||||
|
||||
60
crates/infrastructure/src/persistence/sync_run_repository.rs
Normal file
60
crates/infrastructure/src/persistence/sync_run_repository.rs
Normal file
@ -0,0 +1,60 @@
|
||||
//! Postgres-Implementierung von `SyncRunRepository` (Tabelle `erp_sync_runs`).
|
||||
//!
|
||||
//! Reines Append-/Audit-Log der ERP-Import-Läufe + die Catch-up-Abfrage
|
||||
//! „gibt es schon einen erfolgreichen Sync für Datum X?".
|
||||
|
||||
use async_trait::async_trait;
|
||||
use chrono::NaiveDate;
|
||||
use sqlx::PgPool;
|
||||
|
||||
use holzleitner_application::error::ApplicationError;
|
||||
use holzleitner_application::ports::{SyncRunRecord, SyncRunRepository};
|
||||
|
||||
pub struct PgSyncRunRepository {
|
||||
pool: PgPool,
|
||||
}
|
||||
|
||||
impl PgSyncRunRepository {
|
||||
pub fn new(pool: PgPool) -> Self {
|
||||
Self { pool }
|
||||
}
|
||||
}
|
||||
|
||||
fn db<E: std::fmt::Display>(e: E) -> ApplicationError {
|
||||
ApplicationError::Repository(e.to_string())
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl SyncRunRepository for PgSyncRunRepository {
|
||||
async fn record(&self, run: SyncRunRecord) -> Result<(), ApplicationError> {
|
||||
sqlx::query(
|
||||
"INSERT INTO erp_sync_runs \
|
||||
(target_date, trigger, success, tours_total, tours_ok, tours_failed, \
|
||||
drivers_provisioned, error) \
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
|
||||
)
|
||||
.bind(run.target_date)
|
||||
.bind(run.trigger.as_str())
|
||||
.bind(run.success)
|
||||
.bind(run.tours_total)
|
||||
.bind(run.tours_ok)
|
||||
.bind(run.tours_failed)
|
||||
.bind(run.drivers_provisioned)
|
||||
.bind(run.error)
|
||||
.execute(&self.pool)
|
||||
.await
|
||||
.map_err(db)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn has_successful_run_for(&self, date: NaiveDate) -> Result<bool, ApplicationError> {
|
||||
let exists: bool = sqlx::query_scalar(
|
||||
"SELECT EXISTS(SELECT 1 FROM erp_sync_runs WHERE target_date = $1 AND success)",
|
||||
)
|
||||
.bind(date)
|
||||
.fetch_one(&self.pool)
|
||||
.await
|
||||
.map_err(db)?;
|
||||
Ok(exists)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user