Source code for Module.Quality_Control_torch

# Yuncong Ma, 6/5/2024
# Quality control module of pNet using PyTorch

#########################################
# Packages

# to disable warnings
import logging
logging.getLogger('matplotlib.font_manager').setLevel(level=logging.CRITICAL)
# added by Yong Fan on July 23, 2024

# other functions of pNet
import numpy as np

from Module.Data_Input import *
from Basic.Matrix_Computation import *
from Module.Quality_Control import *
from Basic.Cluster_Computation import submit_bash_job


[docs]def run_quality_control_torch(dir_pnet_result: str): """ Run the quality control module, which computes spatial correspondence and functional coherence The quality control result folder has consistent sub-folder organization as Personalized_FN Quality control results of each scan or combined scans are stored into sub-folders A single matlab file named Result.mat stores all quantitative values, including Spatial_Correspondence: spatial correspondence between pFNs and gFNs Delta_Spatial_Correspondence: the difference between spatial correspondence of matched pFNs-gFNs and miss-matched pFNs-gFNS Miss_Match: A 2D matrix, [N, 2], each row specifies which pFN is miss matched to a different gFN Functional_Coherence: weighted average of Pearson correlation between time series of pFNs and all nodes Functional_Coherence_Control: weighted average of Pearson correlation between time series of gFNs and all nodes A final report in txt format saved in the root directory of quality control folder It summaries the number of miss matched FNs for each failed scan :param dir_pnet_result: the directory of pNet result folder :return: None Yuncong Ma, 12/20/2023 """ # Setup sub-folders in pNet result dir_pnet_dataInput, dir_pnet_FNC, dir_pnet_gFN, dir_pnet_pFN, dir_pnet_QC, _ = setup_result_folder(dir_pnet_result) # Log file file_Final_Report = open(os.path.join(dir_pnet_QC, 'Final_Report.txt'), 'w') print('\nStart QC at ' + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) + '\n', file=file_Final_Report, flush=True) # Description of QC if file_Final_Report is not None: print_description_QC(file_Final_Report) setting = load_json_setting(os.path.join(dir_pnet_dataInput, 'Setting.json')) Data_Type = setting['Data_Type'] Data_Format = setting['Data_Format'] setting = load_json_setting(os.path.join(dir_pnet_FNC, 'Setting.json')) combineScan = setting['Combine_Scan'] dataPrecision = setting['Computation']['dataPrecision'] # Information about scan list file_scan = os.path.join(dir_pnet_dataInput, 'Scan_List.txt') file_subject_ID = os.path.join(dir_pnet_dataInput, 'Subject_ID.txt') file_subject_folder = os.path.join(dir_pnet_dataInput, 'Subject_Folder.txt') list_scan = np.array([line.replace('\n', '') for line in open(file_scan, 'r')]) list_subject_ID = np.array([line.replace('\n', '') for line in open(file_subject_ID, 'r')]) subject_ID_unique = np.unique(list_subject_ID) N_Subject = subject_ID_unique.shape[0] list_subject_folder = np.array([line.replace('\n', '') for line in open(file_subject_folder, 'r')]) list_subject_folder_unique = np.unique(list_subject_folder) # Load gFNs gFN = load_matlab_single_array(os.path.join(dir_pnet_gFN, 'FN.mat')) # [dim_space, K] if Data_Type == 'Volume': Brain_Mask = load_brain_template(os.path.join(dir_pnet_dataInput, 'Brain_Template.json.zip'))['Brain_Mask'] gFN = reshape_FN(gFN, dataType=Data_Type, Brain_Mask=Brain_Mask) # data precision torch_float, torch_eps = set_data_precision_torch(dataPrecision) # compute spatial correspondence and functional coherence for each scan if combineScan == 0: N_pFN = list_scan.shape[0] else: N_pFN = subject_ID_unique.shape[0] # Compute quality control measurement for each scan or scans combined flag_QC = 0 for i in range(N_pFN): dir_pFN_indv = os.path.join(dir_pnet_pFN, list_subject_folder_unique[i]) pFN = load_matlab_single_array(os.path.join(dir_pFN_indv, 'FN.mat')) if Data_Type == 'Volume': pFN = reshape_FN(pFN, dataType=Data_Type, Brain_Mask=Brain_Mask) # Get the scan list file_scan_list = os.path.join(dir_pFN_indv, 'Scan_List.txt') # Load the data if Data_Type == 'Surface': scan_data, _, _ = load_fmri_scan(file_scan_list, dataType=Data_Type, dataFormat=Data_Format, Reshape=True, Normalization=None) elif Data_Type == 'Volume': scan_data, _, _ = load_fmri_scan(file_scan_list, dataType=Data_Type, dataFormat=Data_Format, Reshape=True, Brain_Mask=Brain_Mask, Normalization=None) elif Data_Type == 'Surface-Volume': scan_data, _, _ = load_fmri_scan(file_scan_list, dataType=Data_Type, dataFormat=Data_Format, Reshape=True, Normalization=None) else: raise ValueError('Unknown data type: ' + Data_Type) scan_data = torch.tensor(scan_data, dtype=torch_float) # Compute quality control measurement Spatial_Correspondence, Delta_Spatial_Correspondence, Miss_Match, Functional_Coherence, Functional_Coherence_Control =\ compute_quality_control_torch(scan_data, gFN, pFN, dataPrecision=dataPrecision, logFile=None) # Finalize results Result = {'Spatial_Correspondence': Spatial_Correspondence, 'Delta_Spatial_Correspondence': Delta_Spatial_Correspondence, 'Miss_Match': Miss_Match, 'Functional_Coherence': Functional_Coherence, 'Functional_Coherence_Control': Functional_Coherence_Control} # Report the failed scans in the final report if Miss_Match.shape[0] > 0: flag_QC += 1 print(' ' + str(Miss_Match.shape[0]) + ' miss matched FNs in sub folder: ' + list_subject_folder_unique[i], # edited by hm, edit before: list_subject_folder[i], file=file_Final_Report, flush=True) # Save results dir_pFN_indv_QC = os.path.join(dir_pnet_QC, list_subject_folder_unique[i]) # edited by hm, edit before: dir_pFN_indv_QC = os.path.join(dir_pnet_QC, list_subject_folder[i]) if not os.path.exists(dir_pFN_indv_QC): os.makedirs(dir_pFN_indv_QC) scipy.io.savemat(os.path.join(dir_pFN_indv_QC, 'Result.mat'), {'Result': Result}, do_compression=True) # Finish the final report if flag_QC == 0: print(f'\nSummary\n All {N_pFN} scans passed QC\n' f' This ensures that personalized FNs show highest spatial similarity to their group-level counterparts\n', file=file_Final_Report, flush=True) else: print(f'\nSummary\n Number of failed scans = {flag_QC}\n' f' This means those scans have at least one pFN show higher spatial similarity to a different group-level FN\n', file=file_Final_Report, flush=True) # Generate visualization visualize_quality_control(dir_pnet_result) print('\nFinished QC at ' + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) + '\n', file=file_Final_Report, flush=True) file_Final_Report.close()
[docs]def compute_quality_control_torch(scan_data, gFN, pFN, dataPrecision='double', logFile=None): """ Compute quality control measurements, including spatial correspondence and functional coherence The spatial correspondence ensures one-to-one match between gFNs and pFNs The functional coherence ensures that pFNs gives better data fitting :param scan_data: 2D matrix, [dim_time, dim_space], np.ndarray or torch.Tensor :param gFN: 2D matrix, [dim_space, K], K is the number of FNs, np.ndarray or torch.Tensor :param pFN: 2D matrix, [dim_space, K], K is the number of FNs, np.ndarray or torch.Tensor :param dataPrecision: 'double' or 'single' :param logFile: None :return: Spatial_Correspondence, Delta_Spatial_Correspondence, Miss_Match, Functional_Coherence, Functional_Coherence_Control Outputs are numpy.ndarray Spatial correspondence is a 2D symmetric matrix [K, K], which measures the spatial correlation between gFNs and pFNs Delta_Spatial_Correspondence is a vector [K, ], which measures minimum difference of spatial correlation between matched and unmatched gFNs and pFNs Miss_Match is a 2D matrix [N, 2]. Each row notes a pair of miss-matched gFN and pFN. Functional_Coherence is a vector [K, ], which measures the weighted average correlation between node-wise fMRI signal in scan_data and time series of pFNs Functional_Coherence_Control is a vector [K, ], which measures the weighted average correlation between node-wise fMRI signal in scan_data and time series of gFNs Yuncong Ma, 2/9/2024 """ # data precision torch_float, torch_eps = set_data_precision_torch(dataPrecision) if not isinstance(scan_data, torch.Tensor): scan_data = torch.tensor(scan_data, dtype=torch_float) else: scan_data = scan_data.type(torch_float) if not isinstance(gFN, torch.Tensor): gFN = torch.tensor(gFN, dtype=torch_float) else: gFN = gFN.type(torch_float) if not isinstance(pFN, torch.Tensor): pFN = torch.tensor(pFN, dtype=torch_float) else: pFN = pFN.type(torch_float) # Spatial correspondence K = gFN.shape[1] temp = mat_corr_torch(gFN, pFN, dataPrecision=dataPrecision) s_c = temp.cpu().numpy().copy() QC_Spatial_Correspondence = torch.clone(torch.diag(temp)) temp -= torch.diag(2 * torch.ones(K)) # set diagonal values to lower than -1 QC_Spatial_Correspondence_Control = torch.max(temp, dim=0)[0] QC_Delta_Sim = QC_Spatial_Correspondence - QC_Spatial_Correspondence_Control # Convert back to Numpy array Spatial_Correspondence = QC_Spatial_Correspondence.cpu().numpy() Delta_Spatial_Correspondence = QC_Delta_Sim.cpu().numpy() # Miss match between gFNs and pFNs # Index starts from 1 if np.min(Delta_Spatial_Correspondence) >= 0: Miss_Match = np.empty((0,)) else: ps = np.where(Delta_Spatial_Correspondence < 0)[0] #ps2 = np.asarray(np.argmax(Spatial_Correspondence, axis=0)).reshape(1, -1)[:, 0] ps2 = np.asarray(np.argmax(s_c, axis=0)) Miss_Match = np.concatenate((ps[:, np.newaxis] + 1, ps2[ps[:], np.newaxis] + 1), axis=1) # Functional coherence # in case of negative spatial weightings, use np.abs pFN_signal = scan_data @ pFN / torch.sum(np.abs(pFN), dim=0, keepdims=True) Corr_FH = mat_corr_torch(pFN_signal, scan_data, dataPrecision=dataPrecision) Corr_FH[torch.isnan(Corr_FH)] = 0 # in case of zero signals Functional_Coherence = torch.sum(Corr_FH.T * pFN, dim=0) / torch.sum(np.abs(pFN), dim=0) # Use gFN as control gFN_signal = scan_data @ gFN / torch.sum(np.abs(pFN), dim=0, keepdims=True) Corr_FH = mat_corr_torch(gFN_signal, scan_data, dataPrecision=dataPrecision) Corr_FH[torch.isnan(Corr_FH)] = 0 # in case of zero signals Functional_Coherence_Control = torch.sum(Corr_FH.T * gFN, dim=0) / torch.sum(np.abs(gFN), dim=0) # Convert back to Numpy array Functional_Coherence = Functional_Coherence.numpy() Functional_Coherence_Control = Functional_Coherence_Control.numpy() return Spatial_Correspondence, Delta_Spatial_Correspondence, Miss_Match, Functional_Coherence, Functional_Coherence_Control
[docs]def run_quality_control_torch_cluster(dir_pnet_result: str): """ Run the quality control module in cluster computation :param dir_pnet_result: the directory of pNet result folder :return: None Yuncong Ma, 6/5/2024 """ # Setup sub-folders in pNet result dir_pnet_dataInput, dir_pnet_FNC, dir_pnet_gFN, dir_pnet_pFN, dir_pnet_QC, _ = setup_result_folder(dir_pnet_result) # load setting settingCluster = load_json_setting(os.path.join(dir_pnet_dataInput, 'Cluster_Setting.json')) setting = {'Cluster': settingCluster} # Log file print('\nStart QC at ' + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) + '\n', flush=True) settingCluster = load_json_setting(os.path.join(dir_pnet_dataInput, 'Cluster_Setting.json')) setting = {'Cluster': settingCluster} # Information about scan list file_subject_folder = os.path.join(dir_pnet_dataInput, 'Subject_Folder.txt') list_subject_folder = np.array([line.replace('\n', '') for line in open(file_subject_folder, 'r')]) list_subject_folder_unique = np.unique(list_subject_folder) nFolder = list_subject_folder_unique.shape[0] # submit jobs memory = setting['Cluster']['computation_resource']['memory_qc'] n_thread = setting['Cluster']['computation_resource']['thread_qc'] for rep in range(1, 1+nFolder): time.sleep(0.1) dir_indv = os.path.join(dir_pnet_QC, list_subject_folder_unique[rep-1]) os.makedirs(dir_indv, exist_ok=True) if os.path.isfile(os.path.join(dir_indv, 'Result.mat')): continue submit_bash_job(dir_pnet_result, python_command=f'pnet.compute_quality_control_torch_cluster(dir_pnet_result,{rep})', memory=memory, n_thread=n_thread, bashFile=os.path.join(dir_indv, 'Cluster_job_bootstrap.sh'), pythonFile=os.path.join(dir_indv, 'Cluster_job_bootstrap.py'), logFile=os.path.join(dir_indv, 'Cluster_job_bootstrap.log') ) # check completion wait_time = 30 flag_complete = np.zeros(nFolder) report_interval = 20 Count = 0 while np.sum(flag_complete) < nFolder: time.sleep(wait_time) Count += 1 if Count % report_interval == 0: print(f'--> Found {np.sum(flag_complete)} finished jobs out of {nFolder} at ' + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())), flush=True) for rep in range(1, 1+nFolder): if flag_complete[rep-1] == 0 and os.path.isfile(os.path.join(dir_pnet_QC, list_subject_folder_unique[rep-1], 'Result.mat')): flag_complete[rep-1] = 1 # gather results and generate final report file_Final_Report = open(os.path.join(dir_pnet_QC, 'Final_Report.txt'), 'w') print('\nStart QC at ' + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) + '\n', file=file_Final_Report, flush=True) # Description of QC if file_Final_Report is not None: print_description_QC(file_Final_Report) flag_QC = 0 for i in range(1, 1+nFolder): Result = load_matlab_single_variable(os.path.join(dir_pnet_QC, list_subject_folder_unique[i-1], 'Result.mat')) if Result['Miss_Match'][0, 0].shape[0] > 0: flag_QC += 1 print(' ' + str(Result['Miss_Match'][0, 0].shape[0]) + ' miss matched FNs in sub folder: ' + list_subject_folder_unique[i-1], file=file_Final_Report, flush=True) # Finish the final report if flag_QC == 0: print(f'\nSummary\n All {nFolder} scans passed QC\n' f' This ensures that personalized FNs show highest spatial similarity to their group-level counterparts\n', file=file_Final_Report, flush=True) else: print(f'\nSummary\n Number of failed scans = {flag_QC}\n' f' This means those scans have at least one pFN show higher spatial similarity to a different group-level FN\n', file=file_Final_Report, flush=True) file_Final_Report.close() # Generate visualization visualize_quality_control(dir_pnet_result) print('\nFinished QC at ' + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) + '\n', flush=True)
[docs]def compute_quality_control_torch_cluster(dir_pnet_result: str, jobID=1): """ Run the QC in cluster computation :param dir_pnet_result: directory of pNet result folder :param jobID: jobID starting from 1 :return: None Yuncong Ma, 12/1/2023 """ # Setup sub-folders in pNet result dir_pnet_dataInput, dir_pnet_FNC, dir_pnet_gFN, dir_pnet_pFN, dir_pnet_QC, _ = setup_result_folder(dir_pnet_result) print('\nStart QC at ' + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) + '\n', flush=True) setting = load_json_setting(os.path.join(dir_pnet_dataInput, 'Setting.json')) Data_Type = setting['Data_Type'] Data_Format = setting['Data_Format'] setting = load_json_setting(os.path.join(dir_pnet_FNC, 'Setting.json')) dataPrecision = setting['Computation']['dataPrecision'] # Information about scan list file_subject_folder = os.path.join(dir_pnet_dataInput, 'Subject_Folder.txt') list_subject_folder = np.array([line.replace('\n', '') for line in open(file_subject_folder, 'r')]) list_subject_folder_unique = np.unique(list_subject_folder) dir_pFN_indv = os.path.join(dir_pnet_pFN, list_subject_folder_unique[jobID-1]) # Load gFNs gFN = load_matlab_single_array(os.path.join(dir_pnet_gFN, 'FN.mat')) # [dim_space, K] if Data_Type == 'Volume': Brain_Mask = load_brain_template(os.path.join(dir_pnet_dataInput, 'Brain_Template.json.zip'))['Brain_Mask'] gFN = reshape_FN(gFN, dataType=Data_Type, Brain_Mask=Brain_Mask) # data precision torch_float, torch_eps = set_data_precision_torch(dataPrecision) pFN = load_matlab_single_array(os.path.join(dir_pFN_indv, 'FN.mat')) if Data_Type == 'Volume': pFN = reshape_FN(pFN, dataType=Data_Type, Brain_Mask=Brain_Mask) # Get the scan list file_scan_list = os.path.join(dir_pFN_indv, 'Scan_List.txt') # Load the data if Data_Type == 'Surface': scan_data, _, _ = load_fmri_scan(file_scan_list, dataType=Data_Type, dataFormat=Data_Format, Reshape=True, Normalization=None) elif Data_Type == 'Volume': scan_data, _, _ = load_fmri_scan(file_scan_list, dataType=Data_Type, dataFormat=Data_Format, Reshape=True, Brain_Mask=Brain_Mask, Normalization=None) elif Data_Type == 'Surface-Volume': scan_data, _, _ = load_fmri_scan(file_scan_list, dataType=Data_Type, dataFormat=Data_Format, Reshape=True, Normalization=None) else: raise ValueError('Unknown data type: ' + Data_Type) scan_data = torch.tensor(scan_data, dtype=torch_float) # Compute quality control measurement Spatial_Correspondence, Delta_Spatial_Correspondence, Miss_Match, Functional_Coherence, Functional_Coherence_Control =\ compute_quality_control_torch(scan_data, gFN, pFN, dataPrecision=dataPrecision, logFile=None) # Finalize results Result = {'Spatial_Correspondence': Spatial_Correspondence, 'Delta_Spatial_Correspondence': Delta_Spatial_Correspondence, 'Miss_Match': Miss_Match, 'Functional_Coherence': Functional_Coherence, 'Functional_Coherence_Control': Functional_Coherence_Control} # Save results dir_pFN_indv_QC = os.path.join(dir_pnet_QC, list_subject_folder_unique[jobID-1]) os.makedirs(dir_pFN_indv_QC, exist_ok=True) scipy.io.savemat(os.path.join(dir_pFN_indv_QC, 'Result.mat'), {'Result': Result}, do_compression=True)