Files
import_alfen_transactions/import_to_alfendb.py
2025-11-19 17:20:03 +01:00

343 lines
12 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Import charging station transaction data from CSV to existing MySQL database
Aangepast voor bestaande 'transactions' tabel structuur
"""
import re
import mysql.connector
from datetime import datetime
from decimal import Decimal
import sys
import io
# Force UTF-8 encoding for stdout on Windows
if sys.platform == 'win32':
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
# Database configuration
DB_CONFIG = {
'host': '192.168.178.201',
'port': 3307,
'user': 'alfen_user',
'password': '5uVgr%f%s2P5GR@3q!',
'database': 'alfen', # Jouw database naam
'charset': 'utf8mb4'
}
class ChargingDataImporter:
def __init__(self, config):
self.config = config
self.conn = None
self.cursor = None
self.existing_transactions = set() # Track existing (id, start_timestamp) combinations
self.starts = [] # All transaction starts from CSV
self.stops = [] # All transaction stops from CSV
def connect(self):
"""Connect to MySQL database"""
try:
self.conn = mysql.connector.connect(**self.config)
self.cursor = self.conn.cursor()
print("✓ Database verbinding succesvol")
# Load existing transaction IDs from database
self.cursor.execute("SELECT transaction_id FROM transactions")
existing_ids = set()
for (tx_id,) in self.cursor.fetchall():
existing_ids.add(tx_id)
self.existing_transaction_ids = existing_ids
print(f"{len(self.existing_transaction_ids)} bestaande transacties geladen")
except mysql.connector.Error as err:
print(f"✗ Fout bij verbinden met database: {err}")
sys.exit(1)
def transaction_exists(self, unique_tx_id):
"""Check if transaction with this unique ID already exists"""
return unique_tx_id in self.existing_transaction_ids
def close(self):
"""Close database connection"""
if self.cursor:
self.cursor.close()
if self.conn:
self.conn.close()
def parse_txstart(self, line):
"""Parse transaction start line and collect it"""
# txstart2: id 0x0000000000000001, socket 1, 2025-10-28 18:27:42 5518.267kWh 04BB29EAFD0F94 3 2 Y
pattern = r'txstart2: id (0x[0-9a-fA-F]+), socket (\d+), ([\d-]+ [\d:]+) ([\d.]+)kWh (\w+)'
match = re.match(pattern, line)
if match:
tx_id = match.group(1)
socket_num = int(match.group(2))
timestamp = datetime.strptime(match.group(3), '%Y-%m-%d %H:%M:%S')
kwh = Decimal(match.group(4))
card = match.group(5)
self.starts.append({
'transaction_id': tx_id,
'socket': socket_num,
'timestamp': timestamp,
'kwh': kwh,
'card': card
})
return True
return False
def parse_txstop(self, line):
"""Parse transaction stop line and collect it"""
# txstop2: id 0x0000000000000001, socket 1, 2025-10-31 15:59:29 5540.316kWh 04BB29EAFD0F94 6 5 Y
pattern = r'txstop2: id (0x[0-9a-fA-F]+), socket (\d+), ([\d-]+ [\d:]+) ([\d.]+)kWh'
match = re.match(pattern, line)
if match:
tx_id = match.group(1)
socket_num = int(match.group(2))
timestamp = datetime.strptime(match.group(3), '%Y-%m-%d %H:%M:%S')
kwh = Decimal(match.group(4))
self.stops.append({
'transaction_id': tx_id,
'socket': socket_num,
'timestamp': timestamp,
'kwh': kwh
})
return True
return False
def generate_unique_transaction_id(self, original_id, start_timestamp):
"""Generate a unique transaction ID by combining original ID with timestamp"""
# Format: originalID_YYYYMMDDHHMMSS
timestamp_str = start_timestamp.strftime('%Y%m%d%H%M%S')
return f"{original_id}_{timestamp_str}"
def match_transactions(self):
"""Match each start with its corresponding stop (closest timestamp after start)"""
matched_transactions = []
unmatched_starts = []
unmatched_stops = []
print(f"\n=== Transacties matchen ===")
print(f"Gevonden: {len(self.starts)} starts, {len(self.stops)} stops")
# For each start, find the corresponding stop
for start in self.starts:
# Find all stops with same transaction_id that come AFTER this start
potential_stops = [
stop for stop in self.stops
if stop['transaction_id'] == start['transaction_id']
and stop['timestamp'] > start['timestamp']
]
if not potential_stops:
unmatched_starts.append(start)
continue
# Get the stop with the closest timestamp to this start
closest_stop = min(potential_stops, key=lambda s: s['timestamp'] - start['timestamp'])
# Generate unique transaction ID
unique_id = self.generate_unique_transaction_id(start['transaction_id'], start['timestamp'])
# Create matched transaction
matched_transactions.append({
'transaction_id': unique_id,
'original_transaction_id': start['transaction_id'],
'socket': start['socket'],
'start_timestamp': start['timestamp'],
'start_kWh': start['kwh'],
'stop_timestamp': closest_stop['timestamp'],
'stop_kWh': closest_stop['kwh'],
'total_kWh': closest_stop['kwh'] - start['kwh'],
'card': start['card']
})
# Remove this stop from the list so it won't be matched again
self.stops.remove(closest_stop)
# Remaining stops are unmatched
unmatched_stops = self.stops.copy()
print(f"{len(matched_transactions)} transacties gematched")
if unmatched_starts:
print(f"{len(unmatched_starts)} starts zonder stop")
if unmatched_stops:
print(f"{len(unmatched_stops)} stops zonder start")
return matched_transactions
def save_transactions(self, transactions):
"""Save matched transactions to database, skipping duplicates"""
print(f"\n=== Transacties opslaan ===")
saved_count = 0
skipped_count = 0
error_count = 0
for tx in transactions:
unique_id = tx['transaction_id']
original_id = tx['original_transaction_id']
# Check if this transaction already exists
if self.transaction_exists(unique_id):
print(f"{original_id} -> {unique_id} bestaat al")
skipped_count += 1
continue
try:
self.cursor.execute("""
INSERT INTO transactions
(transaction_id, socket, start_timestamp, start_kWh,
stop_timestamp, stop_kWh, total_kWh, card)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
""", (
unique_id,
tx['socket'],
tx['start_timestamp'],
tx['start_kWh'],
tx['stop_timestamp'],
tx['stop_kWh'],
tx['total_kWh'],
tx['card']
))
self.conn.commit()
# Add to existing transactions
self.existing_transaction_ids.add(unique_id)
print(f"{original_id} -> {unique_id} opgeslagen ({tx['total_kWh']:.3f} kWh)")
saved_count += 1
except mysql.connector.Error as err:
print(f" ✗ Fout bij opslaan {unique_id}: {err}")
error_count += 1
print(f"\n✓ Opgeslagen: {saved_count}")
print(f"⊘ Overgeslagen (duplicaat): {skipped_count}")
if error_count:
print(f"✗ Fouten: {error_count}")
return saved_count
def import_file(self, filepath):
"""Import CSV file into database"""
print(f"\n=== Import gestart: {filepath} ===")
line_count = 0
try:
# Phase 1: Scan file and collect all starts and stops
print("\nFase 1: CSV scannen...")
with open(filepath, 'r') as file:
for line_num, line in enumerate(file, 1):
line = line.strip()
# Skip empty lines and comments
if not line or line.startswith('#'):
continue
line_count += 1
# Parse transaction start
if line.startswith('txstart2:'):
self.parse_txstart(line)
# Parse transaction stop
elif line.startswith('txstop2:'):
self.parse_txstop(line)
# Skip meter values (mv:)
elif line.startswith('mv:'):
continue
print(f"{line_count} regels gescand")
print(f"{len(self.starts)} starts gevonden")
print(f"{len(self.stops)} stops gevonden")
# Phase 2: Match starts with stops
matched_transactions = self.match_transactions()
# Phase 3: Save to database
saved_count = self.save_transactions(matched_transactions)
# Show summary statistics
self.show_statistics()
except FileNotFoundError:
print(f"✗ Bestand niet gevonden: {filepath}")
sys.exit(1)
except Exception as err:
print(f"✗ Onverwachte fout: {err}")
import traceback
traceback.print_exc()
self.conn.rollback()
sys.exit(1)
def show_statistics(self):
"""Show database statistics after import"""
print("\n=== Database Statistieken ===")
# Total transactions
self.cursor.execute("SELECT COUNT(*) FROM transactions")
total_tx = self.cursor.fetchone()[0]
print(f"Totaal transacties in database: {total_tx}")
# Total consumption
self.cursor.execute("SELECT SUM(total_kWh) FROM transactions")
result = self.cursor.fetchone()
total_consumption = result[0] if result[0] else 0
print(f"Totaal verbruik: {total_consumption:.3f} kWh")
# Latest transaction
self.cursor.execute("""
SELECT transaction_id, start_timestamp, stop_timestamp, total_kWh
FROM transactions
ORDER BY stop_timestamp DESC
LIMIT 1
""")
latest = self.cursor.fetchone()
if latest:
print(f"\nLaatste transactie:")
print(f" ID: {latest[0]}")
print(f" Periode: {latest[1]} - {latest[2]}")
print(f" Verbruik: {latest[3]:.3f} kWh")
def main():
if len(sys.argv) < 2:
print("Gebruik: python3 import_to_existing_db.py <csv_bestand>")
print("\nVoorbeeld: python3 import_to_existing_db.py VAN_01971_Transactions.csv")
sys.exit(1)
csv_file = sys.argv[1]
print("=" * 60)
print("Charging Station Data Importer")
print("Import naar bestaande 'transactions' tabel")
print("=" * 60)
# Create importer instance
importer = ChargingDataImporter(DB_CONFIG)
try:
# Connect to database
importer.connect()
# Import the file
importer.import_file(csv_file)
finally:
# Close connection
importer.close()
print("\n✓ Database verbinding gesloten")
if __name__ == "__main__":
main()