Files
E-Manager/client/energymeter.py

116 lines
4.5 KiB
Python

import sys
import json
import requests
class EnergyMeter:
def __init__(self, config):
self.ID = config.get("ID")
self.IP = config.get("IP")
self.Adr = config.get("Adr")
def get_current_value(self):
raise NotImplementedError("Subclasses must implement use_energy method")
@staticmethod
def create_meterlist_from_json(json_data):
subclasses = [cls for cls in sys.modules[__name__].__dict__.values() if isinstance(cls, type) and issubclass(cls, EnergyMeter) and cls != EnergyMeter]
energy_meters = []
data = json.loads(json_data)
for user_data in data["meters"]:
user_type = user_data["type"]
user_config = user_data["config"]
for subclass in subclasses:
if user_type in subclass.__name__:
energy_meters.append(subclass(user_config))
return energy_meters
@staticmethod
def get_all_current_values(meters):
data = {"Meter_Sum": {"success": True, "Current_AC_Phase_1": 0, "Current_AC_Phase_2": 0, "Current_AC_Phase_3": 0, "PowerReal_P_Sum": 0}}
for meter in meters:
if isinstance(meter, EnergyMeter):
current_value = meter.get_current_value()
meter_data = {
"success": current_value["success"],
"Current_AC_Phase_1": current_value.get("Current_AC_Phase_1", 0),
"Current_AC_Phase_2": current_value.get("Current_AC_Phase_2", 0),
"Current_AC_Phase_3": current_value.get("Current_AC_Phase_3", 0),
"PowerReal_P_Sum": current_value.get("PowerReal_P_Sum", 0)
}
data[meter.ID] = meter_data
if current_value["success"]:
data["Meter_Sum"]["success"] &= True
data["Meter_Sum"]["Current_AC_Phase_1"] += meter_data["Current_AC_Phase_1"]
data["Meter_Sum"]["Current_AC_Phase_2"] += meter_data["Current_AC_Phase_2"]
data["Meter_Sum"]["Current_AC_Phase_3"] += meter_data["Current_AC_Phase_3"]
data["Meter_Sum"]["PowerReal_P_Sum"] += meter_data["PowerReal_P_Sum"]
else:
data["Meter_Sum"]["success"] &= False
return data
class Meter1(EnergyMeter):
def __init__(self, config):
self.ID = config.get("ID")
self.IP = config.get("IP")
self.Adr = config.get("Adr")
def get_current_value(self):
raise NotImplementedError("Subclasses must implement use_energy method")
class Meter2(EnergyMeter):
def __init__(self, config):
self.ID = config.get("ID")
self.IP = config.get("IP")
self.Adr = config.get("Adr")
def get_current_value(self):
raise NotImplementedError("Subclasses must implement use_energy method")
class Fronius(EnergyMeter):
def __init__(self, config):
self.ID = config.get("ID")
self.IP = config.get("IP")
self.Adr = config.get("Adr")
def get_current_value(self):
url = f"http://{self.IP}/solar_api/v1/GetMeterRealtimeData.cgi?Scope=System"
try:
response = requests.get(url)
if response.status_code == 200:
data = response.json()
if "Body" in data and "Data" in data["Body"]:
meter_data = data["Body"]["Data"]
if str(self.Adr) in meter_data:
meter_values = meter_data[str(self.Adr)]
current_ac_phase_1 = meter_values.get("Current_AC_Phase_1", 0)
current_ac_phase_2 = meter_values.get("Current_AC_Phase_2", 0)
current_ac_phase_3 = meter_values.get("Current_AC_Phase_3", 0)
power_real_p_sum = meter_values.get("PowerReal_P_Sum", 0)
return {"success": True,
"Current_AC_Phase_1": current_ac_phase_1,
"Current_AC_Phase_2": current_ac_phase_2,
"Current_AC_Phase_3": current_ac_phase_3,
"PowerReal_P_Sum": power_real_p_sum}
return {"success": False}
else:
return {"success": False}
except Exception as e:
print(f"An error occurred: {str(e)}")
return {"success": False}