Coverage for mindsdb / utilities / ps.py: 16%

56 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import sys 

2import time 

3from collections import namedtuple 

4import psutil 

5 

6 

7def get_child_pids(pid): 

8 p = psutil.Process(pid=pid) 

9 return p.children(recursive=True) 

10 

11 

12def net_connections(): 

13 """Cross-platform psutil.net_connections like interface""" 

14 if sys.platform.lower().startswith("linux"): 

15 return psutil.net_connections() 

16 

17 all_connections = [] 

18 Pconn = None 

19 for p in psutil.process_iter(["pid"]): 

20 try: 

21 process = psutil.Process(p.pid) 

22 connections = process.net_connections() 

23 if connections: 

24 for conn in connections: 

25 # Adding pid to the returned instance 

26 # for consistency with psutil.net_connections() 

27 if Pconn is None: 

28 fields = list(conn._fields) 

29 fields.append("pid") 

30 _conn = namedtuple("Pconn", fields) 

31 for attr in conn._fields: 

32 setattr(_conn, attr, getattr(conn, attr)) 

33 _conn.pid = p.pid 

34 all_connections.append(_conn) 

35 

36 except (psutil.AccessDenied, psutil.ZombieProcess, psutil.NoSuchProcess): 

37 pass 

38 return all_connections 

39 

40 

41def is_port_in_use(port_num): 

42 """Check does any of child process uses specified port.""" 

43 parent_process = psutil.Process() 

44 child_pids = [x.pid for x in parent_process.children(recursive=True)] 

45 conns = net_connections() 

46 portsinuse = [x.laddr[1] for x in conns if x.pid in child_pids and x.status == "LISTEN"] 

47 portsinuse.sort() 

48 return int(port_num) in portsinuse 

49 

50 

51def wait_func_is_true(func, timeout, *args, **kwargs): 

52 start_time = time.time() 

53 

54 result = func(*args, **kwargs) 

55 while result is False and (time.time() - start_time) < timeout: 

56 time.sleep(2) 

57 result = func(*args, **kwargs) 

58 

59 return result 

60 

61 

62def wait_port(port_num, timeout): 

63 return wait_func_is_true(func=is_port_in_use, timeout=timeout, port_num=port_num) 

64 

65 

66def get_listen_ports(pid): 

67 try: 

68 p = psutil.Process(pid) 

69 cons = p.net_connections() 

70 cons = [x.laddr.port for x in cons] 

71 except Exception: 

72 return [] 

73 return cons 

74 

75 

76def is_pid_listen_port(pid, port): 

77 ports = get_listen_ports(pid) 

78 return int(port) in ports