#!/usr/bin/python3 ''' W3SA inventory agent (c) 2025 Jan ONDREJ (SAL) This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. Usage: w3sa_agent.py http://inventory.server/url [default_group] ''' import sys import os import subprocess import re import json import gzip from urllib import request from urllib.error import HTTPError agent_version = "lin-1.0" def readfile(fn): return open(fn).read().strip() def json_popen(cmd): f = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE) if f.wait()==0: return json.load(f.stdout) return {} def parse_section(section, separator): for row in section: if separator in row: key, value = row.strip().split(separator, 1) if value.endswith(" kB"): yield key, int(value.strip().split(" ")[0]) * 1024 else: yield key.strip(), value.strip('" \n') def parse_file(filename, separator="="): for key, value in parse_section(open(filename).readlines(), separator): yield key, value def get_cpuinfo(): cpuinfo = open("/proc/cpuinfo").read() for cpu in cpuinfo.strip().split("\n\n"): yield dict( parse_section(cpu.strip().split("\n"), separator=": ") ) class dmidecoder(): def __init__(self): data = {} handle = False multi_sections = [ "Processor Information", "Physical Memory Array", "Memory Device", "Memory Array Mapped Address", "Cache Information", "Port Connector Information", "System Slot Information", "System Power Supply", "Onboard Device", "OEM-specific Type" ] section_data = {} for row in os.popen("/usr/sbin/dmidecode").readlines(): row = row.rstrip() if row.startswith("Handle "): handle = True continue if not row: handle = False if section_data: if section_name in multi_sections: if not section_name in data: data[section_name] = [] data[section_name].append(section_data) else: data[section_name] = section_data continue if row.startswith("\t\t"): subsection_data.append(row.strip()) elif row.startswith("\t"): if row.endswith(":"): # subsection subsection_name = row.strip() subsection_data = [] else: # section data key, value = row.strip().split(": ", 1) section_data[key] = value elif handle: section_name = row.strip() section_data = {} subsection_name = "" subsection_data = [] self.data = data def mem_info(self): if "Memory Device" not in self.data: meminfo = dict(parse_file("/proc/meminfo", ": ")) yield dict(Capacity=meminfo.get("MemTotal")) return units = dict(kb=1024, mb=1024**2, gb=1024**3, tb=1024**3) for dev in self.data["Memory Device"]: if dev["Size"] == "No Module Installed": continue size, unit = dev["Size"].rsplit(" ", 1) size = int(size)*units[unit.lower()] yield { "Capacity": size, "PartNumber": dev["Type"]+" "+dev["Speed"], "SerialNumber": dev["Serial Number"] } def get_computer(self): computer = self.data.get("System Information") if not computer: return {} return dict( Vendor=computer["Manufacturer"], Name=computer["Product Name"] ) def get_card(self): card = self.data.get("Chassis Information") if not card: return {} return dict( Manufacturer=card["Manufacturer"], Version=card["Version"], SerialNumber=card["Serial Number"] ) def get_bios(self): bios = self.data.get("BIOS Information") if not bios: return {} return dict( Manufacturer=bios["Vendor"], Description=bios["Release Date"], Version=bios["Version"], SerialNumber=None ) def disk_drives(): js = json_popen("lsblk --json --nodeps --bytes") for dev in js.get("blockdevices", []): yield dict( Size=int(dev["size"]), Caption=dev["name"], Model=None ) def logical_disks(): col_names = "source,target,fstype,size,avail" f = os.popen( "LANG=C df -x tmpfs -x efivarfs -x devtmpfs" " -k -l --output="+col_names ) for row in f.readlines()[1:]: cols = re.split(r'\s+', row.strip()) yield dict( Name=cols[1], Description="(%s)" % cols[0], Filesystem=cols[2], Size=int(cols[3]) * 1024, FreeSpace=int(cols[4]) * 1024 ) def netdevs(): js = json_popen("/sbin/ip --json addr show") for dev in js: yield dict( Name=dev.get("ifname"), MACAddress=dev.get("address"), PhysicalAdapter=dev.get("link_type") in ["ether"] ) def packages(): ''' Get list of RPM packages ''' cmd = os.popen("rpm -qa --qf '%{NAME} %{VERSION}-%{RELEASE} %{ARCH}\n'") for pkg in cmd.readlines(): name, version, arch = pkg.strip().split(" ", 2) yield {"Name": name, "Version": version, "Arch": arch} os_info = dict(parse_file("/etc/os-release")) dmi = dmidecoder() data = dict( Version=agent_version, UUID=readfile("/sys/devices/virtual/dmi/id/product_uuid"), ComputerName=readfile("/etc/hostname"), Username=os.environ.get("USER"), Computer=dmi.get_computer(), OS=dict( Name=os_info.get("NAME")+" "+os_info.get("VERSION_ID"), Caption=os_info.get("PRETTY_NAME"), Version=os_info.get("VERSION"), MUILanguages=[os.environ.get("LANG")], MachineID=readfile("/etc/machine-id") ), CPU=[ dict( Name=cpu.get("model name"), NumberOfCores=cpu.get("cpu cores"), NumberOfThreads=cpu.get("siblings") ) for cpu in get_cpuinfo() ], Memory=list(dmi.mem_info()), DiskDrive=list(disk_drives()), LogicalDisk=list(logical_disks()), BIOS=dmi.get_bios(), Card=dmi.get_card(), Network=list(netdevs()), Package=list(packages()), dmidecode=dmi.data ) if len(sys.argv) < 2: print(__doc__) sys.exit() inventory_url = sys.argv[1] if len(sys.argv) > 2: data["_w3sa_default_group"] = sys.argv[2] if "--debug" in sys.argv: print(json.dumps(data, indent=2)) data = gzip.compress(json.dumps(data, separators=(",", ":")).encode("utf8")) try: response = request.urlopen(request.Request(inventory_url, data)) except HTTPError as err: print( "HTTP ERROR %s: %s\n%s" % ( err.code, err.reason, err.fp.read().decode("utf8") ), file=sys.stderr ) sys.exit(1) print("%s: %s" % (response.code, response.msg)) if response.code == 200: print(json.dumps(json.loads(response.read().decode("utf8")), indent=2))