#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Import charging station transaction data from CSV to existing MySQL database Aangepast voor bestaande 'transactions' tabel structuur """ import re import mysql.connector from datetime import datetime from decimal import Decimal import sys import io # Force UTF-8 encoding for stdout on Windows if sys.platform == 'win32': sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8') # Database configuration DB_CONFIG = { 'host': '192.168.178.201', 'port': 3307, 'user': 'alfen_user', 'password': '5uVgr%f%s2P5GR@3q!', 'database': 'alfen', # Jouw database naam 'charset': 'utf8mb4' } class ChargingDataImporter: def __init__(self, config): self.config = config self.conn = None self.cursor = None self.existing_transactions = set() # Track existing (id, start_timestamp) combinations self.starts = [] # All transaction starts from CSV self.stops = [] # All transaction stops from CSV def connect(self): """Connect to MySQL database""" try: self.conn = mysql.connector.connect(**self.config) self.cursor = self.conn.cursor() print("✓ Database verbinding succesvol") # Load existing transaction IDs from database self.cursor.execute("SELECT transaction_id FROM transactions") existing_ids = set() for (tx_id,) in self.cursor.fetchall(): existing_ids.add(tx_id) self.existing_transaction_ids = existing_ids print(f"✓ {len(self.existing_transaction_ids)} bestaande transacties geladen") except mysql.connector.Error as err: print(f"✗ Fout bij verbinden met database: {err}") sys.exit(1) def transaction_exists(self, unique_tx_id): """Check if transaction with this unique ID already exists""" return unique_tx_id in self.existing_transaction_ids def close(self): """Close database connection""" if self.cursor: self.cursor.close() if self.conn: self.conn.close() def parse_txstart(self, line): """Parse transaction start line and collect it""" # txstart2: id 0x0000000000000001, socket 1, 2025-10-28 18:27:42 5518.267kWh 04BB29EAFD0F94 3 2 Y pattern = r'txstart2: id (0x[0-9a-fA-F]+), socket (\d+), ([\d-]+ [\d:]+) ([\d.]+)kWh (\w+)' match = re.match(pattern, line) if match: tx_id = match.group(1) socket_num = int(match.group(2)) timestamp = datetime.strptime(match.group(3), '%Y-%m-%d %H:%M:%S') kwh = Decimal(match.group(4)) card = match.group(5) self.starts.append({ 'transaction_id': tx_id, 'socket': socket_num, 'timestamp': timestamp, 'kwh': kwh, 'card': card }) return True return False def parse_txstop(self, line): """Parse transaction stop line and collect it""" # txstop2: id 0x0000000000000001, socket 1, 2025-10-31 15:59:29 5540.316kWh 04BB29EAFD0F94 6 5 Y pattern = r'txstop2: id (0x[0-9a-fA-F]+), socket (\d+), ([\d-]+ [\d:]+) ([\d.]+)kWh' match = re.match(pattern, line) if match: tx_id = match.group(1) socket_num = int(match.group(2)) timestamp = datetime.strptime(match.group(3), '%Y-%m-%d %H:%M:%S') kwh = Decimal(match.group(4)) self.stops.append({ 'transaction_id': tx_id, 'socket': socket_num, 'timestamp': timestamp, 'kwh': kwh }) return True return False def generate_unique_transaction_id(self, original_id, start_timestamp): """Generate a unique transaction ID by combining original ID with timestamp""" # Format: originalID_YYYYMMDDHHMMSS timestamp_str = start_timestamp.strftime('%Y%m%d%H%M%S') return f"{original_id}_{timestamp_str}" def match_transactions(self): """Match each start with its corresponding stop (closest timestamp after start)""" matched_transactions = [] unmatched_starts = [] unmatched_stops = [] print(f"\n=== Transacties matchen ===") print(f"Gevonden: {len(self.starts)} starts, {len(self.stops)} stops") # For each start, find the corresponding stop for start in self.starts: # Find all stops with same transaction_id that come AFTER this start potential_stops = [ stop for stop in self.stops if stop['transaction_id'] == start['transaction_id'] and stop['timestamp'] > start['timestamp'] ] if not potential_stops: unmatched_starts.append(start) continue # Get the stop with the closest timestamp to this start closest_stop = min(potential_stops, key=lambda s: s['timestamp'] - start['timestamp']) # Generate unique transaction ID unique_id = self.generate_unique_transaction_id(start['transaction_id'], start['timestamp']) # Create matched transaction matched_transactions.append({ 'transaction_id': unique_id, 'original_transaction_id': start['transaction_id'], 'socket': start['socket'], 'start_timestamp': start['timestamp'], 'start_kWh': start['kwh'], 'stop_timestamp': closest_stop['timestamp'], 'stop_kWh': closest_stop['kwh'], 'total_kWh': closest_stop['kwh'] - start['kwh'], 'card': start['card'] }) # Remove this stop from the list so it won't be matched again self.stops.remove(closest_stop) # Remaining stops are unmatched unmatched_stops = self.stops.copy() print(f"✓ {len(matched_transactions)} transacties gematched") if unmatched_starts: print(f"⚠ {len(unmatched_starts)} starts zonder stop") if unmatched_stops: print(f"⚠ {len(unmatched_stops)} stops zonder start") return matched_transactions def save_transactions(self, transactions): """Save matched transactions to database, skipping duplicates""" print(f"\n=== Transacties opslaan ===") saved_count = 0 skipped_count = 0 error_count = 0 for tx in transactions: unique_id = tx['transaction_id'] original_id = tx['original_transaction_id'] # Check if this transaction already exists if self.transaction_exists(unique_id): print(f" ⊘ {original_id} -> {unique_id} bestaat al") skipped_count += 1 continue try: self.cursor.execute(""" INSERT INTO transactions (transaction_id, socket, start_timestamp, start_kWh, stop_timestamp, stop_kWh, total_kWh, card) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) """, ( unique_id, tx['socket'], tx['start_timestamp'], tx['start_kWh'], tx['stop_timestamp'], tx['stop_kWh'], tx['total_kWh'], tx['card'] )) self.conn.commit() # Add to existing transactions self.existing_transaction_ids.add(unique_id) print(f" ✓ {original_id} -> {unique_id} opgeslagen ({tx['total_kWh']:.3f} kWh)") saved_count += 1 except mysql.connector.Error as err: print(f" ✗ Fout bij opslaan {unique_id}: {err}") error_count += 1 print(f"\n✓ Opgeslagen: {saved_count}") print(f"⊘ Overgeslagen (duplicaat): {skipped_count}") if error_count: print(f"✗ Fouten: {error_count}") return saved_count def import_file(self, filepath): """Import CSV file into database""" print(f"\n=== Import gestart: {filepath} ===") line_count = 0 try: # Phase 1: Scan file and collect all starts and stops print("\nFase 1: CSV scannen...") with open(filepath, 'r') as file: for line_num, line in enumerate(file, 1): line = line.strip() # Skip empty lines and comments if not line or line.startswith('#'): continue line_count += 1 # Parse transaction start if line.startswith('txstart2:'): self.parse_txstart(line) # Parse transaction stop elif line.startswith('txstop2:'): self.parse_txstop(line) # Skip meter values (mv:) elif line.startswith('mv:'): continue print(f"✓ {line_count} regels gescand") print(f"✓ {len(self.starts)} starts gevonden") print(f"✓ {len(self.stops)} stops gevonden") # Phase 2: Match starts with stops matched_transactions = self.match_transactions() # Phase 3: Save to database saved_count = self.save_transactions(matched_transactions) # Show summary statistics self.show_statistics() except FileNotFoundError: print(f"✗ Bestand niet gevonden: {filepath}") sys.exit(1) except Exception as err: print(f"✗ Onverwachte fout: {err}") import traceback traceback.print_exc() self.conn.rollback() sys.exit(1) def show_statistics(self): """Show database statistics after import""" print("\n=== Database Statistieken ===") # Total transactions self.cursor.execute("SELECT COUNT(*) FROM transactions") total_tx = self.cursor.fetchone()[0] print(f"Totaal transacties in database: {total_tx}") # Total consumption self.cursor.execute("SELECT SUM(total_kWh) FROM transactions") result = self.cursor.fetchone() total_consumption = result[0] if result[0] else 0 print(f"Totaal verbruik: {total_consumption:.3f} kWh") # Latest transaction self.cursor.execute(""" SELECT transaction_id, start_timestamp, stop_timestamp, total_kWh FROM transactions ORDER BY stop_timestamp DESC LIMIT 1 """) latest = self.cursor.fetchone() if latest: print(f"\nLaatste transactie:") print(f" ID: {latest[0]}") print(f" Periode: {latest[1]} - {latest[2]}") print(f" Verbruik: {latest[3]:.3f} kWh") def main(): if len(sys.argv) < 2: print("Gebruik: python3 import_to_existing_db.py ") print("\nVoorbeeld: python3 import_to_existing_db.py VAN_01971_Transactions.csv") sys.exit(1) csv_file = sys.argv[1] print("=" * 60) print("Charging Station Data Importer") print("Import naar bestaande 'transactions' tabel") print("=" * 60) # Create importer instance importer = ChargingDataImporter(DB_CONFIG) try: # Connect to database importer.connect() # Import the file importer.import_file(csv_file) finally: # Close connection importer.close() print("\n✓ Database verbinding gesloten") if __name__ == "__main__": main()