(Assay) Classes¶
Experiment
¶
Superclass for all experiments. Reads rawdata into a DataFrame.
Attributes¶
rawdata : pd.DataFrame DataFrame containing the rawdata
Methods¶
save_plots Save all the resulting plots to figuredir save_tables Save all the resulting tables to tabledir save Save all plots and tables to resultdir
Source code in rda_toolbox/experiment_classes.py
class Experiment:
"""
Superclass for all experiments.
Reads rawdata into a DataFrame.
Attributes
----------
rawdata : pd.DataFrame
DataFrame containing the rawdata
Methods
----------
save_plots
Save all the resulting plots to figuredir
save_tables
Save all the resulting tables to tabledir
save
Save all plots and tables to resultdir
"""
def __init__(self, rawfiles_folderpath: str | None, plate_type: int):
self._plate_type = plate_type
self._rows, self._columns = get_rows_cols(plate_type)
self._rawfiles_folderpath = rawfiles_folderpath
self.rawdata = parse_readerfiles(
rawfiles_folderpath
) # Get rawdata, this will later be overwritting by adding precipitation, if available
MIC
¶
Bases: Experiment
Source code in rda_toolbox/experiment_classes.py
class MIC(Experiment): # Minimum Inhibitory Concentration
def __init__(
self,
rawfiles_folderpath,
inputfile_path,
mp_ast_mapping_filepath,
ast_acd_mapping_filepath,
plate_type=384, # Define default plate_type for experiment
measurement_label: str = "Raw Optical Density",
map_rowname: str = "Row_96",
map_colname: str = "Col_96",
q_name: str = "Quadrant",
substance_id: str = "Internal ID",
negative_controls: str = "Bacteria + Medium",
blanks: str = "Medium",
norm_by_barcode: str = "AcD Barcode 384",
thresholds: list[float] = [50.0],
exclude_negative_zfactors: bool = False,
precipitation_rawfilepath: str | None = None,
precip_background_locations: pd.DataFrame | list[str] = [
f"{row}24" for row in string.ascii_uppercase[:16]
],
precip_exclude_outlier: bool = False,
precip_conc_multiplicator: float = 2.0,
):
super().__init__(rawfiles_folderpath, plate_type)
self._inputfile_path = inputfile_path
self._mp_ast_mapping_filepath = mp_ast_mapping_filepath
self._ast_acd_mapping_filepath = ast_acd_mapping_filepath
self._measurement_label = measurement_label
self.precipitation = (
Precipitation(
precipitation_rawfilepath,
background_locations=precip_background_locations,
exclude_outlier=precip_exclude_outlier,
)
# if precipitation_rawfilepath
# else None
)
self.precip_conc_multiplicator = precip_conc_multiplicator
self.rawdata = ( # Overwrite rawdata if precipitation data is available
# self.rawdata
# if self.precipitation is None
# else
add_precipitation(
self.rawdata, self.precipitation.results, self._mapping_dict
)
)
self._substances_unmapped, self._organisms, self._dilutions, self._controls = (
read_inputfile(inputfile_path, substance_id)
)
# self._substance_id = substance_id
self._negative_controls = negative_controls
self._blanks = blanks
self._norm_by_barcode = norm_by_barcode
self.thresholds = thresholds
self._processed_only_substances = (
self.processed[ # Negative Control is still there!
(self.processed["Dataset"] != "Reference")
& (self.processed["Dataset"] != "Positive Control")
& (self.processed["Dataset"] != "Blank")
]
)
self._references_results = self.processed.loc[
self.processed["Dataset"] == "Reference"
]
self.substances_precipitation = (
None
if self.precipitation.results.empty
else (
self._processed_only_substances[
self._processed_only_substances["Dataset"] != "Negative Control"
]
.drop_duplicates(
["Internal ID", "AsT Barcode 384", "Row_384", "Col_384"]
)
.loc[
:,
[
"Internal ID",
"AsT Barcode 384",
"Row_384",
"Col_384",
"Concentration",
"Precipitated",
],
]
.reset_index(drop=True)
)
)
def get_min_precip_conc_df(self):
if (self.precipitation.results.empty) and (not self.substances_precipitation):
return None
else:
precip_grps = []
# precip_df = self.substances_precipitation
for (int_id, ast_barcode), grp in self.substances_precipitation.groupby(
["Internal ID", "AsT Barcode 384"]
):
grp = grp.sort_values("Concentration")
min_precip_conc = None
if grp.Precipitated.any():
min_precip_conc = grp["Concentration"][grp["Precipitated"].idxmax()] * self.precip_conc_multiplicator
grp["Minimum Precipitation Concentration"] = min_precip_conc
precip_grps.append(grp)
precip_df = pd.concat(precip_grps)
precip_df = precip_df[["Internal ID", "Minimum Precipitation Concentration"]]
return precip_df
self.substances_minimum_precipitation_conc = get_min_precip_conc_df(self)
self._exclude_negative_zfactor = exclude_negative_zfactors
self.mic_df = self.get_mic_df(
# self.processed.copy()
df = self.processed[
(self.processed["Dataset"] != "Negative Control") & (self.processed["Dataset"] != "Blank")
].dropna(subset=["Concentration"]).copy()
).reset_index(drop=True)
@property
def _mapping_dict(self):
mp_ast_mapping_dict = get_mapping_dict(
parse_mappingfile(
self._mp_ast_mapping_filepath,
motherplate_column="MP Barcode 96",
childplate_column="AsT Barcode 384",
),
mother_column="MP Barcode 96",
child_column="AsT Barcode 384",
)
ast_acd_mapping_dict = get_mapping_dict(
parse_mappingfile(
self._ast_acd_mapping_filepath,
motherplate_column="AsT Barcode 384",
childplate_column="AcD Barcode 384",
),
mother_column="AsT Barcode 384",
child_column="AcD Barcode 384",
)
mapping_dict = {}
for mp_barcode, ast_barcodes in mp_ast_mapping_dict.items():
tmp_dict = {}
for ast_barcode in ast_barcodes:
tmp_dict[ast_barcode] = ast_acd_mapping_dict[ast_barcode]
mapping_dict[mp_barcode] = tmp_dict
return mapping_dict
@cached_property
def mapped_input_df(self):
"""
Does mapping of the inputfile describing the tested substances with the
corresponding mappingfile(s).
*Basically replaces rda.process.mic_process_inputs() function so all the variables and intermediate results are available via the class*
"""
# Sorting of organisms via Rack is **very** important, otherwise data gets attributed to wrong organisms
organisms = list(self._organisms.sort_values(by="Rack")["Organism"])
formatted_organisms = list(self._organisms.sort_values(by="Rack")["Organism formatted"])
ast_platemapping, _ = read_platemapping(
self._mp_ast_mapping_filepath,
self._substances_unmapped["MP Barcode 96"].unique(),
)
# Do some sanity checks:
necessary_columns = [
"Dataset",
"Internal ID",
"MP Barcode 96",
"MP Position 96",
]
# Check if all necessary column are present in the input table:
if not all(
column in self._substances_unmapped.columns for column in necessary_columns
):
raise ValueError(
f"Not all necessary columns are present in the input table.\n(Necessary columns: {necessary_columns})"
)
# Check if all of the necessary column are complete:
if self._substances_unmapped[necessary_columns].isnull().values.any():
raise ValueError("Input table incomplete, contains NA (missing) values.")
# Check if there are duplicates in the internal IDs (apart from references)
if any(
self._substances_unmapped[
self._substances_unmapped["Dataset"] != "Reference"
]["Internal ID"].duplicated()
):
raise ValueError("Duplicate Internal IDs.")
# Map AssayTransfer barcodes to the motherplate barcodes:
(
self._substances_unmapped["Row_384"],
self._substances_unmapped["Col_384"],
self._substances_unmapped["AsT Barcode 384"],
) = zip(
*self._substances_unmapped.apply(
lambda row: mic_assaytransfer_mapping(
row["MP Position 96"],
row["MP Barcode 96"],
ast_platemapping,
),
axis=1,
)
)
acd_platemapping, replicates_dict = read_platemapping(
self._ast_acd_mapping_filepath,
self._substances_unmapped["AsT Barcode 384"].unique(),
)
num_replicates = list(set(replicates_dict.values()))[0]
single_subst_concentrations = []
for substance, subst_row in self._substances_unmapped.groupby("Internal ID"):
# Collect the concentrations each as rows for a single substance:
single_subst_conc_rows = []
init_pos = int(subst_row["Col_384"].iloc[0]) - 1
col_positions_384 = [list(range(1, 23, 2)), list(range(2, 23, 2))]
for col_i, conc in enumerate(
list(self._dilutions["Concentration"].unique())
):
# Add concentration:
subst_row["Concentration"] = conc
# Add corresponding column:
subst_row["Col_384"] = int(col_positions_384[init_pos][col_i])
single_subst_conc_rows.append(subst_row.copy())
# Concatenate all concentrations rows for a substance in a dataframe
single_subst_concentrations.append(pd.concat(single_subst_conc_rows))
# Concatenate all self._substances_unmapped dataframes to one whole
input_w_concentrations = pd.concat(single_subst_concentrations)
acd_dfs_list = []
for ast_barcode, ast_plate in input_w_concentrations.groupby("AsT Barcode 384"):
self._controls["AsT Barcode 384"] = list(
ast_plate["AsT Barcode 384"].unique()
)[0]
ast_plate = pd.concat([ast_plate, self._controls.copy()])
for org_i, organism in enumerate(organisms):
for replicate in range(num_replicates):
# Add the AcD barcode
ast_plate["AcD Barcode 384"] = acd_platemapping[ast_barcode][
replicate
][org_i]
ast_plate["Replicate"] = replicate + 1
# Add the scientific Organism name
ast_plate["Organism formatted"] = formatted_organisms[org_i]
ast_plate["Organism"] = organism
acd_dfs_list.append(ast_plate.copy())
# Add concentrations:
acd_single_concentrations_df = pd.concat(acd_dfs_list)
# merge rawdata with input specifications
df = pd.merge(self.rawdata, acd_single_concentrations_df, how="outer")
return df
@cached_property
def processed(self):
return preprocess(
self.mapped_input_df,
substance_id="Internal ID",
measurement=self._measurement_label.strip(
"Raw "
), # I know this is weird, its because of how background_normalize_zfactor works,
negative_controls=self._negative_controls,
blanks=self._blanks,
norm_by_barcode=self._norm_by_barcode,
)
@cached_property
def plateheatmap(self):
return plateheatmaps(
self.processed,
substance_id="Internal ID",
barcode=self._norm_by_barcode,
negative_control=self._negative_controls,
blank=self._blanks,
)
# def lineplots_facet(self):
# return lineplots_facet(self.processed)
@cached_property
def _resultfigures(self) -> list[Result]:
result_figures = []
result_figures.append(
Result("QualityControl", "plateheatmaps", figure=self.plateheatmap)
)
if (self.substances_precipitation is not None) and (
not self.substances_precipitation.empty
):
result_figures.append(
Result(
"QualityControl",
"Precipitation_Heatmap",
figure=self.precipitation.plateheatmap(),
)
)
# Save plots per dataset:
processed_negative_zfactor = self._processed_only_substances[
self._processed_only_substances["Z-Factor"] < 0
]
if (
not processed_negative_zfactor.empty
and self._exclude_negative_zfactor == True
):
print(
f"{len(processed_negative_zfactor["AsT Barcode 384"].unique())} plate(s) with negative Z-Factor detected for organisms '{", ".join(processed_negative_zfactor["Organism formatted"].unique())}'.\n",
"These plates will be excluded from the lineplots visualization!\n (If you want to include them, use the `exclude_negative_zfactors=False` flag of the MIC class)",
)
for dataset, dataset_data in self._processed_only_substances.groupby("Dataset"):
# Look for and add the corresponding references for each dataset:
if "AcD Barcode 384" in dataset_data:
dataset_barcodes = list(dataset_data["AcD Barcode 384"].unique())
corresponding_dataset_references = self._references_results.loc[
(
self._references_results["AcD Barcode 384"].isin(
dataset_barcodes
)
),
:,
]
else:
corresponding_dataset_references = pd.DataFrame()
lineplots_input_df = pd.concat(
[dataset_data, corresponding_dataset_references]
)
lineplots_input_df = lineplots_input_df.dropna(
subset=["Concentration"]
).loc[
(lineplots_input_df["Dataset"] != "Negative Control")
& (lineplots_input_df["Dataset"] != "Blank"),
:,
]
if not lineplots_input_df.empty:
for threshold in self.thresholds:
result_figures.append(
Result(
dataset,
f"{dataset}_lineplots_facet_thrsh{threshold}",
figure=lineplots_facet(
lineplots_input_df,
exclude_negative_zfactors=self._exclude_negative_zfactor,
threshold=threshold,
),
)
)
# Save plots per threshold:
for threshold in self.thresholds:
for dataset, sub_df in self.mic_df.groupby("Dataset"):
dummy_df = get_upsetplot_df(
sub_df.dropna(subset=f"MIC{threshold} in µM"),
counts_column="Internal ID",
set_column="Organism",
)
result_figures.append(
Result(
dataset,
f"{dataset}_UpSetPlot",
figure=UpSetAltair(dummy_df, title=dataset),
)
)
result_figures.append(
Result(
dataset,
f"{dataset}_PotencyDistribution",
figure=potency_distribution(sub_df, threshold, dataset),
)
)
return result_figures
def get_mic_df(self, df):
pivot_df = pd.pivot_table(
df,
values=["Relative Optical Density", "Replicate", "Z-Factor", "Robust Z-Factor"],
index=[
"Internal ID",
# "External ID",
"Organism formatted",
"Organism",
"Concentration",
"Dataset",
],
aggfunc={
"Relative Optical Density": ["mean"],
"Replicate": ["count"],
"Z-Factor": [
"mean",
"std",
],
"Robust Z-Factor": [
"mean",
"std"
]
, # does this make sense? with std its usable.
# "Z-Factor": ["std"],
},
# margins=True
fill_value=0 # This might result in confusion, if there are no replicates (1)
).reset_index()
pivot_df.columns = [" ".join(x).strip() for x in pivot_df.columns.ravel()]
mic_records = []
for group_names, grp in pivot_df.groupby(
["Internal ID", "Organism formatted", "Dataset"]
):
internal_id, organism_formatted, dataset = group_names
# Sort by concentration just to be sure:
grp = grp[
[
"Concentration",
"Relative Optical Density mean",
"Z-Factor mean",
"Z-Factor std",
"Robust Z-Factor mean",
"Robust Z-Factor std",
]
].sort_values(by=["Concentration"])
# Get rows where the OD is below the given threshold:
record = {
"Internal ID": internal_id,
"Organism formatted": organism_formatted,
"Dataset": dataset,
"Z-Factor mean": list(grp["Z-Factor mean"])[0],
"Z-Factor std": list(grp["Z-Factor std"])[0],
"Robust Z-Factor mean": list(grp["Robust Z-Factor mean"])[0],
"Robust Z-Factor std": list(grp["Robust Z-Factor std"])[0],
}
for threshold in self.thresholds:
values_below_threshold = grp[
grp["Relative Optical Density mean"] < threshold
]
# thx to jonathan - check if the OD at maximum concentration is below threshold (instead of any concentration)
max_conc_below_threshold = list(
grp[grp["Concentration"] == max(grp["Concentration"])][
"Relative Optical Density mean"
]
< threshold
)[0]
if not max_conc_below_threshold:
mic = None
else:
mic = values_below_threshold.iloc[0]["Concentration"]
record[f"MIC{threshold} in µM"] = mic
mic_records.append(record)
# Drop entries where no MIC could be determined
mic_df = pd.DataFrame.from_records(mic_records)
# Merge inconsistent (but maybe necessary) columns again
mic_df = pd.merge(mic_df, df[["Internal ID", "External ID"]], on=["Internal ID"])
mic_df = pd.merge(mic_df, self._organisms[["Organism", "Organism formatted"]], on=["Organism formatted"])
mic_df = mic_df.drop_duplicates()
return mic_df
@cached_property
def _resulttables(self) -> list[Result]:
"""
Retrieves result tables and returns them like list[Result]
where Resulttable is a dataclass collecting meta information about the plot.
"""
result_tables = []
df = self.processed.copy()
# mic_df = self.get_mic_df(df)
references_mic_results = self.get_mic_df(
self.processed[self.processed["Dataset"] == "Reference"].copy()
).reset_index(drop=True)
result_tables.append(
Result(
"Reference",
"References_MIC_results_eachRefID",
table=references_mic_results,
)
)
mic_df = self.mic_df
# If precipitation has been done, merge MPC results on long mic_df
if not self.precipitation.results.empty:
mic_df = pd.merge(self.mic_df, self.substances_minimum_precipitation_conc, on="Internal ID", how="left")
result_tables.append(
Result("All", "MIC_Results_AllDatasets_longformat", table=self.mic_df)
)
for dataset, dataset_grp in mic_df.groupby("Dataset"):
print(f"Preparing tables for dataset: {dataset}")
pivot_multiindex_df = pd.pivot_table(
dataset_grp,
values=[f"MIC{threshold} in µM" for threshold in self.thresholds]
+ ["Z-Factor mean", "Z-Factor std"],
index=["Internal ID", "Dataset"],
columns="Organism",
).reset_index()
# print(pivot_multiindex_df)
# self.pivot_multiindex_df = pivot_multiindex_df
for threshold in self.thresholds:
# print(pivot_multiindex_df.columns)
# print(pivot_multiindex_df)
if pivot_multiindex_df.empty:
continue
organisms_thresholded_mics = pivot_multiindex_df[
["Internal ID", f"MIC{threshold} in µM"]
]
cols = list(organisms_thresholded_mics.columns.droplevel())
cols[0] = "Internal ID"
organisms_thresholded_mics.columns = cols
organisms_thresholded_mics = organisms_thresholded_mics.sort_values(
by=list(organisms_thresholded_mics.columns)[1:],
na_position="last",
)
# Fill with nan if not available
organisms_thresholded_mics = organisms_thresholded_mics.round(2)
organisms_thresholded_mics = organisms_thresholded_mics.astype(str)
organisms_thresholded_mics = pd.merge(organisms_thresholded_mics, self.mic_df[["Internal ID", "External ID"]], on=["Internal ID"], how="left")
# organisms_thresholded_mics.fillna("NA", inplace=True)
if not self.precipitation.results.empty:
organisms_thresholded_mics = pd.merge(
organisms_thresholded_mics,
self.substances_minimum_precipitation_conc,
how="left"
)
organisms_thresholded_mics = organisms_thresholded_mics.reset_index(drop=True)
organisms_thresholded_mics = organisms_thresholded_mics.drop_duplicates()
result_tables.append(
Result(
dataset,
f"{dataset}_MIC{int(round(threshold))}_results",
table=organisms_thresholded_mics.reset_index(drop=True)
)
)
return result_tables
@cached_property
def results(self):
"""
Retrieves result tables (from self._resulttables)
and returns them in a dictionary like:
{"<filepath>": pd.DataFrame}
"""
return {tbl.file_basename: tbl.table for tbl in self._resulttables}
def save_figures(self, result_path, fileformats: list[str] = ["svg", "html"]):
_save_figures(result_path, self._resultfigures, fileformats=fileformats)
def save_tables(
self, result_path, processed_path, fileformats: list[str] = ["xlsx", "csv"]
):
# Create folder if not existent:
pathlib.Path(processed_path).mkdir(parents=True, exist_ok=True)
self.processed.to_csv(os.path.join(processed_path, "processed.csv"))
_save_tables(result_path, self._resulttables, fileformats=fileformats)
def save_results(
self,
tables_path: str,
figures_path: str,
processed_path: str,
figureformats: list[str] = ["svg", "html"],
tableformats: list[str] = ["xlsx", "csv"],
):
self.save_figures(figures_path, fileformats=figureformats)
self.save_tables(tables_path, processed_path, fileformats=tableformats)
mapped_input_df
cached
property
¶
Does mapping of the inputfile describing the tested substances with the corresponding mappingfile(s). Basically replaces rda.process.mic_process_inputs() function so all the variables and intermediate results are available via the class
results
cached
property
¶
Retrieves result tables (from self._resulttables) and returns them in a dictionary like:
PrimaryScreen
¶
Bases: Experiment
Primary screen experiment. Usually done using only 1 concentration.
Source code in rda_toolbox/experiment_classes.py
class PrimaryScreen(Experiment):
"""
Primary screen experiment. Usually done using only 1 concentration.
"""
def __init__(
self,
rawfiles_folderpath: str,
inputfile_path: str,
mappingfile_path: str,
plate_type: int = 384, # Define default plate_type for experiment
measurement_label: str = "Raw Optical Density",
map_rowname: str = "Row_96",
map_colname: str = "Col_96",
q_name: str = "Quadrant",
substance_id: str = "Internal ID",
negative_controls: str = "Bacteria + Medium",
blanks: str = "Medium",
norm_by_barcode: str = "AcD Barcode 384",
thresholds: list[float] = [50.0],
b_score_threshold: float = -3.0,
precipitation_rawfilepath: str | None = None,
background_locations: pd.DataFrame | list[str] = [
f"{row}24" for row in string.ascii_uppercase[:16]
],
precip_exclude_outlier: bool = False,
):
super().__init__(rawfiles_folderpath, plate_type)
self._measurement_label = measurement_label
self._mappingfile_path = mappingfile_path
self._inputfile_path = inputfile_path
self._substances_unmapped, self._organisms, self._dilutions, self._controls = (
read_inputfile(inputfile_path, substance_id)
)
self.substances = mapapply_96_to_384(
self._substances_unmapped,
rowname=map_rowname,
colname=map_colname,
q_name=q_name,
)
self._mapping_df = parse_mappingfile(
mappingfile_path,
motherplate_column="AsT Barcode 384",
childplate_column="AcD Barcode 384",
)
self._mapping_dict = get_mapping_dict(self._mapping_df)
# self._substance_id = substance_id
self._negative_controls = negative_controls
self._blanks = blanks
self._norm_by_barcode = norm_by_barcode
self.thresholds = thresholds
self.b_score_threshold = b_score_threshold
self.precipitation = (
Precipitation(
precipitation_rawfilepath,
background_locations=background_locations,
exclude_outlier=precip_exclude_outlier,
)
# if precipitation_rawfilepath
# else None
)
self.rawdata = ( # Overwrite rawdata if precipitation data is available
# self.rawdata
# if self.precipitation is None
# else
add_precipitation(
self.rawdata, self.precipitation.results, self._mapping_dict
)
)
self._processed_only_substances = self.processed[
(self.processed["Dataset"] != "Reference")
& (self.processed["Dataset"] != "Positive Control")
& (self.processed["Dataset"] != "Blank")
]
self.substances_precipitation = (
None
if self.precipitation.results.empty
else (
self._processed_only_substances[
self._processed_only_substances["Dataset"] != "Negative Control"
]
.drop_duplicates(
["Internal ID", "AsT Barcode 384", "Row_384", "Col_384"]
)
.loc[
:,
[
"Internal ID",
# "AsT Barcode 384",
# "Row_384",
# "Col_384",
"Concentration",
"Precipitated",
f"Precipitated at {measurement_label}",
],
]
.reset_index(drop=True)
)
)
def check_substances(self):
"""
Do some sanity checks for the substances table.
- Check if all necessary columns are present.
- Check if substances contains missing values.
- Check if there are duplicate Internal IDs (references excluded)
"""
# if not all(column in self._substances_unmapped.columns for column in necessary_columns):
# raise ValueError(
# f"Not all necessary columns are present in the input table.\n(Necessary columns: {necessary_columns})"
# )
# # Check if all of the necessary column are complete:
# if substances[necessary_columns].isnull().values.any():
# raise ValueError(
# "Input table incomplete, contains NA (missing) values."
# )
# # Check if there are duplicates in the internal IDs (apart from references)
# if any(substances[substances["Dataset"] != "Reference"]["Internal ID"].duplicated()):
# raise ValueError("Duplicate Internal IDs.")
pass
@cached_property
def mapped_input_df(self):
"""
Does mapping of the inputfile describing the tested substances with the
corresponding mappingfile(s).
*Basically replaces rda.process.primary_process_inputs() function so all the variables and intermediate results are available via the class*
"""
control_wbarcodes = []
# multiply controls with number of AsT plates to later merge them with substances df
for origin_barcode in list(self.substances["AsT Barcode 384"].unique()):
controls_subdf = self._controls.copy()
controls_subdf["AsT Barcode 384"] = origin_barcode
control_wbarcodes.append(controls_subdf)
controls_n_barcodes = pd.concat(control_wbarcodes)
ast_plate_df = pd.merge(
pd.concat([self.substances, controls_n_barcodes]),
self._dilutions,
how="outer",
)
mapped_organisms = pd.merge(self._mapping_df, self._organisms)
result_df = pd.concat(
[
pd.merge(org_df, ast_plate_df)
for _, org_df in pd.merge(mapped_organisms, self.rawdata).groupby(
"Organism formatted"
)
]
)
for ast_barcode, ast_plate in result_df.groupby("AsT Barcode 384"):
print(
f"AsT Plate {ast_barcode} has size: {
len(ast_plate) // len(ast_plate['AcD Barcode 384'].unique())
}"
)
print(f"{ast_barcode} -> {ast_plate['AcD Barcode 384'].unique()}")
# result_df = result_df.rename({self._substance_id: "Internal ID"}) # rename whatever substance ID was given to Internal ID
return result_df
@cached_property
def processed(self):
processed = preprocess(
self.mapped_input_df,
substance_id="Internal ID",
measurement=self._measurement_label.strip(
"Raw "
), # I know this is weird, its because of how background_normalize_zfactor works,
negative_controls=self._negative_controls,
blanks=self._blanks,
norm_by_barcode=self._norm_by_barcode,
)
# Add B-Scores to plates without negative controls and blanks
proc_wo_controls = processed[
~processed["Internal ID"].isin([self._negative_controls, self._blanks])
]
# We add b_scores here since we only want them in a primary screen and preprocess() is used generally
b_scores = (
proc_wo_controls.groupby(self._norm_by_barcode)[
[self._norm_by_barcode, "Row_384", "Col_384", self._measurement_label]
]
.apply(lambda plate_grp: add_b_score(plate_grp))
.reset_index(drop=True)
)
processed = pd.merge(processed, b_scores, how="outer")
return processed
@cached_property
def plateheatmap(self):
return plateheatmaps(
self.processed,
substance_id="Internal ID",
negative_control=self._negative_controls,
blank=self._blanks,
barcode=self._norm_by_barcode,
)
@cached_property
def _resultfigures(self):
result_figures = []
# Add QualityControl overview of the plates as heatmaps:
result_figures.append(
Result("QualityControl", "plateheatmaps", figure=self.plateheatmap)
)
# If precipitation testing was done, add it to QC result figures:
if not self.precipitation.results.empty:
result_figures.append(
Result(
"QualityControl",
"Heatmap_Precipitation",
figure=self.precipitation.plateheatmap(),
)
)
for threshold in self.thresholds:
result_figures.append(
Result(
"QualityControl",
"Scatter_Measurement_vs_BScore_Substances",
figure=measurement_vs_bscore_scatter(
self._processed_only_substances,
measurement_header="Relative Optical Density",
measurement_title="Relative Optical Density",
bscore_header="b_scores",
bscore_title="B-Score",
color_header="Dataset",
measurement_threshold=threshold,
b_score_threshold=self.b_score_threshold,
).facet(row="Organism", column="Dataset"),
)
)
result_figures.append(
Result(
"QualityControl",
"Scatter_Measurement_vs_BScore_References",
figure=measurement_vs_bscore_scatter(
self.processed[
self.processed["Dataset"] == "Reference"
].replace({np.nan: None}),
measurement_header="Relative Optical Density",
measurement_title="Relative Optical Density",
bscore_header="b_scores",
bscore_title="B-Score",
color_header="Dataset",
measurement_threshold=threshold,
b_score_threshold=self.b_score_threshold,
).facet(row="Organism", column="Dataset"),
)
)
subset = get_thresholded_subset(
self._processed_only_substances,
id_column="Internal ID",
negative_controls=self._negative_controls,
blanks=self._blanks,
threshold=threshold,
)
for dataset, sub_df in subset.groupby("Dataset"):
dummy_df = get_upsetplot_df(sub_df, counts_column="Internal ID")
result_figures.append(
Result(
dataset,
f"UpSetPlot_{dataset}",
figure=UpSetAltair(dummy_df, title=dataset),
)
)
# ---
only_actives = self.results[f"{dataset}_all_results"][
self.results[f"{dataset}_all_results"]
.groupby("Organism")["Relative Optical Density mean"]
.transform(lambda x: x < threshold)
]
result_figures.append(
Result(
dataset,
f"Scatterplot_BScores_{dataset}",
figure=measurement_vs_bscore_scatter(
only_actives, show_area=False
),
)
)
return result_figures
@cached_property
def _resulttables(self):
"""
Retrieves result tables and returns them like list[Resulttable]
where Resulttable is a dataclass collecting meta information about the plot.
"""
# result_plots = dict() # {"filepath": plot}
result_tables = []
# result_tables.append(Result("All", ))
df = self.processed.copy()
df = df[
(df["Dataset"] != "Reference")
& (df["Dataset"] != "Positive Control")
& (df["Dataset"] != "Blank")
].dropna(subset=["Concentration"])
pivot_df = pd.pivot_table(
df,
values=[
"Relative Optical Density",
"Replicate",
"Z-Factor",
"Robust Z-Factor",
"b_scores",
],
index=[
"Internal ID",
"Organism formatted",
"Organism",
"Concentration",
"Dataset",
],
aggfunc={
"Relative Optical Density": ["mean"],
"Replicate": ["count"],
"b_scores": ["mean"],
},
).reset_index()
pivot_df.columns = [" ".join(x).strip() for x in pivot_df.columns.ravel()]
for threshold in self.thresholds:
# Apply Threshold to % Growth:
# pivot_df[f"Relative Growth < {threshold}"] = pivot_df.groupby(
# ["Internal ID", "Organism", "Dataset"]
# )["Relative Optical Density mean"].transform(lambda x: x < threshold)
# Apply B-Score Treshold:
# B-Scores <= -3: https://doi.org/10.1128/mbio.00205-25
# pivot_df[f"B Score <= {self.b_score_threshold}"] = pivot_df.groupby(
# ["Internal ID", "Organism", "Dataset"]
# )["b_scores mean"].transform(lambda x: x <= self.b_score_threshold)
for dataset, dataset_grp in pivot_df.groupby("Dataset"):
# dataset = dataset[0]
# resultpath = os.path.join(filepath, dataset)
# result_tables[f"{dataset}_all_results"] = dataset_grp
if not self.precipitation.results.empty:
dataset_grp = pd.merge(dataset_grp, self.substances_precipitation)
result_tables.append(
Result(dataset, f"{dataset}_all_results", table=dataset_grp)
)
# Apply threshold conditions:
thresholded_dataset_grp = dataset_grp.groupby("Internal ID").filter(
lambda x: check_activity_conditions(
x["Relative Optical Density mean"],
x["b_scores mean"],
threshold,
self.b_score_threshold,
)
)
# Pivot the long table for excel viewability:
pivot_multiindex_df = pd.pivot_table(
thresholded_dataset_grp,
values=["Relative Optical Density mean", "b_scores mean"],
index=["Internal ID", "Dataset", "Concentration", "Organism"],
columns="Organism formatted",
).reset_index()
# pivot_multiindex_df = pd.pivot_table(
# dataset_grp,
# values=["Relative Optical Density mean"],
# index=["Internal ID", "Dataset", "Concentration"],
# columns="Organism",
# ).reset_index()
# cols = list(pivot_multiindex_df.columns.droplevel())
# cols[:3] = list(map(lambda x: x[0], pivot_multiindex_df.columns[:3]))
# pivot_multiindex_df.columns = cols
# # Apply threshold (active in any organism)
# thresholded_pivot = pivot_multiindex_df.iloc[
# list(
# pivot_multiindex_df.iloc[:, 3:].apply(
# lambda x: any(list(map(lambda i: i < threshold, x))), axis=1
# )
# )
# ]
# Sort by columns each organism after the other
# return pivot_multiindex_df.sort_values(by=cols[3:])
# Sort rows by mean between the organisms (lowest mean activity first)
# results_sorted_by_mean_activity = pivot_multiindex_df.iloc[
# pivot_multiindex_df.iloc[:, 3:].mean(axis=1).argsort()
# ]
# Sort rows by mean between the organisms (lowest mean measurement first)
results_sorted_by_mean_activity = pivot_multiindex_df.loc[
pivot_multiindex_df.loc[
:,
list(
filter(
lambda x: x[0].startswith("Relative Optical Density"),
pivot_multiindex_df.columns,
)
),
]
.mean(axis=1)
.argsort()
]
if not self.precipitation.results.empty:
results_sorted_by_mean_activity = pd.merge(
results_sorted_by_mean_activity, self.substances_precipitation
)
# Correct "mean" header if its only one replicate (remove 'mean')
if sum(thresholded_dataset_grp["Replicate count"].unique()) == 1:
results_sorted_by_mean_activity = results_sorted_by_mean_activity.rename(
columns={
"Relative Optical Density mean": "Relative Optical Density",
"b_scores mean": "B-Score",
}
)
results_sorted_by_mean_activity = (
results_sorted_by_mean_activity.rename(
columns={"b_scores mean": "B-Score mean"}
)
)
result_tables.append(
Result(
dataset,
f"{dataset}_threshold{round(threshold)}_results",
table=results_sorted_by_mean_activity,
)
)
return result_tables
@cached_property
def results(self):
"""
Retrieves result tables (from self._resulttables)
and returns them in a dictionary like:
{"<filepath>": pd.DataFrame}
"""
return {tbl.file_basename: tbl.table for tbl in self._resulttables}
def save_figures(self, resultpath, fileformats: list[str] = ["svg", "html"]):
_save_figures(resultpath, self._resultfigures, fileformats=fileformats)
def save_tables(
self, result_path, processed_path, fileformats: list[str] = ["xlsx", "csv"]
):
pathlib.Path(processed_path).mkdir(parents=True, exist_ok=True)
self.processed.to_csv(os.path.join(processed_path, "processed.csv"))
_save_tables(result_path, self._resulttables, fileformats=fileformats)
def save_results(
self,
tables_path: str,
figures_path: str,
processed_path: str,
figureformats: list[str] = ["svg", "html"],
tableformats: list[str] = ["xlsx", "csv"],
):
self.save_figures(figures_path, fileformats=figureformats)
self.save_tables(tables_path, processed_path, fileformats=tableformats)
mapped_input_df
cached
property
¶
Does mapping of the inputfile describing the tested substances with the corresponding mappingfile(s). Basically replaces rda.process.primary_process_inputs() function so all the variables and intermediate results are available via the class
results
cached
property
¶
Retrieves result tables (from self._resulttables) and returns them in a dictionary like:
check_substances()
¶
Do some sanity checks for the substances table. - Check if all necessary columns are present. - Check if substances contains missing values. - Check if there are duplicate Internal IDs (references excluded)
Source code in rda_toolbox/experiment_classes.py
def check_substances(self):
"""
Do some sanity checks for the substances table.
- Check if all necessary columns are present.
- Check if substances contains missing values.
- Check if there are duplicate Internal IDs (references excluded)
"""
# if not all(column in self._substances_unmapped.columns for column in necessary_columns):
# raise ValueError(
# f"Not all necessary columns are present in the input table.\n(Necessary columns: {necessary_columns})"
# )
# # Check if all of the necessary column are complete:
# if substances[necessary_columns].isnull().values.any():
# raise ValueError(
# "Input table incomplete, contains NA (missing) values."
# )
# # Check if there are duplicates in the internal IDs (apart from references)
# if any(substances[substances["Dataset"] != "Reference"]["Internal ID"].duplicated()):
# raise ValueError("Duplicate Internal IDs.")
pass