1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556 |
- import os
- import subprocess
- import time
- from datetime import datetime
- import json
- def main():
- script_dir = os.environ.get('SELENIUM_SCRIPT_DIR', '/selenium_bot/')
- interval = int(os.environ.get('SELENIUM_SCRIPT_INTERVAL', '10'))
- forks = int(os.environ.get('SELENIUM_SCRIPT_FORKS', '1'))
- chromedriver_path = os.environ.get('CHROMEDRIVER_PATH', '/usr/bin/chromedriver')
- while True:
- scripts = [f for f in os.listdir(script_dir) if f.endswith('.py')]
- runs = []
- for script in scripts:
- for i in range(1, forks + 1):
- proc_env = os.environ.copy()
- proc_env['SELENIUM_FORK_NUM'] = str(i)
- proc_env['PATH'] = f"{proc_env['PATH']}:{chromedriver_path}"
- proc = subprocess.Popen(
- ["pytest", f"{script_dir}/{script}"],
- env=proc_env
- )
- runs.append({
- "script": script,
- "fork": i,
- "start_time": datetime.now().timestamp(),
- "end_time": None,
- "exit_status": None,
- "pid": proc.pid
- })
- time.sleep(interval)
- for run in runs:
- if run["end_time"] is not None:
- continue
- proc = subprocess.Popen(
- ["ps", "-o", "etime=", "-o", "stat=", "-p", str(run["pid"])],
- stdout=subprocess.PIPE
- )
- out, _ = proc.communicate()
- if proc.returncode == 0:
- run["current_time"] = out.decode().strip().split(" ")[0]
- run["exit_status"] = out.decode().strip().split(" ")[1]
- run["end_time"] = datetime.now().timestamp()
- else:
- run["current_time"] = None
- run["exit_status"] = "ERROR"
- run["end_time"] = datetime.now().timestamp()
- # Print the results as JSON
- print(json.dumps(runs, indent=4, default=str))
- if __name__ == '__main__':
- main()
|