Files
synology_energy_scripts/ocpp_logger.py
2026-03-02 14:41:27 +01:00

236 lines
8.4 KiB
Python

import asyncio
import websockets
import json
from ocpp.routing import on
from ocpp.v16 import ChargePoint as cp
from ocpp.v16 import call_result
from datetime import datetime
import os
def get_log_filename():
"""Genereer log bestandsnaam met datum"""
date_str = datetime.now().strftime("%Y-%m-%d")
return f"alfen_ocpp_{date_str}.log"
def log_message(message, level="INFO"):
"""Log naar bestand en console"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"[{timestamp}] [{level}] {message}"
# Print naar console
print(log_entry)
# Append naar dagelijks log bestand
log_file = get_log_filename()
with open(log_file, 'a', encoding='utf-8') as f:
f.write(log_entry + "\n")
def log_json(title, data):
"""Log JSON data mooi geformatteerd"""
log_message(f"\n{title}:")
formatted = json.dumps(data, indent=2, ensure_ascii=False)
for line in formatted.split('\n'):
log_message(f" {line}")
class ChargePoint(cp):
@on('BootNotification')
def on_boot_notification(self, charge_point_model, charge_point_vendor, **kwargs):
log_message("=" * 60, "SUCCESS")
log_message("LAADPAAL VERBONDEN!", "SUCCESS")
log_message("=" * 60, "SUCCESS")
log_json("Boot Notification data", {
"model": charge_point_model,
"vendor": charge_point_vendor,
**kwargs
})
return call_result.BootNotification(
current_time=datetime.utcnow().isoformat(),
interval=300,
status='Accepted'
)
@on('Heartbeat')
def on_heartbeat(self, **kwargs):
log_message("💓 Heartbeat ontvangen", "HEARTBEAT")
if kwargs:
log_json("Heartbeat data", kwargs)
return call_result.Heartbeat(
current_time=datetime.utcnow().isoformat()
)
@on('StatusNotification')
def on_status_notification(self, connector_id, error_code, status, **kwargs):
log_message(f"📊 Status Update - Connector {connector_id}", "STATUS")
log_json("Status details", {
"connector_id": connector_id,
"status": status,
"error_code": error_code,
**kwargs
})
return call_result.StatusNotification()
@on('StartTransaction')
def on_start_transaction(self, connector_id, id_tag, meter_start, timestamp, **kwargs):
log_message("=" * 60, "TRANSACTION")
log_message("🔌 LAADSESSIE GESTART!", "TRANSACTION")
log_message("=" * 60, "TRANSACTION")
log_json("Start Transaction data", {
"connector_id": connector_id,
"id_tag": id_tag,
"meter_start": meter_start,
"timestamp": timestamp,
**kwargs
})
return call_result.StartTransaction(
transaction_id=int(datetime.now().timestamp()),
id_tag_info={'status': 'Accepted'}
)
@on('StopTransaction')
def on_stop_transaction(self, meter_stop, timestamp, transaction_id, **kwargs):
log_message("=" * 60, "TRANSACTION")
log_message("🔋 LAADSESSIE GESTOPT!", "TRANSACTION")
log_message("=" * 60, "TRANSACTION")
# Bereken verbruik als meter_start beschikbaar is
transaction_data = {
"transaction_id": transaction_id,
"meter_stop": meter_stop,
"timestamp": timestamp,
**kwargs
}
# Check of reason aanwezig is in kwargs
if 'reason' in kwargs:
transaction_data['reason'] = kwargs['reason']
# Check of transaction_data aanwezig is in kwargs
if 'transaction_data' in kwargs:
# Parse meter values uit transaction_data
for data_item in kwargs['transaction_data']:
if 'sampled_value' in data_item:
for value in data_item['sampled_value']:
if value.get('measurand') == 'Energy.Active.Import.Register':
meter_start_value = int(value.get('value', 0))
verbruik_wh = meter_stop - meter_start_value
transaction_data['meter_start'] = meter_start_value
transaction_data['verbruik_wh'] = verbruik_wh
transaction_data['verbruik_kwh'] = round(verbruik_wh / 1000, 3)
log_json("Stop Transaction data", transaction_data)
return call_result.StopTransaction(
id_tag_info={'status': 'Accepted'}
)
@on('MeterValues')
def on_meter_values(self, connector_id, meter_value, **kwargs):
log_message(f"⚡ Meter waarden - Connector {connector_id}", "METER")
log_json("Meter Values", {
"connector_id": connector_id,
"meter_value": meter_value,
**kwargs
})
return call_result.MeterValues()
@on('Authorize')
def on_authorize(self, id_tag, **kwargs):
log_message(f"🔐 Autorisatie aanvraag - Tag: {id_tag}", "AUTH")
if kwargs:
log_json("Authorize data", kwargs)
return call_result.Authorize(
id_tag_info={'status': 'Accepted'}
)
@on('DataTransfer')
def on_data_transfer(self, vendor_id, **kwargs):
log_message(f"📦 Data Transfer - Vendor: {vendor_id}", "DATA")
log_json("Data Transfer", {
"vendor_id": vendor_id,
**kwargs
})
return call_result.DataTransfer(status='Accepted')
@on('DiagnosticsStatusNotification')
def on_diagnostics_status(self, status, **kwargs):
log_message(f"🔧 Diagnostics Status: {status}", "DIAG")
if kwargs:
log_json("Diagnostics data", kwargs)
return call_result.DiagnosticsStatusNotification()
@on('FirmwareStatusNotification')
def on_firmware_status(self, status, **kwargs):
log_message(f"🔄 Firmware Status: {status}", "FIRMWARE")
if kwargs:
log_json("Firmware data", kwargs)
return call_result.FirmwareStatusNotification()
async def on_connect(websocket, path):
charge_point_id = path.strip('/')
log_message("\n" + "=" * 60, "CONNECTION")
log_message(f"Nieuwe verbinding!", "CONNECTION")
log_message(f" Charge Point ID: {charge_point_id}", "CONNECTION")
log_message(f" IP adres: {websocket.remote_address[0]}", "CONNECTION")
log_message(f" Poort: {websocket.remote_address[1]}", "CONNECTION")
log_message("=" * 60, "CONNECTION")
cp = ChargePoint(charge_point_id, websocket)
try:
await cp.start()
except websockets.exceptions.ConnectionClosed:
log_message("Verbinding verbroken", "WARNING")
except Exception as e:
log_message(f"Fout opgetreden: {e}", "ERROR")
import traceback
log_message(traceback.format_exc(), "ERROR")
async def main():
log_file = get_log_filename()
# Check of log bestand voor vandaag al bestaat
if not os.path.exists(log_file):
# Nieuw dagelijks log bestand
with open(log_file, 'w', encoding='utf-8') as f:
f.write(f"OCPP Log gestart op {datetime.now()}\n")
f.write("=" * 60 + "\n\n")
else:
# Bestand bestaat al, voeg scheidingslijn toe
with open(log_file, 'a', encoding='utf-8') as f:
f.write("\n" + "=" * 60 + "\n")
f.write(f"Server herstart op {datetime.now()}\n")
f.write("=" * 60 + "\n\n")
log_message("🚀 OCPP Test Server gestart", "INFO")
log_message("=" * 60, "INFO")
log_message("Poort: 9000", "INFO")
log_message(f"Log bestand: {os.path.abspath(log_file)}", "INFO")
log_message("Ondersteunde OCPP versie: 1.6", "INFO")
log_message("=" * 60, "INFO")
log_message("\nWacht op verbinding van laadpaal...\n", "INFO")
server = await websockets.serve(
on_connect,
'0.0.0.0',
9000,
subprotocols=['ocpp1.6'],
ping_interval=None # Disable websocket pings, OCPP heeft eigen heartbeat
)
await server.wait_closed()
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
log_message("\n\n👋 Server gestopt door gebruiker", "INFO")
except Exception as e:
log_message(f"Fatale fout: {e}", "ERROR")
import traceback
log_message(traceback.format_exc(), "ERROR")