Source code for Basic.Cluster_Computation

# Yuncong Ma, 2/12/2024
# pNet
# Provide functions to submit jobs to cluster environment


#########################################
# Packages
from datetime import datetime
import os
from sys import platform

from Module.Data_Input import write_json_setting, load_json_setting, setup_result_folder

#########################################


[docs]def setup_cluster(dir_pnet_result: str, dir_env: str, dir_pnet: str, dir_python: str, submit_command='qsub -terse -j y', thread_command='-pe threaded ', memory_command='-l h_vmem=', log_command='-o ', computation_resource=None or dict): """ Setup cluster environment and commands to submit jobs :param dir_pnet_result: directory of pNet result folder :param dir_env: directory of the desired virtual environment :param dir_pnet: directory of the pNet toolbox :param dir_python: absolute directory to the python folder, ex. /Users/YuncongMa/.conda/envs/pnet/bin/python :param submit_command: command to submit a cluster job :param thread_command: command to setup number of threads for each job :param memory_command: command to setup memory allowance for each job :param log_command: command to specify the logfile :param computation_resource: None or a dict which specifies the number of threads and memory for different processes :return: None Yuncong Ma, 2/12/2024 """ dir_pnet_dataInput, _, _, _, _, _ = setup_result_folder(dir_pnet_result) setting = {'dir_env': dir_env, 'dir_pnet': dir_pnet, 'dir_python': dir_python, 'submit_command': submit_command, 'thread_command': thread_command, 'memory_command': memory_command, 'log_command': log_command, 'computation_resource': computation_resource} write_json_setting(setting, os.path.join(dir_pnet_dataInput, 'Cluster_Setting.json'))
[docs]def submit_bash_job(dir_pnet_result: str, python_command: str or None, memory=50, n_thread=4, logFile=None, bashFile=None, pythonFile=None, create_python_file=True): """ submit a bash job to the desired cluster environment Generate bash and python files automatically :param dir_pnet_result: directory of pNet result folder :param python_command: the Python function to run, with dir_pnet_result as a preset variable :param memory: a real number in GB :param n_thread: number of threads to use :param logFile: full directory of a log file :param bashFile: full directory of the bash file to generate :param pythonFile: full directory of the python file to generate :param create_python_file: bool, create a new Python file or not :return: None Yuncong Ma, 2/12/2024 """ # load cluster setting dir_pnet_dataInput, _, _, _, _, _ = setup_result_folder(dir_pnet_result) setting = load_json_setting(os.path.join(dir_pnet_dataInput, 'Cluster_Setting.json')) dir_env = setting['dir_env'] dir_pnet = setting['dir_pnet'] dir_python = setting['dir_python'] submit_command = setting['submit_command'] thread_command = setting['thread_command'] memory_command = setting['memory_command'] log_command = setting['log_command'] # current date and time now = datetime.now() date_time = now.strftime("%m/%d/%Y, %H:%M:%S") # create a new bash file if os.path.isfile(bashFile): os.remove(bashFile) bashFile = open(bashFile, 'w') # header print('#!/bin/sh\n', file=bashFile, flush=True) print('# This bash script is to run a pNet job in the desired cluster environment', file=bashFile, flush=True) print(f'# created on {date_time}\n', file=bashFile, flush=True) print(f'# Use command to submit this job:\n# $ {submit_command} {thread_command}{n_thread} {memory_command}{memory} {log_command}{logFile} {bashFile.name}\n', file=bashFile, flush=True) print(f'source activate {dir_env}\n', file=bashFile, flush=True) print(r'echo -e "Start time : `date +%F-%H:%M:%S`\n" ', file=bashFile, flush=True) print(f'\n{dir_python} {pythonFile}\n', file=bashFile, flush=True) print(r'echo -e "Finished time : `date +%F-%H:%M:%S`\n" ', file=bashFile, flush=True) bashFile.close() bashFile = bashFile.name if create_python_file: # create a Python job file if os.path.isfile(pythonFile): os.remove(pythonFile) pythonFile = open(pythonFile, 'w') print('# This python file is to run a pNet job', file=pythonFile, flush=True) print(f'# created on {date_time}\n', file=pythonFile, flush=True) print('import sys\nimport os\n', file=pythonFile, flush=True) print(f"dir_pnet = '{dir_pnet}'", file=pythonFile, flush=True) #print(f"sys.path.append(os.path.join(dir_pnet, 'Python'))", file=pythonFile, flush=True) print(f"sys.path.append(dir_pnet)", file=pythonFile, flush=True) # modified by FY, 07/26/2024 #print('import pNet\n', file=pythonFile, flush=True) print('import pnet\n', file=pythonFile, flush=True) # mod by hm, 07/19/2024 print(f"dir_pnet_result = '{dir_pnet_result}'\n", file=pythonFile, flush=True) print(f"{python_command}\n", file=pythonFile, flush=True) pythonFile.close() # execute a shell command to submit a cluster job, only for linux based systems if platform == "linux": os.system(f'{submit_command} {thread_command}{n_thread} {memory_command}{memory} {log_command}{logFile} {bashFile}')