diff --git a/test/read_registers.py b/test/read_registers.py new file mode 100644 index 0000000..dd282b5 --- /dev/null +++ b/test/read_registers.py @@ -0,0 +1,486 @@ +""" +Script to read and write holding registers from a Solarman inverter using the pysolarmanv5 library. + +Usage: + Read registers: python read_registers.py + Write to register 230: python read_registers.py --write 230 75 + Write to register 293: python read_registers.py --write 293 5000 + Write to multiple registers: python read_registers.py --write 230 75 --write 293 5000 + +Writable registers: + - Register 230: range 0-115 + - Register 293: range 0-6500 +""" + +import sys +import os +import argparse +import ipaddress + +# Import required libraries with error handling +try: + from pysolarmanv5 import PySolarmanV5 + from pysolarmanv5.pysolarmanv5 import V5FrameError + import umodbus.exceptions +except ImportError as e: + print(f"ERROR: Required library not installed: {e}") + print("Install required libraries with: pip install pysolarmanv5 umodbus") + sys.exit(1) + + +# Helper functions for validated configuration +def get_env_int(name, default, min_val=None, max_val=None): + """ + Get integer from environment variable with validation. + + Args: + name: Environment variable name + default: Default value if not set or invalid + min_val: Minimum allowed value (optional) + max_val: Maximum allowed value (optional) + + Returns: + int: Validated integer value + """ + value_str = os.getenv(name) + if value_str is None: + return default + + try: + value = int(value_str) + if min_val is not None and value < min_val: + print(f"WARNING: {name}={value} is below minimum ({min_val}). Using default: {default}") + return default + if max_val is not None and value > max_val: + print(f"WARNING: {name}={value} exceeds maximum ({max_val}). Using default: {default}") + return default + return value + except ValueError: + print(f"WARNING: Invalid {name}='{value_str}' (must be an integer). Using default: {default}") + return default + + +def get_env_ip(name, default): + """ + Get IP address from environment variable with validation. + + Args: + name: Environment variable name + default: Default IP address + + Returns: + str: Validated IP address + """ + value = os.getenv(name) + if value is None: + return default + + try: + # Validate IP address format + ipaddress.ip_address(value) + return value + except ValueError: + print(f"WARNING: Invalid IP address in {name}='{value}'. Using default: {default}") + return default + + +# Configuration - Can be overridden with environment variables +INVERTER_IP = get_env_ip("INVERTER_IP", "192.168.0.203") +INVERTER_SERIAL = get_env_int("INVERTER_SERIAL", 2722455016, min_val=0, max_val=4294967295) +INVERTER_PORT = get_env_int("INVERTER_PORT", 8899, min_val=1, max_val=65535) +MODBUS_SLAVE_ID = get_env_int("MODBUS_SLAVE_ID", 1, min_val=0, max_val=247) +SOCKET_TIMEOUT = get_env_int("SOCKET_TIMEOUT", 5, min_val=1, max_val=60) + +# Registers to read +REGISTERS = [227, 230, 292, 293] + +# Writable registers whitelist with validation ranges +# Format: {register_number: (min_value, max_value)} +WRITABLE_REGISTERS = { + 230: (0, 115), # Register 230: valid range 0-115 + 293: (0, 6500), # Register 293: valid range 0-6500 +} + + +def validate_writable_registers(): + """Validate WRITABLE_REGISTERS configuration at startup.""" + if not WRITABLE_REGISTERS: + raise ValueError("WRITABLE_REGISTERS cannot be empty. At least one register must be configured.") + + for reg, (min_val, max_val) in WRITABLE_REGISTERS.items(): + if not isinstance(reg, int) or reg < 0 or reg > 65535: + raise ValueError(f"Invalid register address: {reg}. Must be 0-65535.") + if not isinstance(min_val, int) or not isinstance(max_val, int): + raise ValueError(f"Register {reg} range values must be integers") + if min_val > max_val: + raise ValueError(f"Invalid range for register {reg}: min ({min_val}) > max ({max_val})") + if min_val < 0 or max_val > 65535: + raise ValueError(f"Register {reg} range [{min_val}, {max_val}] outside valid Modbus range [0, 65535]") + + +def validate_registers(): + """Validate REGISTERS configuration at startup.""" + if not isinstance(REGISTERS, list): + raise ValueError("REGISTERS must be a list") + + seen = set() + for reg in REGISTERS: + if not isinstance(reg, int): + raise ValueError(f"Invalid register address: {reg}. Must be an integer.") + if reg < 0 or reg > 65535: + raise ValueError(f"Invalid register address: {reg}. Must be 0-65535.") + if reg in seen: + raise ValueError(f"Duplicate register address: {reg}") + seen.add(reg) + + +# Validate configuration on module load +try: + validate_writable_registers() + validate_registers() +except ValueError as e: + print(f"ERROR: Configuration validation failed: {e}") + print("Please check WRITABLE_REGISTERS and REGISTERS configuration.") + sys.exit(1) + + +def validate_register_arg(value_str): + """ + Validate register address from command line. + + Args: + value_str: String value from command line + + Returns: + int: Validated register address + + Raises: + ValueError: If value is invalid + """ + try: + int_val = int(value_str) + if int_val < 0 or int_val > 65535: + raise ValueError(f"Register address must be 0-65535, got {int_val}") + return int_val + except ValueError as e: + if "invalid literal" in str(e): + raise ValueError(f"Invalid integer: {value_str}") + raise + + +def validate_value_arg(value_str): + """ + Validate register value from command line. + + Args: + value_str: String value from command line + + Returns: + int: Validated register value + + Raises: + ValueError: If value is invalid + """ + try: + int_val = int(value_str) + if int_val < 0 or int_val > 65535: + raise ValueError(f"Register value must be 0-65535, got {int_val}") + return int_val + except ValueError as e: + if "invalid literal" in str(e): + raise ValueError(f"Invalid integer: {value_str}") + raise + + +def validate_write_operations(write_operations): + """ + Validate all write operations before connecting to device. + + Args: + write_operations: List of (register_addr, value) tuples + + Returns: + bool: True if all operations are valid + + Raises: + SystemExit: If any validation fails + """ + if not write_operations: + return True + + print("Validating write operations...") + + for register_addr, value in write_operations: + # Check if register is in the whitelist + if register_addr not in WRITABLE_REGISTERS: + print(f"\nERROR: Register {register_addr} is not in the writable registers whitelist") + print(f"Allowed registers: {list(WRITABLE_REGISTERS.keys())}") + sys.exit(1) + + # Validate value is not negative + if value < 0: + print(f"\nERROR: Negative values not allowed. Got: {value}") + sys.exit(1) + + # Validate value fits in Modbus register + if value > 65535: + print(f"\nERROR: Value {value} exceeds maximum Modbus register value (65535)") + sys.exit(1) + + # Validate value range for this specific register + min_val, max_val = WRITABLE_REGISTERS[register_addr] + if not min_val <= value <= max_val: + print(f"\nERROR: Value {value} out of range for register {register_addr}") + print(f"Valid range: {min_val} to {max_val}") + sys.exit(1) + + print("All write operations validated.\n") + return True + + +def write_register(modbus, register_addr, value): + """ + Write a value to a holding register. + + Note: This function assumes the register address and value have already been + validated by validate_write_operations() before calling. + + Args: + modbus: PySolarmanV5 instance + register_addr: Register address to write to (must be pre-validated) + value: Integer value to write (must be pre-validated) + + Returns: + bool: True if write succeeded, False otherwise + """ + try: + print(f"\nWriting value {value} to register {register_addr}...") + + # Read current value first + print("Reading current value...") + current = modbus.read_holding_registers(register_addr=register_addr, quantity=1) + + # Check for empty response + if not current or len(current) == 0: + print("ERROR: Failed to read current value (empty response)") + return False + + print(f"Current value: {current[0]}") + + try: + result = modbus.write_multiple_holding_registers(register_addr, [value]) + print(f"Write command sent (method: multi)({result})") + except V5FrameError as e: + print(f"V5FrameError: {e}") + return False + except umodbus.exceptions.ModbusError as e: + print(f"Modbus protocol error: {e}") + return False + + # Verify the write succeeded + print("Verifying write...") + try: + updated = modbus.read_holding_registers(register_addr=register_addr, quantity=1) + + # Check for empty response + if not updated or len(updated) == 0: + print("ERROR: Failed to verify write (empty response)") + return False + + updated_value = updated[0] + + if updated_value == value: + print(f"SUCCESS: Register {register_addr} = {updated_value}") + return True + else: + print(f"WARNING: Expected {value}, but register {register_addr} = {updated_value}") + print("Note: Value may have been modified by device or another client") + return False + + except TimeoutError: + print(f"ERROR: Timeout reading back register {register_addr}") + return False + + except Exception as e: + print(f"ERROR writing to register {register_addr}: {type(e).__name__}: {e}") + return False + + +def read_and_display_registers(modbus): + """ + Read and display the configured registers. + + Args: + modbus: PySolarmanV5 instance + """ + try: + # Check if REGISTERS list is empty + if not REGISTERS: + print("WARNING: No registers configured to read") + return + + # Read and display each register + print("Reading holding registers:") + print("-" * 50) + + for register_addr in REGISTERS: + try: + # Read single register + result = modbus.read_holding_registers( + register_addr=register_addr, + quantity=1 + ) + + # Check for empty response + if not result or len(result) == 0: + print(f"Register {register_addr:>3}: Error - empty response") + continue + + # Extract value (result is a list) + value = result[0] + + # Display the register and its value + print(f"Register {register_addr:>3}: {value:>5} (0x{value:04X})") + + except (V5FrameError, umodbus.exceptions.IllegalDataAddressError) as e: + print(f"Register {register_addr:>3}: Error reading - {e}") + except Exception as e: + print(f"Register {register_addr:>3}: Unexpected error - {e}") + + print("-" * 50) + print("\nRead complete!") + + except Exception as e: + print(f"\nError reading registers: {e}") + + +def main(): + """Main function to parse arguments and execute operations.""" + # Parse command line arguments + parser = argparse.ArgumentParser( + description="Read and write Solarman inverter holding registers", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=f""" +Examples: + Read registers: python read_registers.py + Write to register 230: python read_registers.py --write 230 75 + Write to register 293: python read_registers.py --write 293 5000 + Write to multiple registers: python read_registers.py --write 230 75 --write 293 5000 + Enable verbose output: python read_registers.py --verbose + +Writable registers and their valid ranges: +{chr(10).join(f' Register {reg}: {min_val}-{max_val}' for reg, (min_val, max_val) in WRITABLE_REGISTERS.items())} + +Environment Variables: + INVERTER_IP - IP address of Solarman data logger (default: {INVERTER_IP}) + INVERTER_SERIAL - Serial number of data logger (default: {INVERTER_SERIAL}) + INVERTER_PORT - TCP port (default: {INVERTER_PORT}) + MODBUS_SLAVE_ID - Modbus slave ID (default: {MODBUS_SLAVE_ID}) + SOCKET_TIMEOUT - Socket timeout in seconds (default: {SOCKET_TIMEOUT}) + """ + ) + parser.add_argument( + '--write', + nargs=2, + type=str, # Parse as strings for better validation + metavar=('REGISTER', 'VALUE'), + action='append', + dest='write_operations', + help='Write VALUE to REGISTER (can be used multiple times)' + ) + parser.add_argument( + '--verbose', + action='store_true', + help='Enable verbose protocol output for debugging' + ) + + args = parser.parse_args() + + # Validate and convert write operations + if args.write_operations: + validated_operations = [] + for reg_str, val_str in args.write_operations: + try: + register_addr = validate_register_arg(reg_str) + value = validate_value_arg(val_str) + validated_operations.append((register_addr, value)) + except ValueError as e: + print(f"ERROR: Invalid write argument: {e}") + sys.exit(1) + args.write_operations = validated_operations + + print("=" * 50) + print("Solarman Register Reader/Writer") + print("=" * 50) + print() + + # Pre-validate write operations before connecting + validate_write_operations(args.write_operations) + + modbus = None + write_failures = [] + + try: + # Initialize connection + print(f"Connecting to {INVERTER_IP}:{INVERTER_PORT} (Serial: {INVERTER_SERIAL})...") + modbus = PySolarmanV5( + address=INVERTER_IP, + serial=INVERTER_SERIAL, + port=INVERTER_PORT, + mb_slave_id=MODBUS_SLAVE_ID, + socket_timeout=SOCKET_TIMEOUT, + verbose=args.verbose, + v5_error_correction=True + ) + print("Connected successfully!") + + # Validate connection with a test read + if REGISTERS: + try: + test_read = modbus.read_holding_registers(register_addr=REGISTERS[0], quantity=1) + if test_read and len(test_read) > 0: + print(f"Connection verified (read register {REGISTERS[0]})!\n") + else: + print(f"WARNING: Connection test to register {REGISTERS[0]} returned empty response\n") + except Exception as e: + print(f"WARNING: Connection test to register {REGISTERS[0]} failed: {e}") + print("This may indicate a connection issue or invalid register address.") + print("Continuing anyway...\n") + else: + print("Skipping connection test (no registers configured)\n") + + # Perform write operations if requested + if args.write_operations: + for register_addr, value in args.write_operations: + success = write_register(modbus, register_addr, value) + if not success: + print(f"Write operation failed for register {register_addr}!") + write_failures.append(register_addr) + + # Always read registers to show current values + print() + read_and_display_registers(modbus) + + except Exception as e: + print(f"\nConnection error: {e}") + print("Please check your IP address, serial number, and network connection.") + sys.exit(1) + + finally: + # Ensure disconnection + if modbus: + try: + modbus.disconnect() + print("\nDisconnected from inverter.") + except Exception as e: + # Log but don't raise - we're already cleaning up + print(f"\nWarning: Error during disconnect: {e}") + + # Exit with error code if any write operations failed + if write_failures: + print(f"\nERROR: Failed to write to {len(write_failures)} register(s): {write_failures}") + sys.exit(1) + + +if __name__ == "__main__": + main()