Skip to content
Snippets Groups Projects
Compare_clusters.py 10.3 KiB
Newer Older
  • Learn to ignore specific revisions
  • mcoth's avatar
    mcoth committed
    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    """
    Created on Sun Sep  1 18:52:23 2024
    
    @author: Maya Coulson Theodorsen (mcoth@dtu.dk)
    
    This function performs statistical comparisons across clusters derived from prior analysis.
    It evaluates differences between clusters using both categorical and continuous variables,
    applies appropriate statistical tests, and adjusts p-values for multiple testing.
    
    Overview:
    Comparison of Categorical Variables:
      - Chi-square or Fisher's Exact Test
      - Effect size calculations using Cramér's V
      - Post-hoc chi-square comparisons for significant results
    
    Comparison of Continuous Variables:
      - Tests for normality using Shapiro-Wilk
      - Bartlett's test or Levene's test for homogeneity of variances
      - Welch's ANOVA, One-way ANOVA, or Kruskal-Wallis depending on normality and variance equality
      - Effect size calculations 
      - Dunn's post-hoc test for significant results
    
    Multiple Testing Correction:
      - False Discovery Rate (FDR) adjustment using the Benjamini-Hochberg method
    
    Output:
      - P-values and corrected p-values for all variables
      - Summary of significant variables
      - Post-hoc comparisons
    
    """
    
    import pandas as pd
    import pingouin as pg
    from scipy import stats
    from itertools import combinations
    from scipy.stats import bartlett, levene
    import scikit_posthocs as sp
    from statsmodels.stats.multitest import multipletests
    
    def compare_clusters(data_complete, questionnaireClusters):
        # Categorical variables
        categorical_variables = [
            ('PTSD_t0_DSMIV', 'Probable PTSD diagnosis', data_complete),
            ('q0002', 'Sex (male)', data_complete),
            ('q0006', 'Children', data_complete),
            ('civil_status', 'Civil status (single)', data_complete),
            ('Psychoanaleptica', 'Psychoanaleptica', data_complete),
            ('Psycholeptica', 'Psycholeptica', data_complete),
            ('q0033_0001', 'Suicidal history', data_complete),
            ('binge', 'Excessive alcohol intake', data_complete),
            ('ADHD_total_GROUP_t0', 'Probable childhood ADHD', data_complete),
            ('drugs', 'Current drug usage', data_complete),
            ('Military_trauma', 'Exposed to war', data_complete),
            ('Unemployed', 'Unemployed', data_complete)
        ]
    
        # Continuous variables
        continuous_variables = [
            ('q0003_0001', 'Age', data_complete),
            ('q0008_0001', 'Self-rated health', data_complete),
            ('PCL_t0', 'PCL total score', data_complete),
            ('Intrusion_t0', 'PCL Intrusion', data_complete),
            ('PCL Avoidance', 'PCL Avoidance', questionnaireClusters),
            ('PCL Numbing', 'PCL Numbing', questionnaireClusters),
            ('PCL Hyperarousal', 'PCL Hyperarousal', questionnaireClusters),
            ('DASS Anxiety', 'DASS Anxiety', questionnaireClusters),
            ('DASS Depression', 'DASS Depression', questionnaireClusters),
            ('DASS Stress', 'DASS Stress', questionnaireClusters),
            ('Traume_t0_total', 'Total traumas', data_complete),
            ('Antal_unikke_traumer_t0', 'Total unique traumas', data_complete)
        ]
        
        # Store p-values for all comparisons
        p_values = []
        significant_tests = []
    
        
        # Compare each categorical variable
        for outcome_column, outcome_label, df in categorical_variables:
            result = compare_categorical_clusters(df, 'clusters', outcome_column, outcome_label)
            p_values.append(result) # Add each p value to the vector
            significant_tests.append((outcome_column, outcome_label, df, 'categorical')) # Add variable info
    
        # Compare each continuous variable
        for outcome_column, outcome_label, df in continuous_variables:
            result = compare_continuous_clusters(df, 'clusters', outcome_column, outcome_label)
            p_values.append(result) # Add each p value to the vector
            significant_tests.append((outcome_column, outcome_label, df, 'continuous')) # Add variable info
        
        # Extract the original p-values from the dictionaries
        original_p_values = [result['p_value'] for result in p_values]
    
        # Apply FDR with Benjamini-Hochberg multiple testing correction
        reject, corrected_p_values, _, _ = multipletests(original_p_values, alpha=0.05, method='fdr_bh')
        
    
        # Add corrected p_values to dict
        for i, result in enumerate(p_values):
            result['corrected_p_value'] = corrected_p_values[i]
    
        # Get significant tests for post-hoc tests
        overview = []
        significant_tests_filtered = []
        for result, corr_p, test_info in zip(p_values, corrected_p_values, significant_tests):
            uncorr_p = result['p_value']  # Extract the uncorrected p-value
    
            if corr_p < 0.05:
                overview.append('Sig')
                significant_tests_filtered.append(test_info)
            elif uncorr_p < 0.05 and corr_p >= 0.05:
                    overview.append('No longer sig')
            else:
                overview.append('Not sig')
    
    
        #Post hoc tests
        posthoc_p_values = []
        # Perform post hoc tests on significant results
        for outcome_column, outcome_label, df, var_type in significant_tests_filtered:
            if var_type == 'categorical':
                posthoc_p_value = posthoc_categorical(df, 'clusters', outcome_column, outcome_label)
            else:
                posthoc_p_value = posthoc_continuous(df, 'clusters', outcome_column, outcome_label)
            
            posthoc_p_values.extend(posthoc_p_value)
    
    
        # Extract the original p-values from the posthoc results
        original_p_values = [result['p_value'] for result in posthoc_p_values]
    
        # Apply FDR with Benjamini-Hochberg multiple testing correction
        reject, posthoc_pvals_corrected, _, _ = multipletests(original_p_values, alpha=0.05, method='fdr_bh')
    
        # Add corrected p-values
        for i, result in enumerate(posthoc_p_values):
            result['corrected_p_value'] = posthoc_pvals_corrected[i]
    
        return p_values, posthoc_p_values, categorical_variables, continuous_variables
    
    
    
    def compare_categorical_clusters(df, cluster_column, outcome_column, outcome_label):
        
        contingency_table = pd.crosstab(df[cluster_column], df[outcome_column])
        chi2, p, dof, expected = stats.chi2_contingency(contingency_table)
    
        if (expected >= 5).all():
            test_type = 'Chi-Square'
        else:
            test_type = 'Fishers Exact Test'
    
        contingency_table, observed_table, result_df = pg.chi2_independence(data=df, x=cluster_column, y=outcome_column)
        p_value = result_df.loc[result_df['test'] == 'pearson', 'pval'].values[0]
        chi2_stat = result_df.loc[result_df['test'] == 'pearson', 'chi2'].values[0]
        dof = result_df.loc[result_df['test'] == 'pearson', 'dof'].values[0]
        cramers_v = result_df.loc[result_df['test'] == 'pearson', 'cramer'].values[0]
    
        return {
            'outcome_label': outcome_label,
            'test_type': test_type,
            'p_value': p_value,
            'test_statistic': chi2_stat,
            'dof': dof,
            'effect_size': cramers_v,
            'corrected_p_value': None  # Placeholder, will be filled in after FDR_bh
        }
    
    
    def compare_continuous_clusters(df, cluster_column, outcome_column, outcome_label):
        clusters = df[cluster_column].unique()
        cluster_data = [df[df[cluster_column] == cluster][outcome_column] for cluster in clusters]
    
        # Test for normality
        normality_results = pg.normality(df, dv=outcome_column, group=cluster_column)
        
        if normality_results['normal'].all():
            
            # Perform Bartlett's test for equal variances (for normal data)
            bartlett_stat, bartlett_p = bartlett(*cluster_data)
            
            if bartlett_p < 0.05:
                # Unequal variances, use Welch's ANOVA
                test_type = "Welchs ANOVA"
                welch_anova_results = pg.welch_anova(dv=outcome_column, between=cluster_column, data=df)
                p_value = welch_anova_results['p-unc'][0]
                dof = welch_anova_results['dof'][0]
            else:
                # Equal variances, use one-way ANOVA
                test_type = "ANOVA"
                anova = pg.anova(data=df, dv=outcome_column, between=cluster_column)
                p_value = anova['p-unc'][0]
                dof = anova['dof'][0]
                pd.set_option('display.float_format', lambda x: '%.10f' % x)
        else:
            # Not normally distributed, perform Levene's test for equal variances
            levene_stat, levene_p = levene(*cluster_data, center='median')  # 'median' used for robustness
            test_type = 'Kruskal-Wallis'
            kruskal = pg.kruskal(data=df, dv=outcome_column, between=cluster_column)
            p_value = kruskal['p-unc'].values[0]
            dof = kruskal['ddof1'][0]
            H_stat = kruskal['H'][0]
            # Calculate epsilon-squared
            N = len(df)
            epsilon_squared = max((H_stat - dof + 1) / (N - dof), 0)
    
        return {
        'outcome_label': outcome_label,
        'test_type': test_type,
        'p_value': p_value,
        'test_statistic': H_stat,
        'dof': dof,
        'effect_size': epsilon_squared, 
        'corrected_p_value': None  # Placeholder, will be filled in after FDR_bh
        }
                
    # Post hoc tests for variables significant after FDR_bh
    
    def posthoc_categorical(df, cluster_column, outcome_column, outcome_label):
        clusters = sorted(df[cluster_column].unique())
        posthoc_p_values = []
        for (cluster1, cluster2) in combinations(clusters, 2):
            subset = df[df[cluster_column].isin([cluster1, cluster2])]
            posthoc_table = pd.crosstab(subset[cluster_column], subset[outcome_column])
            chi2, p, dof, _ = stats.chi2_contingency(posthoc_table)
            posthoc_p_values.append({
                'outcome_label': outcome_label,
                'cluster1': cluster1,
                'cluster2': cluster2,
                'test_type': 'Chi-squared',
                'p_value': p  
            })
        return posthoc_p_values
    
    def posthoc_continuous(df, cluster_column, outcome_column, outcome_label):
        # Use Dunn's test for pairwise group comparisons
        dunn = sp.posthoc_dunn(df, val_col= outcome_column, group_col = cluster_column) 
        clusters = sorted(df[cluster_column].unique())
        posthoc_p_values = []
    
        # Iterate through the Dunn's test result matrix
        for i in range(len(clusters)):
            for j in range(i + 1, len(clusters)):  
                cluster1 = clusters[i]
                cluster2 = clusters[j]
                p_value = dunn.iloc[i, j]
                posthoc_p_values.append({
                    'outcome_label': outcome_label,
                    'cluster1': cluster1,
                    'cluster2': cluster2,
                    'test_type': 'Dunns',
                    'p_value': p_value
                    
                })
        return posthoc_p_values