""" Script to read and write holding registers from a Solarman inverter using the pysolarmanv5 library. Usage: Read registers: python read_registers.py Write to register 230: python read_registers.py --write 230 75 Write to register 293: python read_registers.py --write 293 5000 Write to multiple registers: python read_registers.py --write 230 75 --write 293 5000 Writable registers: - Register 230: range 0-115 - Register 293: range 0-6500 """ import sys import os import argparse import ipaddress # Import required libraries with error handling try: from pysolarmanv5 import PySolarmanV5 from pysolarmanv5.pysolarmanv5 import V5FrameError import umodbus.exceptions except ImportError as e: print(f"ERROR: Required library not installed: {e}") print("Install required libraries with: pip install pysolarmanv5 umodbus") sys.exit(1) # Helper functions for validated configuration def get_env_int(name, default, min_val=None, max_val=None): """ Get integer from environment variable with validation. Args: name: Environment variable name default: Default value if not set or invalid min_val: Minimum allowed value (optional) max_val: Maximum allowed value (optional) Returns: int: Validated integer value """ value_str = os.getenv(name) if value_str is None: return default try: value = int(value_str) if min_val is not None and value < min_val: print(f"WARNING: {name}={value} is below minimum ({min_val}). Using default: {default}") return default if max_val is not None and value > max_val: print(f"WARNING: {name}={value} exceeds maximum ({max_val}). Using default: {default}") return default return value except ValueError: print(f"WARNING: Invalid {name}='{value_str}' (must be an integer). Using default: {default}") return default def get_env_ip(name, default): """ Get IP address from environment variable with validation. Args: name: Environment variable name default: Default IP address Returns: str: Validated IP address """ value = os.getenv(name) if value is None: return default try: # Validate IP address format ipaddress.ip_address(value) return value except ValueError: print(f"WARNING: Invalid IP address in {name}='{value}'. Using default: {default}") return default # Configuration - Can be overridden with environment variables INVERTER_IP = get_env_ip("INVERTER_IP", "192.168.0.203") INVERTER_SERIAL = get_env_int("INVERTER_SERIAL", 2722455016, min_val=0, max_val=4294967295) INVERTER_PORT = get_env_int("INVERTER_PORT", 8899, min_val=1, max_val=65535) MODBUS_SLAVE_ID = get_env_int("MODBUS_SLAVE_ID", 1, min_val=0, max_val=247) SOCKET_TIMEOUT = get_env_int("SOCKET_TIMEOUT", 5, min_val=1, max_val=60) # Registers to read REGISTERS = [227, 230, 292, 293] # Writable registers whitelist with validation ranges # Format: {register_number: (min_value, max_value)} WRITABLE_REGISTERS = { 230: (0, 115), # Register 230: valid range 0-115 293: (0, 6500), # Register 293: valid range 0-6500 } def validate_writable_registers(): """Validate WRITABLE_REGISTERS configuration at startup.""" if not WRITABLE_REGISTERS: raise ValueError("WRITABLE_REGISTERS cannot be empty. At least one register must be configured.") for reg, (min_val, max_val) in WRITABLE_REGISTERS.items(): if not isinstance(reg, int) or reg < 0 or reg > 65535: raise ValueError(f"Invalid register address: {reg}. Must be 0-65535.") if not isinstance(min_val, int) or not isinstance(max_val, int): raise ValueError(f"Register {reg} range values must be integers") if min_val > max_val: raise ValueError(f"Invalid range for register {reg}: min ({min_val}) > max ({max_val})") if min_val < 0 or max_val > 65535: raise ValueError(f"Register {reg} range [{min_val}, {max_val}] outside valid Modbus range [0, 65535]") def validate_registers(): """Validate REGISTERS configuration at startup.""" if not isinstance(REGISTERS, list): raise ValueError("REGISTERS must be a list") seen = set() for reg in REGISTERS: if not isinstance(reg, int): raise ValueError(f"Invalid register address: {reg}. Must be an integer.") if reg < 0 or reg > 65535: raise ValueError(f"Invalid register address: {reg}. Must be 0-65535.") if reg in seen: raise ValueError(f"Duplicate register address: {reg}") seen.add(reg) # Validate configuration on module load try: validate_writable_registers() validate_registers() except ValueError as e: print(f"ERROR: Configuration validation failed: {e}") print("Please check WRITABLE_REGISTERS and REGISTERS configuration.") sys.exit(1) def validate_register_arg(value_str): """ Validate register address from command line. Args: value_str: String value from command line Returns: int: Validated register address Raises: ValueError: If value is invalid """ try: int_val = int(value_str) if int_val < 0 or int_val > 65535: raise ValueError(f"Register address must be 0-65535, got {int_val}") return int_val except ValueError as e: if "invalid literal" in str(e): raise ValueError(f"Invalid integer: {value_str}") raise def validate_value_arg(value_str): """ Validate register value from command line. Args: value_str: String value from command line Returns: int: Validated register value Raises: ValueError: If value is invalid """ try: int_val = int(value_str) if int_val < 0 or int_val > 65535: raise ValueError(f"Register value must be 0-65535, got {int_val}") return int_val except ValueError as e: if "invalid literal" in str(e): raise ValueError(f"Invalid integer: {value_str}") raise def validate_write_operations(write_operations): """ Validate all write operations before connecting to device. Args: write_operations: List of (register_addr, value) tuples Returns: bool: True if all operations are valid Raises: SystemExit: If any validation fails """ if not write_operations: return True print("Validating write operations...") for register_addr, value in write_operations: # Check if register is in the whitelist if register_addr not in WRITABLE_REGISTERS: print(f"\nERROR: Register {register_addr} is not in the writable registers whitelist") print(f"Allowed registers: {list(WRITABLE_REGISTERS.keys())}") sys.exit(1) # Validate value is not negative if value < 0: print(f"\nERROR: Negative values not allowed. Got: {value}") sys.exit(1) # Validate value fits in Modbus register if value > 65535: print(f"\nERROR: Value {value} exceeds maximum Modbus register value (65535)") sys.exit(1) # Validate value range for this specific register min_val, max_val = WRITABLE_REGISTERS[register_addr] if not min_val <= value <= max_val: print(f"\nERROR: Value {value} out of range for register {register_addr}") print(f"Valid range: {min_val} to {max_val}") sys.exit(1) print("All write operations validated.\n") return True def write_register(modbus, register_addr, value): """ Write a value to a holding register. Note: This function assumes the register address and value have already been validated by validate_write_operations() before calling. Args: modbus: PySolarmanV5 instance register_addr: Register address to write to (must be pre-validated) value: Integer value to write (must be pre-validated) Returns: bool: True if write succeeded, False otherwise """ try: print(f"\nWriting value {value} to register {register_addr}...") # Read current value first print("Reading current value...") current = modbus.read_holding_registers(register_addr=register_addr, quantity=1) # Check for empty response if not current or len(current) == 0: print("ERROR: Failed to read current value (empty response)") return False print(f"Current value: {current[0]}") try: result = modbus.write_multiple_holding_registers(register_addr, [value]) print(f"Write command sent (method: multi)({result})") except V5FrameError as e: print(f"V5FrameError: {e}") return False except umodbus.exceptions.ModbusError as e: print(f"Modbus protocol error: {e}") return False # Verify the write succeeded print("Verifying write...") try: updated = modbus.read_holding_registers(register_addr=register_addr, quantity=1) # Check for empty response if not updated or len(updated) == 0: print("ERROR: Failed to verify write (empty response)") return False updated_value = updated[0] if updated_value == value: print(f"SUCCESS: Register {register_addr} = {updated_value}") return True else: print(f"WARNING: Expected {value}, but register {register_addr} = {updated_value}") print("Note: Value may have been modified by device or another client") return False except TimeoutError: print(f"ERROR: Timeout reading back register {register_addr}") return False except Exception as e: print(f"ERROR writing to register {register_addr}: {type(e).__name__}: {e}") return False def read_and_display_registers(modbus): """ Read and display the configured registers. Args: modbus: PySolarmanV5 instance """ try: # Check if REGISTERS list is empty if not REGISTERS: print("WARNING: No registers configured to read") return # Read and display each register print("Reading holding registers:") print("-" * 50) for register_addr in REGISTERS: try: # Read single register result = modbus.read_holding_registers( register_addr=register_addr, quantity=1 ) # Check for empty response if not result or len(result) == 0: print(f"Register {register_addr:>3}: Error - empty response") continue # Extract value (result is a list) value = result[0] # Display the register and its value print(f"Register {register_addr:>3}: {value:>5} (0x{value:04X})") except (V5FrameError, umodbus.exceptions.IllegalDataAddressError) as e: print(f"Register {register_addr:>3}: Error reading - {e}") except Exception as e: print(f"Register {register_addr:>3}: Unexpected error - {e}") print("-" * 50) print("\nRead complete!") except Exception as e: print(f"\nError reading registers: {e}") def main(): """Main function to parse arguments and execute operations.""" # Parse command line arguments parser = argparse.ArgumentParser( description="Read and write Solarman inverter holding registers", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=f""" Examples: Read registers: python read_registers.py Write to register 230: python read_registers.py --write 230 75 Write to register 293: python read_registers.py --write 293 5000 Write to multiple registers: python read_registers.py --write 230 75 --write 293 5000 Enable verbose output: python read_registers.py --verbose Writable registers and their valid ranges: {chr(10).join(f' Register {reg}: {min_val}-{max_val}' for reg, (min_val, max_val) in WRITABLE_REGISTERS.items())} Environment Variables: INVERTER_IP - IP address of Solarman data logger (default: {INVERTER_IP}) INVERTER_SERIAL - Serial number of data logger (default: {INVERTER_SERIAL}) INVERTER_PORT - TCP port (default: {INVERTER_PORT}) MODBUS_SLAVE_ID - Modbus slave ID (default: {MODBUS_SLAVE_ID}) SOCKET_TIMEOUT - Socket timeout in seconds (default: {SOCKET_TIMEOUT}) """ ) parser.add_argument( '--write', nargs=2, type=str, # Parse as strings for better validation metavar=('REGISTER', 'VALUE'), action='append', dest='write_operations', help='Write VALUE to REGISTER (can be used multiple times)' ) parser.add_argument( '--verbose', action='store_true', help='Enable verbose protocol output for debugging' ) args = parser.parse_args() # Validate and convert write operations if args.write_operations: validated_operations = [] for reg_str, val_str in args.write_operations: try: register_addr = validate_register_arg(reg_str) value = validate_value_arg(val_str) validated_operations.append((register_addr, value)) except ValueError as e: print(f"ERROR: Invalid write argument: {e}") sys.exit(1) args.write_operations = validated_operations print("=" * 50) print("Solarman Register Reader/Writer") print("=" * 50) print() # Pre-validate write operations before connecting validate_write_operations(args.write_operations) modbus = None write_failures = [] try: # Initialize connection print(f"Connecting to {INVERTER_IP}:{INVERTER_PORT} (Serial: {INVERTER_SERIAL})...") modbus = PySolarmanV5( address=INVERTER_IP, serial=INVERTER_SERIAL, port=INVERTER_PORT, mb_slave_id=MODBUS_SLAVE_ID, socket_timeout=SOCKET_TIMEOUT, verbose=args.verbose, v5_error_correction=True ) print("Connected successfully!") # Validate connection with a test read if REGISTERS: try: test_read = modbus.read_holding_registers(register_addr=REGISTERS[0], quantity=1) if test_read and len(test_read) > 0: print(f"Connection verified (read register {REGISTERS[0]})!\n") else: print(f"WARNING: Connection test to register {REGISTERS[0]} returned empty response\n") except Exception as e: print(f"WARNING: Connection test to register {REGISTERS[0]} failed: {e}") print("This may indicate a connection issue or invalid register address.") print("Continuing anyway...\n") else: print("Skipping connection test (no registers configured)\n") # Perform write operations if requested if args.write_operations: for register_addr, value in args.write_operations: success = write_register(modbus, register_addr, value) if not success: print(f"Write operation failed for register {register_addr}!") write_failures.append(register_addr) # Always read registers to show current values print() read_and_display_registers(modbus) except Exception as e: print(f"\nConnection error: {e}") print("Please check your IP address, serial number, and network connection.") sys.exit(1) finally: # Ensure disconnection if modbus: try: modbus.disconnect() print("\nDisconnected from inverter.") except Exception as e: # Log but don't raise - we're already cleaning up print(f"\nWarning: Error during disconnect: {e}") # Exit with error code if any write operations failed if write_failures: print(f"\nERROR: Failed to write to {len(write_failures)} register(s): {write_failures}") sys.exit(1) if __name__ == "__main__": main()