#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Sun Sep 1 18:52:23 2024 @author: Maya Coulson Theodorsen (mcoth@dtu.dk) This function performs statistical comparisons across clusters derived from prior analysis. It evaluates differences between clusters using both categorical and continuous variables, applies appropriate statistical tests, and adjusts p-values for multiple testing. Overview: Comparison of Categorical Variables: - Chi-square or Fisher's Exact Test - Effect size calculations using Cramér's V - Post-hoc chi-square comparisons for significant results Comparison of Continuous Variables: - Tests for normality using Shapiro-Wilk - Bartlett's test or Levene's test for homogeneity of variances - Welch's ANOVA, One-way ANOVA, or Kruskal-Wallis depending on normality and variance equality - Effect size calculations - Dunn's post-hoc test for significant results Multiple Testing Correction: - False Discovery Rate (FDR) adjustment using the Benjamini-Hochberg method Output: - P-values and corrected p-values for all variables - Summary of significant variables - Post-hoc comparisons """ import pandas as pd import pingouin as pg from scipy import stats from itertools import combinations from scipy.stats import bartlett, levene import scikit_posthocs as sp from statsmodels.stats.multitest import multipletests def compare_clusters(data_complete, questionnaireClusters): # Categorical variables categorical_variables = [ ('PTSD_t0_DSMIV', 'Probable PTSD diagnosis', data_complete), ('q0002', 'Sex (male)', data_complete), ('q0006', 'Children', data_complete), ('civil_status', 'Civil status (single)', data_complete), ('Psychoanaleptica', 'Psychoanaleptica', data_complete), ('Psycholeptica', 'Psycholeptica', data_complete), ('q0033_0001', 'Suicidal history', data_complete), ('binge', 'Excessive alcohol intake', data_complete), ('ADHD_total_GROUP_t0', 'Probable childhood ADHD', data_complete), ('drugs', 'Current drug usage', data_complete), ('Military_trauma', 'Exposed to war', data_complete), ('Unemployed', 'Unemployed', data_complete) ] # Continuous variables continuous_variables = [ ('q0003_0001', 'Age', data_complete), ('q0008_0001', 'Self-rated health', data_complete), ('PCL_t0', 'PCL total score', data_complete), ('Intrusion_t0', 'PCL Intrusion', data_complete), ('PCL Avoidance', 'PCL Avoidance', questionnaireClusters), ('PCL Numbing', 'PCL Numbing', questionnaireClusters), ('PCL Hyperarousal', 'PCL Hyperarousal', questionnaireClusters), ('DASS Anxiety', 'DASS Anxiety', questionnaireClusters), ('DASS Depression', 'DASS Depression', questionnaireClusters), ('DASS Stress', 'DASS Stress', questionnaireClusters), ('Traume_t0_total', 'Total traumas', data_complete), ('Antal_unikke_traumer_t0', 'Total unique traumas', data_complete) ] # Store p-values for all comparisons p_values = [] significant_tests = [] # Compare each categorical variable for outcome_column, outcome_label, df in categorical_variables: result = compare_categorical_clusters(df, 'clusters', outcome_column, outcome_label) p_values.append(result) # Add each p value to the vector significant_tests.append((outcome_column, outcome_label, df, 'categorical')) # Add variable info # Compare each continuous variable for outcome_column, outcome_label, df in continuous_variables: result = compare_continuous_clusters(df, 'clusters', outcome_column, outcome_label) p_values.append(result) # Add each p value to the vector significant_tests.append((outcome_column, outcome_label, df, 'continuous')) # Add variable info # Extract the original p-values from the dictionaries original_p_values = [result['p_value'] for result in p_values] # Apply FDR with Benjamini-Hochberg multiple testing correction reject, corrected_p_values, _, _ = multipletests(original_p_values, alpha=0.05, method='fdr_bh') # Add corrected p_values to dict for i, result in enumerate(p_values): result['corrected_p_value'] = corrected_p_values[i] # Get significant tests for post-hoc tests overview = [] significant_tests_filtered = [] for result, corr_p, test_info in zip(p_values, corrected_p_values, significant_tests): uncorr_p = result['p_value'] # Extract the uncorrected p-value if corr_p < 0.05: overview.append('Sig') significant_tests_filtered.append(test_info) elif uncorr_p < 0.05 and corr_p >= 0.05: overview.append('No longer sig') else: overview.append('Not sig') #Post hoc tests posthoc_p_values = [] # Perform post hoc tests on significant results for outcome_column, outcome_label, df, var_type in significant_tests_filtered: if var_type == 'categorical': posthoc_p_value = posthoc_categorical(df, 'clusters', outcome_column, outcome_label) else: posthoc_p_value = posthoc_continuous(df, 'clusters', outcome_column, outcome_label) posthoc_p_values.extend(posthoc_p_value) # Extract the original p-values from the posthoc results original_p_values = [result['p_value'] for result in posthoc_p_values] # Apply FDR with Benjamini-Hochberg multiple testing correction reject, posthoc_pvals_corrected, _, _ = multipletests(original_p_values, alpha=0.05, method='fdr_bh') # Add corrected p-values for i, result in enumerate(posthoc_p_values): result['corrected_p_value'] = posthoc_pvals_corrected[i] return p_values, posthoc_p_values, categorical_variables, continuous_variables def compare_categorical_clusters(df, cluster_column, outcome_column, outcome_label): contingency_table = pd.crosstab(df[cluster_column], df[outcome_column]) chi2, p, dof, expected = stats.chi2_contingency(contingency_table) if (expected >= 5).all(): test_type = 'Chi-Square' else: test_type = 'Fishers Exact Test' contingency_table, observed_table, result_df = pg.chi2_independence(data=df, x=cluster_column, y=outcome_column) p_value = result_df.loc[result_df['test'] == 'pearson', 'pval'].values[0] chi2_stat = result_df.loc[result_df['test'] == 'pearson', 'chi2'].values[0] dof = result_df.loc[result_df['test'] == 'pearson', 'dof'].values[0] cramers_v = result_df.loc[result_df['test'] == 'pearson', 'cramer'].values[0] return { 'outcome_label': outcome_label, 'test_type': test_type, 'p_value': p_value, 'test_statistic': chi2_stat, 'dof': dof, 'effect_size': cramers_v, 'corrected_p_value': None # Placeholder, will be filled in after FDR_bh } def compare_continuous_clusters(df, cluster_column, outcome_column, outcome_label): clusters = df[cluster_column].unique() cluster_data = [df[df[cluster_column] == cluster][outcome_column] for cluster in clusters] # Test for normality normality_results = pg.normality(df, dv=outcome_column, group=cluster_column) if normality_results['normal'].all(): # Perform Bartlett's test for equal variances (for normal data) bartlett_stat, bartlett_p = bartlett(*cluster_data) if bartlett_p < 0.05: # Unequal variances, use Welch's ANOVA test_type = "Welchs ANOVA" welch_anova_results = pg.welch_anova(dv=outcome_column, between=cluster_column, data=df) p_value = welch_anova_results['p-unc'][0] dof = welch_anova_results['dof'][0] else: # Equal variances, use one-way ANOVA test_type = "ANOVA" anova = pg.anova(data=df, dv=outcome_column, between=cluster_column) p_value = anova['p-unc'][0] dof = anova['dof'][0] pd.set_option('display.float_format', lambda x: '%.10f' % x) else: # Not normally distributed, perform Levene's test for equal variances levene_stat, levene_p = levene(*cluster_data, center='median') # 'median' used for robustness test_type = 'Kruskal-Wallis' kruskal = pg.kruskal(data=df, dv=outcome_column, between=cluster_column) p_value = kruskal['p-unc'].values[0] dof = kruskal['ddof1'][0] H_stat = kruskal['H'][0] # Calculate epsilon-squared N = len(df) epsilon_squared = max((H_stat - dof + 1) / (N - dof), 0) return { 'outcome_label': outcome_label, 'test_type': test_type, 'p_value': p_value, 'test_statistic': H_stat, 'dof': dof, 'effect_size': epsilon_squared, 'corrected_p_value': None # Placeholder, will be filled in after FDR_bh } # Post hoc tests for variables significant after FDR_bh def posthoc_categorical(df, cluster_column, outcome_column, outcome_label): clusters = sorted(df[cluster_column].unique()) posthoc_p_values = [] for (cluster1, cluster2) in combinations(clusters, 2): subset = df[df[cluster_column].isin([cluster1, cluster2])] posthoc_table = pd.crosstab(subset[cluster_column], subset[outcome_column]) chi2, p, dof, _ = stats.chi2_contingency(posthoc_table) posthoc_p_values.append({ 'outcome_label': outcome_label, 'cluster1': cluster1, 'cluster2': cluster2, 'test_type': 'Chi-squared', 'p_value': p }) return posthoc_p_values def posthoc_continuous(df, cluster_column, outcome_column, outcome_label): # Use Dunn's test for pairwise group comparisons dunn = sp.posthoc_dunn(df, val_col= outcome_column, group_col = cluster_column) clusters = sorted(df[cluster_column].unique()) posthoc_p_values = [] # Iterate through the Dunn's test result matrix for i in range(len(clusters)): for j in range(i + 1, len(clusters)): cluster1 = clusters[i] cluster2 = clusters[j] p_value = dunn.iloc[i, j] posthoc_p_values.append({ 'outcome_label': outcome_label, 'cluster1': cluster1, 'cluster2': cluster2, 'test_type': 'Dunns', 'p_value': p_value }) return posthoc_p_values