selenium_bot.py 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. import os
  2. import subprocess
  3. import time
  4. from datetime import datetime
  5. import json
  6. def main():
  7. script_dir = os.environ.get('SELENIUM_SCRIPT_DIR', '/selenium_bot/')
  8. interval = int(os.environ.get('SELENIUM_SCRIPT_INTERVAL', '10'))
  9. forks = int(os.environ.get('SELENIUM_SCRIPT_FORKS', '1'))
  10. chromedriver_path = os.environ.get('CHROMEDRIVER_PATH', '/usr/bin/chromedriver')
  11. while True:
  12. scripts = [f for f in os.listdir(script_dir) if f.endswith('.py')]
  13. runs = []
  14. for script in scripts:
  15. for i in range(1, forks + 1):
  16. proc_env = os.environ.copy()
  17. proc_env['SELENIUM_FORK_NUM'] = str(i)
  18. proc_env['PATH'] = f"{proc_env['PATH']}:{chromedriver_path}"
  19. proc = subprocess.Popen(
  20. ["pytest", f"{script_dir}/{script}"],
  21. env=proc_env
  22. )
  23. runs.append({
  24. "script": script,
  25. "fork": i,
  26. "start_time": datetime.now().timestamp(),
  27. "end_time": None,
  28. "exit_status": None,
  29. "pid": proc.pid
  30. })
  31. time.sleep(interval)
  32. for run in runs:
  33. if run["end_time"] is not None:
  34. continue
  35. proc = subprocess.Popen(
  36. ["ps", "-o", "etime=", "-o", "stat=", "-p", str(run["pid"])],
  37. stdout=subprocess.PIPE
  38. )
  39. out, _ = proc.communicate()
  40. if proc.returncode == 0:
  41. run["current_time"] = out.decode().strip().split(" ")[0]
  42. run["exit_status"] = out.decode().strip().split(" ")[1]
  43. run["end_time"] = datetime.now().timestamp()
  44. else:
  45. run["current_time"] = None
  46. run["exit_status"] = "ERROR"
  47. run["end_time"] = datetime.now().timestamp()
  48. # Print the results as JSON
  49. print(json.dumps(runs, indent=4, default=str))
  50. if __name__ == '__main__':
  51. main()