Source code for resomapper.processing.processing

import os
import json
import resomapper.core.utils as ut
from resomapper.core.misc import NoModalsSelectedError
from resomapper.core.misc import NotStudiesToProcessError
import nibabel as nib
import numpy as np
from dipy.io.gradients import read_bvals_bvecs

from resomapper.core.misc import auto_innited_logger as lggr
import resomapper.model_fitting.Tmapfit as tfit
import resomapper.model_fitting.DTIfit as dtifit
import resomapper.model_fitting.MTRfit as mtrfit

import warnings

warnings.filterwarnings("ignore")

#### COMMON UTILS ####


[docs] def get_studies_to_process(path, modals_to_process, auto_mode=False): sourcedata_folder = os.path.join(path, "sourcedata") derivatives_folder = os.path.join(path, "derivatives") studies_to_process = {} if len(modals_to_process) == 0: raise NoModalsSelectedError for study_folder in os.listdir(sourcedata_folder): if ut.is_folder_and_not_occult(os.path.join(sourcedata_folder, study_folder)): studies_to_process[study_folder] = [] for study_bids_folder in os.listdir( os.path.join(sourcedata_folder, study_folder) ): if ut.is_folder_and_not_occult( os.path.join(sourcedata_folder, study_folder, study_bids_folder) ): for data_file in os.listdir( os.path.join(sourcedata_folder, study_folder, study_bids_folder) ): if ut.is_nii(data_file): modal = ut.get_modality_nii_acq(data_file) if modal in modals_to_process: studies_to_process[study_folder].append( (modal, data_file) ) acqs_to_process = {} for study in studies_to_process: acqs_to_process[study] = [ (x[0], ut.get_acq(x[1])) for x in studies_to_process[study] ] acqs_to_process[study] = list(set(acqs_to_process[study])) if not any_studies_in_dict(studies_to_process): raise NotStudiesToProcessError else: if not os.path.exists(derivatives_folder): # No studies have already been processed os.mkdir(derivatives_folder) return acqs_to_process else: # Some studies might have already been processed processed_acqs = {} for study_folder in os.listdir(derivatives_folder): if ut.is_folder_and_not_occult( os.path.join(derivatives_folder, study_folder) ): processed_acqs[study_folder] = [] for modal_folder in os.listdir( os.path.join(derivatives_folder, study_folder) ): if ut.is_folder_and_not_occult( os.path.join(derivatives_folder, study_folder, modal_folder) ): for data_file in os.listdir( os.path.join( derivatives_folder, study_folder, modal_folder ) ): if ut.is_nii(data_file) and "processedmap" in data_file: processed_acqs[study_folder].append( ( ut.get_modality_nii_map(data_file), ut.get_acq(data_file), ) ) processed_acqs[study_folder] = list( set(processed_acqs[study_folder]) ) coincident_processed_acqs = {} for study, acqs in processed_acqs.items(): if study in acqs_to_process: filtered_list = [ acq for acq in acqs if acq in acqs_to_process[study] ] if filtered_list: coincident_processed_acqs[study] = filtered_list if any_studies_in_dict(coincident_processed_acqs): if auto_mode is False: acqs_to_process = choose_if_reprocess( acqs_to_process, coincident_processed_acqs ) if not any_studies_in_dict(acqs_to_process): raise NotStudiesToProcessError return acqs_to_process
[docs] def any_studies_in_dict(studies_dict): for array in studies_dict.values(): if array: return True return False
[docs] def choose_if_reprocess(acqs_to_process, processed_acqs): print( f"\n{lggr.warn}Some studies have already been processed. Please choose if you want to reprocess them or not." ) print("Already processed studies:") i = 0 index_map = {} for study in processed_acqs: if processed_acqs[study] != []: print(f"\nStudy - {study}:") for x in processed_acqs[study]: print(f"({i}) - Modality: {x[0]}, Acquisition: {x[1]}") index_map[i] = (study, x) i += 1 question = "Select the option you prefer." options = { "a": "Process again all studies (previous results will be overwritten).", # "s": "Select which studies to reprocess.", "n": "Do not reprocess any studies already processed.", } selection = ut.ask_user_options(question, options) if selection == "a": return acqs_to_process elif selection == "n": new_acqs_to_process = {} for study in acqs_to_process: if study in processed_acqs: new_acqs_to_process[study] = [ x for x in acqs_to_process[study] if x not in processed_acqs[study] ] else: new_acqs_to_process[study] = acqs_to_process[study] return new_acqs_to_process
# TODO: give the option to select some # elif selection == "s": # pass
[docs] def cli_ask_indexes_to_rm(n_max, map_type, n_basal=None, n_b_val=None): if map_type == "T1": text_to_remove = "repetition times" elif "T2" in map_type: text_to_remove = "echo times" elif map_type == "DTI": text_to_remove = "directions" while True: try: n_dirs_to_rm = int( input( f"\n{lggr.ask}How many {text_to_remove} do you want to remove?" f"\n{lggr.pointer}" ) ) if 0 <= n_dirs_to_rm < n_max: # n_dirs = n_max - n_dirs_to_rm break else: print(f"{lggr.error}Please enter a number between 0 and {int(n_max)}.") except ValueError: print(f"{lggr.error}Please enter only integer numbers.") indexes_to_rm = [] if n_dirs_to_rm != 0: print(f"\n{lggr.info}{n_dirs_to_rm} {text_to_remove} will be removed.") print( f"\n{lggr.ask}Please specify the index of the {text_to_remove} to remove " f"(from 1 to {n_max}, one at a time)." ) dirs_to_rm = [] for i in range(n_dirs_to_rm): while True: try: temp = int(input(f"({i+1}) {lggr.pointer}")) if (1 <= temp <= n_max) and (temp not in dirs_to_rm): dirs_to_rm.append(temp) break elif temp in dirs_to_rm: print(f"{lggr.error}You have already specified that number.") else: print( f"{lggr.error}You must enter a number between " f"1 and {n_max}." ) except ValueError: print(f"{lggr.error}Please enter integer numbers.") if map_type == "DTI": for i_dir in dirs_to_rm: # Index of the first bvalue for one direction temp = [i_dir + n_basal - 1 + (n_b_val - 1) * (i_dir - 1)] # Add the next indexes for the same direction (the rest of bvalues) temp.extend(temp[-1] + 1 for _ in range(n_b_val - 1)) indexes_to_rm += temp else: indexes_to_rm = [index - 1 for index in dirs_to_rm] # Sort to remove up to down indexes_to_rm.sort(reverse=True) return indexes_to_rm
#### Tmap fitting ####
[docs] def process_Tmap(nifti_file_path, mask_nifti_path, T_type, ask_remove_times=False): study_nii = nib.load(nifti_file_path) nii_data = study_nii.get_fdata() json_file_path = nifti_file_path.replace(".nii.gz", ".json") with open(json_file_path) as f: json_info = json.load(f) if T_type == "T1": times_array = json_info["RepetitionTime"] elif "T2" in T_type: times_array = json_info["EchoTime"] if np.shape(nii_data)[3] != len(times_array): print( f"\n{lggr.error}Nifti shape ({np.shape(nii_data)}) does not " f"match the length of the times array ({len(times_array)}). " "Fitting aborted." ) return else: if ask_remove_times: nifti_file_path, times_array = cli_Tmap_remove_times( nifti_file_path, json_file_path, T_type ) output_basename = nifti_file_path.replace(".nii.gz", "") tfit.Tmapfit_image( nifti_file_path, times_array, output_basename, T_type, non_linear_fitting=True, ncpu=None, mask_nifti=mask_nifti_path, ) return output_basename
[docs] def cli_Tmap_remove_times(nifti_file_path, json_file_path, T_type): with open(json_file_path) as f: json_info = json.load(f) if T_type == "T1": text_to_rm = "repetition times" times_array = json_info["RepetitionTime"] elif "T2" in T_type: text_to_rm = "echo times" times_array = json_info["EchoTime"] if ut.ask_user( f"Do you want to remove any {text_to_rm} from the original acquisition?" ): if "_preproc" not in nifti_file_path: new_nifti_file_path = nifti_file_path.replace(".nii.gz", "_preproc.nii.gz") new_json_file_path = json_file_path.replace(".json", "_preproc.json") else: new_nifti_file_path = nifti_file_path new_json_file_path = json_file_path study_nii = nib.load(nifti_file_path) nii_data = study_nii.get_fdata() indexes_to_rm = cli_ask_indexes_to_rm(len(times_array), T_type) nii_data = np.delete(nii_data, indexes_to_rm, axis=3) nii_ima = nib.Nifti1Image( nii_data.astype(np.float64), study_nii.affine, study_nii.header ) nib.save(nii_ima, new_nifti_file_path) times_array = np.delete(times_array, indexes_to_rm) if T_type == "T1": json_info["RepetitionTime"] = times_array.tolist() elif "T2" in T_type: json_info["EchoTime"] = times_array.tolist() with open(new_json_file_path, "w") as f: json.dump(json_info, f, indent=4) return new_nifti_file_path, times_array else: return nifti_file_path, times_array
#### MTRmap fitting ####
[docs] def process_MTRmap(mton_nifti_file_path, mtoff_nifti_file_path, mask_nifti_path=None): mtr_map_output_path = mton_nifti_file_path.replace( ".nii.gz", "_MTR-processedmap.nii.gz" ) mtr_map_output_path = mtr_map_output_path.replace("_MTon", "") mton_nii = nib.load(mton_nifti_file_path) mton_nii_data = mton_nii.get_fdata() mtoff_nii = nib.load(mtoff_nifti_file_path) mtoff_nii_data = mtoff_nii.get_fdata() mtrfit.check_slices_MT_images() if mask_nifti_path is not None: mask_nii = nib.load(mask_nifti_path) mask_nii_data = mask_nii.get_fdata() mton_nii_data = mton_nii_data * mask_nii_data mtoff_nii_data = mtoff_nii_data * mask_nii_data mtrfit.check_slopes_MT_images() mtr_map_data = mtrfit.compute_MTR_map(mton_nii_data, mtoff_nii_data) mtr_map_nii = nib.Nifti1Image( mtr_map_data.astype(np.float64), mton_nii.affine, mton_nii.header ) nib.save(mtr_map_nii, mtr_map_output_path)
#### DWI-DTI fitting ####
[docs] def process_DTI( nifti_file_path, mask_nifti_path, ask_user_info=False, ask_remove_dirs=False ): study_nii = nib.load(nifti_file_path) nii_data = study_nii.get_fdata() b_vals_fname = nifti_file_path.replace(".nii.gz", ".bval") b_vecs_fname = b_vals_fname.replace(".bval", ".bvec") b_vals, b_vecs = read_bvals_bvecs(b_vals_fname, b_vecs_fname) n_bval, n_basal, n_dirs = dtifit.count_basal_dirs_bvals(b_vals, b_vecs) if np.shape(nii_data)[3] != (n_basal + (n_bval * n_dirs)): print( f"\n{lggr.error}Nifti shape ({np.shape(nii_data)}) does not " f"match with the information of the .bval and .bvec files. " "Fitting aborted." ) return if ask_user_info: dtifit.cli_ask_dti_info(n_bval, n_basal, n_dirs) if ask_remove_dirs: nifti_file_path, b_vals, b_vecs = cli_DTI_remove_directions( nifti_file_path, b_vals, b_vecs, n_bval, n_basal, n_dirs ) print(f"\n{lggr.info}B values and gradient directions info:") dtifit.print_bvals_bvecs(b_vals, b_vecs) output_basename = nifti_file_path.replace(".nii.gz", "") if n_dirs < 6: print( f"\n{lggr.warn}To accurately fit the DTI model you need an acquisition " "of at least 6 gradient directions. Classic monoexponential ADC fitting " "will be performed for this study." ) # TODO: ADC fitting # return output_basename else: dtifit.process_dti( nifti_file_path, b_vals, b_vecs, output_basename, mask_fname=mask_nifti_path ) return output_basename
[docs] def cli_DTI_remove_directions(nifti_file_path, b_vals, b_vecs, n_bval, n_basal, n_dirs): if ut.ask_user( "Do you want to remove any directions from the original acquisition?" ): indexes_to_rm = cli_ask_indexes_to_rm( n_dirs, "DTI", n_basal=n_basal, n_b_val=n_bval ) if "_preproc" not in nifti_file_path: new_nifti_file_path = nifti_file_path.replace(".nii.gz", "_preproc.nii.gz") else: new_nifti_file_path = nifti_file_path bv_output_basename = new_nifti_file_path.replace(".nii.gz", "") study_nii = nib.load(nifti_file_path) nii_data = study_nii.get_fdata() nii_data = np.delete(nii_data, indexes_to_rm, axis=3) nii_ima = nib.Nifti1Image( nii_data.astype(np.float64), study_nii.affine, study_nii.header ) nib.save(nii_ima, new_nifti_file_path) b_vals = np.delete(b_vals, indexes_to_rm) b_vecs = np.delete(b_vecs, indexes_to_rm, axis=0) dtifit.save_bvals_bvecs(b_vals, b_vecs, bv_output_basename) return new_nifti_file_path, b_vals, b_vecs else: return nifti_file_path, b_vals, b_vecs