Skip to content
Snippets Groups Projects
Compare_clusters.py 10.3 KiB
Newer Older
mcoth's avatar
mcoth committed
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sun Sep  1 18:52:23 2024

@author: Maya Coulson Theodorsen (mcoth@dtu.dk)

This function performs statistical comparisons across clusters derived from prior analysis.
It evaluates differences between clusters using both categorical and continuous variables,
applies appropriate statistical tests, and adjusts p-values for multiple testing.

Overview:
Comparison of Categorical Variables:
  - Chi-square or Fisher's Exact Test
  - Effect size calculations using Cramér's V
  - Post-hoc chi-square comparisons for significant results

Comparison of Continuous Variables:
  - Tests for normality using Shapiro-Wilk
  - Bartlett's test or Levene's test for homogeneity of variances
  - Welch's ANOVA, One-way ANOVA, or Kruskal-Wallis depending on normality and variance equality
  - Effect size calculations 
  - Dunn's post-hoc test for significant results

Multiple Testing Correction:
  - False Discovery Rate (FDR) adjustment using the Benjamini-Hochberg method

Output:
  - P-values and corrected p-values for all variables
  - Summary of significant variables
  - Post-hoc comparisons

"""

import pandas as pd
import pingouin as pg
from scipy import stats
from itertools import combinations
from scipy.stats import bartlett, levene
import scikit_posthocs as sp
from statsmodels.stats.multitest import multipletests

def compare_clusters(data_complete, questionnaireClusters):
    # Categorical variables
    categorical_variables = [
        ('PTSD_t0_DSMIV', 'Probable PTSD diagnosis', data_complete),
        ('q0002', 'Sex (male)', data_complete),
        ('q0006', 'Children', data_complete),
        ('civil_status', 'Civil status (single)', data_complete),
        ('Psychoanaleptica', 'Psychoanaleptica', data_complete),
        ('Psycholeptica', 'Psycholeptica', data_complete),
        ('q0033_0001', 'Suicidal history', data_complete),
        ('binge', 'Excessive alcohol intake', data_complete),
        ('ADHD_total_GROUP_t0', 'Probable childhood ADHD', data_complete),
        ('drugs', 'Current drug usage', data_complete),
        ('Military_trauma', 'Exposed to war', data_complete),
        ('Unemployed', 'Unemployed', data_complete)
    ]

    # Continuous variables
    continuous_variables = [
        ('q0003_0001', 'Age', data_complete),
        ('q0008_0001', 'Self-rated health', data_complete),
        ('PCL_t0', 'PCL total score', data_complete),
        ('Intrusion_t0', 'PCL Intrusion', data_complete),
        ('PCL Avoidance', 'PCL Avoidance', questionnaireClusters),
        ('PCL Numbing', 'PCL Numbing', questionnaireClusters),
        ('PCL Hyperarousal', 'PCL Hyperarousal', questionnaireClusters),
        ('DASS Anxiety', 'DASS Anxiety', questionnaireClusters),
        ('DASS Depression', 'DASS Depression', questionnaireClusters),
        ('DASS Stress', 'DASS Stress', questionnaireClusters),
        ('Traume_t0_total', 'Total traumas', data_complete),
        ('Antal_unikke_traumer_t0', 'Total unique traumas', data_complete)
    ]
    
    # Store p-values for all comparisons
    p_values = []
    significant_tests = []

    
    # Compare each categorical variable
    for outcome_column, outcome_label, df in categorical_variables:
        result = compare_categorical_clusters(df, 'clusters', outcome_column, outcome_label)
        p_values.append(result) # Add each p value to the vector
        significant_tests.append((outcome_column, outcome_label, df, 'categorical')) # Add variable info

    # Compare each continuous variable
    for outcome_column, outcome_label, df in continuous_variables:
        result = compare_continuous_clusters(df, 'clusters', outcome_column, outcome_label)
        p_values.append(result) # Add each p value to the vector
        significant_tests.append((outcome_column, outcome_label, df, 'continuous')) # Add variable info
    
    # Extract the original p-values from the dictionaries
    original_p_values = [result['p_value'] for result in p_values]

    # Apply FDR with Benjamini-Hochberg multiple testing correction
    reject, corrected_p_values, _, _ = multipletests(original_p_values, alpha=0.05, method='fdr_bh')
    

    # Add corrected p_values to dict
    for i, result in enumerate(p_values):
        result['corrected_p_value'] = corrected_p_values[i]

    # Get significant tests for post-hoc tests
    overview = []
    significant_tests_filtered = []
    for result, corr_p, test_info in zip(p_values, corrected_p_values, significant_tests):
        uncorr_p = result['p_value']  # Extract the uncorrected p-value

        if corr_p < 0.05:
            overview.append('Sig')
            significant_tests_filtered.append(test_info)
        elif uncorr_p < 0.05 and corr_p >= 0.05:
                overview.append('No longer sig')
        else:
            overview.append('Not sig')


    #Post hoc tests
    posthoc_p_values = []
    # Perform post hoc tests on significant results
    for outcome_column, outcome_label, df, var_type in significant_tests_filtered:
        if var_type == 'categorical':
            posthoc_p_value = posthoc_categorical(df, 'clusters', outcome_column, outcome_label)
        else:
            posthoc_p_value = posthoc_continuous(df, 'clusters', outcome_column, outcome_label)
        
        posthoc_p_values.extend(posthoc_p_value)


    # Extract the original p-values from the posthoc results
    original_p_values = [result['p_value'] for result in posthoc_p_values]

    # Apply FDR with Benjamini-Hochberg multiple testing correction
    reject, posthoc_pvals_corrected, _, _ = multipletests(original_p_values, alpha=0.05, method='fdr_bh')

    # Add corrected p-values
    for i, result in enumerate(posthoc_p_values):
        result['corrected_p_value'] = posthoc_pvals_corrected[i]

    return p_values, posthoc_p_values, categorical_variables, continuous_variables



def compare_categorical_clusters(df, cluster_column, outcome_column, outcome_label):
    
    contingency_table = pd.crosstab(df[cluster_column], df[outcome_column])
    chi2, p, dof, expected = stats.chi2_contingency(contingency_table)

    if (expected >= 5).all():
        test_type = 'Chi-Square'
    else:
        test_type = 'Fishers Exact Test'

    contingency_table, observed_table, result_df = pg.chi2_independence(data=df, x=cluster_column, y=outcome_column)
    p_value = result_df.loc[result_df['test'] == 'pearson', 'pval'].values[0]
    chi2_stat = result_df.loc[result_df['test'] == 'pearson', 'chi2'].values[0]
    dof = result_df.loc[result_df['test'] == 'pearson', 'dof'].values[0]
    cramers_v = result_df.loc[result_df['test'] == 'pearson', 'cramer'].values[0]

    return {
        'outcome_label': outcome_label,
        'test_type': test_type,
        'p_value': p_value,
        'test_statistic': chi2_stat,
        'dof': dof,
        'effect_size': cramers_v,
        'corrected_p_value': None  # Placeholder, will be filled in after FDR_bh
    }


def compare_continuous_clusters(df, cluster_column, outcome_column, outcome_label):
    clusters = df[cluster_column].unique()
    cluster_data = [df[df[cluster_column] == cluster][outcome_column] for cluster in clusters]

    # Test for normality
    normality_results = pg.normality(df, dv=outcome_column, group=cluster_column)
    
    if normality_results['normal'].all():
        
        # Perform Bartlett's test for equal variances (for normal data)
        bartlett_stat, bartlett_p = bartlett(*cluster_data)
        
        if bartlett_p < 0.05:
            # Unequal variances, use Welch's ANOVA
            test_type = "Welchs ANOVA"
            welch_anova_results = pg.welch_anova(dv=outcome_column, between=cluster_column, data=df)
            p_value = welch_anova_results['p-unc'][0]
            dof = welch_anova_results['dof'][0]
        else:
            # Equal variances, use one-way ANOVA
            test_type = "ANOVA"
            anova = pg.anova(data=df, dv=outcome_column, between=cluster_column)
            p_value = anova['p-unc'][0]
            dof = anova['dof'][0]
            pd.set_option('display.float_format', lambda x: '%.10f' % x)
    else:
        # Not normally distributed, perform Levene's test for equal variances
        levene_stat, levene_p = levene(*cluster_data, center='median')  # 'median' used for robustness
        test_type = 'Kruskal-Wallis'
        kruskal = pg.kruskal(data=df, dv=outcome_column, between=cluster_column)
        p_value = kruskal['p-unc'].values[0]
        dof = kruskal['ddof1'][0]
        H_stat = kruskal['H'][0]
        # Calculate epsilon-squared
        N = len(df)
        epsilon_squared = max((H_stat - dof + 1) / (N - dof), 0)

    return {
    'outcome_label': outcome_label,
    'test_type': test_type,
    'p_value': p_value,
    'test_statistic': H_stat,
    'dof': dof,
    'effect_size': epsilon_squared, 
    'corrected_p_value': None  # Placeholder, will be filled in after FDR_bh
    }
            
# Post hoc tests for variables significant after FDR_bh

def posthoc_categorical(df, cluster_column, outcome_column, outcome_label):
    clusters = sorted(df[cluster_column].unique())
    posthoc_p_values = []
    for (cluster1, cluster2) in combinations(clusters, 2):
        subset = df[df[cluster_column].isin([cluster1, cluster2])]
        posthoc_table = pd.crosstab(subset[cluster_column], subset[outcome_column])
        chi2, p, dof, _ = stats.chi2_contingency(posthoc_table)
        posthoc_p_values.append({
            'outcome_label': outcome_label,
            'cluster1': cluster1,
            'cluster2': cluster2,
            'test_type': 'Chi-squared',
            'p_value': p  
        })
    return posthoc_p_values

def posthoc_continuous(df, cluster_column, outcome_column, outcome_label):
    # Use Dunn's test for pairwise group comparisons
    dunn = sp.posthoc_dunn(df, val_col= outcome_column, group_col = cluster_column) 
    clusters = sorted(df[cluster_column].unique())
    posthoc_p_values = []

    # Iterate through the Dunn's test result matrix
    for i in range(len(clusters)):
        for j in range(i + 1, len(clusters)):  
            cluster1 = clusters[i]
            cluster2 = clusters[j]
            p_value = dunn.iloc[i, j]
            posthoc_p_values.append({
                'outcome_label': outcome_label,
                'cluster1': cluster1,
                'cluster2': cluster2,
                'test_type': 'Dunns',
                'p_value': p_value
                
            })
    return posthoc_p_values