import os
import re
import json
import pydicom
from tqdm import tqdm
from pydicom import dcmread
from resomapper.core.misc import NotStudiesToConvertError
from resomapper.core.misc import auto_innited_logger as lggr
import resomapper.core.utils as ut
import resomapper.core.misc as msc
import resomapper.format_conversion.DICOM_conversion as dcmconv
import warnings
warnings.filterwarnings("ignore")
[docs]
def get_modality_MRS(params, conditions_dict=msc.conditions_dict_MRS):
for modality in conditions_dict:
if is_modality_MRS(modality, params, conditions_dict):
return modality
return "etc"
[docs]
def is_modality_MRS(modality, params, conditions_dict=msc.conditions_dict_MRS):
return all(cond(params) for cond in conditions_dict[modality])
[docs]
def convert_single_study_MRS(path, output_dir, acq_categories=msc.acq_categories_BIDS):
# brk_dir_info = check_valid_dir_MRS(path)
# if brk_dir_info is None:
# return None
# else:
if not os.path.exists(output_dir):
os.mkdir(output_dir)
output_dir = os.path.join(output_dir, "sourcedata")
if not os.path.exists(output_dir):
os.mkdir(output_dir)
# output_subject_dir = os.path.join(output_dir, subj_sess_name)
# if not os.path.exists(output_subject_dir):
# os.mkdir(output_subject_dir)
dicom_dir = os.path.join(path, "DICOM")
sur_dir = os.path.join(path, "Image")
# nifti_dir = os.path.join(path, "Nifti")
first_acq = True
for acq_id in os.listdir(dicom_dir):
acq_folder_path = os.path.join(dicom_dir, acq_id)
if ut.is_folder_and_not_occult(acq_folder_path):
for reco_id in os.listdir(acq_folder_path):
reco_folder_path = os.path.join(acq_folder_path, reco_id)
if ut.is_folder_and_not_occult(reco_folder_path):
# Check type of sequence
first_file = os.listdir(reco_folder_path)[0]
ds_first = dcmread(os.path.join(reco_folder_path, first_file))
params = {"seq_name": ds_first[0x0018, 0x0024].value,
"patient_name": ds_first[0x0010, 0x0010].value,
"patient_ID": ds_first[0x0010, 0x0020].value,
"accs_num": ds_first[0x0008, 0x0050].value,
"modality": ds_first[0x0008, 0x0060].value, # either MR or PET
"image_type": ds_first[0x0008, 0x0008].value, # ORIGINAL or DERIVED
}
# TODO: delete recoID =1 if every derived image is marked as so
if "DERIVED" not in params["image_type"] and reco_id == "1":
acq_modality = get_modality_MRS(params)
if first_acq:
sub_name = "_".join(map(str, [params["patient_ID"], params["accs_num"], params["patient_name"]]))
first_acq = False
acq_filename = (
f"sub-{sub_name}_acq-{acq_id}_run-{reco_id}_{acq_modality}"
)
output_subject_dir = os.path.join(output_dir, sub_name)
if not os.path.exists(output_subject_dir):
os.mkdir(output_subject_dir)
acq_category = acq_categories[acq_modality]
acq_output_dir = os.path.join(output_subject_dir, acq_category)
if not os.path.exists(acq_output_dir):
os.mkdir(acq_output_dir)
nii_output_path = os.path.join(acq_output_dir, acq_filename+".nii.gz")
json_output_path = os.path.join(acq_output_dir, acq_filename+".json")
if acq_modality == "localizer":
dcmconv.convert_dicom_localizer(reco_folder_path, nii_output_path)
else:
dcmconv.convert_dicom_series(reco_folder_path, nii_output_path)
create_metadata_json_MRS(reco_folder_path,json_output_path,acq_modality,metadata_ref_dict=msc.metadata_ref_DICOM_MRS)
if acq_modality == "dwi":
btable_path = os.path.join(sur_dir, acq_id, reco_id, "btable.txt")
btable_to_bval_bvlec(btable_path, acq_filename, acq_output_dir)
return sub_name
[docs]
def convert_studies_from_MRS(root_path, output_dir, return_list=False):
converted_studies = []
studies_to_convert, present_studies = get_studies_to_convert_MRS(
root_path, output_dir
)
if len(studies_to_convert) != 0:
progress_message = (
f"{lggr.info}Converting studies from MRSolutions DICOM and SUR data"
)
print()
for folder in tqdm(studies_to_convert, desc=progress_message):
folder_path = os.path.join(root_path, folder)
if ut.is_folder_and_not_occult(folder_path):
study_folder_name = convert_single_study_MRS(
folder_path, output_dir, acq_categories=msc.acq_categories_BIDS
)
if (
study_folder_name is not None
and study_folder_name not in converted_studies
):
converted_studies.append(study_folder_name)
if return_list:
return converted_studies
elif not present_studies:
raise NotStudiesToConvertError
[docs]
def get_studies_to_convert_MRS(root_path, output_dir):
root_path_content = os.listdir(root_path)
try:
output_dir_content = os.listdir(os.path.join(output_dir, "sourcedata"))
except FileNotFoundError:
output_dir_content = []
root_path_content = [
x
for x in root_path_content
if (
ut.is_folder_and_not_occult(os.path.join(root_path, x))
and study_files_present_MRS(os.path.join(root_path, x))
)
]
if len(root_path_content) != 0:
present_studies = True
else:
present_studies = False
output_dir_content = [
x
for x in output_dir_content
if ut.is_folder_and_not_occult(os.path.join(output_dir, "sourcedata", x))
]
output_dir_content = [x.replace("sub-", "") for x in output_dir_content]
studies_already_converted = [
x for x in output_dir_content if any(x in item for item in root_path_content)
]
studies_not_converted = [
x
for x in root_path_content
if not any(x in item for item in output_dir_content)
]
n_studies_already_converted = len(studies_already_converted)
if n_studies_already_converted == 0:
studies_to_convert = root_path_content
elif len(studies_not_converted) == 0:
print(
f"\n{lggr.info}All studies ({n_studies_already_converted}) have already been converted before."
)
studies_to_convert = []
else:
print(
f"\n{lggr.info}Some studies ({n_studies_already_converted}) have already been converted before. The rest will be converted now."
)
studies_to_convert = studies_not_converted
return studies_to_convert, present_studies
[docs]
def btable_to_bval_bvlec(btable_path, output_filename, output_path):
with open(btable_path, 'r') as f:
lines = f.readlines()
# Initialize empty lists for bvals and bvecs
bvals = []
bvec_x = []
bvec_y = []
bvec_z = []
# Process each line of the btable.txt file
for line in lines:
# Split the line into individual values (tab-separated)
values = line.strip().split('\t')
# The first value is the b-value
bvals.append(values[0])
# The second, third, and fourth values are the x, y, z components of the b-vector
bvec_x.append(values[1])
bvec_y.append(values[2])
bvec_z.append(values[3])
bval_output_path = os.path.join(output_path,output_filename+".bval")
# Write the bvals to a .bval file (space-separated on one line)
with open(bval_output_path, 'w') as f:
f.write(' '.join(bvals) + '\n')
bvec_output_path = os.path.join(output_path,output_filename+".bvec")
# Write the bvecs to a .bvec file (each component on a separate line)
with open(bvec_output_path, 'w') as f:
f.write(' '.join(bvec_x) + '\n')
f.write(' '.join(bvec_y) + '\n')
f.write(' '.join(bvec_z) + '\n')
[docs]
def dicom_to_json_serializable(value):
if isinstance(value, pydicom.valuerep.PersonName) or isinstance(value, pydicom.valuerep.DSfloat) or isinstance(value, pydicom.valuerep.IS):
return str(value)
elif isinstance(value, pydicom.uid.UID):
return str(value)
elif isinstance(value, list):
return [dicom_to_json_serializable(v) for v in value]
return value
[docs]
def study_files_present_MRS(path):
# TODO: is a valid directory??
return True
MRsolutionsKeys = ['mtc_on']
[docs]
def parse_PPR_keywords(file_path, keys_list=MRsolutionsKeys):
with open(file_path, 'rb') as f:
ppr_text = f.read()
ppr_text = ppr_text.decode('ascii', errors='ignore').replace('\r\n', '')
par = {}
if len(ppr_text) > 0:
for i in keys_list:
exp = ''.join(["(", i, ")[^_](.*?)(:)"])
match = re.findall(exp, ppr_text, flags=re.S)
if match:
matches = re.findall(exp, ppr_text, flags=re.S)[0]
tmp = dict(zip(matches[::2], matches[1::2]))
par.update(tmp)
else:
par = {}
return par
[docs]
def add_SUR_info_to_json():
pass