//! Postgres-Implementierung des `TourRepository`-Ports. //! //! Drei Operationen, getrennt umgesetzt: //! * `find_today_for_driver` — eine Query (Tour + Lieferzahl pro Tour). //! * `find_details_by_id` — pro Aggregat ein paar gezielte Queries, //! anschließend in-memory zusammenbauen. Bewusst keine eine-Big-Join- //! Query: das vervielfacht Daten über die Leitung, die wir clientseitig //! wieder deduplizieren müssten — und Postgres handhabt 5–7 kleine //! Prepared Statements im selben Pool effizient genug. //! * `upsert_from_sync` — eine Transaktion, idempotent per UPSERT auf //! den fachlichen Keys. Bestehende `scan_state`-Werte bleiben //! unangetastet: das ERP weiß nichts davon und darf sie nicht //! überschreiben. use std::collections::HashMap; use async_trait::async_trait; use chrono::{DateTime, NaiveDate, Utc}; use sqlx::{PgPool, Postgres, Transaction}; use uuid::Uuid; use holzleitner_application::dto::{ DeliveryOrderEntry, DeliveryWithItems, SyncDelivery, SyncDeliveryItem, SyncTourRequest, TourDetails, TourSummary, }; use holzleitner_application::error::ApplicationError; use holzleitner_application::ports::TourRepository; use holzleitner_domain::{ Address, Article, Customer, CustomerContact, Delivery, DeliveryItem, DeliveryNote, DeliveryState, ScanState, ScanStatus, Tour, Warehouse, }; pub struct PgTourRepository { pool: PgPool, } impl PgTourRepository { pub fn new(pool: PgPool) -> Self { Self { pool } } } // ===== Row-Typen ========================================================= // // Eigene FromRow-Strukturen in der Infrastructure halten die Domain-Typen // frei von `sqlx`-Traits. Mapping in private Helfer. #[derive(sqlx::FromRow)] struct TourRow { id: Uuid, account_id: i64, tour_date: NaiveDate, synced_at: DateTime, } #[derive(sqlx::FromRow)] struct TourSummaryRow { id: Uuid, tour_date: NaiveDate, delivery_count: i64, } #[derive(sqlx::FromRow)] struct DeliveryRow { id: Uuid, tour_id: Uuid, erp_belegart_id: i64, erp_belegnummer: String, customer_id: Uuid, snap_street: String, snap_house_number: String, snap_postal_code: String, snap_city: String, snap_country: String, assigned_car_id: Option, desired_time: Option, special_agreements: Option, state: String, state_reason: Option, sort_order: i32, } #[derive(sqlx::FromRow)] struct DeliveryItemRow { id: Uuid, delivery_id: Uuid, article_id: Uuid, required_quantity: i32, warehouse_id: Uuid, belegzeilen_nr: i32, komponenten_artikel_nr: Option, scanned_quantity: i32, scan_status: String, held_reason: Option, scan_last_updated_at: DateTime, } #[derive(sqlx::FromRow)] struct CustomerRow { id: Uuid, erp_customer_id: i64, name: String, street: String, house_number: String, postal_code: String, city: String, country: String, } #[derive(sqlx::FromRow)] struct CustomerContactRow { id: Uuid, customer_id: Uuid, name: String, phone: Option, email: Option, } #[derive(sqlx::FromRow)] struct ArticleRow { id: Uuid, article_number: String, name: String, scannable: bool, default_warehouse_id: Option, } #[derive(sqlx::FromRow)] struct WarehouseRow { id: Uuid, code: String, name: String, is_standard: bool, } #[derive(sqlx::FromRow)] struct ContactLinkRow { delivery_id: Uuid, customer_contact_id: Uuid, } #[derive(sqlx::FromRow)] struct DeliveryNoteRow { id: Uuid, delivery_id: Uuid, text: Option, image_attachment: Option, author_personalnummer: i64, author_car_id: Option, created_at: DateTime, } // ===== Mapping Row -> Domain ============================================= fn map_tour(row: TourRow) -> Tour { Tour { id: row.id, account_id: row.account_id, date: row.tour_date, synced_at: row.synced_at, } } fn parse_delivery_state(value: &str) -> Result { match value { "active" => Ok(DeliveryState::Active), "held" => Ok(DeliveryState::Held), "canceled" => Ok(DeliveryState::Canceled), "completed" => Ok(DeliveryState::Completed), other => Err(ApplicationError::Repository(format!( "unknown delivery state '{other}'" ))), } } fn parse_scan_status(value: &str) -> Result { match value { "in_progress" => Ok(ScanStatus::InProgress), "done" => Ok(ScanStatus::Done), "held" => Ok(ScanStatus::Held), "removed" => Ok(ScanStatus::Removed), other => Err(ApplicationError::Repository(format!( "unknown scan status '{other}'" ))), } } fn map_item(row: DeliveryItemRow) -> Result { Ok(DeliveryItem { id: row.id, delivery_id: row.delivery_id, article_id: row.article_id, required_quantity: row.required_quantity, warehouse_id: row.warehouse_id, belegzeilen_nr: row.belegzeilen_nr, komponenten_artikel_nr: row.komponenten_artikel_nr, scan_state: ScanState { scanned_quantity: row.scanned_quantity, status: parse_scan_status(&row.scan_status)?, held_reason: row.held_reason, last_updated_at: row.scan_last_updated_at, }, }) } fn map_customer(row: CustomerRow) -> Customer { Customer { id: row.id, erp_customer_id: row.erp_customer_id, name: row.name, address: Address { street: row.street, house_number: row.house_number, postal_code: row.postal_code, city: row.city, country: row.country, }, } } fn map_contact(row: CustomerContactRow) -> CustomerContact { CustomerContact { id: row.id, customer_id: row.customer_id, name: row.name, phone: row.phone, email: row.email, } } fn map_article(row: ArticleRow) -> Article { Article { id: row.id, article_number: row.article_number, name: row.name, scannable: row.scannable, default_warehouse_id: row.default_warehouse_id, } } fn map_note(row: DeliveryNoteRow) -> DeliveryNote { DeliveryNote { id: row.id, delivery_id: row.delivery_id, text: row.text, image_attachment: row.image_attachment, author_personalnummer: row.author_personalnummer, author_car_id: row.author_car_id, created_at: row.created_at, } } fn map_warehouse(row: WarehouseRow) -> Warehouse { Warehouse { id: row.id, code: row.code, name: row.name, is_standard: row.is_standard, } } fn map_delivery( row: DeliveryRow, contact_person_ids: Vec, ) -> Result<(Delivery, i32), ApplicationError> { let state = parse_delivery_state(&row.state)?; let delivery = Delivery { id: row.id, tour_id: row.tour_id, erp_belegart_id: row.erp_belegart_id, erp_belegnummer: row.erp_belegnummer, customer_id: row.customer_id, delivery_address_snapshot: Address { street: row.snap_street, house_number: row.snap_house_number, postal_code: row.snap_postal_code, city: row.snap_city, country: row.snap_country, }, assigned_car_id: row.assigned_car_id, contact_person_ids, desired_time: row.desired_time, special_agreements: row.special_agreements, state, state_reason: row.state_reason, }; Ok((delivery, row.sort_order)) } // ===== Helfer: Error-Mapping ============================================= fn db(e: E) -> ApplicationError { ApplicationError::Repository(e.to_string()) } // ===== Trait-Implementierung ============================================= #[async_trait] impl TourRepository for PgTourRepository { async fn find_today_for_driver( &self, personalnummer: i64, today: NaiveDate, ) -> Result, ApplicationError> { let rows = sqlx::query_as::<_, TourSummaryRow>( r#" SELECT t.id, t.tour_date, COUNT(d.id) AS delivery_count FROM tours t LEFT JOIN deliveries d ON d.tour_id = t.id WHERE t.account_id = $1 AND t.tour_date = $2 GROUP BY t.id, t.tour_date ORDER BY t.tour_date "#, ) .bind(personalnummer) .bind(today) .fetch_all(&self.pool) .await .map_err(db)?; Ok(rows .into_iter() .map(|r| TourSummary { tour_id: r.id, tour_date: r.tour_date, delivery_count: r.delivery_count, }) .collect()) } async fn find_details_by_id( &self, tour_id: Uuid, ) -> Result, ApplicationError> { // 1. Tour selbst let Some(tour_row) = sqlx::query_as::<_, TourRow>( "SELECT id, account_id, tour_date, synced_at FROM tours WHERE id = $1", ) .bind(tour_id) .fetch_optional(&self.pool) .await .map_err(db)? else { return Ok(None); }; let tour = map_tour(tour_row); // 2. Lieferungen let delivery_rows = sqlx::query_as::<_, DeliveryRow>( r#" SELECT id, tour_id, erp_belegart_id, erp_belegnummer, customer_id, snap_street, snap_house_number, snap_postal_code, snap_city, snap_country, assigned_car_id, desired_time, special_agreements, state, state_reason, sort_order FROM deliveries WHERE tour_id = $1 ORDER BY sort_order, erp_belegnummer "#, ) .bind(tour_id) .fetch_all(&self.pool) .await .map_err(db)?; let delivery_ids: Vec = delivery_rows.iter().map(|d| d.id).collect(); // 3. Kontakt-Person-Verknüpfungen let contact_links = sqlx::query_as::<_, ContactLinkRow>( r#" SELECT delivery_id, customer_contact_id FROM delivery_contact_persons WHERE delivery_id = ANY($1) "#, ) .bind(&delivery_ids) .fetch_all(&self.pool) .await .map_err(db)?; let mut contacts_per_delivery: HashMap> = HashMap::new(); for link in contact_links { contacts_per_delivery .entry(link.delivery_id) .or_default() .push(link.customer_contact_id); } // 4. Positionen let item_rows = sqlx::query_as::<_, DeliveryItemRow>( r#" SELECT id, delivery_id, article_id, required_quantity, warehouse_id, belegzeilen_nr, komponenten_artikel_nr, scanned_quantity, scan_status, held_reason, scan_last_updated_at FROM delivery_items WHERE delivery_id = ANY($1) ORDER BY delivery_id, belegzeilen_nr, komponenten_artikel_nr NULLS FIRST "#, ) .bind(&delivery_ids) .fetch_all(&self.pool) .await .map_err(db)?; let mut items_per_delivery: HashMap> = HashMap::new(); let mut article_ids = std::collections::BTreeSet::new(); let mut warehouse_ids = std::collections::BTreeSet::new(); for row in item_rows { article_ids.insert(row.article_id); warehouse_ids.insert(row.warehouse_id); let delivery_id = row.delivery_id; let item = map_item(row)?; items_per_delivery .entry(delivery_id) .or_default() .push(item); } // 5. Lieferungen + Items kombinieren let mut customer_ids = std::collections::BTreeSet::new(); let mut deliveries = Vec::with_capacity(delivery_rows.len()); for row in delivery_rows { customer_ids.insert(row.customer_id); let delivery_id = row.id; let contact_ids = contacts_per_delivery.remove(&delivery_id).unwrap_or_default(); let items = items_per_delivery.remove(&delivery_id).unwrap_or_default(); let (delivery, sort_order) = map_delivery(row, contact_ids)?; deliveries.push(DeliveryWithItems { delivery, sort_order, items, }); } // 6. Lookup-Stammdaten let customer_ids_vec: Vec = customer_ids.into_iter().collect(); let customers = sqlx::query_as::<_, CustomerRow>( r#" SELECT id, erp_customer_id, name, street, house_number, postal_code, city, country FROM customers WHERE id = ANY($1) ORDER BY name "#, ) .bind(&customer_ids_vec) .fetch_all(&self.pool) .await .map_err(db)? .into_iter() .map(map_customer) .collect::>(); let customer_contacts = sqlx::query_as::<_, CustomerContactRow>( r#" SELECT id, customer_id, name, phone, email FROM customer_contacts WHERE customer_id = ANY($1) ORDER BY name "#, ) .bind(&customer_ids_vec) .fetch_all(&self.pool) .await .map_err(db)? .into_iter() .map(map_contact) .collect::>(); let article_ids_vec: Vec = article_ids.into_iter().collect(); let articles = sqlx::query_as::<_, ArticleRow>( r#" SELECT id, article_number, name, scannable, default_warehouse_id FROM articles WHERE id = ANY($1) ORDER BY article_number "#, ) .bind(&article_ids_vec) .fetch_all(&self.pool) .await .map_err(db)? .into_iter() .map(map_article) .collect::>(); let warehouse_ids_vec: Vec = warehouse_ids.into_iter().collect(); let warehouses = sqlx::query_as::<_, WarehouseRow>( r#" SELECT id, code, name, is_standard FROM warehouses WHERE id = ANY($1) ORDER BY code "#, ) .bind(&warehouse_ids_vec) .fetch_all(&self.pool) .await .map_err(db)? .into_iter() .map(map_warehouse) .collect::>(); // 7. Notizen aller Lieferungen dieser Tour. let notes = sqlx::query_as::<_, DeliveryNoteRow>( r#" SELECT id, delivery_id, text, image_attachment, author_personalnummer, author_car_id, created_at FROM delivery_notes WHERE delivery_id = ANY($1) ORDER BY delivery_id, created_at "#, ) .bind(&delivery_ids) .fetch_all(&self.pool) .await .map_err(db)? .into_iter() .map(map_note) .collect::>(); Ok(Some(TourDetails { tour, deliveries, customers, customer_contacts, articles, warehouses, notes, })) } async fn set_delivery_order( &self, tour_id: Uuid, delivery_ids: &[Uuid], ) -> Result, ApplicationError> { let mut tx = self.pool.begin().await.map_err(db)?; // 1. Lock alle Lieferungen dieser Tour. Liefert leer, wenn Tour // nicht existiert oder keine Lieferungen hat. let existing: Vec = sqlx::query_scalar( "SELECT id FROM deliveries WHERE tour_id = $1 FOR UPDATE", ) .bind(tour_id) .fetch_all(&mut *tx) .await .map_err(db)?; if existing.is_empty() { tx.rollback().await.map_err(db)?; return Err(ApplicationError::NotFound); } // 2. Mengen-Match: Input muss exakt der Tour entsprechen. let existing_set: std::collections::HashSet = existing.iter().copied().collect(); let input_set: std::collections::HashSet = delivery_ids.iter().copied().collect(); if existing_set != input_set { tx.rollback().await.map_err(db)?; let fremde: Vec = input_set.difference(&existing_set).copied().collect(); let fehlende: Vec = existing_set.difference(&input_set).copied().collect(); return Err(ApplicationError::Validation(format!( "delivery_ids match nicht zur tour (fehlende: {fehlende:?}, fremde: {fremde:?})" ))); } // 3. Bulk-Update via UNNEST. let positions: Vec = (1..=delivery_ids.len() as i32).collect(); sqlx::query( r#" UPDATE deliveries AS d SET sort_order = data.new_order FROM ( SELECT UNNEST($1::uuid[]) AS id, UNNEST($2::int[]) AS new_order ) AS data WHERE d.id = data.id "#, ) .bind(delivery_ids) .bind(&positions) .execute(&mut *tx) .await .map_err(db)?; tx.commit().await.map_err(db)?; Ok(delivery_ids .iter() .zip(positions.iter()) .map(|(id, pos)| DeliveryOrderEntry { delivery_id: *id, sort_order: *pos, }) .collect()) } async fn upsert_from_sync( &self, request: &SyncTourRequest, ) -> Result { let mut tx = self.pool.begin().await.map_err(db)?; // 1. Tour upserten — Identität: (account_id, tour_date) let tour_id: Uuid = sqlx::query_scalar( r#" INSERT INTO tours (account_id, tour_date) VALUES ($1, $2) ON CONFLICT (account_id, tour_date) DO UPDATE SET synced_at = now() RETURNING id "#, ) .bind(request.driver_personalnummer) .bind(request.tour_date) .fetch_one(&mut *tx) .await .map_err(db)?; for delivery in &request.deliveries { // 2. Kunde upserten — Identität: erp_customer_id let customer_id = upsert_customer(&mut tx, delivery).await?; // 3. Lieferung upserten — Identität: (belegart_id, belegnummer). // Bestehende Lieferung bleibt mit ihrem state/cancellation // erhalten; nur Stammdaten + sort_order werden refresht. let delivery_id = upsert_delivery(&mut tx, tour_id, customer_id, delivery).await?; for item in &delivery.items { let warehouse_id = upsert_warehouse(&mut tx, item).await?; let article_id = upsert_article(&mut tx, item, warehouse_id).await?; upsert_delivery_item(&mut tx, delivery_id, article_id, warehouse_id, item).await?; } } tx.commit().await.map_err(db)?; Ok(tour_id) } } // ===== Upsert-Helfer ===================================================== async fn upsert_warehouse( tx: &mut Transaction<'_, Postgres>, item: &SyncDeliveryItem, ) -> Result { let id: Uuid = sqlx::query_scalar( r#" INSERT INTO warehouses (code, name) VALUES ($1, $2) ON CONFLICT (code) DO UPDATE SET name = EXCLUDED.name RETURNING id "#, ) .bind(&item.warehouse_code) .bind(&item.warehouse_name) .fetch_one(&mut **tx) .await .map_err(db)?; Ok(id) } async fn upsert_article( tx: &mut Transaction<'_, Postgres>, item: &SyncDeliveryItem, fallback_warehouse_id: Uuid, ) -> Result { // Optional: explizites Default-Lager aus dem ERP. Wenn nicht // geliefert, nehmen wir das Lager dieser Position als Default — das // ist eine pragmatische Wahl, die wir später korrigieren können. let default_warehouse_id = if let Some(code) = &item.article_default_warehouse_code { sqlx::query_scalar::<_, Uuid>("SELECT id FROM warehouses WHERE code = $1") .bind(code) .fetch_optional(&mut **tx) .await .map_err(db)? .unwrap_or(fallback_warehouse_id) } else { fallback_warehouse_id }; let id: Uuid = sqlx::query_scalar( r#" INSERT INTO articles (article_number, name, scannable, default_warehouse_id) VALUES ($1, $2, $3, $4) ON CONFLICT (article_number) DO UPDATE SET name = EXCLUDED.name, scannable = EXCLUDED.scannable, default_warehouse_id = EXCLUDED.default_warehouse_id RETURNING id "#, ) .bind(&item.article_number) .bind(&item.article_name) .bind(item.article_scannable) .bind(default_warehouse_id) .fetch_one(&mut **tx) .await .map_err(db)?; Ok(id) } async fn upsert_customer( tx: &mut Transaction<'_, Postgres>, delivery: &SyncDelivery, ) -> Result { let id: Uuid = sqlx::query_scalar( r#" INSERT INTO customers ( erp_customer_id, name, street, house_number, postal_code, city, country ) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (erp_customer_id) DO UPDATE SET name = EXCLUDED.name, street = EXCLUDED.street, house_number = EXCLUDED.house_number, postal_code = EXCLUDED.postal_code, city = EXCLUDED.city, country = EXCLUDED.country RETURNING id "#, ) .bind(delivery.erp_customer_id) .bind(&delivery.customer_name) .bind(&delivery.customer_address.street) .bind(&delivery.customer_address.house_number) .bind(&delivery.customer_address.postal_code) .bind(&delivery.customer_address.city) .bind(&delivery.customer_address.country) .fetch_one(&mut **tx) .await .map_err(db)?; Ok(id) } async fn upsert_delivery( tx: &mut Transaction<'_, Postgres>, tour_id: Uuid, customer_id: Uuid, delivery: &SyncDelivery, ) -> Result { let id: Uuid = sqlx::query_scalar( r#" INSERT INTO deliveries ( tour_id, erp_belegart_id, erp_belegnummer, customer_id, snap_street, snap_house_number, snap_postal_code, snap_city, snap_country, sort_order, desired_time, special_agreements ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) ON CONFLICT (erp_belegart_id, erp_belegnummer) DO UPDATE SET tour_id = EXCLUDED.tour_id, customer_id = EXCLUDED.customer_id, snap_street = EXCLUDED.snap_street, snap_house_number = EXCLUDED.snap_house_number, snap_postal_code = EXCLUDED.snap_postal_code, snap_city = EXCLUDED.snap_city, snap_country = EXCLUDED.snap_country, sort_order = EXCLUDED.sort_order, desired_time = EXCLUDED.desired_time, special_agreements = EXCLUDED.special_agreements RETURNING id "#, ) .bind(tour_id) .bind(delivery.belegart_id) .bind(&delivery.belegnummer) .bind(customer_id) .bind(&delivery.delivery_address.street) .bind(&delivery.delivery_address.house_number) .bind(&delivery.delivery_address.postal_code) .bind(&delivery.delivery_address.city) .bind(&delivery.delivery_address.country) .bind(delivery.sort_order) .bind(delivery.desired_time.as_deref()) .bind(delivery.special_agreements.as_deref()) .fetch_one(&mut **tx) .await .map_err(db)?; Ok(id) } async fn upsert_delivery_item( tx: &mut Transaction<'_, Postgres>, delivery_id: Uuid, article_id: Uuid, warehouse_id: Uuid, item: &SyncDeliveryItem, ) -> Result<(), ApplicationError> { // Identität: (delivery_id, belegzeilen_nr, komponenten_artikel_nr). // scan_state-Felder bleiben beim UPDATE bewusst unberührt — das ERP // weiß nichts über Scans. sqlx::query( r#" INSERT INTO delivery_items ( delivery_id, article_id, required_quantity, warehouse_id, belegzeilen_nr, komponenten_artikel_nr ) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (delivery_id, belegzeilen_nr, komponenten_artikel_nr) DO UPDATE SET article_id = EXCLUDED.article_id, required_quantity = EXCLUDED.required_quantity, warehouse_id = EXCLUDED.warehouse_id "#, ) .bind(delivery_id) .bind(article_id) .bind(item.required_quantity) .bind(warehouse_id) .bind(item.belegzeilen_nr) .bind(item.komponenten_artikel_nr.as_deref()) .execute(&mut **tx) .await .map_err(db)?; Ok(()) }